""" 终端安全对比 API 路径: /api/admin/security/comparison 鉴权: require_admin """ from datetime import datetime from typing import Optional from uuid import uuid4 from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from app.api.admin_api import require_admin from app.services.security_comparison import ( TerminalSecurityComparison, comparison_task_config, ) router = APIRouter(prefix="/security/comparison", tags=["终端安全对比"]) # --- Request/Response Models --- class CompareRequest(BaseModel): """手动触发比对请求""" pass # 无参数,手动触发 class CompareSummaryResponse(BaseModel): """比对汇总响应""" lianruan_count: int huorong_count: int no_huorong_count: int compliance_rate: str generated_at: str class NoHuorongDevice(BaseModel): """未安装火绒设备""" hostname: str ip: str useraccount: Optional[str] = None dept: Optional[str] = None last_login: Optional[str] = None osver: Optional[str] = None status: Optional[str] = None class TaskConfigRequest(BaseModel): """任务配置请求""" name: str # 任务名称 cron: str # Cron 表达式,如 "0 9 * * 1" 每周一9点 recipients: list[str] # 企微接收人user_id列表 enabled: bool = True class TaskConfigResponse(BaseModel): """任务配置响应""" task_id: str name: str cron: str recipients: list[str] enabled: bool last_run: Optional[str] = None next_run: Optional[str] = None # --- API Endpoints --- @router.get("/summary", response_model=CompareSummaryResponse) async def get_comparison_summary(current_user=Depends(require_admin)): """获取比对汇总数据""" service = TerminalSecurityComparison() try: summary = await service.compare_summary() return summary finally: await service.close() @router.get("/no-huorong", response_model=list[NoHuorongDevice]) async def get_no_huorong_devices(current_user=Depends(require_admin)): """获取未安装火绒的电脑清单""" service = TerminalSecurityComparison() try: devices = await service.get_no_huorong_devices() return devices finally: await service.close() @router.post("/trigger") async def trigger_comparison(current_user=Depends(require_admin)): """手动触发比对并推送企微消息""" service = TerminalSecurityComparison() try: # 1. 执行比对 no_huorong = await service.get_no_huorong_devices() # 2. 生成消息 if no_huorong: msg = f"⚠️ 终端安全检查:发现 {len(no_huorong)} 台电脑未安装火绒\n\n" for dev in no_huorong[:10]: # 只显示前10条 msg += f"• {dev.get('hostname')} ({dev.get('ip')})\n" if len(no_huorong) > 10: msg += f"... 还有 {len(no_huorong)-10} 台" else: msg = "✅ 终端安全检查:所有电脑已安装火绒" # 3. TODO: 推送到企微(需要企微消息API) logger.info(f"比对结果: {msg}") return { "success": True, "no_huorong_count": len(no_huorong), "message": msg, } finally: await service.close() # --- 任务配置 API --- @router.get("/tasks", response_model=list[TaskConfigResponse]) async def list_tasks(current_user=Depends(require_admin)): """列出所有定时任务""" tasks = comparison_task_config.list_tasks() return tasks @router.post("/tasks", response_model=TaskConfigResponse) async def create_task( config: TaskConfigRequest, current_user=Depends(require_admin) ): """创建定时任务""" task_id = str(uuid4())[:8] comparison_task_config.add_task(task_id, { "name": config.name, "cron": config.cron, "recipients": config.recipients, "enabled": config.enabled, "created_at": datetime.now().isoformat(), }) return TaskConfigResponse( task_id=task_id, **config.model_dump(), ) @router.delete("/tasks/{task_id}") async def delete_task( task_id: str, current_user=Depends(require_admin) ): """删除定时任务""" success = comparison_task_config.delete_task(task_id) if not success: raise HTTPException(status_code=404, detail="任务不存在") return {"success": True} # 日志记录 import logging logger = logging.getLogger(__name__)