v0.5.5: 应急页 v0.5.4 + 移除IT设备升级 + admin登录修复 + 内容审核架构 + 知识库

This commit is contained in:
Simon
2026-06-16 10:07:42 +08:00
parent 10b37a6acc
commit 60e67b0681
59 changed files with 4195 additions and 110 deletions
@@ -0,0 +1,166 @@
"""
终端安全对比 API
路径: /api/admin/security/comparison
鉴权: require_admin
"""
from datetime import datetime
from typing import Optional
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from app.api.admin import require_admin
from app.services.security_comparison import (
TerminalSecurityComparison,
comparison_task_config,
)
router = APIRouter(prefix="/security/comparison", tags=["终端安全对比"])
# --- Request/Response Models ---
class CompareRequest(BaseModel):
"""手动触发比对请求"""
pass # 无参数,手动触发
class CompareSummaryResponse(BaseModel):
"""比对汇总响应"""
lianruan_count: int
huorong_count: int
no_huorong_count: int
compliance_rate: str
generated_at: str
class NoHuorongDevice(BaseModel):
"""未安装火绒设备"""
hostname: str
ip: str
useraccount: Optional[str] = None
dept: Optional[str] = None
last_login: Optional[str] = None
osver: Optional[str] = None
status: Optional[str] = None
class TaskConfigRequest(BaseModel):
"""任务配置请求"""
name: str # 任务名称
cron: str # Cron 表达式,如 "0 9 * * 1" 每周一9点
recipients: list[str] # 企微接收人user_id列表
enabled: bool = True
class TaskConfigResponse(BaseModel):
"""任务配置响应"""
task_id: str
name: str
cron: str
recipients: list[str]
enabled: bool
last_run: Optional[str] = None
next_run: Optional[str] = None
# --- API Endpoints ---
@router.get("/summary", response_model=CompareSummaryResponse)
async def get_comparison_summary(current_user=Depends(require_admin)):
"""获取比对汇总数据"""
service = TerminalSecurityComparison()
try:
summary = await service.compare_summary()
return summary
finally:
await service.close()
@router.get("/no-huorong", response_model=list[NoHuorongDevice])
async def get_no_huorong_devices(current_user=Depends(require_admin)):
"""获取未安装火绒的电脑清单"""
service = TerminalSecurityComparison()
try:
devices = await service.get_no_huorong_devices()
return devices
finally:
await service.close()
@router.post("/trigger")
async def trigger_comparison(current_user=Depends(require_admin)):
"""手动触发比对并推送企微消息"""
service = TerminalSecurityComparison()
try:
# 1. 执行比对
no_huorong = await service.get_no_huorong_devices()
# 2. 生成消息
if no_huorong:
msg = f"⚠️ 终端安全检查:发现 {len(no_huorong)} 台电脑未安装火绒\n\n"
for dev in no_huorong[:10]: # 只显示前10条
msg += f"{dev.get('hostname')} ({dev.get('ip')})\n"
if len(no_huorong) > 10:
msg += f"... 还有 {len(no_huorong)-10}"
else:
msg = "✅ 终端安全检查:所有电脑已安装火绒"
# 3. TODO: 推送到企微(需要企微消息API)
logger.info(f"比对结果: {msg}")
return {
"success": True,
"no_huorong_count": len(no_huorong),
"message": msg,
}
finally:
await service.close()
# --- 任务配置 API ---
@router.get("/tasks", response_model=list[TaskConfigResponse])
async def list_tasks(current_user=Depends(require_admin)):
"""列出所有定时任务"""
tasks = comparison_task_config.list_tasks()
return tasks
@router.post("/tasks", response_model=TaskConfigResponse)
async def create_task(
config: TaskConfigRequest,
current_user=Depends(require_admin)
):
"""创建定时任务"""
task_id = str(uuid4())[:8]
comparison_task_config.add_task(task_id, {
"name": config.name,
"cron": config.cron,
"recipients": config.recipients,
"enabled": config.enabled,
"created_at": datetime.now().isoformat(),
})
return TaskConfigResponse(
task_id=task_id,
**config.model_dump(),
)
@router.delete("/tasks/{task_id}")
async def delete_task(
task_id: str,
current_user=Depends(require_admin)
):
"""删除定时任务"""
success = comparison_task_config.delete_task(task_id)
if not success:
raise HTTPException(status_code=404, detail="任务不存在")
return {"success": True}
# 日志记录
import logging
logger = logging.getLogger(__name__)
+2 -2
View File
@@ -212,7 +212,7 @@ async def agent_login(
if not existing_agent:
# 新坐席注册必须通过企微验证,防止任意 user_id 冒充
raise AppException(
1003,
ErrorCode.AUTH_TOKEN_INVALID,
"企微通讯录验证失败,新坐席注册需要企微身份验证。请稍后重试或联系管理员。"
)
logger.warning(
@@ -223,7 +223,7 @@ async def agent_login(
if existing_agent.password_hash is None:
# 已注册坐席但未设置密码,要求先设置密码
raise AppException(
1012,
ErrorCode.AUTH_PASSWORD_REQUIRED,
"首次登录请先设置密码。管理后台 → 坐席管理 → 设置本地密码"
)
if not body.password:
+7 -4
View File
@@ -829,18 +829,21 @@ async def h5_poll_messages(
).order_by(Message.created_at.asc())
if after_message_id:
# 转换为UUID类型查询,确保和数据库UUID字段类型匹配
# 校验 UUID 格式,然后转字符串(兼容 SQLite/PG 的 String(36) 列,避免类型匹配)
from uuid import UUID as UUIDType
try:
msg_uuid = UUIDType(after_message_id)
UUIDType(after_message_id) # 仅校验
except ValueError:
# 无效的UUID格式返回空列表
# 无效的UUID格式,返回空列表
items = []
return success_response(data={"items": items, "has_more": False})
# 必须用字符串比较,Message.id 在 DB 里是 String(36)/VARCHAR,
# 传 UUID 对象会被 SQLAlchemy 推断成 UUID 类型 → PostgreSQL 报
# "operator does not exist: character varying = uuid"
after_stmt = select(Message.created_at).where(
Message.id == msg_uuid
Message.id == str(after_message_id)
)
after_result = await db.execute(after_stmt)
after_time = after_result.scalar_one_or_none()
+14
View File
@@ -24,7 +24,9 @@ from app.api.upload import router as upload_router
from app.api.admin import router as admin_router
from app.api.portal import router as portal_router
from app.api.admin_roles import router as admin_roles_router
from app.api.admin.security_comparison import router as security_comparison_router
from app.api.approval import router as approval_router
from app.api.wecom_jsapi import router as wecom_jsapi_router # v0.5.4 应急页 JS-SDK 签名
# 创建 API 路由器
# 所有子路由都会挂载到这个路由器上
@@ -157,6 +159,14 @@ api_router.include_router(portal_router, tags=["统一入口"])
# DELETE /api/admin/roles/mapping-rules/{id} — 删除映射规则
api_router.include_router(admin_roles_router, tags=["角色管理"])
# 终端安全对比 API
# GET /api/admin/security/comparison/summary — 比对汇总
# GET /api/admin/security/comparison/no-huorong — 未安装火绒清单
# POST /api/admin/security/comparison/trigger — 手动触发
# GET /api/admin/security/comparison/tasks — 任务列表
# POST /api/admin/security/comparison/tasks — 创建定时任务
api_router.include_router(security_comparison_router, tags=["终端安全对比"])
# 审批流程 API
# GET /api/approval/templates — 获取审批模板列表
# GET /api/approval/templates/{id} — 获取审批模板详情
@@ -164,3 +174,7 @@ api_router.include_router(admin_roles_router, tags=["角色管理"])
# POST /api/approval/submit — API提交审批
# GET /api/approval/keywords — 获取审批关键词
api_router.include_router(approval_router, tags=["审批流程"])
# 企微 JS-SDK 签名 API (v0.5.4 应急页身份检测用)
# GET /api/wecom/jsapi-config?url=xxx — 返回 corp_id/agent_id/timestamp/nonce_str/signature
api_router.include_router(wecom_jsapi_router, tags=["企微JS-SDK"])
+181
View File
@@ -0,0 +1,181 @@
# =============================================================================
# 企微IT智能服务台 — 企微 JS-SDK 签名 API (v0.5.4 应急页用)
# =============================================================================
# 说明:提供前端 wx.config / wx.agentConfig 所需的鉴权签名。
# 对应企微文档:https://developer.work.weixin.qq.com/document/path/90506
#
# 流程:
# 1. 前端调 GET /api/wecom/jsapi-config?url=xxx 拿签名
# 2. 后端用 jsapi_ticket + url 算 sha1 签名
# 3. 前端用 wx.config({...}) 鉴权后,即可调企微 JS-SDK(如 wx.agentConfig)
#
# BC/DR 设计:不依赖 session/auth,公开访问(只返回签名,不返回敏感数据)
# =============================================================================
import logging
import secrets
import time
from fastapi import APIRouter, Query
from app.config import settings
from app.dependencies import get_shared_wecom_service
from app.utils.response import AppException, success_response
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/wecom/jsapi-config")
async def get_jsapi_config(
url: str = Query(..., description="当前页面 URL(不含 # 及其后)"),
):
"""获取企微 JS-SDK 鉴权配置。
供前端 wx.config 和 wx.agentConfig 使用。
Returns:
{
"code": 0,
"data": {
"corp_id": "wwa8c87970b2011f41",
"agent_id": "1000133",
"timestamp": 1718500000,
"nonce_str": "5K8264ILTKCH...",
"signature": "f7c8e9..."
}
}
"""
try:
wecom_service = get_shared_wecom_service()
# 1. 获取 jsapi_ticket
ticket = await wecom_service.get_jsapi_ticket()
# 2. 生成时间戳和随机串
timestamp = int(time.time())
nonce_str = secrets.token_hex(8) # 16 字符
# 3. 计算签名
signature = wecom_service.generate_jsapi_signature(
ticket=ticket,
nonce_str=nonce_str,
timestamp=timestamp,
url=url,
)
logger.info(
f"生成 JS-SDK 签名: url={url[:80]}... timestamp={timestamp}"
)
return success_response(
{
"corp_id": settings.wecom_corp_id,
"agent_id": str(settings.wecom_agent_id),
"timestamp": timestamp,
"nonce_str": nonce_str,
"signature": signature,
}
)
except Exception as e:
logger.error(f"生成 JS-SDK 签名失败: {e}", exc_info=True)
raise AppException(
code=5001,
message=f"生成 JS-SDK 签名失败: {str(e)}",
) from e
# =============================================================================
# 应急页身份检测 (v0.5.4)
# =============================================================================
# 流程:
# 1. 前端用 wx.agentConfig 拿到当前 userid
# 2. 前端调 GET /api/wecom/check-role?userid=xxx
# 3. 后端用企微通讯录 API 查 userid 是否在"IT支持-咨询坐席"标签里
# 4. 返回 "user" 或 "agent"
# =============================================================================
@router.get("/wecom/check-role")
async def check_emergency_role(
userid: str = Query(..., description="企微 userid"),
):
"""检测当前账号在应急页场景下的角色。
实现方式(优先级递减)
1. 企微通讯录标签检测(若配置 WECOM_AGENT_TAG_ID)
2. 后台硬编码名单(若配置 WECOM_AGENT_USERIDS 环境变量)
3. 默认 "user" (兜底)
Args:
userid: 企微 userid(从 wx.agentConfig 拿)
Returns:
{
"code": 0,
"data": {
"role": "user" | "agent",
"userid": "...",
"method": "tag" | "hardcoded" | "default"
}
}
"""
wecom_service = get_shared_wecom_service()
# 方式 1:企微标签检测
tag_id = getattr(settings, "wecom_agent_tag_id", None)
if tag_id:
try:
access_token = await wecom_service.get_access_token()
url = f"https://qyapi.weixin.qq.com/cgi-bin/tag/get?access_token={access_token}&tagid={tag_id}"
import httpx
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url)
result = resp.json()
if result.get("errcode", 0) == 0:
user_list = result.get("userlist", [])
# userlist 元素可能是 str(老版)或 dict(新版带 name)
user_ids = [
u if isinstance(u, str) else u.get("userid", "")
for u in user_list
]
if userid in user_ids:
logger.info(f"标签检测: userid={userid} 是坐席")
return success_response(
{"role": "agent", "userid": userid, "method": "tag"}
)
else:
logger.info(f"标签检测: userid={userid} 是员工")
return success_response(
{"role": "user", "userid": userid, "method": "tag"}
)
else:
logger.warning(
f"标签 API 失败: errcode={result.get('errcode')}, "
f"errmsg={result.get('errmsg')}, 降级到硬编码"
)
except Exception as e:
logger.warning(f"标签检测失败(降级): {e}")
# 方式 2:硬编码名单
hardcoded = getattr(settings, "wecom_agent_userids", None)
if hardcoded:
agent_ids = [x.strip() for x in hardcoded.split(",") if x.strip()]
if userid in agent_ids:
logger.info(f"硬编码名单: userid={userid} 是坐席")
return success_response(
{"role": "agent", "userid": userid, "method": "hardcoded"}
)
else:
return success_response(
{"role": "user", "userid": userid, "method": "hardcoded"}
)
# 方式 3:默认 user
logger.info(f"未配置检测方式, userid={userid} 默认 user")
return success_response(
{"role": "user", "userid": userid, "method": "default"}
)
+12 -11
View File
@@ -20,7 +20,6 @@
import logging
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from starlette.requests import Request
from app.services.ws_manager import manager as ws_manager
from app.services.cache_service import cache_service
@@ -39,7 +38,6 @@ WS_CLOSE_UNAUTHORIZED = 4001
async def websocket_endpoint(
websocket: WebSocket,
agent_id: str,
request: Request,
) -> None:
"""坐席 WebSocket 端点主循环(含 WS-01 token 认证)。
@@ -61,10 +59,12 @@ async def websocket_endpoint(
- 兼容从 ?token= URL 参数获取(向后兼容)
- 不再将 token 暴露在 URL 中,避免 access_log 泄露
v0.5.1 修复:移除 `request: Request` 参数(部分 Starlette 版本注入 Request 失败,
改用 `websocket.headers` 和 `websocket.query_params` 读取 header/query)
Args:
websocket: FastAPI WebSocket 对象(框架自动注入)
agent_id: 坐席ID(从 URL 路径参数获取)
request: Starlette Request(用于获取 header
"""
# ======================================================================
# WS-01: Token 认证(从 subprotocol / header / query 获取)
@@ -74,17 +74,17 @@ async def websocket_endpoint(
# 格式: Sec-WebSocket-Protocol: bearer.{token}
# 说明: 浏览器原生 WebSocket API 不支持 headers 参数,但支持 subprotocols (第2参数数组)
# 前端用 new WebSocket(url, ["bearer.{token}"]) 传递,服务端从 sec-websocket-protocol 头读取
subprotocol = request.headers.get("sec-websocket-protocol", "")
subprotocol = websocket.headers.get("sec-websocket-protocol", "")
if subprotocol.startswith("bearer."):
token = subprotocol[7:] # 去掉 "bearer." 前缀
else:
# 其次从 Authorization header 获取
auth_header = request.headers.get("Authorization", "")
auth_header = websocket.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:] # 去掉 "Bearer " 前缀
else:
# 向后兼容:从 query param 获取(即将废弃)
token = request.query_params.get("token", "")
token = websocket.query_params.get("token", "")
# 步骤2: 检查 token 是否为空
if not token:
@@ -197,7 +197,6 @@ async def websocket_endpoint(
async def h5_websocket_endpoint(
websocket: WebSocket,
employee_id: str,
request: Request,
) -> None:
"""H5员工 WebSocket 端点主循环(含 token 认证)。
@@ -223,10 +222,12 @@ async def h5_websocket_endpoint(
- (与H5登录 API /api/h5/mock-login 存储格式一致)
- token 缺失、无效、过期、与 employee_id 不匹配均拒绝连接
v0.5.1 修复:移除 `request: Request` 参数(部分 Starlette 版本注入 Request 失败,
改用 `websocket.headers` 和 `websocket.query_params` 读取 header/query)
Args:
websocket: FastAPI WebSocket 对象(框架自动注入)
employee_id: 员工企微 UserID(从 URL 路径参数获取)
request: Starlette Request(用于获取 header
"""
# ======================================================================
# Token 认证(从 subprotocol / header / query 获取)
@@ -234,17 +235,17 @@ async def h5_websocket_endpoint(
# 步骤1: 优先从 Sec-WebSocket-Protocol (subprotocol) 获取 token,其次从 Authorization header,最后从 query(向后兼容)
# 格式: Sec-WebSocket-Protocol: bearer.{token}
subprotocol = request.headers.get("sec-websocket-protocol", "")
subprotocol = websocket.headers.get("sec-websocket-protocol", "")
if subprotocol.startswith("bearer."):
token = subprotocol[7:] # 去掉 "bearer." 前缀
else:
# 其次从 Authorization header 获取
auth_header = request.headers.get("Authorization", "")
auth_header = websocket.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:] # 去掉 "Bearer " 前缀
else:
# 向后兼容:从 query param 获取(即将废弃)
token = request.query_params.get("token", "")
token = websocket.query_params.get("token", "")
# 步骤2: 检查 token 是否为空
if not token:
+19
View File
@@ -107,6 +107,25 @@ class Settings(BaseSettings):
# 设备申请审批模板ID(在企微审批应用设置中获取)
approval_template_device: str = ""
# ----------------------------------------------------------------------
# v0.5.4 应急页身份检测配置
# ----------------------------------------------------------------------
# IT支持-咨询坐席 通讯录标签 ID(在企微管理后台 > 通讯录管理 > 标签管理 中查看)
# 配置后,应急页会通过此标签判断当前用户是否为坐席
# 留空则降级到下面的硬编码名单
wecom_agent_tag_id: str = ""
# 硬编码坐席 userid 列表(逗号分隔),作为标签检测的降级方案
# 例:"zhangsan,lisi,wangwu"(生产环境建议用标签方案)
wecom_agent_userids: str = ""
# ----------------------------------------------------------------------
# v0.6.0 内容审核报警配置(占位,后续完善)
# ----------------------------------------------------------------------
# 合规通知企微群机器人 webhook
content_audit_webhook: str = ""
# 主管接收报警的 userid(多个用逗号分隔)
content_audit_supervisor_userids: str = ""
# ----------------------------------------------------------------------
# Pydantic-settings 配置
# ----------------------------------------------------------------------
+23 -8
View File
@@ -290,14 +290,29 @@ async def _init_approval_links(db, ApprovalLink):
return
links = [
ApprovalLink(category="IT", title="软件安装申请", url="https://审批系统地址/software-install", sort_order=1),
ApprovalLink(category="IT", title="设备报修工单", url="https://审批系统地址/device-repair", sort_order=2),
ApprovalLink(category="IT", title="VPN开通申请", url="https://审批系统地址/vpn-apply", sort_order=3),
ApprovalLink(category="IT", title="权限申请", url="https://审批系统地址/permission-apply", sort_order=4),
ApprovalLink(category="HR", title="入职手续", url="https://审批系统地址/onboarding", sort_order=5),
ApprovalLink(category="HR", title="离职手续", url="https://审批系统地址/offboarding", sort_order=6),
ApprovalLink(category="行政", title="办公用品申领", url="https://审批系统地址/office-supplies", sort_order=7),
ApprovalLink(category="财务", title="报销申请", url="https://审批系统地址/reimbursement", sort_order=8),
# v0.5.2:一站式运维平台真实工单链接(域名 devops.dc.servyou-it.com,已实现企微免登录)
# v0.5.3 更新:去掉 "IT设备升级与硬件维修" (申请单冲突,后续移除)
ApprovalLink(category="IT", title="零信任(原VPN)账号申请",
url="https://devops.dc.servyou-it.com/ITSM/workflow/service/createTicket?name=%E5%91%98%E5%B7%A5%E9%9B%B6%E4%BF%A1%E4%BB%BB%EF%BC%88%E5%8E%9FVPN%EF%BC%89%E8%B4%A6%E5%8F%B7%E7%94%B3%E8%AF%B7IT",
sort_order=1),
ApprovalLink(category="IT", title="活动与会议技术支持",
url="https://devops.dc.servyou-it.com/ITSM/workflow/service/createTicket?name=%E6%B4%BB%E5%8A%A8%E4%B8%8E%E4%BC%9A%E8%AE%AE%E6%8A%80%E6%9C%AF%E6%94%AF%E6%8C%81",
sort_order=2),
# sort_order=3 故意空缺:旧版本是"IT设备升级与硬件维修",已与一站式运维平台冲突,不再提供
ApprovalLink(category="IT", title="员工IT支持与故障报修",
url="https://devops.dc.servyou-it.com/ITSM/workflow/service/createTicket?name=%E5%91%98%E5%B7%A5IT%E6%94%AF%E6%8C%81%E4%B8%8E%E6%95%85%E9%9A%9C%E6%8A%A5%E4%BF%AE",
sort_order=4),
ApprovalLink(category="IT", title="终端设备网络准入申请",
url="https://devops.dc.servyou-it.com/ITSM/workflow/service/createTicket?name=%E7%BB%88%E7%AB%AF%E8%AE%BE%E5%A4%87%E7%BD%91%E7%BB%9C%E5%87%86%E5%85%A5%E7%94%B3%E8%AF%B7",
sort_order=5),
ApprovalLink(category="IT", title="公共邮箱账号申请",
url="https://devops.dc.servyou-it.com/ITSM/workflow/service/createTicket?name=%E5%85%AC%E5%85%B1%E9%82%AE%E7%AE%B1%E8%B4%A6%E5%8F%B7%E7%94%B3%E8%AF%B7",
sort_order=6),
# HR / 行政 / 财务 占位(待后续接入真实流程)
ApprovalLink(category="HR", title="入职手续", url="https://审批系统地址/onboarding", sort_order=7),
ApprovalLink(category="HR", title="离职手续", url="https://审批系统地址/offboarding", sort_order=8),
ApprovalLink(category="行政", title="办公用品申领", url="https://审批系统地址/office-supplies", sort_order=9),
ApprovalLink(category="财务", title="报销申请", url="https://审批系统地址/reimbursement", sort_order=10),
]
db.add_all(links)
+149
View File
@@ -0,0 +1,149 @@
"""
终端安全对比服务 - 火绒 vs 联软
功能:
1. 获取未安装火绒的电脑清单
2. 定时任务推送
3. 手动触发
依赖:
- 联软 LV7000: get_dev_all_info()
- 火绒企业版: list_terminals()
比对逻辑:按主机名精确匹配
"""
from datetime import datetime
from typing import Optional
import logging
from app.integrations.huorong.client import HuorongClient
from app.integrations.lianruan.client import LianruanClient
logger = logging.getLogger(__name__)
class TerminalSecurityComparison:
"""终端安全对比服务"""
def __init__(self):
self.huorong = HuorongClient()
self.lianruan = LianruanClient()
async def close(self):
"""关闭连接"""
await self.huorong.close()
await self.lianruan.close()
async def get_no_huorong_devices(self) -> list[dict]:
"""获取未安装火绒的电脑清单(按主机名匹配)"""
logger.info("开始比对终端安全数据...")
# 1. 获取联软所有设备
lianruan_devices = await self._get_all_lianruan_devices()
logger.info(f"联软设备数: {len(lianruan_devices)}")
# 2. 获取火绒所有终端
huorong_devices = await self._get_all_huorong_devices()
logger.info(f"火绒终端数: {len(huorong_devices)}")
# 3. 构建火绒主机名集合(转小写匹配)
huorong_hostnames = {
dev.get("hostname", "").lower()
for dev in huorong_devices
if dev.get("hostname")
}
# 4. 比对:联软有,火绒无 = 未安装火绒
no_huorong = []
for dev in lianruan_devices:
# 联软用 strdevname (计算机名)
hostname = dev.get("strdevname", "").lower()
if hostname and hostname not in huorong_hostnames:
no_huorong.append({
"hostname": dev.get("strdevname"),
"ip": dev.get("strip1"), # 联软IP字段
"useraccount": dev.get("strusername"), # 用户名
"dept": dev.get("strdeptname"), # 部门
"last_login": dev.get("dtlastlogin"),
"osver": dev.get("strosver"),
"status": dev.get("strstatus"),
})
logger.info(f"未安装火绒设备数: {len(no_huorong)}")
return no_huorong
async def _get_all_lianruan_devices(self) -> list[dict]:
"""获取联软所有设备"""
# TODO: 分页获取全部设备
result = await self.lianruan.get_dev_all_info()
if result and hasattr(result, 'devices') and result.devices:
# 转换为字典列表
return [d.model_dump() if hasattr(d, 'model_dump') else d for d in result.devices]
return []
async def _get_all_huorong_devices(self) -> list[dict]:
"""获取火绒所有终端(分页获取)"""
all_devices = []
page = 1
per_page = 200
while True:
result = await self.huorong.list_terminals(page=page, per_page=per_page)
clients = result.get("clients", [])
if not clients:
break
for c in clients:
# 火绒字段:hostname, computer_name, ip_addr, local_ip
all_devices.append({
"hostname": c.get("hostname") or c.get("computer_name"),
"ip": c.get("ip_addr") or c.get("local_ip"),
"status": c.get("stat"),
})
# 检查是否还有更多
if len(clients) < per_page:
break
page += 1
return all_devices
async def compare_summary(self) -> dict:
"""比对汇总数据"""
lianruan_devices = await self._get_all_lianruan_devices()
huorong_devices = await self._get_all_huorong_devices()
no_huorong = await self.get_no_huorong_devices()
return {
"lianruan_count": len(lianruan_devices),
"huorong_count": len(huorong_devices),
"no_huorong_count": len(no_huorong),
"compliance_rate": f"{len(huorong_devices)/len(lianruan_devices)*100:.1f}%" if lianruan_devices else "N/A",
"generated_at": datetime.now().isoformat(),
}
class ComparisonTaskConfig:
"""定时任务配置"""
def __init__(self):
self.tasks: dict[str, dict] = {}
def add_task(self, task_id: str, config: dict):
self.tasks[task_id] = config
def get_task(self, task_id: str) -> Optional[dict]:
return self.tasks.get(task_id)
def list_tasks(self) -> list[dict]:
return [{"task_id": k, **v} for k, v in self.tasks.items()]
def delete_task(self, task_id: str) -> bool:
if task_id in self.tasks:
del self.tasks[task_id]
return True
return False
comparison_task_config = ComparisonTaskConfig()
+95
View File
@@ -463,6 +463,101 @@ class WecomService:
logger.error(f"获取部门成员网络错误: dept_id={department_id}, error={e}")
raise Exception(f"获取部门成员网络错误: {e}") from e
# --------------------------------------------------------------------------
# JS-SDK 票据 (v0.5.4:应急页身份检测用)
# --------------------------------------------------------------------------
async def get_jsapi_ticket(self) -> str:
"""获取企微 JS-SDK 票据 jsapi_ticket。
对应企微API:
GET https://qyapi.weixin.qq.com/cgi-bin/get_jsapi_ticket?access_token=TOKEN
jsapi_ticket 用于计算 JS-SDK 签名(sha1),让前端 wx.config/wx.agentConfig 鉴权通过。
有效期 7200 秒,缓存到 Redis(提前 300 秒刷新)。
Returns:
str: jsapi_ticket 字符串
Raises:
Exception: 获取失败
"""
cache_key = "wecom:jsapi_ticket"
# 1. Redis 缓存
if self.redis:
try:
cached = await self.redis.get(cache_key)
if cached:
logger.debug("从缓存获取 jsapi_ticket")
return cached.decode("utf-8")
except Exception as e:
logger.warning(f"Redis 读取 jsapi_ticket 失败(降级): {e}")
# 2. 调用企微 API
access_token = await self.get_access_token()
url = f"https://qyapi.weixin.qq.com/cgi-bin/get_jsapi_ticket?access_token={access_token}"
try:
response = await self.client.get(url)
result = response.json()
if result.get("errcode", 0) != 0:
logger.error(
f"获取 jsapi_ticket 失败: "
f"errcode={result.get('errcode')}, errmsg={result.get('errmsg')}"
)
raise Exception(f"获取 jsapi_ticket 失败: {result.get('errmsg')}")
ticket = result.get("ticket", "")
expires_in = result.get("expires_in", 7200)
# 3. 缓存到 Redis(TTL = expires_in - 300s)
cache_ttl = max(expires_in - 300, 60)
if self.redis:
try:
await self.redis.setex(cache_key, cache_ttl, ticket)
except Exception as e:
logger.warning(f"Redis 写入 jsapi_ticket 失败(降级): {e}")
logger.info(f"jsapi_ticket 获取成功,缓存 TTL={cache_ttl}")
return ticket
except httpx.HTTPError as e:
logger.error(f"获取 jsapi_ticket 网络错误: {e}")
raise Exception(f"企微API网络错误: {e}") from e
@staticmethod
def generate_jsapi_signature(
ticket: str, nonce_str: str, timestamp: int, url: str
) -> str:
"""生成 JS-SDK 签名(sha1)。
对应企微JS-SDK签名算法:
1. 拼接:jsapi_ticket={ticket}&noncestr={nonce_str}&timestamp={timestamp}&url={url}
2. sha1(拼接字符串)
注意:
- url 不含 # 及其后面部分
- url 不含 ?
- url 是前端调用 wx.config 的页面 URL
Args:
ticket: jsapi_ticket
nonce_str: 随机字符串(前端生成,16位)
timestamp: 当前时间戳(秒)
url: 当前页面 URL(不含 # 后面)
Returns:
str: sha1 签名字符串(40 字符)
"""
import hashlib
# 拼接签名字符串
raw = f"jsapi_ticket={ticket}&noncestr={nonce_str}&timestamp={timestamp}&url={url}"
# sha1 哈希
signature = hashlib.sha1(raw.encode("utf-8")).hexdigest()
return signature
# --------------------------------------------------------------------------
# 上传临时素材
# --------------------------------------------------------------------------