""" 终端安全对比服务 - 火绒 vs 联软 功能: 1. 获取未安装火绒的电脑清单 2. 定时任务推送 3. 手动触发 依赖: - 联软 LV7000: get_dev_all_info() - 火绒企业版: list_terminals() 比对逻辑:按主机名精确匹配 """ from datetime import datetime from typing import Optional import logging from app.integrations.huorong.client import HuorongClient from app.integrations.lianruan.client import LianruanClient logger = logging.getLogger(__name__) class TerminalSecurityComparison: """终端安全对比服务""" def __init__(self): self.huorong = HuorongClient() self.lianruan = LianruanClient() async def close(self): """关闭连接""" await self.huorong.close() await self.lianruan.close() async def get_no_huorong_devices(self) -> list[dict]: """获取未安装火绒的电脑清单(按主机名匹配)""" logger.info("开始比对终端安全数据...") # 1. 获取联软所有设备 lianruan_devices = await self._get_all_lianruan_devices() logger.info(f"联软设备数: {len(lianruan_devices)}") # 2. 获取火绒所有终端 huorong_devices = await self._get_all_huorong_devices() logger.info(f"火绒终端数: {len(huorong_devices)}") # 3. 构建火绒主机名集合(转小写匹配) huorong_hostnames = { dev.get("hostname", "").lower() for dev in huorong_devices if dev.get("hostname") } # 4. 比对:联软有,火绒无 = 未安装火绒 no_huorong = [] for dev in lianruan_devices: # 联软用 strdevname (计算机名) hostname = dev.get("strdevname", "").lower() if hostname and hostname not in huorong_hostnames: no_huorong.append({ "hostname": dev.get("strdevname"), "ip": dev.get("strip1"), # 联软IP字段 "useraccount": dev.get("strusername"), # 用户名 "dept": dev.get("strdeptname"), # 部门 "last_login": dev.get("dtlastlogin"), "osver": dev.get("strosver"), "status": dev.get("strstatus"), }) logger.info(f"未安装火绒设备数: {len(no_huorong)}") return no_huorong async def _get_all_lianruan_devices(self) -> list[dict]: """获取联软所有设备""" # TODO: 分页获取全部设备 result = await self.lianruan.get_dev_all_info() if result and hasattr(result, 'devices') and result.devices: # 转换为字典列表 return [d.model_dump() if hasattr(d, 'model_dump') else d for d in result.devices] return [] async def _get_all_huorong_devices(self) -> list[dict]: """获取火绒所有终端(分页获取)""" all_devices = [] page = 1 per_page = 200 while True: result = await self.huorong.list_terminals(page=page, per_page=per_page) clients = result.get("clients", []) if not clients: break for c in clients: # 火绒字段:hostname, computer_name, ip_addr, local_ip all_devices.append({ "hostname": c.get("hostname") or c.get("computer_name"), "ip": c.get("ip_addr") or c.get("local_ip"), "status": c.get("stat"), }) # 检查是否还有更多 if len(clients) < per_page: break page += 1 return all_devices async def compare_summary(self) -> dict: """比对汇总数据""" lianruan_devices = await self._get_all_lianruan_devices() huorong_devices = await self._get_all_huorong_devices() no_huorong = await self.get_no_huorong_devices() return { "lianruan_count": len(lianruan_devices), "huorong_count": len(huorong_devices), "no_huorong_count": len(no_huorong), "compliance_rate": f"{len(huorong_devices)/len(lianruan_devices)*100:.1f}%" if lianruan_devices else "N/A", "generated_at": datetime.now().isoformat(), } class ComparisonTaskConfig: """定时任务配置""" def __init__(self): self.tasks: dict[str, dict] = {} def add_task(self, task_id: str, config: dict): self.tasks[task_id] = config def get_task(self, task_id: str) -> Optional[dict]: return self.tasks.get(task_id) def list_tasks(self) -> list[dict]: return [{"task_id": k, **v} for k, v in self.tasks.items()] def delete_task(self, task_id: str) -> bool: if task_id in self.tasks: del self.tasks[task_id] return True return False comparison_task_config = ComparisonTaskConfig()