Files

277 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 企微回调 API
# =============================================================================
# 说明:处理企微服务器的回调请求,包括:
# 1. GET /api/wecom/callback — 验证URL有效性(企微配置回调URL时调用)
# 2. POST /api/wecom/callback — 接收企微推送的消息
#
# 重构记录(2026-06):
# - 移除手动创建 Redis/WecomService/AIService 实例的模式
# - 改用 dependencies 模块提供的共享服务实例
# - 不再手动 close() 服务实例(由应用生命周期管理)
# =============================================================================
import logging
from fastapi import APIRouter, Query, Request
from fastapi.responses import Response
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import _get_session_factory
from app.dependencies import (
get_shared_redis,
get_shared_wecom_service,
get_shared_ai_handler,
)
from app.services.ai_handler import AIHandler
from app.services.cache_service import CacheService
from app.services.message_router import MessageRouter
from app.services.scoring_service import ScoringService
from app.services.wecom_service import WecomService
from app.utils.wecom_crypto import WecomCrypto
logger = logging.getLogger(__name__)
# 创建路由器
router = APIRouter()
# 加解密工具实例(懒加载单例,避免导入时因无效配置导致 base64 解码失败)
_wecom_crypto: WecomCrypto | None = None
def _get_wecom_crypto() -> WecomCrypto:
"""获取加解密工具单例(延迟初始化)。
在测试环境中,settings 中的 EncodingAESKey 可能是无效的占位值,
延迟初始化可以避免模块导入时就触发 base64 解码错误。
"""
global _wecom_crypto
if _wecom_crypto is None:
from app.config import settings
_wecom_crypto = WecomCrypto(
token=settings.wecom_token,
encoding_aes_key=settings.wecom_encoding_aes_key,
corp_id=settings.wecom_corp_id,
)
return _wecom_crypto
@router.get("/wecom/callback")
async def verify_url(
msg_signature: str = Query(..., description="企微签名"),
timestamp: str = Query(..., description="时间戳"),
nonce: str = Query(..., description="随机数"),
echostr: str = Query(..., description="加密的验证字符串"),
):
"""验证企微回调URL有效性。
企微管理后台配置回调URL时,会发送 GET 请求验证。
验证流程:
1. 验证签名 SHA1(sort(token, timestamp, nonce, echostr))
2. 解密 echostr
3. 返回解密后的明文
Args:
msg_signature: 企微签名
timestamp: 时间戳
nonce: 随机数
echostr: 加密的验证字符串
Returns:
str: 解密后的 echostr 明文
"""
try:
# 验证签名并解密 echostr
plaintext = _get_wecom_crypto().decrypt_echostr(
msg_signature=msg_signature,
timestamp=timestamp,
nonce=nonce,
echostr=echostr,
)
logger.info("企微回调URL验证成功")
return Response(content=plaintext, media_type="text/plain")
except ValueError as e:
logger.error(f"企微回调URL验证失败: {e}")
return Response(content=f"验证失败: {e}", media_type="text/plain", status_code=400)
@router.post("/wecom/callback")
async def receive_message(
request: Request,
msg_signature: str = Query(..., description="企微签名"),
timestamp: str = Query(..., description="时间戳"),
nonce: str = Query(..., description="随机数"),
):
"""接收企微推送的消息。
企微将员工发送的消息通过此接口推送过来。
处理流程:
1. 读取 XML 请求体
2. 解密消息(验证签名 + AES 解密)
3. 解析消息内容
4. 路由到 MessageRouter 处理
5. 返回 "success" 字符串(企微要求)
重构说明:使用 dependencies 模块提供的共享服务实例,
不再手动创建/关闭 Redis、WecomService、AIService。
企微推送的消息格式(加密后):
<xml>
<ToUserName><![CDATA[corp_id]]></ToUserName>
<AgentID>1000002</AgentID>
<Encrypt><![CDATA[加密内容]]></Encrypt>
</xml>
Args:
request: FastAPI 请求对象(读取 XML 请求体)
msg_signature: 企微签名
timestamp: 时间戳
nonce: 随机数
Returns:
str: "success" 字符串(企微要求的固定响应)
"""
try:
# 1. 读取 XML 请求体
xml_body = (await request.body()).decode("utf-8")
logger.debug(f"收到企微回调: xml_length={len(xml_body)}")
# 2. 解密消息
message_dict = _get_wecom_crypto().decrypt_message(
xml_body=xml_body,
msg_signature=msg_signature,
timestamp=timestamp,
nonce=nonce,
)
# 3. 提取消息关键字段
from_user_id = message_dict.get("FromUserName", "")
content = message_dict.get("Content", "")
msg_type = message_dict.get("MsgType", "text")
agent_id = message_dict.get("AgentID", "")
event = message_dict.get("Event", "")
msg_id = message_dict.get("MsgId", "")
# 提取非文本消息的媒体字段(图片/语音/视频/文件/位置)
media_id: str = message_dict.get("MediaId", "")
pic_url: str = message_dict.get("PicUrl", "")
msg_format: str = message_dict.get("Format", "")
file_name: str = message_dict.get("FileName", "")
file_size: str = message_dict.get("FileSize", "")
# 位置消息字段
location_x: str = message_dict.get("Location_X", "")
location_y: str = message_dict.get("Location_Y", "")
location_label: str = message_dict.get("Label", "")
# 4. 处理事件消息(如员工进入应用)
if event:
await _handle_event(event, from_user_id, message_dict)
return Response(content="success", media_type="text/plain")
# 5. 处理各类消息(文本 + 非文本)
# 文本消息必须有 Content 字段;非文本消息(image/voice/video/file/location
# 没有 Content 字段,content 可能为空字符串,这是正常的
if msg_type == "text" and (not from_user_id or not content):
logger.warning("文本消息缺少发送者或内容,忽略")
return Response(content="success", media_type="text/plain")
elif msg_type != "text" and not from_user_id:
logger.warning("非文本消息缺少发送者,忽略")
return Response(content="success", media_type="text/plain")
# 6. 路由消息到 MessageRouter(使用共享服务实例)
session_factory = _get_session_factory()
async with session_factory() as db:
try:
# 获取共享服务实例(不再手动创建/关闭)
wecom_service = get_shared_wecom_service()
ai_handler = get_shared_ai_handler()
redis_client = get_shared_redis()
# ScoringService 需要当前 db 会话,仍需按请求创建
scoring_service = ScoringService(db)
# CacheService 使用共享 Redis 客户端
cache_service = CacheService(redis_client)
# 创建消息路由器
message_router = MessageRouter(
db=db,
wecom_service=wecom_service,
scoring_service=scoring_service,
ai_handler=ai_handler,
cache_service=cache_service,
)
# 构建 extra_data(存储各消息类型的额外元数据)
extra_data: dict = {}
if msg_type == "image":
extra_data["pic_url"] = pic_url
elif msg_type == "voice":
extra_data["format"] = msg_format
elif msg_type == "video":
extra_data["thumb_media_id"] = message_dict.get("ThumbMediaId", "")
elif msg_type == "location":
extra_data["location_x"] = location_x
extra_data["location_y"] = location_y
extra_data["label"] = location_label
extra_data["scale"] = message_dict.get("Scale", "")
# 路由消息
await message_router.route_message(
from_user_id=from_user_id,
content=content,
msg_type=msg_type,
msg_id=msg_id if msg_id else None,
media_id=media_id if media_id else None,
extra_data=extra_data if extra_data else None,
file_name=file_name if file_name else None,
file_size=int(file_size) if file_size else None,
)
# 提交事务
await db.commit()
except Exception as e:
await db.rollback()
logger.error(f"消息路由处理失败: {e}", exc_info=True)
# 即使处理失败,也返回 "success" 避免企微重试
# 但记录错误日志以便排查
return Response(content="success", media_type="text/plain")
except ValueError as e:
# 解密失败,记录日志但仍返回 success 避免企微重试
logger.error(f"消息解密失败: {e}")
return Response(content="success", media_type="text/plain")
except Exception as e:
# 其他未知错误,记录日志但仍返回 success
logger.error(f"消息处理未知错误: {e}", exc_info=True)
return Response(content="success", media_type="text/plain")
async def _handle_event(
event: str, from_user_id: str, message_dict: dict
) -> None:
"""处理企微事件消息。
事件类型:
- subscribe: 员工关注应用
- unsubscribe: 员工取消关注
- enter_agent: 员工进入应用
Args:
event: 事件类型
from_user_id: 发送者企微 UserID
message_dict: 完整消息字典
"""
if event == "enter_agent":
logger.info(f"员工进入应用: user_id={from_user_id}")
elif event == "subscribe":
logger.info(f"员工关注应用: user_id={from_user_id}")
elif event == "unsubscribe":
logger.info(f"员工取消关注: user_id={from_user_id}")
else:
logger.info(f"收到事件消息: event={event}, user_id={from_user_id}")