Files

1235 lines
45 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 会话状态管理服务
# =============================================================================
# 说明:管理会话的完整生命周期,包括:
# 1. 创建会话(新员工发消息时自动创建)
# 2. 更新会话状态(queued → serving → resolved
# 3. 分配坐席
# 4. 结单
# 5. 获取会话列表(支持排序:紧急→举手→需介入→活跃→已结单)
# 6. 获取坐席当前服务的会话列表
# =============================================================================
import logging
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from sqlalchemy import and_, case, desc, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.agent import Agent
from app.models.conversation import Conversation
from app.services.wecom_service import WecomService
from app.utils.response import (
AppException,
ERR_AGENT_BUSY,
ERR_AGENT_NOT_FOUND,
ERR_CONVERSATION_NOT_FOUND,
ERR_CONVERSATION_RESOLVED,
ERR_DUPLICATE_ASSIGN,
)
logger = logging.getLogger(__name__)
class SessionService:
"""会话状态管理服务。
管理会话的完整生命周期,实现会话状态流转和坐席分配逻辑。
"""
def __init__(
self,
db: AsyncSession,
wecom_service: Optional[WecomService] = None,
):
"""初始化会话状态管理服务。
Args:
db: 异步数据库会话
wecom_service: 企微 API 服务(用于坐席接入时发送通知,可选)
"""
self.db = db
self.wecom_service = wecom_service
# --------------------------------------------------------------------------
# 创建会话
# --------------------------------------------------------------------------
async def create_conversation(
self,
employee_id: str,
employee_name: str = "",
department: str = "",
position: str = "",
level: str = "",
) -> Conversation:
"""创建新会话。
当员工首次发消息或摇人时自动创建。
新会话默认状态为 queued(排队等坐席)。
Args:
employee_id: 企微员工 UserID
employee_name: 员工姓名
department: 部门
position: 岗位
level: 等级
Returns:
Conversation: 新创建的会话对象
"""
conversation = Conversation(
employee_id=employee_id,
employee_name=employee_name,
department=department,
position=position,
level=level,
status="queued",
is_vip=False,
is_pinned=False,
is_todo=False,
urgency_score=1,
tags={},
)
self.db.add(conversation)
await self.db.flush()
logger.info(f"创建会话: conv_id={conversation.id}, employee={employee_id}")
return conversation
# --------------------------------------------------------------------------
# 更新会话状态
# --------------------------------------------------------------------------
async def update_status(
self, conversation_id: UUID, new_status: str
) -> Conversation:
"""更新会话状态。
状态流转规则:
- queued → serving: 坐席接单
- serving → resolved: 结单
- queued → resolved: 直接结单(排队中员工问题已自行解决)
Args:
conversation_id: 会话ID
new_status: 新状态(queued/serving/resolved/ai_handling
Returns:
Conversation: 更新后的会话对象
Raises:
AppException: 会话不存在或状态流转不合法
"""
conversation = await self._get_conversation(conversation_id)
# 校验状态流转合法性
valid_transitions = {
"queued": ["serving", "resolved"],
"serving": ["resolved"],
"ai_handling": ["queued", "serving", "resolved"],
"resolved": [], # 已结单不能再改状态
}
allowed = valid_transitions.get(conversation.status, [])
if new_status not in allowed and new_status != conversation.status:
raise AppException(
3010,
f"会话状态流转不合法: {conversation.status}{new_status}",
)
# 如果是已结单,不能再改状态
if conversation.status == "resolved":
raise ERR_CONVERSATION_RESOLVED
conversation.status = new_status
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
logger.info(
f"会话状态更新: conv_id={conversation_id}, "
f"{conversation.status}{new_status}"
)
return conversation
# --------------------------------------------------------------------------
# 分配坐席(接单)
# --------------------------------------------------------------------------
async def assign_agent(
self, conversation_id: UUID, agent_id: str
) -> Conversation:
"""分配坐席接入会话。
流程:
1. 校验会话存在且未结单
2. 校验坐席存在且在线
3. 校验坐席未满负荷
4. 更新会话状态为 serving
5. 更新坐席当前服务数 +1
6. 通过企微 API 向员工发送接入通知
Args:
conversation_id: 会话ID
agent_id: 坐席ID
Returns:
Conversation: 更新后的会话对象
Raises:
AppException: 校验不通过
"""
# 1. 校验会话
conversation = await self._get_conversation(conversation_id)
if conversation.status == "resolved":
raise ERR_CONVERSATION_RESOLVED
if conversation.assigned_agent_id and conversation.status == "serving":
raise ERR_DUPLICATE_ASSIGN
# 2. 校验坐席(本地开发模式:自动创建不存在的坐席)
stmt = select(Agent).where(Agent.user_id == agent_id)
result = await self.db.execute(stmt)
agent = result.scalars().first()
if not agent:
# DEV MODE: 本地开发时自动创建坐席,避免必须先登录才能测试接单
logger.warning(f"坐席不存在,自动创建: user_id={agent_id}")
agent = Agent(
user_id=agent_id,
name="未知坐席",
status="online",
current_load=0,
max_load=5,
)
self.db.add(agent)
await self.db.flush()
if agent.status == "offline":
raise AppException(3007, "坐席不在线,无法接单")
if agent.current_load >= agent.max_load:
raise ERR_AGENT_BUSY
# 3. 更新会话
conversation.status = "serving"
conversation.assigned_agent_id = agent_id
conversation.updated_at = datetime.now()
self.db.add(conversation)
# 4. 更新坐席服务数
agent.current_load += 1
self.db.add(agent)
await self.db.flush()
# 5. 发送接入通知给员工
if self.wecom_service:
try:
await self.wecom_service.send_text_message(
conversation.employee_id,
"人摇来了!IT坐席为您服务",
)
except Exception as e:
logger.warning(f"发送接入通知失败(不阻塞流程): {e}")
logger.info(
f"坐席接单: conv_id={conversation_id}, agent={agent_id}"
)
# ----------------------------------------------------------------------
# 广播 WebSocket 事件:会话状态变更
# ----------------------------------------------------------------------
# 做什么:通知所有在线坐席,有会话被接单了
# 为什么:其他坐席需要实时看到该会话从"排队"变为"服务中"
# 用 try/except 包裹:广播失败不能阻塞接单主流程
# ----------------------------------------------------------------------
from app.services.ws_manager import manager as ws_manager
try:
await ws_manager.broadcast({
"type": "conversation_updated",
"data": {
"conversation_id": str(conversation.id),
"status": conversation.status,
"assigned_agent_id": conversation.assigned_agent_id,
}
})
except Exception as e:
logger.warning(f"WebSocket广播失败: {e}")
return conversation
# --------------------------------------------------------------------------
# 结单
# --------------------------------------------------------------------------
async def resolve_conversation(
self, conversation_id: UUID, agent_id: Optional[str] = None
) -> Conversation:
"""结单。
流程:
1. 校验会话存在
2. 更新会话状态为 resolved
3. 如果有坐席,更新坐席当前服务数 -1
Args:
conversation_id: 会话ID
agent_id: 坐席ID(可选,用于更新坐席服务数)
Returns:
Conversation: 更新后的会话对象
"""
conversation = await self._get_conversation(conversation_id)
if conversation.status == "resolved":
raise ERR_CONVERSATION_RESOLVED
# 更新会话状态
conversation.status = "resolved"
conversation.updated_at = datetime.now()
self.db.add(conversation)
# 更新坐席服务数
assigned_agent_id = agent_id or conversation.assigned_agent_id
if assigned_agent_id:
stmt = select(Agent).where(Agent.user_id == assigned_agent_id)
result = await self.db.execute(stmt)
agent = result.scalars().first()
if agent and agent.current_load > 0:
agent.current_load -= 1
self.db.add(agent)
await self.db.flush()
logger.info(f"会话结单: conv_id={conversation_id}")
# ----------------------------------------------------------------------
# 广播 WebSocket 事件:会话已结单
# ----------------------------------------------------------------------
# 做什么:通知所有在线坐席,有会话已结单
# 为什么:坐席需要实时看到该会话从"服务中"变为"已结单"
# 用 try/except 包裹:广播失败不能阻塞结单主流程
# ----------------------------------------------------------------------
from app.services.ws_manager import manager as ws_manager
try:
await ws_manager.broadcast({
"type": "conversation_updated",
"data": {
"conversation_id": str(conversation.id),
"status": conversation.status,
"assigned_agent_id": conversation.assigned_agent_id,
}
})
except Exception as e:
logger.warning(f"WebSocket广播失败: {e}")
return conversation
# --------------------------------------------------------------------------
# 置顶/取消置顶
# --------------------------------------------------------------------------
async def toggle_pin(self, conversation_id: UUID) -> Conversation:
"""切换会话置顶状态。
Args:
conversation_id: 会话ID
Returns:
Conversation: 更新后的会话对象
"""
conversation = await self._get_conversation(conversation_id)
conversation.is_pinned = not conversation.is_pinned
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
logger.info(
f"会话置顶{'开启' if conversation.is_pinned else '取消'}: "
f"conv_id={conversation_id}"
)
return conversation
# --------------------------------------------------------------------------
# 代办/取消代办
# --------------------------------------------------------------------------
async def toggle_todo(self, conversation_id: UUID) -> Conversation:
"""切换会话代办状态。
Args:
conversation_id: 会话ID
Returns:
Conversation: 更新后的会话对象
"""
conversation = await self._get_conversation(conversation_id)
conversation.is_todo = not conversation.is_todo
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
logger.info(
f"会话代办{'开启' if conversation.is_todo else '取消'}: "
f"conv_id={conversation_id}"
)
return conversation
# --------------------------------------------------------------------------
# 摇人 — 邀请坐席加入协作
# --------------------------------------------------------------------------
async def invite_collaborator(
self,
conversation_id: UUID,
inviter_agent_id: str,
invitee_agent_id: str,
) -> Conversation:
"""邀请坐席加入协作。
做什么:坐席A在处理会话时发现需要坐席B的专业知识,点击「摇人」
将坐席B加入 collaborating_agent_ids 列表,坐席B可查看和回复但不能结单。
校验规则:
1. 会话存在且为 serving(已结单的不能摇人)
2. 邀请人在主责坐席或协作坐席列表中
3. 被邀请人不能是主责坐席,也不能已在协作列表中(防重复)
4. 被邀请坐席存在且在线
Args:
conversation_id: 会话ID
inviter_agent_id: 邀请人坐席ID
invitee_agent_id: 被邀请坐席ID
Returns:
Conversation: 更新后的会话对象
Raises:
AppException: 校验不通过
"""
# 1. 校验会话
conversation = await self._get_conversation(conversation_id)
if conversation.status == "resolved":
raise ERR_CONVERSATION_RESOLVED
if conversation.status != "serving":
raise AppException(3020, "只能邀请协作服务中的会话")
# 2. 校验邀请人权限:必须是主责坐席或已在协作列表中
is_owner = conversation.assigned_agent_id == inviter_agent_id
is_collaborator = inviter_agent_id in (conversation.collaborating_agent_ids or [])
if not is_owner and not is_collaborator:
raise AppException(3021, "只有主责坐席或协作坐席才能摇人")
# 3. 校验被邀请人:不能是主责坐席
if conversation.assigned_agent_id == invitee_agent_id:
raise AppException(3022, "不能邀请主责坐席协作(他已经在处理了)")
# 4. 校验被邀请人:不能已在协作列表中
if invitee_agent_id in (conversation.collaborating_agent_ids or []):
raise AppException(3023, "该坐席已在协作中,无需重复邀请")
# 5. 校验被邀请坐席存在且在线
stmt = select(Agent).where(Agent.user_id == invitee_agent_id)
result = await self.db.execute(stmt)
invitee = result.scalars().first()
if not invitee:
raise ERR_AGENT_NOT_FOUND
if invitee.status == "offline":
raise AppException(3024, "被邀请坐席不在线")
# 6. 将被邀请人加入协作列表
collab_ids = list(conversation.collaborating_agent_ids or [])
collab_ids.append(invitee_agent_id)
conversation.collaborating_agent_ids = collab_ids
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
logger.info(
f"摇人成功: conv_id={conversation_id}, "
f"inviter={inviter_agent_id}, invitee={invitee_agent_id}"
)
# ----------------------------------------------------------------------
# 广播 + 定向推送 WebSocket 事件
# ----------------------------------------------------------------------
# 做什么:
# 1. broadcast — 所有坐席看到协作关系变化(刷新会话列表)
# 2. send_to_agent — 被邀请人收到定向通知(弹窗提示)
# 用 try/except 包裹:推送失败不能阻塞主流程
# ----------------------------------------------------------------------
from app.services.ws_manager import manager as ws_manager
inviter_name = inviter_agent_id # 前端会从 agent_name_map 获取真实姓名
try:
# 广播:会话协作关系变更(所有坐席刷新列表)
await ws_manager.broadcast({
"type": "collaborator_joined",
"data": {
"conversation_id": str(conversation.id),
"agent_id": invitee_agent_id,
"inviter_agent_id": inviter_agent_id,
}
})
# 定向推送:通知被邀请人
await ws_manager.send_to_agent(invitee_agent_id, {
"type": "collaborator_invited",
"data": {
"conversation_id": str(conversation.id),
"inviter_agent_id": inviter_agent_id,
"invitee_agent_id": invitee_agent_id,
"employee_name": conversation.employee_name,
"last_message_summary": conversation.last_message_summary or "",
}
})
except Exception as e:
logger.warning(f"WebSocket推送失败(不阻塞流程): {e}")
return conversation
# --------------------------------------------------------------------------
# 摇人 — 退出协作
# --------------------------------------------------------------------------
async def leave_collaboration(
self,
conversation_id: UUID,
agent_id: str,
) -> Conversation:
"""坐席退出协作。
做什么:将坐席从 collaborating_agent_ids 中移除。
校验规则:
1. 坐席必须在协作列表中
2. 坐席不能是主责坐席(主责坐席不能"退出",只能转接或结单)
Args:
conversation_id: 会话ID
agent_id: 退出协作的坐席ID
Returns:
Conversation: 更新后的会话对象
Raises:
AppException: 校验不通过
"""
# 1. 校验会话
conversation = await self._get_conversation(conversation_id)
# 2. 校验:不能是主责坐席
if conversation.assigned_agent_id == agent_id:
raise AppException(3025, "主责坐席不能退出协作,请使用转接或结单")
# 3. 校验:必须在协作列表中
collab_ids = list(conversation.collaborating_agent_ids or [])
if agent_id not in collab_ids:
raise AppException(3026, "您不在该会话的协作列表中")
# 4. 移除
collab_ids.remove(agent_id)
conversation.collaborating_agent_ids = collab_ids
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
logger.info(f"退出协作: conv_id={conversation_id}, agent={agent_id}")
# ----------------------------------------------------------------------
# 广播 WebSocket 事件:协作坐席退出
# ----------------------------------------------------------------------
from app.services.ws_manager import manager as ws_manager
try:
await ws_manager.broadcast({
"type": "collaborator_left",
"data": {
"conversation_id": str(conversation.id),
"agent_id": agent_id,
}
})
except Exception as e:
logger.warning(f"WebSocket推送失败(不阻塞流程): {e}")
return conversation
# --------------------------------------------------------------------------
# 转接会话
# --------------------------------------------------------------------------
async def transfer_conversation(
self, conversation_id: UUID, target_agent_id: str
) -> Conversation:
"""转接会话到另一个坐席。
第一步简化版:只更换坐席,不做转接通知。
Args:
conversation_id: 会话ID
target_agent_id: 目标坐席ID
Returns:
Conversation: 更新后的会话对象
"""
conversation = await self._get_conversation(conversation_id)
# 校验目标坐席
stmt = select(Agent).where(Agent.user_id == target_agent_id)
result = await self.db.execute(stmt)
target_agent = result.scalars().first()
if not target_agent:
raise ERR_AGENT_NOT_FOUND
if target_agent.current_load >= target_agent.max_load:
raise ERR_AGENT_BUSY
# 旧坐席服务数 -1
old_agent_id = conversation.assigned_agent_id
if old_agent_id:
stmt = select(Agent).where(Agent.user_id == old_agent_id)
result = await self.db.execute(stmt)
old_agent = result.scalars().first()
if old_agent and old_agent.current_load > 0:
old_agent.current_load -= 1
self.db.add(old_agent)
# 新坐席服务数 +1
target_agent.current_load += 1
self.db.add(target_agent)
# 更新会话
conversation.assigned_agent_id = target_agent_id
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
logger.info(
f"会话转接: conv_id={conversation_id}, "
f"from={old_agent_id} to={target_agent_id}"
)
# ----------------------------------------------------------------------
# 广播 WebSocket 事件:会话转接
# ----------------------------------------------------------------------
# 做什么:通知所有在线坐席,有会话被转接了
# 为什么:原坐席和目标坐席都需要实时看到会话归属变化
# 用 try/except 包裹:广播失败不能阻塞转接主流程
# ----------------------------------------------------------------------
from app.services.ws_manager import manager as ws_manager
try:
await ws_manager.broadcast({
"type": "conversation_updated",
"data": {
"conversation_id": str(conversation.id),
"status": conversation.status,
"assigned_agent_id": conversation.assigned_agent_id,
}
})
except Exception as e:
logger.warning(f"WebSocket广播失败: {e}")
return conversation
# --------------------------------------------------------------------------
# 获取会话列表(坐席端)
# --------------------------------------------------------------------------
async def get_conversations(
self,
status: Optional[str] = None,
agent_id: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> tuple[List[Conversation], int]:
"""获取会话列表,支持过滤和排序。
排序规则(PRD 定义):
紧急 → 举手 → 需介入 → 活跃 → 已结单
同级别按 last_message_at 倒序
实现方式:先按数据库基础排序(状态+置顶+紧急度),
再在 Python 侧按完整规则精细排序(含 JSON tags 字段)。
Args:
status: 按状态过滤(可选)
agent_id: 按坐席ID过滤(可选,查看某坐席的会话)
page: 页码(从1开始)
page_size: 每页数量
Returns:
tuple[List[Conversation], int]: (会话列表, 总数)
"""
# 构建查询条件
conditions = []
if status:
conditions.append(Conversation.status == status)
if agent_id:
conditions.append(Conversation.assigned_agent_id == agent_id)
# 查询总数
count_stmt = select(func.count(Conversation.id))
if conditions:
count_stmt = count_stmt.where(and_(*conditions))
total_result = await self.db.execute(count_stmt)
total = total_result.scalar() or 0
# 数据库侧基础排序(快速过滤):
# 置顶 > 紧急度5 > 紧急度4 > 紧急度3 > 状态排序 > 最后消息时间
# JSON tags 字段的排序在 Python 侧完成(SQLite 不支持 JSON 操作符)
db_order_weight = case(
(Conversation.is_pinned == True, 1000),
(Conversation.urgency_score >= 5, 900),
(Conversation.urgency_score >= 4, 600),
(Conversation.urgency_score >= 3, 300),
(Conversation.status == "queued", 200),
(Conversation.status == "ai_handling", 150),
(Conversation.status == "serving", 100),
else_=0,
)
stmt = select(Conversation)
if conditions:
stmt = stmt.where(and_(*conditions))
# 数据库侧先按基础权重 + 最后消息时间排序
stmt = stmt.order_by(desc(db_order_weight), desc(Conversation.last_message_at))
# 查询所有符合条件的会话(数据量不大时可行;生产环境建议改用 PostgreSQL + JSONB 操作符)
result = await self.db.execute(stmt)
all_conversations = list(result.scalars().all())
# ===== Python 侧精细排序(支持 JSON tags 字段)=====
def _sort_key(conv: Conversation):
"""计算完整排序权重(数值越大越靠前)"""
weight = 0
tags = conv.tags or {}
# 置顶(最高优先级)
if conv.is_pinned:
weight += 10000
# 紧急度评分(越高越靠前)
urgency = conv.urgency_score or 0
if urgency >= 5:
weight += 9000
elif urgency >= 4:
weight += 6000
elif urgency >= 3:
weight += 3000
# 举手标记
if tags.get("hand_raise"):
weight += 8000
# 需介入标记
if tags.get("need_intervene"):
weight += 7000
# 情绪标记(非 neutral
emotion = tags.get("emotion", "neutral")
if emotion and emotion != "neutral":
weight += 5000
# 状态排序
status_order = {
"queued": 2000,
"ai_handling": 1500,
"serving": 1000,
"resolved": 0,
}
weight += status_order.get(conv.status, 0)
# 最后消息时间(时间戳越大越靠前,除以 1e6 归一化到合理范围)
import time
if conv.last_message_at:
ts = conv.last_message_at.timestamp()
else:
ts = 0
# 用 (weight, ts) 元组排序:先按 weight 降序,再按 ts 降序
return (weight, ts)
# 按完整规则排序(降序)
all_conversations.sort(key=_sort_key, reverse=True)
# 分页
offset = (page - 1) * page_size
conversations = all_conversations[offset:offset + page_size]
return conversations, total
# --------------------------------------------------------------------------
# 获取坐席当前服务的会话列表
# --------------------------------------------------------------------------
async def get_agent_conversations(
self, agent_id: str
) -> List[Conversation]:
"""获取坐席当前正在服务的会话列表。
只返回状态为 serving 且分配给该坐席的会话。
Args:
agent_id: 坐席ID
Returns:
List[Conversation]: 会话列表
"""
stmt = (
select(Conversation)
.where(
Conversation.assigned_agent_id == agent_id,
Conversation.status == "serving",
)
.order_by(desc(Conversation.last_message_at))
)
result = await self.db.execute(stmt)
return list(result.scalars().all())
# --------------------------------------------------------------------------
# 获取会话详情
# --------------------------------------------------------------------------
async def _get_conversation(self, conversation_id: UUID) -> Conversation:
"""获取会话对象,不存在则抛异常。
Args:
conversation_id: 会话ID
Returns:
Conversation: 会话对象
Raises:
AppException: 会话不存在
"""
# 将 UUID 转为字符串,确保与 String(36) 列类型匹配
conv_id_str = str(conversation_id)
stmt = select(Conversation).where(Conversation.id == conv_id_str)
result = await self.db.execute(stmt)
conversation = result.scalars().first()
if not conversation:
raise ERR_CONVERSATION_NOT_FOUND
return conversation
async def get_conversation(self, conversation_id: UUID) -> Conversation:
"""获取会话详情(公开方法)。
Args:
conversation_id: 会话ID
Returns:
Conversation: 会话对象
Raises:
AppException: 会话不存在
"""
return await self._get_conversation(conversation_id)
# ======================================================================
# 邀请功能(P0-09~P0-11):坐席邀请员工/部门加入会话
# ======================================================================
async def _get_employee_avatar(self, employee_id: str) -> str:
"""获取员工头像URL。
做什么:从 employees 表或企微通讯录API获取头像
为什么:邀请参与者时需要展示头像,前端无法单独获取被邀请人头像
优先级:employees 表(本地缓存) > 企微API(实时获取)
Args:
employee_id: 企微员工UserID
Returns:
str: 头像URL,获取不到返回空字符串
"""
# 1. 优先从 employees 表查(本地缓存,速度快)
from app.models.employee import Employee
result = await self.db.execute(
select(Employee.avatar).where(Employee.employee_id == employee_id)
)
row = result.first()
if row and row[0]:
return row[0]
# 2. employees 表没有,尝试从企微通讯录API获取
if self.wecom_service:
try:
user_info = await self.wecom_service.get_user_info(employee_id)
avatar = user_info.get("avatar", "")
return avatar
except Exception as e:
logger.warning(f"从企微API获取头像失败: employee_id={employee_id}, error={e}")
return ""
async def invite_participants(
self,
conversation_id: UUID,
inviter_agent_id: str,
participants: list[dict],
history_mode: str = "recent10",
) -> Conversation:
"""邀请员工/部门加入会话(P0-09)。
做什么:主责坐席邀请非坐席人员(员工/部门)参与会话
为什么:复杂IT问题可能需要业务方人员补充信息
权限:只有主责坐席可以发起邀请
Args:
conversation_id: 会话ID
inviter_agent_id: 邀请人坐席ID
participants: 被邀请人列表 [{"id": "xxx", "name": "xxx", "department": "xxx", "type": "employee"}]
history_mode: 历史共享模式 — recent10/all/none
Returns:
Conversation: 更新后的会话对象
Raises:
AppException: 校验不通过
"""
# 1. 校验会话
conversation = await self._get_conversation(conversation_id)
# 2. 权限:只有主责坐席可以邀请
if conversation.assigned_agent_id != inviter_agent_id:
raise AppException(3030, "只有主责坐席才能邀请人员加入会话")
# 3. 校验:会话必须是服务中状态
if conversation.status != "serving":
raise AppException(3031, f"只有服务中的会话才能邀请,当前状态: {conversation.status}")
# 4. 合并参与者(去重),同时补充头像
existing_participants = list(conversation.participants or [])
existing_ids = {p.get("id") for p in existing_participants}
new_added = []
for p in participants:
if p.get("id") not in existing_ids:
# 补充头像:员工类型的参与者,从 employees 表或企微API获取头像
if p.get("type", "employee") == "employee" and not p.get("avatar"):
try:
avatar_url = await self._get_employee_avatar(p["id"])
if avatar_url:
p["avatar"] = avatar_url
except Exception as e:
logger.warning(f"获取参与者头像失败(不阻塞流程): employee_id={p['id']}, error={e}")
existing_participants.append(p)
existing_ids.add(p.get("id"))
new_added.append(p)
if not new_added:
raise AppException(3032, "所有被邀请人已在该会话中")
# 5. 更新 participants 字段
conversation.participants = existing_participants
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
logger.info(
f"邀请参与者: conv_id={conversation_id}, "
f"inviter={inviter_agent_id}, new_participants={len(new_added)}"
)
# 6. 发送企微卡片通知给被邀请人
if self.wecom_service:
for p in new_added:
if p.get("type") == "employee":
try:
# 生成邀请链接:H5 端加入会话的 URL
# 格式:https://itsupport.servyou.com.cn/itdesk/?invite={conv_id}&eid={employee_id}
invite_url = (
f"{getattr(self, '_h5_base_url', '')}/itdesk/"
f"?invite={conversation.id}&eid={p['id']}"
)
await self.wecom_service.send_card_message(
user_id=p["id"],
title="IT服务台邀请您加入会话",
description=(
f"坐席邀请您加入一个IT服务会话,"
f"请点击「加入会话」查看详情并参与讨论。"
),
url=invite_url,
btntxt="加入会话",
)
except Exception as e:
logger.warning(f"发送邀请通知失败(不阻塞流程): user_id={p['id']}, error={e}")
# 7. 创建系统消息广播
await self._create_system_message(
conversation_id=conversation.id,
content=f"坐席邀请 {', '.join(p['name'] for p in new_added)} 加入会话",
extra_data={"action": "participant_invited", "participants": new_added, "history_mode": history_mode},
)
# 8. WebSocket 广播:参与者变更通知
await self._broadcast_participant_change(conversation, "participant_invited", new_added)
return conversation
async def join_conversation(
self,
conversation_id: UUID,
employee_id: str,
) -> Conversation:
"""被邀请人通过链接加入会话(P0-10)。
做什么:被邀请人点击企微卡片链接后加入会话
为什么:实现邀请-加入闭环
校验:该员工必须在 participants 列表中
Args:
conversation_id: 会话ID
employee_id: 加入的员工企微UserID
Returns:
Conversation: 会话对象
Raises:
AppException: 校验不通过
"""
# 1. 校验会话
conversation = await self._get_conversation(conversation_id)
# 2. 校验:会话必须是服务中状态
if conversation.status != "serving":
raise AppException(3033, "该会话已结束,无法加入")
# 3. 校验:该员工必须在 participants 列表中(被邀请过才能加入)
participants = list(conversation.participants or [])
is_invited = any(p.get("id") == employee_id for p in participants)
if not is_invited:
raise AppException(3034, "您未被邀请加入该会话")
# 4. 更新参与者的 joined 状态
for p in participants:
if p.get("id") == employee_id:
p["joined"] = True
p["joined_at"] = datetime.now().isoformat()
break
conversation.participants = participants
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
# 5. 获取参与者姓名
participant_name = next(
(p["name"] for p in participants if p.get("id") == employee_id),
"未知"
)
# 6. 系统消息
await self._create_system_message(
conversation_id=conversation.id,
content=f"{participant_name} 已加入会话",
extra_data={"action": "participant_joined", "employee_id": employee_id},
)
# 7. WebSocket 广播
await self._broadcast_participant_change(
conversation, "participant_joined", [{"id": employee_id, "name": participant_name}]
)
return conversation
async def remove_participant(
self,
conversation_id: UUID,
remover_agent_id: str,
target_user_id: str,
) -> Conversation:
"""移除参与者(P0-11)。
做什么:主责坐席将参与者移出会话
为什么:参与者不再需要参与时,主责坐席可以移除
权限:只有主责坐席可以移除参与者
Args:
conversation_id: 会话ID
remover_agent_id: 操作坐席ID
target_user_id: 被移除的员工UserID
Returns:
Conversation: 更新后的会话对象
Raises:
AppException: 校验不通过
"""
# 1. 校验会话
conversation = await self._get_conversation(conversation_id)
# 2. 权限:只有主责坐席可以移除
if conversation.assigned_agent_id != remover_agent_id:
raise AppException(3035, "只有主责坐席才能移除参与者")
# 3. 查找并移除
participants = list(conversation.participants or [])
removed_name = None
new_participants = []
for p in participants:
if p.get("id") == target_user_id:
removed_name = p.get("name", "未知")
else:
new_participants.append(p)
if removed_name is None:
raise AppException(3036, "该人员不在会话参与者列表中")
conversation.participants = new_participants
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
# 4. 系统消息
await self._create_system_message(
conversation_id=conversation.id,
content=f"{removed_name} 已被移出会话",
extra_data={"action": "participant_removed", "user_id": target_user_id},
)
# 5. WebSocket 广播
await self._broadcast_participant_change(
conversation, "participant_removed", [{"id": target_user_id, "name": removed_name}]
)
return conversation
async def leave_as_participant(
self,
conversation_id: UUID,
employee_id: str,
) -> Conversation:
"""参与者主动退出会话。
做什么:被邀请人主动退出会话
为什么:参与者不再需要参与时,可以自行退出
Args:
conversation_id: 会话ID
employee_id: 退出的员工企微UserID
Returns:
Conversation: 更新后的会话对象
Raises:
AppException: 校验不通过
"""
# 1. 校验会话
conversation = await self._get_conversation(conversation_id)
# 2. 查找并移除
participants = list(conversation.participants or [])
leaving_name = None
new_participants = []
for p in participants:
if p.get("id") == employee_id:
leaving_name = p.get("name", "未知")
else:
new_participants.append(p)
if leaving_name is None:
raise AppException(3037, "您不在该会话的参与者列表中")
conversation.participants = new_participants
conversation.updated_at = datetime.now()
self.db.add(conversation)
await self.db.flush()
# 3. 系统消息
await self._create_system_message(
conversation_id=conversation.id,
content=f"{leaving_name} 已退出会话",
extra_data={"action": "participant_left", "employee_id": employee_id},
)
# 4. WebSocket 广播
await self._broadcast_participant_change(
conversation, "participant_left", [{"id": employee_id, "name": leaving_name}]
)
return conversation
# ======================================================================
# 邀请功能 — 内部辅助方法
# ======================================================================
async def _create_system_message(
self,
conversation_id: str,
content: str,
extra_data: Optional[dict] = None,
) -> None:
"""创建系统消息并保存到数据库。
做什么:在会话中插入一条系统类型的消息
为什么:邀请/加入/退出等事件需要在消息流中可见
Args:
conversation_id: 会话ID(字符串)
content: 消息内容
extra_data: 扩展元数据(JSON
"""
from app.models.message import Message
system_msg = Message(
conversation_id=conversation_id,
sender_type="system",
sender_id="system",
sender_name="系统",
content=content,
msg_type="system",
extra_data=extra_data,
)
self.db.add(system_msg)
await self.db.flush()
logger.info(f"系统消息已创建: conv_id={conversation_id}, content={content[:50]}")
async def _broadcast_participant_change(
self,
conversation: Conversation,
action: str,
changed_participants: list[dict],
) -> None:
"""通过 WebSocket 广播参与者变更事件。
做什么:
1. 通知所有在线坐席参与者列表发生变化
2. 通知相关H5员工(会话的原始员工 + 被邀请参与者)参与者变更
为什么:
- 坐席端需要实时更新参与者展示
- H5员工端需要实时更新参与者面板(如新参与者加入、有人退出)
降级策略:广播失败不阻塞主流程
Args:
conversation: 会话对象
action: 事件类型(participant_invited/joined/removed/left
changed_participants: 变更的参与者列表
"""
from app.services.ws_manager import manager as ws_manager
event_data = {
"type": action,
"data": {
"conversation_id": str(conversation.id),
"participants": conversation.participants or [],
"changed": changed_participants,
}
}
try:
# 1. 广播给所有在线坐席
await ws_manager.broadcast(event_data)
except Exception as e:
logger.warning(f"WebSocket广播参与者变更失败(坐席端,不阻塞流程): {e}")
try:
# 2. 推送给相关H5员工(原始员工 + 所有参与者中已加入的员工)
# 做什么:收集会话相关员工的ID,通过 WS 推送
# 为什么:H5端需要实时刷新参与者面板,不用等3秒轮询
employee_ids = set()
# 原始员工(会话发起人)
if conversation.employee_id:
employee_ids.add(conversation.employee_id)
# 所有参与者中的员工类型
for p in (conversation.participants or []):
if p.get("type", "employee") == "employee" and p.get("id"):
employee_ids.add(p["id"])
if employee_ids:
await ws_manager.broadcast_to_employees(list(employee_ids), event_data)
except Exception as e:
logger.warning(f"WebSocket推送参与者变更失败(H5员工端,不阻塞流程): {e}")