# ============================================================================= # 企微IT智能服务台 — WebSocket 连接管理器 # ============================================================================= # 说明:管理所有坐席和H5员工的 WebSocket 连接,提供: # 1. 坐席连接注册/注销(坐席上线/下线) # 2. H5员工连接注册/注销(员工打开H5页面时建立) # 3. 向指定坐席/员工发送消息(定向推送) # 4. 广播消息给所有在线坐席(全员推送) # 5. 向指定员工推送消息(参与者事件定向推送) # 6. 自动清理断连的 WebSocket 连接 # # 设计决策: # - 使用模块级单例,全局共享同一个 ConnectionManager 实例 # - broadcast 遇到发送失败自动断开该连接,避免僵尸连接积累 # - 所有发送方法都不阻塞调用方,失败只记 warning 不抛异常 # - 坐席和员工连接分开管理(不同认证体系、不同推送需求) # ============================================================================= import logging from typing import Dict, List from fastapi import WebSocket logger = logging.getLogger(__name__) class ConnectionManager: """管理所有坐席和H5员工的 WebSocket 连接。 核心职责: - 维护 agent_id → WebSocket 的映射表(坐席连接) - 维护 employee_id → WebSocket 的映射表(H5员工连接) - 提供定向推送和广播能力 - 自动清理无效连接 为什么需要这个类: - FastAPI 的 WebSocket 是无状态的,需要一个集中管理器来跟踪所有活跃连接 - 后端服务(消息路由、会话管理等)需要通过此管理器向前端推送实时事件 """ def __init__(self) -> None: """初始化连接管理器。 active_connections: 字典,key=坐席ID,value=WebSocket连接对象 同一个坐席只保留最新的连接(后连接的会替换旧连接) employee_connections: 字典,key=员工ID,value=WebSocket连接对象 同一个员工只保留最新的连接(后连接的会替换旧连接) """ # 坐席连接(agent_id → WebSocket) self.active_connections: Dict[str, WebSocket] = {} # H5员工连接(employee_id → WebSocket) self.employee_connections: Dict[str, WebSocket] = {} # ========================================================================== # 坐席连接管理 # ========================================================================== async def connect(self, agent_id: str, websocket: WebSocket) -> None: """接受坐席 WebSocket 握手并注册连接。 做什么:完成 WebSocket 握手(accept),然后将连接存入映射表 为什么:必须在 send_json 之前 accept,否则客户端收不到消息 如果同一坐席重复连接(如刷新页面),旧连接会被覆盖, 旧连接的 onclose 回调会触发 disconnect 做清理。 Args: agent_id: 坐席ID(企微 UserID) websocket: FastAPI WebSocket 对象 """ # 完成 WebSocket 握手(必须先 accept 才能收发消息) await websocket.accept() # 如果该坐席已有连接,先关闭旧连接 # 场景:坐席刷新页面或重新登录,会产生新连接 if agent_id in self.active_connections: old_ws = self.active_connections[agent_id] try: await old_ws.close() except Exception: # 旧连接可能已经断开,忽略关闭错误 pass # 注册新连接 self.active_connections[agent_id] = websocket logger.info( f"坐席 WebSocket 连接建立: agent_id={agent_id}, " f"当前在线坐席数={len(self.active_connections)}" ) def disconnect(self, agent_id: str) -> None: """从坐席映射表中移除连接。 做什么:删除 agent_id 对应的 WebSocket 映射 为什么:坐席关闭页面或网络断开时,需要清理映射表,避免向已断开的连接发消息 注意:只做映射表清理,不主动关闭 WebSocket(由调用方或 onclose 回调处理) Args: agent_id: 坐席ID """ if agent_id in self.active_connections: del self.active_connections[agent_id] logger.info( f"坐席 WebSocket 连接断开: agent_id={agent_id}, " f"当前在线坐席数={len(self.active_connections)}" ) async def send_to_agent(self, agent_id: str, data: dict) -> None: """向指定坐席发送消息。 做什么:通过 WebSocket 向指定坐席推送 JSON 数据 为什么:某些事件只需要通知特定坐席(如会话分配给你了) 如果发送失败(连接已断开),自动清理该连接。 Args: agent_id: 目标坐席ID data: 要发送的数据(会被序列化为 JSON) """ websocket = self.active_connections.get(agent_id) if not websocket: # 该坐席不在线,跳过 logger.debug(f"坐席不在线,跳过推送: agent_id={agent_id}") return try: await websocket.send_json(data) except Exception as e: # 发送失败 → 连接已断开,自动清理 logger.warning(f"WebSocket 发送失败,清理坐席连接: agent_id={agent_id}, error={e}") self.disconnect(agent_id) async def broadcast(self, data: dict) -> None: """向所有在线坐席广播消息。 做什么:遍历所有活跃连接,逐一发送 JSON 数据 为什么:新消息、会话状态变更等事件需要通知所有坐席 关键设计: - 发送失败的连接会被自动断开和清理,避免僵尸连接 - 不因单个连接失败而中断整次广播 - 使用 list() 拷贝映射表的 key,避免遍历时字典大小改变 Args: data: 要广播的数据(会被序列化为 JSON) """ # 拷贝 key 列表,避免遍历过程中字典被修改(disconnect 会删条目) agent_ids = list(self.active_connections.keys()) if not agent_ids: logger.debug("没有在线坐席,跳过广播") return for agent_id in agent_ids: # send_to_agent 内部已有异常处理,会自动清理断连的 WS await self.send_to_agent(agent_id, data) logger.debug(f"广播完成: 在线坐席数={len(agent_ids)}, 事件类型={data.get('type', 'unknown')}") # ========================================================================== # H5员工连接管理 # ========================================================================== async def connect_employee(self, employee_id: str, websocket: WebSocket) -> None: """接受H5员工 WebSocket 握手并注册连接。 做什么:完成 WebSocket 握手(accept),然后将连接存入员工映射表 为什么:H5员工需要实时接收参与者变更、新消息等事件 如果同一员工重复连接(如刷新页面),旧连接会被覆盖。 Args: employee_id: 员工企微 UserID websocket: FastAPI WebSocket 对象 """ # 完成 WebSocket 握手 await websocket.accept() # 如果该员工已有连接,先关闭旧连接 if employee_id in self.employee_connections: old_ws = self.employee_connections[employee_id] try: await old_ws.close() except Exception: pass # 注册新连接 self.employee_connections[employee_id] = websocket logger.info( f"H5员工 WebSocket 连接建立: employee_id={employee_id}, " f"当前在线员工数={len(self.employee_connections)}" ) def disconnect_employee(self, employee_id: str) -> None: """从H5员工映射表中移除连接。 做什么:删除 employee_id 对应的 WebSocket 映射 为什么:员工关闭H5页面或网络断开时,需要清理映射表 Args: employee_id: 员工企微 UserID """ if employee_id in self.employee_connections: del self.employee_connections[employee_id] logger.info( f"H5员工 WebSocket 连接断开: employee_id={employee_id}, " f"当前在线员工数={len(self.employee_connections)}" ) async def send_to_employee(self, employee_id: str, data: dict) -> None: """向指定H5员工发送消息。 做什么:通过 WebSocket 向指定H5员工推送 JSON 数据 为什么:参与者事件、新消息等需要推送给相关员工 如果发送失败(连接已断开),自动清理该连接。 Args: employee_id: 目标员工企微 UserID data: 要发送的数据(会被序列化为 JSON) """ websocket = self.employee_connections.get(employee_id) if not websocket: # 该员工不在线(未打开H5页面),跳过 logger.debug(f"H5员工不在线,跳过推送: employee_id={employee_id}") return try: await websocket.send_json(data) except Exception as e: # 发送失败 → 连接已断开,自动清理 logger.warning(f"H5员工 WebSocket 发送失败,清理连接: employee_id={employee_id}, error={e}") self.disconnect_employee(employee_id) async def broadcast_to_employees(self, employee_ids: List[str], data: dict) -> None: """向指定的多个H5员工推送消息。 做什么:遍历 employee_ids,逐一推送 JSON 数据 为什么:参与者变更事件只需通知该会话的参与者(非全员广播) Args: employee_ids: 目标员工ID列表 data: 要发送的数据 """ if not employee_ids: return for employee_id in employee_ids: await self.send_to_employee(employee_id, data) # ========================================================================== # 消息状态广播(P1-4) # ========================================================================== async def broadcast_message_status( self, conv_id: str, msg_id: str, status: str, participant_ids: List[str], extra: dict = None, ) -> int: """向会话所有参与方广播消息状态变更。 用于撤回/已读/删除等事件的实时推送。 Args: conv_id: 会话ID msg_id: 消息ID status: 新状态 (sent / delivered / read / recalled / deleted) participant_ids: 参与方ID列表 (agent_id + employee_id) extra: 额外数据 (可选,如 recall_by / recall_at) Returns: 推送到客户端数量 """ # 构建消息 payload = { "type": "message_status", "conv_id": conv_id, "msg_id": msg_id, "status": status, **(extra or {}), } # 分别推送给坐席和员工 sent_count = 0 for pid in participant_ids: # 判断是坐席还是员工 if pid in self.active_connections: await self.send_to_agent(pid, payload) sent_count += 1 elif pid in self.employee_connections: await self.send_to_employee(pid, payload) sent_count += 1 return sent_count # ========================================================================== # 辅助方法 # ========================================================================== def is_employee_online(self, employee_id: str) -> bool: """检查指定员工是否在线(有活跃的H5 WS连接)。 做什么:查询员工是否在 employee_connections 中 为什么:某些场景需要判断员工是否在线(如是否需要通过企微消息降级推送) Args: employee_id: 员工企微 UserID Returns: bool: 是否在线 """ return employee_id in self.employee_connections # -------------------------------------------------------------------------- # 模块级单例 # -------------------------------------------------------------------------- # 为什么用单例:所有后端服务共享同一个 ConnectionManager 实例, # 确保 WebSocket 连接映射表全局唯一,消息路由和会话服务都能推送事件 # -------------------------------------------------------------------------- manager = ConnectionManager()