# ============================================================================= # 企微IT智能服务台 — 外部系统适配器抽象基类 + 统一数据模型 # ============================================================================= # 说明: # 1. 定义所有外部系统共用的抽象接口(ABC) # 2. 定义统一的DTO模型(TerminalInfo/SecurityStatus/VpnSession) # 3. 每个外部系统实现此接口,上层业务只依赖抽象接口 # # 设计原则: # - 默认返回None/空 — 子类按需覆写自己支持的方法 # - 不支持的能力不报错,返回None让调用方走降级逻辑 # - raw_data字段保留原始响应,调试用,生产环境可关闭 # ============================================================================= import logging from abc import ABC, abstractmethod from datetime import datetime from typing import Dict, List, Optional from pydantic import BaseModel, Field logger = logging.getLogger(__name__) # ============================================================================= # 统一数据模型(DTO) # ============================================================================= class TerminalInfo(BaseModel): """统一终端信息模型 — 所有Adapter返回同一结构 做什么:把联软/火绒/aTrust不同格式的终端数据映射到统一结构 为什么:上层业务代码不需要关心数据来自哪个系统 """ # ── 来源标识 ── source_system: str = Field(..., description="数据来源系统标识: lianruan/huorong/atrust/ehr") # ── 基础标识 ── terminal_id: Optional[str] = Field(None, description="终端在来源系统中的唯一ID") computer_name: str = Field(..., description="计算机名") # ── 网络信息 ── ip_addresses: List[str] = Field(default_factory=list, description="IP地址列表(含VPN虚拟IP)") mac_addresses: List[str] = Field(default_factory=list, description="MAC地址列表") # ── 系统信息 ── os_version: Optional[str] = Field(None, description="操作系统版本") is_online: bool = Field(False, description="是否在线") # ── 用户映射(核心字段)── logged_in_user: Optional[str] = Field(None, description="当前登录用户账号 — 映射核心字段") logged_in_user_name: Optional[str] = Field(None, description="用户姓名") department: Optional[str] = Field(None, description="所属部门") # ── 硬件摘要 ── hardware_summary: Optional[Dict] = Field(None, description="硬件摘要(CPU/内存/磁盘使用率等)") # ── 时间信息 ── last_seen: Optional[datetime] = Field(None, description="最后在线时间") # ── 调试用 ── raw_data: Optional[Dict] = Field(None, description="原始响应数据(调试用,生产可关闭)") class VulnerabilityItem(BaseModel): """漏洞条目""" name: str = Field(..., description="漏洞名称") level: str = Field("info", description="严重程度: critical/high/medium/low/info") description: Optional[str] = Field(None, description="漏洞描述") publish_time: Optional[str] = Field(None, description="发布时间") class SecurityStatus(BaseModel): """统一安全状态模型 做什么:聚合火绒的病毒/漏洞/隔离数据 为什么:坐席需要一目了然看到终端安全全貌 """ source_system: str = Field(..., description="数据来源系统标识") terminal_id: str = Field(..., description="终端ID") computer_name: Optional[str] = Field(None, description="计算机名") # ── 安全指标 ── virus_total: int = Field(0, description="病毒事件总数") virus_uncleaned: int = Field(0, description="未处理病毒数") vulnerabilities: List[VulnerabilityItem] = Field(default_factory=list, description="高危漏洞列表") high_vuln_count: int = Field(0, description="高危漏洞数量") # ── 隔离状态 ── is_isolated: bool = Field(False, description="是否被隔离") isolation_source: Optional[str] = Field(None, description="隔离来源系统") # ── 检查时间 ── checked_at: datetime = Field(default_factory=datetime.now, description="检查时间") class VpnSession(BaseModel): """VPN会话模型(仅aTrust) 做什么:描述一个aTrust VPN在线会话 为什么:坐席需要知道远程员工是否通过VPN在线、VPN IP是什么 """ source_system: str = "atrust" session_id: Optional[str] = Field(None, description="会话ID(用于踢出操作)") username: str = Field(..., description="用户名(登录名)") display_name: Optional[str] = Field(None, description="显示名") remote_ip: str = Field(..., description="接入IP(公网IP或'内网IP')") vpn_ip: Optional[str] = Field(None, description="VPN虚拟内网IP — 火绒交叉匹配关键字段") is_trusted: bool = Field(False, description="终端是否已授信") os: Optional[str] = Field(None, description="接入终端操作系统") last_login: Optional[datetime] = Field(None, description="最后登录时间") domain: Optional[str] = Field(None, description="登录域") # ============================================================================= # 适配器抽象基类 # ============================================================================= class ExternalSystemAdapter(ABC): """外部系统适配器抽象基类 做什么:定义所有外部系统共用的接口规范 为什么:让上层业务代码只依赖抽象接口,不感知底层系统差异 设计原则: - 默认方法返回None/空列表/False,子类按需覆写自己支持的能力 - 不支持的能力不报错,让调用方走降级逻辑 - 每个Adapter只负责一个外部系统的对接 """ @property @abstractmethod def system_name(self) -> str: """系统标识名称 返回值: 'lianruan' / 'huorong' / 'atrust' / 'ehr' / 'mock' """ ... @property @abstractmethod def is_available(self) -> bool: """当前系统是否可用(凭证已配置+网络可达) 做什么:检查配置是否完整,不实际发起网络请求 为什么:调用方可据此决定是否跳过本系统 """ ... @abstractmethod async def health_check(self) -> bool: """健康检查 — 验证凭证和网络连通性 做什么:实际发起一次轻量级API调用,确认系统可达 为什么:定期健康检查可提前发现连接问题 """ ... # ========================================================================= # 终端查询能力 # ========================================================================= async def get_terminal_by_user(self, username: str) -> Optional[TerminalInfo]: """通过员工账号查询终端信息(映射核心方法) 做什么:输入员工账号,返回该员工使用的终端信息 为什么:这是员工→终端映射的核心入口 各系统实现方式: - 联软:queryDevByParams(strusername=xxx) — 精确匹配 - 火绒:_list(ip=xxx) — 需配合联软IP交叉匹配 - aTrust:queryAll(bindUserList) — 终端绑定用户 - eHR:不提供终端数据,返回None Args: username: 员工账号(如 'songxian') Returns: TerminalInfo 或 None(系统不支持或未找到) """ return None async def get_terminal_by_computer(self, computer_name: str) -> Optional[TerminalInfo]: """通过计算机名查询终端信息 Args: computer_name: 计算机名(如 'IT-SONGXIAN') """ return None async def get_terminal_detail(self, terminal_id: str) -> Optional[TerminalInfo]: """查询终端详细信息(硬件/软件/网络配置) 做什么:返回比 get_terminal_by_user 更详细的信息 为什么:排查时需要硬件配置、磁盘使用率、已安装软件等 各系统实现方式: - 联软:getDevAllInfo — 极详细(主板/CPU/内存/硬盘/网卡/显示器) - 火绒:_info2 — 中等详细(硬件/软件/网络配置) - aTrust/eHR:不支持 Args: terminal_id: 终端在来源系统中的唯一ID """ return None # ========================================================================= # 安全能力 # ========================================================================= async def get_security_status(self, terminal_id: str) -> Optional[SecurityStatus]: """获取终端安全状态(病毒/漏洞/隔离状态) 做什么:聚合安全指标,坐席一目了然 为什么:安全问题通常需要紧急处理 仅火绒支持此接口。 Args: terminal_id: 终端ID(火绒的client_id) """ return None async def isolate_terminal(self, terminal_id: str, reason: str) -> bool: """隔离终端(断网) 做什么:调用火绒 _create(type=netctrl) 隔离终端 为什么:安全事件紧急处理,阻断威胁扩散 仅火绒支持。调用前必须二次确认+审计日志记录。 Args: terminal_id: 终端ID reason: 隔离原因(记入审计日志) Returns: True=成功, False=失败 Raises: NotImplementedError: 本系统不支持隔离操作 """ raise NotImplementedError(f"{self.system_name} 不支持终端隔离") async def unisolate_terminal(self, terminal_id: str) -> bool: """解除终端隔离(恢复网络) 仅火绒支持。 Args: terminal_id: 终端ID Returns: True=成功, False=失败 """ raise NotImplementedError(f"{self.system_name} 不支持解除隔离") # ========================================================================= # VPN/在线状态能力 # ========================================================================= async def get_vpn_sessions(self, username: Optional[str] = None) -> List[VpnSession]: """查询VPN在线会话 做什么:获取当前通过aTrust在线的VPN会话 为什么:坐席需要知道远程员工VPN状态和IP 仅aTrust支持。 Args: username: 可选,过滤指定用户 Returns: VPN会话列表 """ return [] async def get_online_status(self, username: str) -> bool: """查询用户是否在线 做什么:检查用户终端是否当前在线 为什么:坐席需要知道用户是否可达 各系统实现方式: - 联软:existOnlineUser - 火绒:_list(is_online=True) + IP交叉匹配 - aTrust:getUserStatus Args: username: 员工账号 Returns: True=在线, False=离线或未知 """ return False # ========================================================================= # 辅助方法 # ========================================================================= def _log_not_implemented(self, method_name: str) -> None: """记录未实现方法的调试日志 做什么:当子类未覆写某个方法时记录DEBUG级日志 为什么:开发期帮助发现调用链路问题,生产环境可关闭DEBUG """ logger.debug( f"[{self.system_name}] {method_name} 未实现," f"将走降级逻辑" )