Files

672 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 消息路由核心服务
# =============================================================================
# 说明:消息路由层是整个系统的"大脑",负责:
# 1. 接收企微回调消息,路由到不同处理逻辑
# 2. 查找或创建会话
# 3. AI 自动回复(新会话 / AI 处理中的会话)
# 4. 触发 VIP 检测
# 5. 触发标记检测(举手/需介入/情绪)
# 6. 触发紧急度评分
# 7. 消息入库
#
# 路由策略(含 AI):
# - 新会话 → ai_handling → AIHandler 处理 → 命中回复 / 未命中转 queued
# - AI 处理中的会话 → AIHandler 处理 → 命中回复 / 未命中转 queued
# - 排队中/服务中的会话 → 追加消息(坐席人工处理)
#
# 重构记录(2026-06):
# - 替换 ai_service 为 ai_handler(统一 AI 调用逻辑)
# - AIHandler 包含打招呼检测和呼叫人工拦截,两端行为完全一致
# - 举手检测仅用于标记,不再强制跳过 AI(由 AIHandler 统一处理呼叫人工)
# =============================================================================
import json
import logging
from datetime import datetime
from typing import Any, Dict, Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.conversation import Conversation
from app.models.message import Message
from app.services.ai_handler import AIHandler, AIReplyResult
from app.services.cache_service import CacheService
from app.services.scoring_service import ScoringService
from app.services.wecom_service import WecomService
logger = logging.getLogger(__name__)
class MessageRouter:
"""消息路由核心服务。
接收企微回调消息后,按流程处理:
1. find_or_create_conversation — 查找或创建会话(新会话默认 ai_handling
2. AI 自动回复(仅对 ai_handling 状态的会话,通过 AIHandler 统一处理)
3. VIP 检测(从企微通讯录获取员工信息)
4. 标记检测(举手/情绪/需介入)
5. 紧急度评分
6. 更新会话标记和紧急度
7. 创建消息记录
"""
def __init__(
self,
db: AsyncSession,
wecom_service: WecomService,
scoring_service: ScoringService,
ai_handler: Optional[AIHandler] = None,
cache_service: Optional[CacheService] = None,
):
"""初始化消息路由器。
Args:
db: 异步数据库会话
wecom_service: 企微 API 服务(发送消息、获取用户信息)
scoring_service: 评分服务(标记检测 + 紧急度计算)
ai_handler: AI 处理器(可选,为 None 时跳过 AI 处理)
cache_service: 缓存服务(可选,为 None 时跳过去重检查)
"""
self.db = db
self.wecom_service = wecom_service
self.scoring_service = scoring_service
self.ai_handler = ai_handler
self.cache_service = cache_service
# --------------------------------------------------------------------------
# 路由消息(核心入口方法)
# --------------------------------------------------------------------------
async def route_message(
self,
from_user_id: str,
content: str,
msg_type: str = "text",
msg_id: Optional[str] = None,
# 非文本消息扩展参数(轻量版:只存元数据,不下载媒体文件)
media_id: Optional[str] = None,
extra_data: Optional[Dict[str, Any]] = None,
file_name: Optional[str] = None,
file_size: Optional[int] = None,
) -> Optional[Conversation]:
"""路由消息的核心方法。
处理流程:
0. 消息去重检查(MsgId 去重 + 用户+内容去重)
1. 非文本消息 → _handle_non_text_message(自动回复 + 入库,不触发 AI)
2. 文本消息:
a. 查找或创建会话(新会话默认 ai_handling
b. AI 自动回复(仅对 ai_handling 状态的会话,通过 AIHandler 统一处理)
c. VIP 检测
d. 标记检测(举手/情绪/需介入)
e. 紧急度评分
f. 更新会话
g. 创建消息记录
h. 广播 WebSocket 事件
重构说明:举手检测不再强制跳过 AI,由 AIHandler 统一处理呼叫人工拦截。
举手关键词仍用于设置 tag(影响紧急度评分),但不影响 AI 调用决策。
Args:
from_user_id: 发送消息的员工企微 UserID
content: 消息内容(非文本消息时可能为空)
msg_type: 消息类型(默认 text
msg_id: 企微消息唯一 IDMsgId),用于去重
media_id: 企微媒体文件ID(非文本消息时使用)
extra_data: 扩展元数据(pic_url/format/location 等)
file_name: 文件名(文件消息时使用)
file_size: 文件大小(字节,文件消息时使用)
Returns:
Optional[Conversation]: 更新后的会话对象,去重命中时返回 None
"""
logger.info(
f"收到员工消息: employee_id={from_user_id}, "
f"content={content[:50]}{'...' if len(content) > 50 else ''}, "
f"msg_type={msg_type}, msg_id={msg_id}"
)
# ----------------------------------------------------------
# 0. 消息去重检查(幂等保护,防止企微重复推送)
# ----------------------------------------------------------
if self.cache_service:
# 0a. 基于 MsgId 去重(与企微重试窗口一致,5 分钟)
if msg_id and await self.cache_service.is_duplicate(msg_id):
logger.info(
f"MsgId 去重命中,跳过处理: msg_id={msg_id}, "
f"from_user_id={from_user_id}"
)
return None
# 0b. 基于 user_id + content 去重(防快速重复发送,60 秒窗口)
if content and await self.cache_service.is_duplicate_content(
user_id=from_user_id, content=content
):
logger.info(
f"内容去重命中,跳过处理: from_user_id={from_user_id}, "
f"content={content[:30]}{'...' if len(content) > 30 else ''}"
)
return None
# 非文本消息走独立处理路径(不触发 AI、不评分、不标记检测)
if msg_type != "text":
return await self._handle_non_text_message(
from_user_id=from_user_id,
content=content,
msg_type=msg_type,
media_id=media_id,
extra_data=extra_data,
file_name=file_name,
file_size=file_size,
)
# 1. 查找或创建会话(新会话默认 ai_handling
conversation = await self._find_or_create_conversation(
from_user_id, content
)
# 2. 举手检测(仅用于标记,不跳过 AI)
is_hand_raise = self.scoring_service.detect_hand_raise(content)
# 3. AI 自动回复(仅对 ai_handling 状态的会话)
# AIHandler 内部会处理打招呼/呼叫人工/AI调用,统一行为
ai_replied = False
if (
self.ai_handler
and conversation.status == "ai_handling"
):
ai_replied = await self._try_ai_reply(
conversation=conversation,
content=content,
from_user_id=from_user_id,
)
# 4. VIP 检测(只在会话首次创建或未检测过时执行)
if not conversation.is_vip and conversation.department == "":
await self._check_vip(conversation)
# 5. 标记检测
tags = dict(conversation.tags) if conversation.tags else {}
# 5a. 举手标记检测
if is_hand_raise:
tags["hand_raise"] = True
logger.info(f"举手标记触发: employee_id={from_user_id}")
# 5b. 情绪标记检测
emotion = self.scoring_service.detect_emotion(content)
if emotion != "neutral":
tags["emotion"] = emotion
# 记录触发情绪标记的关键词
emotion_keywords = self.scoring_service.get_emotion_keywords(content, emotion)
if emotion_keywords:
tags["emotion_keywords"] = emotion_keywords
logger.info(f"情绪标记触发: employee_id={from_user_id}, emotion={emotion}")
# 5c. 需介入标记检测(基于追问轮次)
is_need_intervene = await self.scoring_service.detect_need_intervene(
conversation.id, self.db
)
if is_need_intervene:
tags["need_intervene"] = True
logger.info(f"需介入标记触发: employee_id={from_user_id}")
# 5d. 更新追问轮次计数
repeat_count = tags.get("repeat_count", 0)
tags["repeat_count"] = repeat_count + 1
# 6. 紧急度评分
urgency_score = await self.scoring_service.calculate_urgency(
content=content,
tags=tags,
is_vip=conversation.is_vip,
)
logger.info(
f"会话标记更新: conv_id={conversation.id}, "
f"tags={json.dumps(tags, ensure_ascii=False)}, urgency={urgency_score}"
)
# 7. 更新会话
conversation.tags = tags
conversation.urgency_score = urgency_score
conversation.last_message_at = datetime.now()
conversation.last_message_summary = content[:256]
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
# 8. 创建消息记录(员工消息)
message = Message(
conversation_id=conversation.id,
sender_type="employee",
sender_id=from_user_id,
sender_name=conversation.employee_name,
content=content,
msg_type=msg_type,
is_read=False,
)
self.db.add(message)
await self.db.flush()
logger.info(
f"消息路由完成: conv_id={conversation.id}, "
f"status={conversation.status}, urgency={urgency_score}, "
f"ai_replied={ai_replied}"
)
# ----------------------------------------------------------------------
# 9. 广播 WebSocket 事件
# ----------------------------------------------------------------------
from app.services.ws_manager import manager as ws_manager
try:
await ws_manager.broadcast({
"type": "new_message",
"data": {
"conversation_id": str(conversation.id),
"message_id": str(message.id),
"sender_type": "employee",
"sender_id": from_user_id,
"content": content,
"urgency_score": urgency_score,
"tags": tags,
"ai_replied": ai_replied,
}
})
except Exception as e:
logger.warning(f"WebSocket广播失败(不阻塞流程): {e}")
return conversation
# --------------------------------------------------------------------------
# 非文本消息处理(轻量版:自动回复 + 入库,不触发 AI)
# --------------------------------------------------------------------------
async def _handle_non_text_message(
self,
from_user_id: str,
content: str,
msg_type: str,
media_id: Optional[str] = None,
extra_data: Optional[Dict[str, Any]] = None,
file_name: Optional[str] = None,
file_size: Optional[int] = None,
) -> Conversation:
"""处理非文本消息(图片/语音/视频/文件/位置)。
轻量版策略:
- 图片:礼貌回复引导用户补充文字描述
- 其余类型:统一回复暂不支持
- 所有消息存入数据库
- 不触发 AI 分析(不调用 Dify API
- 不改变会话状态(非文本不影响 AI 对话状态)
- 不下载媒体文件,只存储企微回传的元数据
Args:
from_user_id: 发送消息的员工企微 UserID
content: 消息内容(非文本通常为空)
msg_type: 消息类型(image/voice/video/file/location
media_id: 企微媒体文件ID
extra_data: 扩展元数据
file_name: 文件名
file_size: 文件大小
Returns:
Conversation: 更新后的会话对象
"""
# 1. 查找或创建会话(复用现有逻辑)
conversation = await self._find_or_create_conversation(
from_user_id, content or f"[{msg_type}]"
)
# 2. 构建非文本消息的展示文本(存入 content 字段,用于前端展示)
display_text = self._get_non_text_display(msg_type, file_name, extra_data)
# 3. 生成自动回复文本
reply_text = self._get_non_text_reply(msg_type)
# 4. 创建员工消息记录(存储非文本消息元数据)
message = Message(
conversation_id=conversation.id,
sender_type="employee",
sender_id=from_user_id,
sender_name=conversation.employee_name or from_user_id,
content=display_text, # 展示用文本,如 "[图片消息]"
msg_type=msg_type,
media_id=media_id,
file_name=file_name,
file_size=file_size,
extra_data=extra_data,
is_read=False,
)
self.db.add(message)
# 5. 发送自动回复到企微
try:
await self.wecom_service.send_text_message(
user_id=from_user_id,
content=reply_text,
)
except Exception as e:
logger.error(f"发送非文本消息自动回复失败: {e}")
# 6. 创建自动回复消息记录
reply_message = Message(
conversation_id=conversation.id,
sender_type="ai",
sender_id="ai_bot",
sender_name="AI智能助手",
content=reply_text,
msg_type="text",
is_read=False,
)
self.db.add(reply_message)
# 7. 更新会话(不改变状态,只更新时间戳和摘要)
conversation.last_message_at = datetime.now()
conversation.last_message_summary = display_text[:256]
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
logger.info(
f"非文本消息处理完成: conv_id={conversation.id}, "
f"msg_type={msg_type}, reply={reply_text[:30]}..."
)
# 8. 广播 WebSocket 事件
from app.services.ws_manager import manager as ws_manager
try:
await ws_manager.broadcast({
"type": "new_message",
"data": {
"conversation_id": str(conversation.id),
"message_id": str(message.id),
"sender_type": "employee",
"sender_id": from_user_id,
"content": display_text,
"msg_type": msg_type,
"media_id": media_id,
"file_name": file_name,
"file_size": file_size,
"urgency_score": conversation.urgency_score,
"tags": conversation.tags,
"ai_replied": True,
}
})
except Exception as e:
logger.warning(f"WebSocket广播失败(不阻塞流程): {e}")
return conversation
def _get_non_text_display(
self,
msg_type: str,
file_name: Optional[str] = None,
extra_data: Optional[Dict[str, Any]] = None,
) -> str:
"""根据消息类型生成展示文本。
Args:
msg_type: 消息类型(image/voice/video/file/location
file_name: 文件名(文件消息时使用)
extra_data: 扩展元数据
Returns:
str: 展示文本,如 "[图片消息]""[文件消息: report.pdf]"
"""
displays: dict[str, str] = {
"image": "[图片消息]",
"voice": "[语音消息]",
"video": "[视频消息]",
"file": f"[文件消息: {file_name}]" if file_name else "[文件消息]",
"location": "[位置消息]",
}
return displays.get(msg_type, f"[{msg_type}消息]")
def _get_non_text_reply(self, msg_type: str) -> str:
"""根据消息类型生成自动回复文本(发给员工)。
Args:
msg_type: 消息类型(image/voice/video/file/location
Returns:
str: 自动回复文本
"""
if msg_type == "image":
return (
"收到您的截图 📷\n"
"请补充文字描述您遇到的问题,以便更快为您处理。\n"
"例如:\n"
"• 这是什么软件的报错截图?\n"
"• 您在操作什么时出现的?\n"
"• 错误信息的具体内容是什么?"
)
type_names: dict[str, str] = {
"voice": "语音",
"video": "视频",
"file": "文件",
"location": "位置",
}
type_name = type_names.get(msg_type, msg_type)
return (
f"暂不支持{type_name}消息 😅\n"
"请用文字描述您的问题,我会尽快为您处理。"
)
async def _try_ai_reply(
self,
conversation: Conversation,
content: str,
from_user_id: str,
) -> bool:
"""尝试让 AI 回复员工消息。
重构说明:使用 AIHandler 统一处理打招呼检测、呼叫人工拦截、
AI 调用、命中判断、计数规则和转人工逻辑,确保与 H5 端行为完全一致。
流程:
1. 调用 AIHandler.handle_message() 获取统一结果
2. 根据结果类型:
- greeting/call_human → 发送引导话术到企微(不计数,不转人工)
- ai_hit → 发送 AI 回复到企微(计数+1,不转人工)
- ai_miss → 发送转人工提示到企微(不计数,转人工)
- ai_fallback → 发送降级模板到企微(不计数,不转人工)
3. 创建消息记录
4. 更新会话状态和计数
Args:
conversation: 当前会话
content: 员工消息内容
from_user_id: 员工企微 UserID
Returns:
bool: True=AI 已回复(含引导),False=需转人工或出错
"""
if not self.ai_handler:
logger.warning("AI 处理器不可用,跳过 AI 回复")
return False
# 调用 AIHandler 统一处理
result: AIReplyResult = await self.ai_handler.handle_message(
content=content,
dify_conversation_id=conversation.dify_conversation_id,
user_id=from_user_id,
)
# 更新 Dify 会话ID(多轮对话上下文)
if result.dify_conversation_id:
conversation.dify_conversation_id = result.dify_conversation_id
# 发送回复到企微(员工在企微中看到回复)
try:
await self.wecom_service.send_text_message(
user_id=from_user_id,
content=result.content,
)
except Exception as e:
logger.error(f"发送AI回复到企微失败: {e}")
# 企微发送失败不阻塞流程,坐席仍然能看
# 创建消息记录(根据类型选择 sender_type
if result.should_transfer:
# 转人工消息用系统消息类型
sender_type = "system"
sender_id = "system"
sender_name = "系统"
else:
# AI 回复/引导/降级均用 AI 消息类型
sender_type = "ai"
sender_id = "ai_bot"
sender_name = "AI智能助手"
ai_message = Message(
conversation_id=conversation.id,
sender_type=sender_type,
sender_id=sender_id,
sender_name=sender_name,
content=result.content,
msg_type="text",
is_read=False,
)
self.db.add(ai_message)
await self.db.flush()
# 更新 AI 实质性回复计数(仅命中时 +1)
if result.should_count:
conversation.ai_substantive_reply_count += 1
logger.info(
f"AI 命中并回复: conv_id={conversation.id}, "
f"ai_count={conversation.ai_substantive_reply_count}"
)
# 转人工处理
if result.should_transfer:
conversation.status = "queued"
logger.info(
f"AI 未命中,转人工: conv_id={conversation.id}"
)
return False
# 记录其他类型日志
if result.is_guidance:
logger.info(
f"AI 引导回复: conv_id={conversation.id}, "
f"type={result.reply_type}"
)
elif result.reply_type == "ai_fallback":
logger.info(
f"AI 降级模板回复: conv_id={conversation.id}"
)
return True
# --------------------------------------------------------------------------
# 查找或创建会话
# --------------------------------------------------------------------------
async def _find_or_create_conversation(
self, employee_id: str, content: str
) -> Conversation:
"""查找员工当前活跃的会话,如果不存在则创建新会话。
规则:
- 如果员工有 status 为 ai_handling 或 queued 或 serving 的会话,继续使用该会话
- 否则创建新会话,状态为 ai_handling(先让 AI 尝试回答)
Args:
employee_id: 员工企微 UserID
content: 消息内容(用于创建会话时设置摘要)
Returns:
Conversation: 找到的或新创建的会话对象
"""
# 查找当前活跃会话(ai_handling/queued/serving 状态)
stmt = select(Conversation).where(
Conversation.employee_id == employee_id,
Conversation.status.in_(["ai_handling", "queued", "serving"]),
).order_by(Conversation.created_at.desc())
result = await self.db.execute(stmt)
conversation = result.scalars().first()
if conversation:
logger.debug(f"找到活跃会话: conv_id={conversation.id}, status={conversation.status}")
return conversation
# 没有活跃会话,创建新会话
# 默认状态 ai_handling:先让 AI 尝试回答,AI 未命中再转 queued
conversation = Conversation(
employee_id=employee_id,
employee_name="", # 稍后通过 VIP 检测补充
department="",
position="",
level="",
status="ai_handling", # 先让 AI 尝试回答
is_vip=False,
is_pinned=False,
is_todo=False,
urgency_score=1,
tags={},
last_message_at=datetime.now(),
last_message_summary=content[:256],
)
self.db.add(conversation)
await self.db.flush() # 刷新以获取生成的 ID
logger.info(
f"创建新会话: conv_id={conversation.id}, "
f"employee_id={employee_id}, status=ai_handling"
)
return conversation
# --------------------------------------------------------------------------
# VIP 检测
# --------------------------------------------------------------------------
async def _check_vip(self, conversation: Conversation) -> None:
"""检测员工是否为 VIP 并更新会话信息。
通过企微通讯录 API 获取员工信息:
- 判断 VIP 规则:总监及以上 或 关键部门
- 补充员工姓名、部门、岗位、等级等信息
Args:
conversation: 会话对象(会被就地修改)
"""
# 已检测过 VIP 的会话不再重复检测
if conversation.is_vip:
return
try:
user_info = await self.wecom_service.get_user_info(
conversation.employee_id
)
# 补充员工信息
conversation.employee_name = user_info.get("name", "")
conversation.department = user_info.get("department", "") # 部门ID列表,JSON字符串
conversation.position = user_info.get("position", "")
conversation.level = user_info.get("position", "") # 企微无单独等级字段,暂用岗位
# VIP 规则:总监及以上 或 关键部门
# 第一步简单规则:职位中包含"总监"/"总经理"/"VP"/"CEO" 为 VIP
position_text = user_info.get("position", "")
vip_keywords = ["总监", "总经理", "VP", "CEO", "CIO", "CTO", "CFO", "COO"]
is_vip = any(kw in position_text for kw in vip_keywords)
conversation.is_vip = is_vip
if is_vip:
logger.info(
f"VIP标记: employee_id={conversation.employee_id}, "
f"position={position_text}"
)
# 缓存 VIP 结果到 Redis1 小时)
# 避免每次消息都调企微 API
# 这里暂不实现 Redis 缓存,后续优化
except Exception as e:
# VIP 检测失败不应阻塞消息路由
logger.warning(
f"VIP检测失败(不阻塞流程): employee_id={conversation.employee_id}, "
f"error={e}"
)