# ============================================================================= # 企微IT智能服务台 — 会话状态管理服务 # ============================================================================= # 说明:管理会话的完整生命周期,包括: # 1. 创建会话(新员工发消息时自动创建) # 2. 更新会话状态(queued → serving → resolved) # 3. 分配坐席 # 4. 结单 # 5. 获取会话列表(支持排序:紧急→举手→需介入→活跃→已结单) # 6. 获取坐席当前服务的会话列表 # ============================================================================= import logging from datetime import datetime from typing import List, Optional from uuid import UUID from sqlalchemy import and_, case, desc, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.agent import Agent from app.models.conversation import Conversation from app.services.wecom_service import WecomService from app.utils.response import ( AppException, ERR_AGENT_BUSY, ERR_AGENT_NOT_FOUND, ERR_CONVERSATION_NOT_FOUND, ERR_CONVERSATION_RESOLVED, ERR_DUPLICATE_ASSIGN, ) logger = logging.getLogger(__name__) class SessionService: """会话状态管理服务。 管理会话的完整生命周期,实现会话状态流转和坐席分配逻辑。 """ def __init__( self, db: AsyncSession, wecom_service: Optional[WecomService] = None, ): """初始化会话状态管理服务。 Args: db: 异步数据库会话 wecom_service: 企微 API 服务(用于坐席接入时发送通知,可选) """ self.db = db self.wecom_service = wecom_service # -------------------------------------------------------------------------- # 创建会话 # -------------------------------------------------------------------------- async def create_conversation( self, employee_id: str, employee_name: str = "", department: str = "", position: str = "", level: str = "", ) -> Conversation: """创建新会话。 当员工首次发消息或摇人时自动创建。 新会话默认状态为 queued(排队等坐席)。 Args: employee_id: 企微员工 UserID employee_name: 员工姓名 department: 部门 position: 岗位 level: 等级 Returns: Conversation: 新创建的会话对象 """ conversation = Conversation( employee_id=employee_id, employee_name=employee_name, department=department, position=position, level=level, status="queued", is_vip=False, is_pinned=False, is_todo=False, urgency_score=1, tags={}, ) self.db.add(conversation) await self.db.flush() logger.info(f"创建会话: conv_id={conversation.id}, employee={employee_id}") return conversation # -------------------------------------------------------------------------- # 更新会话状态 # -------------------------------------------------------------------------- async def update_status( self, conversation_id: UUID, new_status: str ) -> Conversation: """更新会话状态。 状态流转规则: - queued → serving: 坐席接单 - serving → resolved: 结单 - queued → resolved: 直接结单(排队中员工问题已自行解决) Args: conversation_id: 会话ID new_status: 新状态(queued/serving/resolved/ai_handling) Returns: Conversation: 更新后的会话对象 Raises: AppException: 会话不存在或状态流转不合法 """ conversation = await self._get_conversation(conversation_id) # 校验状态流转合法性 valid_transitions = { "queued": ["serving", "resolved"], "serving": ["resolved"], "ai_handling": ["queued", "serving", "resolved"], "resolved": [], # 已结单不能再改状态 } allowed = valid_transitions.get(conversation.status, []) if new_status not in allowed and new_status != conversation.status: raise AppException( 3010, f"会话状态流转不合法: {conversation.status} → {new_status}", ) # 如果是已结单,不能再改状态 if conversation.status == "resolved": raise ERR_CONVERSATION_RESOLVED conversation.status = new_status conversation.updated_at = datetime.now() self.db.add(conversation) await self.db.flush() logger.info( f"会话状态更新: conv_id={conversation_id}, " f"{conversation.status} → {new_status}" ) return conversation # -------------------------------------------------------------------------- # 分配坐席(接单) # -------------------------------------------------------------------------- async def assign_agent( self, conversation_id: UUID, agent_id: str ) -> Conversation: """分配坐席接入会话。 流程: 1. 校验会话存在且未结单 2. 校验坐席存在且在线 3. 校验坐席未满负荷 4. 更新会话状态为 serving 5. 更新坐席当前服务数 +1 6. 通过企微 API 向员工发送接入通知 Args: conversation_id: 会话ID agent_id: 坐席ID Returns: Conversation: 更新后的会话对象 Raises: AppException: 校验不通过 """ # 1. 校验会话 conversation = await self._get_conversation(conversation_id) if conversation.status == "resolved": raise ERR_CONVERSATION_RESOLVED if conversation.assigned_agent_id and conversation.status == "serving": raise ERR_DUPLICATE_ASSIGN # 2. 校验坐席(本地开发模式:自动创建不存在的坐席) stmt = select(Agent).where(Agent.user_id == agent_id) result = await self.db.execute(stmt) agent = result.scalars().first() if not agent: # DEV MODE: 本地开发时自动创建坐席,避免必须先登录才能测试接单 logger.warning(f"坐席不存在,自动创建: user_id={agent_id}") agent = Agent( user_id=agent_id, name="未知坐席", status="online", current_load=0, max_load=5, ) self.db.add(agent) await self.db.flush() if agent.status == "offline": raise AppException(3007, "坐席不在线,无法接单") if agent.current_load >= agent.max_load: raise ERR_AGENT_BUSY # 3. 更新会话 conversation.status = "serving" conversation.assigned_agent_id = agent_id conversation.updated_at = datetime.now() self.db.add(conversation) # 4. 更新坐席服务数 agent.current_load += 1 self.db.add(agent) await self.db.flush() # 5. 发送接入通知给员工 if self.wecom_service: try: await self.wecom_service.send_text_message( conversation.employee_id, "人摇来了!IT坐席为您服务", ) except Exception as e: logger.warning(f"发送接入通知失败(不阻塞流程): {e}") logger.info( f"坐席接单: conv_id={conversation_id}, agent={agent_id}" ) # ---------------------------------------------------------------------- # 广播 WebSocket 事件:会话状态变更 # ---------------------------------------------------------------------- # 做什么:通知所有在线坐席,有会话被接单了 # 为什么:其他坐席需要实时看到该会话从"排队"变为"服务中" # 用 try/except 包裹:广播失败不能阻塞接单主流程 # ---------------------------------------------------------------------- from app.services.ws_manager import manager as ws_manager try: await ws_manager.broadcast({ "type": "conversation_updated", "data": { "conversation_id": str(conversation.id), "status": conversation.status, "assigned_agent_id": conversation.assigned_agent_id, } }) except Exception as e: logger.warning(f"WebSocket广播失败: {e}") return conversation # -------------------------------------------------------------------------- # 结单 # -------------------------------------------------------------------------- async def resolve_conversation( self, conversation_id: UUID, agent_id: Optional[str] = None ) -> Conversation: """结单。 流程: 1. 校验会话存在 2. 更新会话状态为 resolved 3. 如果有坐席,更新坐席当前服务数 -1 Args: conversation_id: 会话ID agent_id: 坐席ID(可选,用于更新坐席服务数) Returns: Conversation: 更新后的会话对象 """ conversation = await self._get_conversation(conversation_id) if conversation.status == "resolved": raise ERR_CONVERSATION_RESOLVED # 更新会话状态 conversation.status = "resolved" conversation.updated_at = datetime.now() self.db.add(conversation) # 更新坐席服务数 assigned_agent_id = agent_id or conversation.assigned_agent_id if assigned_agent_id: stmt = select(Agent).where(Agent.user_id == assigned_agent_id) result = await self.db.execute(stmt) agent = result.scalars().first() if agent and agent.current_load > 0: agent.current_load -= 1 self.db.add(agent) await self.db.flush() logger.info(f"会话结单: conv_id={conversation_id}") # ---------------------------------------------------------------------- # 广播 WebSocket 事件:会话已结单 # ---------------------------------------------------------------------- # 做什么:通知所有在线坐席,有会话已结单 # 为什么:坐席需要实时看到该会话从"服务中"变为"已结单" # 用 try/except 包裹:广播失败不能阻塞结单主流程 # ---------------------------------------------------------------------- from app.services.ws_manager import manager as ws_manager try: await ws_manager.broadcast({ "type": "conversation_updated", "data": { "conversation_id": str(conversation.id), "status": conversation.status, "assigned_agent_id": conversation.assigned_agent_id, } }) except Exception as e: logger.warning(f"WebSocket广播失败: {e}") return conversation # -------------------------------------------------------------------------- # 置顶/取消置顶 # -------------------------------------------------------------------------- async def toggle_pin(self, conversation_id: UUID) -> Conversation: """切换会话置顶状态。 Args: conversation_id: 会话ID Returns: Conversation: 更新后的会话对象 """ conversation = await self._get_conversation(conversation_id) conversation.is_pinned = not conversation.is_pinned conversation.updated_at = datetime.now() self.db.add(conversation) await self.db.flush() logger.info( f"会话置顶{'开启' if conversation.is_pinned else '取消'}: " f"conv_id={conversation_id}" ) return conversation # -------------------------------------------------------------------------- # 代办/取消代办 # -------------------------------------------------------------------------- async def toggle_todo(self, conversation_id: UUID) -> Conversation: """切换会话代办状态。 Args: conversation_id: 会话ID Returns: Conversation: 更新后的会话对象 """ conversation = await self._get_conversation(conversation_id) conversation.is_todo = not conversation.is_todo conversation.updated_at = datetime.now() self.db.add(conversation) await self.db.flush() logger.info( f"会话代办{'开启' if conversation.is_todo else '取消'}: " f"conv_id={conversation_id}" ) return conversation # -------------------------------------------------------------------------- # 摇人 — 邀请坐席加入协作 # -------------------------------------------------------------------------- async def invite_collaborator( self, conversation_id: UUID, inviter_agent_id: str, invitee_agent_id: str, ) -> Conversation: """邀请坐席加入协作。 做什么:坐席A在处理会话时发现需要坐席B的专业知识,点击「摇人」 将坐席B加入 collaborating_agent_ids 列表,坐席B可查看和回复但不能结单。 校验规则: 1. 会话存在且为 serving(已结单的不能摇人) 2. 邀请人在主责坐席或协作坐席列表中 3. 被邀请人不能是主责坐席,也不能已在协作列表中(防重复) 4. 被邀请坐席存在且在线 Args: conversation_id: 会话ID inviter_agent_id: 邀请人坐席ID invitee_agent_id: 被邀请坐席ID Returns: Conversation: 更新后的会话对象 Raises: AppException: 校验不通过 """ # 1. 校验会话 conversation = await self._get_conversation(conversation_id) if conversation.status == "resolved": raise ERR_CONVERSATION_RESOLVED if conversation.status != "serving": raise AppException(3020, "只能邀请协作服务中的会话") # 2. 校验邀请人权限:必须是主责坐席或已在协作列表中 is_owner = conversation.assigned_agent_id == inviter_agent_id is_collaborator = inviter_agent_id in (conversation.collaborating_agent_ids or []) if not is_owner and not is_collaborator: raise AppException(3021, "只有主责坐席或协作坐席才能摇人") # 3. 校验被邀请人:不能是主责坐席 if conversation.assigned_agent_id == invitee_agent_id: raise AppException(3022, "不能邀请主责坐席协作(他已经在处理了)") # 4. 校验被邀请人:不能已在协作列表中 if invitee_agent_id in (conversation.collaborating_agent_ids or []): raise AppException(3023, "该坐席已在协作中,无需重复邀请") # 5. 校验被邀请坐席存在且在线 stmt = select(Agent).where(Agent.user_id == invitee_agent_id) result = await self.db.execute(stmt) invitee = result.scalars().first() if not invitee: raise ERR_AGENT_NOT_FOUND if invitee.status == "offline": raise AppException(3024, "被邀请坐席不在线") # 6. 将被邀请人加入协作列表 collab_ids = list(conversation.collaborating_agent_ids or []) collab_ids.append(invitee_agent_id) conversation.collaborating_agent_ids = collab_ids conversation.updated_at = datetime.now() self.db.add(conversation) await self.db.flush() logger.info( f"摇人成功: conv_id={conversation_id}, " f"inviter={inviter_agent_id}, invitee={invitee_agent_id}" ) # ---------------------------------------------------------------------- # 广播 + 定向推送 WebSocket 事件 # ---------------------------------------------------------------------- # 做什么: # 1. broadcast — 所有坐席看到协作关系变化(刷新会话列表) # 2. send_to_agent — 被邀请人收到定向通知(弹窗提示) # 用 try/except 包裹:推送失败不能阻塞主流程 # ---------------------------------------------------------------------- from app.services.ws_manager import manager as ws_manager inviter_name = inviter_agent_id # 前端会从 agent_name_map 获取真实姓名 try: # 广播:会话协作关系变更(所有坐席刷新列表) await ws_manager.broadcast({ "type": "collaborator_joined", "data": { "conversation_id": str(conversation.id), "agent_id": invitee_agent_id, "inviter_agent_id": inviter_agent_id, } }) # 定向推送:通知被邀请人 await ws_manager.send_to_agent(invitee_agent_id, { "type": "collaborator_invited", "data": { "conversation_id": str(conversation.id), "inviter_agent_id": inviter_agent_id, "invitee_agent_id": invitee_agent_id, "employee_name": conversation.employee_name, "last_message_summary": conversation.last_message_summary or "", } }) except Exception as e: logger.warning(f"WebSocket推送失败(不阻塞流程): {e}") return conversation # -------------------------------------------------------------------------- # 摇人 — 退出协作 # -------------------------------------------------------------------------- async def leave_collaboration( self, conversation_id: UUID, agent_id: str, ) -> Conversation: """坐席退出协作。 做什么:将坐席从 collaborating_agent_ids 中移除。 校验规则: 1. 坐席必须在协作列表中 2. 坐席不能是主责坐席(主责坐席不能"退出",只能转接或结单) Args: conversation_id: 会话ID agent_id: 退出协作的坐席ID Returns: Conversation: 更新后的会话对象 Raises: AppException: 校验不通过 """ # 1. 校验会话 conversation = await self._get_conversation(conversation_id) # 2. 校验:不能是主责坐席 if conversation.assigned_agent_id == agent_id: raise AppException(3025, "主责坐席不能退出协作,请使用转接或结单") # 3. 校验:必须在协作列表中 collab_ids = list(conversation.collaborating_agent_ids or []) if agent_id not in collab_ids: raise AppException(3026, "您不在该会话的协作列表中") # 4. 移除 collab_ids.remove(agent_id) conversation.collaborating_agent_ids = collab_ids conversation.updated_at = datetime.now() self.db.add(conversation) await self.db.flush() logger.info(f"退出协作: conv_id={conversation_id}, agent={agent_id}") # ---------------------------------------------------------------------- # 广播 WebSocket 事件:协作坐席退出 # ---------------------------------------------------------------------- from app.services.ws_manager import manager as ws_manager try: await ws_manager.broadcast({ "type": "collaborator_left", "data": { "conversation_id": str(conversation.id), "agent_id": agent_id, } }) except Exception as e: logger.warning(f"WebSocket推送失败(不阻塞流程): {e}") return conversation # -------------------------------------------------------------------------- # 转接会话 # -------------------------------------------------------------------------- async def transfer_conversation( self, conversation_id: UUID, target_agent_id: str ) -> Conversation: """转接会话到另一个坐席。 第一步简化版:只更换坐席,不做转接通知。 Args: conversation_id: 会话ID target_agent_id: 目标坐席ID Returns: Conversation: 更新后的会话对象 """ conversation = await self._get_conversation(conversation_id) # 校验目标坐席 stmt = select(Agent).where(Agent.user_id == target_agent_id) result = await self.db.execute(stmt) target_agent = result.scalars().first() if not target_agent: raise ERR_AGENT_NOT_FOUND if target_agent.current_load >= target_agent.max_load: raise ERR_AGENT_BUSY # 旧坐席服务数 -1 old_agent_id = conversation.assigned_agent_id if old_agent_id: stmt = select(Agent).where(Agent.user_id == old_agent_id) result = await self.db.execute(stmt) old_agent = result.scalars().first() if old_agent and old_agent.current_load > 0: old_agent.current_load -= 1 self.db.add(old_agent) # 新坐席服务数 +1 target_agent.current_load += 1 self.db.add(target_agent) # 更新会话 conversation.assigned_agent_id = target_agent_id conversation.updated_at = datetime.now() self.db.add(conversation) await self.db.flush() logger.info( f"会话转接: conv_id={conversation_id}, " f"from={old_agent_id} to={target_agent_id}" ) # ---------------------------------------------------------------------- # 广播 WebSocket 事件:会话转接 # ---------------------------------------------------------------------- # 做什么:通知所有在线坐席,有会话被转接了 # 为什么:原坐席和目标坐席都需要实时看到会话归属变化 # 用 try/except 包裹:广播失败不能阻塞转接主流程 # ---------------------------------------------------------------------- from app.services.ws_manager import manager as ws_manager try: await ws_manager.broadcast({ "type": "conversation_updated", "data": { "conversation_id": str(conversation.id), "status": conversation.status, "assigned_agent_id": conversation.assigned_agent_id, } }) except Exception as e: logger.warning(f"WebSocket广播失败: {e}") return conversation # -------------------------------------------------------------------------- # 获取会话列表(坐席端) # -------------------------------------------------------------------------- async def get_conversations( self, status: Optional[str] = None, agent_id: Optional[str] = None, page: int = 1, page_size: int = 50, ) -> tuple[List[Conversation], int]: """获取会话列表,支持过滤和排序。 排序规则(PRD 定义): 紧急 → 举手 → 需介入 → 活跃 → 已结单 同级别按 last_message_at 倒序 实现方式:先按数据库基础排序(状态+置顶+紧急度), 再在 Python 侧按完整规则精细排序(含 JSON tags 字段)。 Args: status: 按状态过滤(可选) agent_id: 按坐席ID过滤(可选,查看某坐席的会话) page: 页码(从1开始) page_size: 每页数量 Returns: tuple[List[Conversation], int]: (会话列表, 总数) """ # 构建查询条件 conditions = [] if status: conditions.append(Conversation.status == status) if agent_id: conditions.append(Conversation.assigned_agent_id == agent_id) # 查询总数 count_stmt = select(func.count(Conversation.id)) if conditions: count_stmt = count_stmt.where(and_(*conditions)) total_result = await self.db.execute(count_stmt) total = total_result.scalar() or 0 # 数据库侧基础排序(快速过滤): # 置顶 > 紧急度5 > 紧急度4 > 紧急度3 > 状态排序 > 最后消息时间 # JSON tags 字段的排序在 Python 侧完成(SQLite 不支持 JSON 操作符) db_order_weight = case( (Conversation.is_pinned == True, 1000), (Conversation.urgency_score >= 5, 900), (Conversation.urgency_score >= 4, 600), (Conversation.urgency_score >= 3, 300), (Conversation.status == "queued", 200), (Conversation.status == "ai_handling", 150), (Conversation.status == "serving", 100), else_=0, ) stmt = select(Conversation) if conditions: stmt = stmt.where(and_(*conditions)) # 数据库侧先按基础权重 + 最后消息时间排序 stmt = stmt.order_by(desc(db_order_weight), desc(Conversation.last_message_at)) # 查询所有符合条件的会话(数据量不大时可行;生产环境建议改用 PostgreSQL + JSONB 操作符) result = await self.db.execute(stmt) all_conversations = list(result.scalars().all()) # ===== Python 侧精细排序(支持 JSON tags 字段)===== def _sort_key(conv: Conversation): """计算完整排序权重(数值越大越靠前)""" weight = 0 tags = conv.tags or {} # 置顶(最高优先级) if conv.is_pinned: weight += 10000 # 紧急度评分(越高越靠前) urgency = conv.urgency_score or 0 if urgency >= 5: weight += 9000 elif urgency >= 4: weight += 6000 elif urgency >= 3: weight += 3000 # 举手标记 if tags.get("hand_raise"): weight += 8000 # 需介入标记 if tags.get("need_intervene"): weight += 7000 # 情绪标记(非 neutral) emotion = tags.get("emotion", "neutral") if emotion and emotion != "neutral": weight += 5000 # 状态排序 status_order = { "queued": 2000, "ai_handling": 1500, "serving": 1000, "resolved": 0, } weight += status_order.get(conv.status, 0) # 最后消息时间(时间戳越大越靠前,除以 1e6 归一化到合理范围) import time if conv.last_message_at: ts = conv.last_message_at.timestamp() else: ts = 0 # 用 (weight, ts) 元组排序:先按 weight 降序,再按 ts 降序 return (weight, ts) # 按完整规则排序(降序) all_conversations.sort(key=_sort_key, reverse=True) # 分页 offset = (page - 1) * page_size conversations = all_conversations[offset:offset + page_size] return conversations, total # -------------------------------------------------------------------------- # 获取坐席当前服务的会话列表 # -------------------------------------------------------------------------- async def get_agent_conversations( self, agent_id: str ) -> List[Conversation]: """获取坐席当前正在服务的会话列表。 只返回状态为 serving 且分配给该坐席的会话。 Args: agent_id: 坐席ID Returns: List[Conversation]: 会话列表 """ stmt = ( select(Conversation) .where( Conversation.assigned_agent_id == agent_id, Conversation.status == "serving", ) .order_by(desc(Conversation.last_message_at)) ) result = await self.db.execute(stmt) return list(result.scalars().all()) # -------------------------------------------------------------------------- # 获取会话详情 # -------------------------------------------------------------------------- async def _get_conversation(self, conversation_id: UUID) -> Conversation: """获取会话对象,不存在则抛异常。 Args: conversation_id: 会话ID Returns: Conversation: 会话对象 Raises: AppException: 会话不存在 """ # 将 UUID 转为字符串,确保与 String(36) 列类型匹配 conv_id_str = str(conversation_id) stmt = select(Conversation).where(Conversation.id == conv_id_str) result = await self.db.execute(stmt) conversation = result.scalars().first() if not conversation: raise ERR_CONVERSATION_NOT_FOUND return conversation async def get_conversation(self, conversation_id: UUID) -> Conversation: """获取会话详情(公开方法)。 Args: conversation_id: 会话ID Returns: Conversation: 会话对象 Raises: AppException: 会话不存在 """ return await self._get_conversation(conversation_id) # ====================================================================== # 邀请功能(P0-09~P0-11):坐席邀请员工/部门加入会话 # ====================================================================== async def _get_employee_avatar(self, employee_id: str) -> str: """获取员工头像URL。 做什么:从 employees 表或企微通讯录API获取头像 为什么:邀请参与者时需要展示头像,前端无法单独获取被邀请人头像 优先级:employees 表(本地缓存) > 企微API(实时获取) Args: employee_id: 企微员工UserID Returns: str: 头像URL,获取不到返回空字符串 """ # 1. 优先从 employees 表查(本地缓存,速度快) from app.models.employee import Employee result = await self.db.execute( select(Employee.avatar).where(Employee.employee_id == employee_id) ) row = result.first() if row and row[0]: return row[0] # 2. employees 表没有,尝试从企微通讯录API获取 if self.wecom_service: try: user_info = await self.wecom_service.get_user_info(employee_id) avatar = user_info.get("avatar", "") return avatar except Exception as e: logger.warning(f"从企微API获取头像失败: employee_id={employee_id}, error={e}") return "" async def invite_participants( self, conversation_id: UUID, inviter_agent_id: str, participants: list[dict], history_mode: str = "recent10", ) -> Conversation: """邀请员工/部门加入会话(P0-09)。 做什么:主责坐席邀请非坐席人员(员工/部门)参与会话 为什么:复杂IT问题可能需要业务方人员补充信息 权限:只有主责坐席可以发起邀请 Args: conversation_id: 会话ID inviter_agent_id: 邀请人坐席ID participants: 被邀请人列表 [{"id": "xxx", "name": "xxx", "department": "xxx", "type": "employee"}] history_mode: 历史共享模式 — recent10/all/none Returns: Conversation: 更新后的会话对象 Raises: AppException: 校验不通过 """ # 1. 校验会话 conversation = await self._get_conversation(conversation_id) # 2. 权限:只有主责坐席可以邀请 if conversation.assigned_agent_id != inviter_agent_id: raise AppException(3030, "只有主责坐席才能邀请人员加入会话") # 3. 校验:会话必须是服务中状态 if conversation.status != "serving": raise AppException(3031, f"只有服务中的会话才能邀请,当前状态: {conversation.status}") # 4. 合并参与者(去重),同时补充头像 existing_participants = list(conversation.participants or []) existing_ids = {p.get("id") for p in existing_participants} new_added = [] for p in participants: if p.get("id") not in existing_ids: # 补充头像:员工类型的参与者,从 employees 表或企微API获取头像 if p.get("type", "employee") == "employee" and not p.get("avatar"): try: avatar_url = await self._get_employee_avatar(p["id"]) if avatar_url: p["avatar"] = avatar_url except Exception as e: logger.warning(f"获取参与者头像失败(不阻塞流程): employee_id={p['id']}, error={e}") existing_participants.append(p) existing_ids.add(p.get("id")) new_added.append(p) if not new_added: raise AppException(3032, "所有被邀请人已在该会话中") # 5. 更新 participants 字段 conversation.participants = existing_participants conversation.updated_at = datetime.now() self.db.add(conversation) await self.db.flush() logger.info( f"邀请参与者: conv_id={conversation_id}, " f"inviter={inviter_agent_id}, new_participants={len(new_added)}" ) # 6. 发送企微卡片通知给被邀请人 if self.wecom_service: for p in new_added: if p.get("type") == "employee": try: # 生成邀请链接:H5 端加入会话的 URL # 格式:https://itsupport.servyou.com.cn/itdesk/?invite={conv_id}&eid={employee_id} invite_url = ( f"{getattr(self, '_h5_base_url', '')}/itdesk/" f"?invite={conversation.id}&eid={p['id']}" ) await self.wecom_service.send_card_message( user_id=p["id"], title="IT服务台邀请您加入会话", description=( f"坐席邀请您加入一个IT服务会话," f"请点击「加入会话」查看详情并参与讨论。" ), url=invite_url, btntxt="加入会话", ) except Exception as e: logger.warning(f"发送邀请通知失败(不阻塞流程): user_id={p['id']}, error={e}") # 7. 创建系统消息广播 await self._create_system_message( conversation_id=conversation.id, content=f"坐席邀请 {', '.join(p['name'] for p in new_added)} 加入会话", extra_data={"action": "participant_invited", "participants": new_added, "history_mode": history_mode}, ) # 8. WebSocket 广播:参与者变更通知 await self._broadcast_participant_change(conversation, "participant_invited", new_added) return conversation async def join_conversation( self, conversation_id: UUID, employee_id: str, ) -> Conversation: """被邀请人通过链接加入会话(P0-10)。 做什么:被邀请人点击企微卡片链接后加入会话 为什么:实现邀请-加入闭环 校验:该员工必须在 participants 列表中 Args: conversation_id: 会话ID employee_id: 加入的员工企微UserID Returns: Conversation: 会话对象 Raises: AppException: 校验不通过 """ # 1. 校验会话 conversation = await self._get_conversation(conversation_id) # 2. 校验:会话必须是服务中状态 if conversation.status != "serving": raise AppException(3033, "该会话已结束,无法加入") # 3. 校验:该员工必须在 participants 列表中(被邀请过才能加入) participants = list(conversation.participants or []) is_invited = any(p.get("id") == employee_id for p in participants) if not is_invited: raise AppException(3034, "您未被邀请加入该会话") # 4. 更新参与者的 joined 状态 for p in participants: if p.get("id") == employee_id: p["joined"] = True p["joined_at"] = datetime.now().isoformat() break conversation.participants = participants conversation.updated_at = datetime.now() self.db.add(conversation) await self.db.flush() # 5. 获取参与者姓名 participant_name = next( (p["name"] for p in participants if p.get("id") == employee_id), "未知" ) # 6. 系统消息 await self._create_system_message( conversation_id=conversation.id, content=f"{participant_name} 已加入会话", extra_data={"action": "participant_joined", "employee_id": employee_id}, ) # 7. WebSocket 广播 await self._broadcast_participant_change( conversation, "participant_joined", [{"id": employee_id, "name": participant_name}] ) return conversation async def remove_participant( self, conversation_id: UUID, remover_agent_id: str, target_user_id: str, ) -> Conversation: """移除参与者(P0-11)。 做什么:主责坐席将参与者移出会话 为什么:参与者不再需要参与时,主责坐席可以移除 权限:只有主责坐席可以移除参与者 Args: conversation_id: 会话ID remover_agent_id: 操作坐席ID target_user_id: 被移除的员工UserID Returns: Conversation: 更新后的会话对象 Raises: AppException: 校验不通过 """ # 1. 校验会话 conversation = await self._get_conversation(conversation_id) # 2. 权限:只有主责坐席可以移除 if conversation.assigned_agent_id != remover_agent_id: raise AppException(3035, "只有主责坐席才能移除参与者") # 3. 查找并移除 participants = list(conversation.participants or []) removed_name = None new_participants = [] for p in participants: if p.get("id") == target_user_id: removed_name = p.get("name", "未知") else: new_participants.append(p) if removed_name is None: raise AppException(3036, "该人员不在会话参与者列表中") conversation.participants = new_participants conversation.updated_at = datetime.now() self.db.add(conversation) await self.db.flush() # 4. 系统消息 await self._create_system_message( conversation_id=conversation.id, content=f"{removed_name} 已被移出会话", extra_data={"action": "participant_removed", "user_id": target_user_id}, ) # 5. WebSocket 广播 await self._broadcast_participant_change( conversation, "participant_removed", [{"id": target_user_id, "name": removed_name}] ) return conversation async def leave_as_participant( self, conversation_id: UUID, employee_id: str, ) -> Conversation: """参与者主动退出会话。 做什么:被邀请人主动退出会话 为什么:参与者不再需要参与时,可以自行退出 Args: conversation_id: 会话ID employee_id: 退出的员工企微UserID Returns: Conversation: 更新后的会话对象 Raises: AppException: 校验不通过 """ # 1. 校验会话 conversation = await self._get_conversation(conversation_id) # 2. 查找并移除 participants = list(conversation.participants or []) leaving_name = None new_participants = [] for p in participants: if p.get("id") == employee_id: leaving_name = p.get("name", "未知") else: new_participants.append(p) if leaving_name is None: raise AppException(3037, "您不在该会话的参与者列表中") conversation.participants = new_participants conversation.updated_at = datetime.now() self.db.add(conversation) await self.db.flush() # 3. 系统消息 await self._create_system_message( conversation_id=conversation.id, content=f"{leaving_name} 已退出会话", extra_data={"action": "participant_left", "employee_id": employee_id}, ) # 4. WebSocket 广播 await self._broadcast_participant_change( conversation, "participant_left", [{"id": employee_id, "name": leaving_name}] ) return conversation # ====================================================================== # 邀请功能 — 内部辅助方法 # ====================================================================== async def _create_system_message( self, conversation_id: str, content: str, extra_data: Optional[dict] = None, ) -> None: """创建系统消息并保存到数据库。 做什么:在会话中插入一条系统类型的消息 为什么:邀请/加入/退出等事件需要在消息流中可见 Args: conversation_id: 会话ID(字符串) content: 消息内容 extra_data: 扩展元数据(JSON) """ from app.models.message import Message system_msg = Message( conversation_id=conversation_id, sender_type="system", sender_id="system", sender_name="系统", content=content, msg_type="system", extra_data=extra_data, ) self.db.add(system_msg) await self.db.flush() logger.info(f"系统消息已创建: conv_id={conversation_id}, content={content[:50]}") async def _broadcast_participant_change( self, conversation: Conversation, action: str, changed_participants: list[dict], ) -> None: """通过 WebSocket 广播参与者变更事件。 做什么: 1. 通知所有在线坐席参与者列表发生变化 2. 通知相关H5员工(会话的原始员工 + 被邀请参与者)参与者变更 为什么: - 坐席端需要实时更新参与者展示 - H5员工端需要实时更新参与者面板(如新参与者加入、有人退出) 降级策略:广播失败不阻塞主流程 Args: conversation: 会话对象 action: 事件类型(participant_invited/joined/removed/left) changed_participants: 变更的参与者列表 """ from app.services.ws_manager import manager as ws_manager event_data = { "type": action, "data": { "conversation_id": str(conversation.id), "participants": conversation.participants or [], "changed": changed_participants, } } try: # 1. 广播给所有在线坐席 await ws_manager.broadcast(event_data) except Exception as e: logger.warning(f"WebSocket广播参与者变更失败(坐席端,不阻塞流程): {e}") try: # 2. 推送给相关H5员工(原始员工 + 所有参与者中已加入的员工) # 做什么:收集会话相关员工的ID,通过 WS 推送 # 为什么:H5端需要实时刷新参与者面板,不用等3秒轮询 employee_ids = set() # 原始员工(会话发起人) if conversation.employee_id: employee_ids.add(conversation.employee_id) # 所有参与者中的员工类型 for p in (conversation.participants or []): if p.get("type", "employee") == "employee" and p.get("id"): employee_ids.add(p["id"]) if employee_ids: await ws_manager.broadcast_to_employees(list(employee_ids), event_data) except Exception as e: logger.warning(f"WebSocket推送参与者变更失败(H5员工端,不阻塞流程): {e}")