Files

492 lines
19 KiB
Python
Raw Permalink Normal View History

# =============================================================================
# 企微IT智能服务台 — 外部系统统一门面服务
# =============================================================================
# 说明:
# 1. 上层业务代码(AI Wingman、会话管理等)只依赖此类
# 2. 按优先级链式查询(联软 → aTrust → eHR
# 3. 自动处理降级(系统不可用时跳到下一个)
# 4. 所有方法均有详细行内注释(做什么 + 为什么)
#
# 使用方式:
# from app.services.external import get_external_service
# svc = get_external_service()
# terminal = await svc.find_user_terminal("songxian")
# =============================================================================
import logging
from typing import Any, Dict, List, Optional
from app.services.cache_service import CacheService
from app.services.external.base import (
ExternalSystemAdapter,
TerminalInfo,
SecurityStatus,
VpnSession,
)
from app.services.external.config import ExternalSystemConfig
from app.services.external.cache import ExternalSystemCache
from app.services.external.mock import MockAdapter
logger = logging.getLogger(__name__)
# =============================================================================
# 全局单例(懒加载)
# =============================================================================
_external_service_instance: Optional["ExternalSystemService"] = None
def get_external_service() -> "ExternalSystemService":
"""获取外部系统服务单例
做什么:返回全局唯一的 ExternalSystemService 实例
为什么:避免重复初始化 Adapter,节省连接资源
"""
global _external_service_instance
if _external_service_instance is None:
raise RuntimeError(
"ExternalSystemService 尚未初始化,"
"请在应用启动时调用 init_external_service()"
)
return _external_service_instance
def init_external_service(
config: Optional[ExternalSystemConfig] = None,
cache_service: Optional[Any] = None,
) -> ExternalSystemService:
"""初始化外部系统服务(应用启动时调用一次)
做什么:根据配置创建所有 Adapter,组装成 ExternalSystemService
为什么:集中初始化,避免分散在各处创建 Adapter 实例
Args:
config: 外部系统配置,None 时自动从环境变量加载
cache_service: Redis 缓存服务实例,None 时降级为无缓存
Returns:
初始化完成的 ExternalSystemService 实例
"""
global _external_service_instance
if config is None:
from app.services.external.config import load_external_config
config = load_external_config()
# 创建缓存层
cache = ExternalSystemCache(cache_service) if cache_service else None
# 按优先级组装 Adapter 字典
adapters: Dict[str, ExternalSystemAdapter] = {}
if config.mock_mode:
# Mock 模式:所有系统走 MockAdapter
logger.info("[External] Mock模式已启用,所有外部系统查询走Mock数据")
mock = MockAdapter()
adapters = {
"lianruan": mock,
"huorong": mock,
"atrust": mock,
"ehr": mock,
}
else:
# ── 联软(主映射源P0)─────────────────────────────
# 做什么:创建联软 Adapter(凭证已配置时启用)
# 为什么:联软的 strusername 字段是员工→终端映射最可靠来源
if config.lianruan_enabled:
from app.services.external.lianruan_adapter import LianruanAdapter
adapters["lianruan"] = LianruanAdapter(config)
logger.info("[External] 联软适配器已启用")
else:
logger.warning(
"[External] 联软适配器未启用(凭证未配置),"
"终端映射功能将降级"
)
# ── 火绒(安全源P0)─────────────────────────────────
# 做什么:创建火绒 Adapter(凭证已配置时启用)
# 为什么:火绒提供终端安全状态(病毒/漏洞/隔离),不参与映射
if config.huorong_enabled:
from app.services.external.huorong_adapter import HuorongAdapter
adapters["huorong"] = HuorongAdapter(config)
logger.info("[External] 火绒适配器已启用")
else:
logger.info("[External] 火绒适配器未启用(凭证未配置)")
# ── aTrust(VPN源P1)────────────────────────────────
# 做什么:创建 aTrust Adapter(凭证已配置时启用)
# 为什么:aTrust 提供 VPN 在线状态和虚拟IP,用于远程员工排查
if config.atrust_enabled:
from app.services.external.atrust_adapter import ATrustAdapter
adapters["atrust"] = ATrustAdapter(config)
logger.info("[External] aTrust适配器已启用")
else:
logger.info("[External] aTrust适配器未启用(凭证未配置)")
# ── eHR(辅助静态数据P2)───────────────────────────
# 做什么:创建 eHR Adapter(凭证已配置时启用)
# 为什么:eHR 提供员工基础信息和任职信息,作为静态数据补充
if config.ehr_enabled:
from app.services.external.ehr_adapter import EHRAdapter
adapters["ehr"] = EHRAdapter(config)
logger.info("[External] eHR适配器已启用")
else:
logger.info("[External] eHR适配器未启用(凭证未配置)")
_external_service_instance = ExternalSystemService(adapters, cache)
logger.info(
f"[External] 服务初始化完成,已加载适配器: "
f"{list(adapters.keys())}"
)
return _external_service_instance
# =============================================================================
# 统一门面服务
# =============================================================================
class ExternalSystemService:
"""外部系统统一门面 — 上层业务唯一依赖的入口
做什么:按优先级链式查询外部系统,对上层屏蔽底层差异
为什么:上层代码不需要知道终端数据来自联软还是火绒
查询优先级(映射场景):
1. 联软(主源,strusername 精确匹配)
2. aTrustVPN源,bindUserList 匹配)
3. eHR(静态辅助,无终端数据,返回None)
安全能力(仅火绒):
- 获取安全状态(病毒/漏洞)
- 隔离/解除终端(需admin角色+二次确认)
VPN能力(仅aTrust):
- 查询在线会话
- 踢出用户
"""
def __init__(
self,
adapters: Dict[str, ExternalSystemAdapter],
cache: Optional[ExternalSystemCache] = None,
):
"""初始化统一门面
Args:
adapters: 系统标识 → Adapter 实例 的字典
cache: 外部数据缓存层(可为None,降级为无缓存)
"""
self._adapters = adapters
self._cache = cache
# =========================================================================
# 终端查询(映射核心)
# =========================================================================
async def find_user_terminal(self, username: str) -> Optional[TerminalInfo]:
"""查找用户终端 — 按优先级链式查询
做什么:根据员工账号查找其使用的终端信息
为什么:这是员工→终端映射的核心入口,坐席排查时首先需要知道
员工用哪台电脑
查询顺序:
1. 联软 queryDevByParams(strusername=xxx) — 最精确
2. aTrust queryAll(bindUserList) — VPN 场景补充
3. eHR — 无终端数据,返回 None
降级:某系统不可用时自动跳过,不影响整体结果。
Args:
username: 员工账号(如 'songxian'
Returns:
TerminalInfo 或 None(所有系统均未找到)
"""
logger.info(f"[External] 查找用户终端: username={username}")
# ── 第1优先级:联软(主映射源)────────────────────
# 做什么:优先用联软查,它有 strusername 精确字段
# 为什么:联软直接建立员工账号→终端的映射,比IP交叉匹配可靠
lianruan = self._adapters.get("lianruan")
if lianruan and lianruan.is_available:
try:
result = await self._query_with_cache(
"lianruan", "get_terminal_by_user", username
)
if result:
logger.info(
f"[External] 联软命中: username={username}, "
f"computer={result.computer_name}"
)
return result
except Exception as e:
# 联软不可用 → 降级到 aTrust,不阻断
logger.warning(
f"[External] 联软查询失败(降级到aTrust): {e}"
)
else:
logger.debug("[External] 联软不可用或未启用,跳过")
# ── 第2优先级:aTrust(VPN源)─────────────────────
# 做什么:联软未命中时,用 aTrust 查 VPN 终端
# 为什么:远程办公员工可能不在联软覆盖范围内
atrust = self._adapters.get("atrust")
if atrust and atrust.is_available:
try:
result = await self._query_with_cache(
"atrust", "get_terminal_by_user", username
)
if result:
logger.info(
f"[External] aTrust命中: username={username}, "
f"vpn_ip={result.ip_addresses}"
)
return result
except Exception as e:
logger.warning(
f"[External] aTrust查询失败(降级到eHR: {e}"
)
else:
logger.debug("[External] aTrust不可用或未启用,跳过")
# ── 第3优先级:eHR(辅助静态数据)────────────────
# 做什么:eHR 不提供终端数据,此方法返回 None
# 为什么:保留接口一致性,未来可能扩展
ehr = self._adapters.get("ehr")
if ehr and ehr.is_available:
result = await self._query_with_cache(
"ehr", "get_terminal_by_user", username
)
if result:
return result
# 所有系统均未命中
logger.info(f"[External] 所有系统均未找到用户终端: username={username}")
return None
async def get_terminal_detail(self, terminal_id: str) -> Optional[TerminalInfo]:
"""查询终端详细信息(硬件/软件/网络配置)
做什么:获取比 find_user_terminal 更详细的终端信息
为什么:排查硬件故障(卡慢)时需要CPU/内存/磁盘使用率等数据
优先联软(getDevAllInfo 比火绒 _info2 更详细)。
Args:
terminal_id: 终端在来源系统中的唯一ID
Returns:
TerminalInfo(含 hardware_summary)或 None
"""
# 联软详细信息最全(含主板/CPU/内存/硬盘/显示器)
lianruan = self._adapters.get("lianruan")
if lianruan and lianruan.is_available:
try:
return await lianruan.get_terminal_detail(terminal_id)
except Exception as e:
logger.warning(f"[External] 联软详细信息查询失败: {e}")
# 火绒作为备选(_info2 含硬件/软件/网络配置)
huorong = self._adapters.get("huorong")
if huorong and huorong.is_available:
try:
return await huorong.get_terminal_detail(terminal_id)
except Exception as e:
logger.warning(f"[External] 火绒详细信息查询失败: {e}")
return None
# =========================================================================
# 安全能力(仅火绒)
# =========================================================================
async def get_terminal_security(self, terminal_id: str) -> Optional[SecurityStatus]:
"""获取终端安全状态
做什么:查询终端的病毒事件、高危漏洞、隔离状态
为什么:坐席排查安全问题时需要一目了然
仅火绒支持此能力。
Args:
terminal_id: 火绒的 client_id
Returns:
SecurityStatus 或 None(火绒不可用)
"""
huorong = self._adapters.get("huorong")
if not huorong or not huorong.is_available:
logger.warning("[External] 火绒不可用,无法获取安全状态")
return None
try:
return await self._query_with_cache(
"huorong", "get_security_status", terminal_id
)
except Exception as e:
logger.error(f"[External] 获取安全状态失败: {e}")
return None
async def isolate_terminal(
self, terminal_id: str, reason: str, operator: str
) -> bool:
"""隔离终端(断网)
做什么:调用火绒 _create(type=netctrl) 隔离终端
为什么:安全事件紧急处理,阻断威胁扩散
仅火绒支持。调用前必须在上层做:
1. 操作者角色校验(仅 admin 可操作)
2. 二次确认弹窗
3. 审计日志记录
Args:
terminal_id: 火绒的 client_id
reason: 隔离原因(记入审计日志)
operator: 操作者账号
Returns:
True=成功, False=失败
"""
huorong = self._adapters.get("huorong")
if not huorong or not huorong.is_available:
logger.error("[External] 火绒不可用,无法执行隔离")
return False
logger.warning(
f"[External] 执行终端隔离: terminal={terminal_id}, "
f"operator={operator}, reason={reason}"
)
try:
return await huorong.isolate_terminal(terminal_id, reason)
except Exception as e:
logger.error(f"[External] 隔离失败: {e}")
return False
async def unisolate_terminal(self, terminal_id: str) -> bool:
"""解除终端隔离(恢复网络)"""
huorong = self._adapters.get("huorong")
if not huorong or not huorong.is_available:
return False
try:
return await huorong.unisolate_terminal(terminal_id)
except Exception as e:
logger.error(f"[External] 解除隔离失败: {e}")
return False
# =========================================================================
# VPN/在线状态(仅aTrust
# =========================================================================
async def get_vpn_sessions(
self, username: Optional[str] = None
) -> List[VpnSession]:
"""查询VPN在线会话
做什么:获取当前通过aTrust在线的VPN会话列表
为什么:坐席需要知道远程员工是否在线、VPN IP是什么
仅aTrust支持。
Args:
username: 可选,过滤指定用户的会话
Returns:
VPN会话列表(可能为空)
"""
atrust = self._adapters.get("atrust")
if not atrust or not atrust.is_available:
return []
try:
return await atrust.get_vpn_sessions(username)
except Exception as e:
logger.warning(f"[External] 查询VPN会话失败: {e}")
return []
async def get_online_status(self, username: str) -> bool:
"""查询用户是否在线
做什么:检查用户当前是否在线(任何方式接入)
为什么:坐席发起协作或推送消息前需要知道用户是否可达
Args:
username: 员工账号
Returns:
True=在线, False=离线或未知
"""
# 优先联软(内网接入)
lianruan = self._adapters.get("lianruan")
if lianruan and lianruan.is_available:
try:
if await lianruan.get_online_status(username):
return True
except Exception:
pass
# 再查 aTrustVPN接入)
atrust = self._adapters.get("atrust")
if atrust and atrust.is_available:
try:
if await atrust.get_online_status(username):
return True
except Exception:
pass
return False
# =========================================================================
# 内部方法
# =========================================================================
async def _query_with_cache(
self, system: str, method: str, param: str
) -> Any:
"""带缓存的查询(内部方法)
做什么:先查缓存,未命中则调Adapter,结果写回缓存
为什么:减少外部API调用频率,降低延迟
Args:
system: 系统标识
method: 方法名(用于缓存key
param: 查询参数(用于缓存key)
Returns:
Adapter返回的数据(可能经缓存)
"""
# 步骤1:尝试从缓存读取
if self._cache:
cached = await self._cache.get(system, method, param)
if cached:
return cached
# 步骤2:缓存未命中,调用Adapter
adapter = self._adapters.get(system)
if not adapter:
raise RuntimeError(f"Adapter不存在: {system}")
method_map = {
"get_terminal_by_user": adapter.get_terminal_by_user,
"get_terminal_by_computer": adapter.get_terminal_by_computer,
"get_terminal_detail": adapter.get_terminal_detail,
"get_security_status": adapter.get_security_status,
"get_vpn_sessions": adapter.get_vpn_sessions,
"get_online_status": adapter.get_online_status,
}
if method not in method_map:
raise RuntimeError(f"未知方法: {method}")
result = await method_map[method](param)
# 步骤3:写入缓存(仅非None结果)
if result and self._cache:
# 转成可序列化的字典
data = result.dict() if hasattr(result, "dict") else result
await self._cache.set(system, method, param, data)
return result