# ============================================================================= # 企微IT智能服务台 — 外部系统统一门面服务 # ============================================================================= # 说明: # 1. 上层业务代码(AI Wingman、会话管理等)只依赖此类 # 2. 按优先级链式查询(联软 → aTrust → eHR) # 3. 自动处理降级(系统不可用时跳到下一个) # 4. 所有方法均有详细行内注释(做什么 + 为什么) # # 使用方式: # from app.services.external import get_external_service # svc = get_external_service() # terminal = await svc.find_user_terminal("songxian") # ============================================================================= import logging from typing import Any, Dict, List, Optional from app.services.cache_service import CacheService from app.services.external.base import ( ExternalSystemAdapter, TerminalInfo, SecurityStatus, VpnSession, ) from app.services.external.config import ExternalSystemConfig from app.services.external.cache import ExternalSystemCache from app.services.external.mock import MockAdapter logger = logging.getLogger(__name__) # ============================================================================= # 全局单例(懒加载) # ============================================================================= _external_service_instance: Optional["ExternalSystemService"] = None def get_external_service() -> "ExternalSystemService": """获取外部系统服务单例 做什么:返回全局唯一的 ExternalSystemService 实例 为什么:避免重复初始化 Adapter,节省连接资源 """ global _external_service_instance if _external_service_instance is None: raise RuntimeError( "ExternalSystemService 尚未初始化," "请在应用启动时调用 init_external_service()" ) return _external_service_instance def init_external_service( config: Optional[ExternalSystemConfig] = None, cache_service: Optional[Any] = None, ) -> ExternalSystemService: """初始化外部系统服务(应用启动时调用一次) 做什么:根据配置创建所有 Adapter,组装成 ExternalSystemService 为什么:集中初始化,避免分散在各处创建 Adapter 实例 Args: config: 外部系统配置,None 时自动从环境变量加载 cache_service: Redis 缓存服务实例,None 时降级为无缓存 Returns: 初始化完成的 ExternalSystemService 实例 """ global _external_service_instance if config is None: from app.services.external.config import load_external_config config = load_external_config() # 创建缓存层 cache = ExternalSystemCache(cache_service) if cache_service else None # 按优先级组装 Adapter 字典 adapters: Dict[str, ExternalSystemAdapter] = {} if config.mock_mode: # Mock 模式:所有系统走 MockAdapter logger.info("[External] Mock模式已启用,所有外部系统查询走Mock数据") mock = MockAdapter() adapters = { "lianruan": mock, "huorong": mock, "atrust": mock, "ehr": mock, } else: # ── 联软(主映射源P0)───────────────────────────── # 做什么:创建联软 Adapter(凭证已配置时启用) # 为什么:联软的 strusername 字段是员工→终端映射最可靠来源 if config.lianruan_enabled: from app.services.external.lianruan_adapter import LianruanAdapter adapters["lianruan"] = LianruanAdapter(config) logger.info("[External] 联软适配器已启用") else: logger.warning( "[External] 联软适配器未启用(凭证未配置)," "终端映射功能将降级" ) # ── 火绒(安全源P0)───────────────────────────────── # 做什么:创建火绒 Adapter(凭证已配置时启用) # 为什么:火绒提供终端安全状态(病毒/漏洞/隔离),不参与映射 if config.huorong_enabled: from app.services.external.huorong_adapter import HuorongAdapter adapters["huorong"] = HuorongAdapter(config) logger.info("[External] 火绒适配器已启用") else: logger.info("[External] 火绒适配器未启用(凭证未配置)") # ── aTrust(VPN源P1)──────────────────────────────── # 做什么:创建 aTrust Adapter(凭证已配置时启用) # 为什么:aTrust 提供 VPN 在线状态和虚拟IP,用于远程员工排查 if config.atrust_enabled: from app.services.external.atrust_adapter import ATrustAdapter adapters["atrust"] = ATrustAdapter(config) logger.info("[External] aTrust适配器已启用") else: logger.info("[External] aTrust适配器未启用(凭证未配置)") # ── eHR(辅助静态数据P2)─────────────────────────── # 做什么:创建 eHR Adapter(凭证已配置时启用) # 为什么:eHR 提供员工基础信息和任职信息,作为静态数据补充 if config.ehr_enabled: from app.services.external.ehr_adapter import EHRAdapter adapters["ehr"] = EHRAdapter(config) logger.info("[External] eHR适配器已启用") else: logger.info("[External] eHR适配器未启用(凭证未配置)") _external_service_instance = ExternalSystemService(adapters, cache) logger.info( f"[External] 服务初始化完成,已加载适配器: " f"{list(adapters.keys())}" ) return _external_service_instance # ============================================================================= # 统一门面服务 # ============================================================================= class ExternalSystemService: """外部系统统一门面 — 上层业务唯一依赖的入口 做什么:按优先级链式查询外部系统,对上层屏蔽底层差异 为什么:上层代码不需要知道终端数据来自联软还是火绒 查询优先级(映射场景): 1. 联软(主源,strusername 精确匹配) 2. aTrust(VPN源,bindUserList 匹配) 3. eHR(静态辅助,无终端数据,返回None) 安全能力(仅火绒): - 获取安全状态(病毒/漏洞) - 隔离/解除终端(需admin角色+二次确认) VPN能力(仅aTrust): - 查询在线会话 - 踢出用户 """ def __init__( self, adapters: Dict[str, ExternalSystemAdapter], cache: Optional[ExternalSystemCache] = None, ): """初始化统一门面 Args: adapters: 系统标识 → Adapter 实例 的字典 cache: 外部数据缓存层(可为None,降级为无缓存) """ self._adapters = adapters self._cache = cache # ========================================================================= # 终端查询(映射核心) # ========================================================================= async def find_user_terminal(self, username: str) -> Optional[TerminalInfo]: """查找用户终端 — 按优先级链式查询 做什么:根据员工账号查找其使用的终端信息 为什么:这是员工→终端映射的核心入口,坐席排查时首先需要知道 员工用哪台电脑 查询顺序: 1. 联软 queryDevByParams(strusername=xxx) — 最精确 2. aTrust queryAll(bindUserList) — VPN 场景补充 3. eHR — 无终端数据,返回 None 降级:某系统不可用时自动跳过,不影响整体结果。 Args: username: 员工账号(如 'songxian') Returns: TerminalInfo 或 None(所有系统均未找到) """ logger.info(f"[External] 查找用户终端: username={username}") # ── 第1优先级:联软(主映射源)──────────────────── # 做什么:优先用联软查,它有 strusername 精确字段 # 为什么:联软直接建立员工账号→终端的映射,比IP交叉匹配可靠 lianruan = self._adapters.get("lianruan") if lianruan and lianruan.is_available: try: result = await self._query_with_cache( "lianruan", "get_terminal_by_user", username ) if result: logger.info( f"[External] 联软命中: username={username}, " f"computer={result.computer_name}" ) return result except Exception as e: # 联软不可用 → 降级到 aTrust,不阻断 logger.warning( f"[External] 联软查询失败(降级到aTrust): {e}" ) else: logger.debug("[External] 联软不可用或未启用,跳过") # ── 第2优先级:aTrust(VPN源)───────────────────── # 做什么:联软未命中时,用 aTrust 查 VPN 终端 # 为什么:远程办公员工可能不在联软覆盖范围内 atrust = self._adapters.get("atrust") if atrust and atrust.is_available: try: result = await self._query_with_cache( "atrust", "get_terminal_by_user", username ) if result: logger.info( f"[External] aTrust命中: username={username}, " f"vpn_ip={result.ip_addresses}" ) return result except Exception as e: logger.warning( f"[External] aTrust查询失败(降级到eHR): {e}" ) else: logger.debug("[External] aTrust不可用或未启用,跳过") # ── 第3优先级:eHR(辅助静态数据)──────────────── # 做什么:eHR 不提供终端数据,此方法返回 None # 为什么:保留接口一致性,未来可能扩展 ehr = self._adapters.get("ehr") if ehr and ehr.is_available: result = await self._query_with_cache( "ehr", "get_terminal_by_user", username ) if result: return result # 所有系统均未命中 logger.info(f"[External] 所有系统均未找到用户终端: username={username}") return None async def get_terminal_detail(self, terminal_id: str) -> Optional[TerminalInfo]: """查询终端详细信息(硬件/软件/网络配置) 做什么:获取比 find_user_terminal 更详细的终端信息 为什么:排查硬件故障(卡慢)时需要CPU/内存/磁盘使用率等数据 优先联软(getDevAllInfo 比火绒 _info2 更详细)。 Args: terminal_id: 终端在来源系统中的唯一ID Returns: TerminalInfo(含 hardware_summary)或 None """ # 联软详细信息最全(含主板/CPU/内存/硬盘/显示器) lianruan = self._adapters.get("lianruan") if lianruan and lianruan.is_available: try: return await lianruan.get_terminal_detail(terminal_id) except Exception as e: logger.warning(f"[External] 联软详细信息查询失败: {e}") # 火绒作为备选(_info2 含硬件/软件/网络配置) huorong = self._adapters.get("huorong") if huorong and huorong.is_available: try: return await huorong.get_terminal_detail(terminal_id) except Exception as e: logger.warning(f"[External] 火绒详细信息查询失败: {e}") return None # ========================================================================= # 安全能力(仅火绒) # ========================================================================= async def get_terminal_security(self, terminal_id: str) -> Optional[SecurityStatus]: """获取终端安全状态 做什么:查询终端的病毒事件、高危漏洞、隔离状态 为什么:坐席排查安全问题时需要一目了然 仅火绒支持此能力。 Args: terminal_id: 火绒的 client_id Returns: SecurityStatus 或 None(火绒不可用) """ huorong = self._adapters.get("huorong") if not huorong or not huorong.is_available: logger.warning("[External] 火绒不可用,无法获取安全状态") return None try: return await self._query_with_cache( "huorong", "get_security_status", terminal_id ) except Exception as e: logger.error(f"[External] 获取安全状态失败: {e}") return None async def isolate_terminal( self, terminal_id: str, reason: str, operator: str ) -> bool: """隔离终端(断网) 做什么:调用火绒 _create(type=netctrl) 隔离终端 为什么:安全事件紧急处理,阻断威胁扩散 仅火绒支持。调用前必须在上层做: 1. 操作者角色校验(仅 admin 可操作) 2. 二次确认弹窗 3. 审计日志记录 Args: terminal_id: 火绒的 client_id reason: 隔离原因(记入审计日志) operator: 操作者账号 Returns: True=成功, False=失败 """ huorong = self._adapters.get("huorong") if not huorong or not huorong.is_available: logger.error("[External] 火绒不可用,无法执行隔离") return False logger.warning( f"[External] 执行终端隔离: terminal={terminal_id}, " f"operator={operator}, reason={reason}" ) try: return await huorong.isolate_terminal(terminal_id, reason) except Exception as e: logger.error(f"[External] 隔离失败: {e}") return False async def unisolate_terminal(self, terminal_id: str) -> bool: """解除终端隔离(恢复网络)""" huorong = self._adapters.get("huorong") if not huorong or not huorong.is_available: return False try: return await huorong.unisolate_terminal(terminal_id) except Exception as e: logger.error(f"[External] 解除隔离失败: {e}") return False # ========================================================================= # VPN/在线状态(仅aTrust) # ========================================================================= async def get_vpn_sessions( self, username: Optional[str] = None ) -> List[VpnSession]: """查询VPN在线会话 做什么:获取当前通过aTrust在线的VPN会话列表 为什么:坐席需要知道远程员工是否在线、VPN IP是什么 仅aTrust支持。 Args: username: 可选,过滤指定用户的会话 Returns: VPN会话列表(可能为空) """ atrust = self._adapters.get("atrust") if not atrust or not atrust.is_available: return [] try: return await atrust.get_vpn_sessions(username) except Exception as e: logger.warning(f"[External] 查询VPN会话失败: {e}") return [] async def get_online_status(self, username: str) -> bool: """查询用户是否在线 做什么:检查用户当前是否在线(任何方式接入) 为什么:坐席发起协作或推送消息前需要知道用户是否可达 Args: username: 员工账号 Returns: True=在线, False=离线或未知 """ # 优先联软(内网接入) lianruan = self._adapters.get("lianruan") if lianruan and lianruan.is_available: try: if await lianruan.get_online_status(username): return True except Exception: pass # 再查 aTrust(VPN接入) atrust = self._adapters.get("atrust") if atrust and atrust.is_available: try: if await atrust.get_online_status(username): return True except Exception: pass return False # ========================================================================= # 内部方法 # ========================================================================= async def _query_with_cache( self, system: str, method: str, param: str ) -> Any: """带缓存的查询(内部方法) 做什么:先查缓存,未命中则调Adapter,结果写回缓存 为什么:减少外部API调用频率,降低延迟 Args: system: 系统标识 method: 方法名(用于缓存key) param: 查询参数(用于缓存key) Returns: Adapter返回的数据(可能经缓存) """ # 步骤1:尝试从缓存读取 if self._cache: cached = await self._cache.get(system, method, param) if cached: return cached # 步骤2:缓存未命中,调用Adapter adapter = self._adapters.get(system) if not adapter: raise RuntimeError(f"Adapter不存在: {system}") method_map = { "get_terminal_by_user": adapter.get_terminal_by_user, "get_terminal_by_computer": adapter.get_terminal_by_computer, "get_terminal_detail": adapter.get_terminal_detail, "get_security_status": adapter.get_security_status, "get_vpn_sessions": adapter.get_vpn_sessions, "get_online_status": adapter.get_online_status, } if method not in method_map: raise RuntimeError(f"未知方法: {method}") result = await method_map[method](param) # 步骤3:写入缓存(仅非None结果) if result and self._cache: # 转成可序列化的字典 data = result.dict() if hasattr(result, "dict") else result await self._cache.set(system, method, param, data) return result