Files
wecom_it_smart_desk/backend/app/services/ws_manager.py
T

328 lines
13 KiB
Python
Raw Normal View History

# =============================================================================
# 企微IT智能服务台 — WebSocket 连接管理器
# =============================================================================
# 说明:管理所有坐席和H5员工的 WebSocket 连接,提供:
# 1. 坐席连接注册/注销(坐席上线/下线)
# 2. H5员工连接注册/注销(员工打开H5页面时建立)
# 3. 向指定坐席/员工发送消息(定向推送)
# 4. 广播消息给所有在线坐席(全员推送)
# 5. 向指定员工推送消息(参与者事件定向推送)
# 6. 自动清理断连的 WebSocket 连接
#
# 设计决策:
# - 使用模块级单例,全局共享同一个 ConnectionManager 实例
# - broadcast 遇到发送失败自动断开该连接,避免僵尸连接积累
# - 所有发送方法都不阻塞调用方,失败只记 warning 不抛异常
# - 坐席和员工连接分开管理(不同认证体系、不同推送需求)
# =============================================================================
import logging
from typing import Dict, List
from fastapi import WebSocket
logger = logging.getLogger(__name__)
class ConnectionManager:
"""管理所有坐席和H5员工的 WebSocket 连接。
核心职责:
- 维护 agent_id → WebSocket 的映射表(坐席连接)
- 维护 employee_id → WebSocket 的映射表(H5员工连接)
- 提供定向推送和广播能力
- 自动清理无效连接
为什么需要这个类:
- FastAPI 的 WebSocket 是无状态的,需要一个集中管理器来跟踪所有活跃连接
- 后端服务(消息路由、会话管理等)需要通过此管理器向前端推送实时事件
"""
def __init__(self) -> None:
"""初始化连接管理器。
active_connections: 字典,key=坐席IDvalue=WebSocket连接对象
同一个坐席只保留最新的连接(后连接的会替换旧连接)
employee_connections: 字典,key=员工IDvalue=WebSocket连接对象
同一个员工只保留最新的连接(后连接的会替换旧连接)
"""
# 坐席连接(agent_id → WebSocket
self.active_connections: Dict[str, WebSocket] = {}
# H5员工连接(employee_id → WebSocket
self.employee_connections: Dict[str, WebSocket] = {}
# ==========================================================================
# 坐席连接管理
# ==========================================================================
async def connect(self, agent_id: str, websocket: WebSocket) -> None:
"""接受坐席 WebSocket 握手并注册连接。
做什么:完成 WebSocket 握手(accept),然后将连接存入映射表
为什么:必须在 send_json 之前 accept,否则客户端收不到消息
如果同一坐席重复连接(如刷新页面),旧连接会被覆盖,
旧连接的 onclose 回调会触发 disconnect 做清理。
Args:
agent_id: 坐席ID(企微 UserID
websocket: FastAPI WebSocket 对象
"""
# 完成 WebSocket 握手(必须先 accept 才能收发消息)
await websocket.accept()
# 如果该坐席已有连接,先关闭旧连接
# 场景:坐席刷新页面或重新登录,会产生新连接
if agent_id in self.active_connections:
old_ws = self.active_connections[agent_id]
try:
await old_ws.close()
except Exception:
# 旧连接可能已经断开,忽略关闭错误
pass
# 注册新连接
self.active_connections[agent_id] = websocket
logger.info(
f"坐席 WebSocket 连接建立: agent_id={agent_id}, "
f"当前在线坐席数={len(self.active_connections)}"
)
def disconnect(self, agent_id: str) -> None:
"""从坐席映射表中移除连接。
做什么:删除 agent_id 对应的 WebSocket 映射
为什么:坐席关闭页面或网络断开时,需要清理映射表,避免向已断开的连接发消息
注意:只做映射表清理,不主动关闭 WebSocket(由调用方或 onclose 回调处理)
Args:
agent_id: 坐席ID
"""
if agent_id in self.active_connections:
del self.active_connections[agent_id]
logger.info(
f"坐席 WebSocket 连接断开: agent_id={agent_id}, "
f"当前在线坐席数={len(self.active_connections)}"
)
async def send_to_agent(self, agent_id: str, data: dict) -> None:
"""向指定坐席发送消息。
做什么:通过 WebSocket 向指定坐席推送 JSON 数据
为什么:某些事件只需要通知特定坐席(如会话分配给你了)
如果发送失败(连接已断开),自动清理该连接。
Args:
agent_id: 目标坐席ID
data: 要发送的数据(会被序列化为 JSON)
"""
websocket = self.active_connections.get(agent_id)
if not websocket:
# 该坐席不在线,跳过
logger.debug(f"坐席不在线,跳过推送: agent_id={agent_id}")
return
try:
await websocket.send_json(data)
except Exception as e:
# 发送失败 → 连接已断开,自动清理
logger.warning(f"WebSocket 发送失败,清理坐席连接: agent_id={agent_id}, error={e}")
self.disconnect(agent_id)
async def broadcast(self, data: dict) -> None:
"""向所有在线坐席广播消息。
做什么:遍历所有活跃连接,逐一发送 JSON 数据
为什么:新消息、会话状态变更等事件需要通知所有坐席
关键设计:
- 发送失败的连接会被自动断开和清理,避免僵尸连接
- 不因单个连接失败而中断整次广播
- 使用 list() 拷贝映射表的 key,避免遍历时字典大小改变
Args:
data: 要广播的数据(会被序列化为 JSON)
"""
# 拷贝 key 列表,避免遍历过程中字典被修改(disconnect 会删条目)
agent_ids = list(self.active_connections.keys())
if not agent_ids:
logger.debug("没有在线坐席,跳过广播")
return
for agent_id in agent_ids:
# send_to_agent 内部已有异常处理,会自动清理断连的 WS
await self.send_to_agent(agent_id, data)
logger.debug(f"广播完成: 在线坐席数={len(agent_ids)}, 事件类型={data.get('type', 'unknown')}")
# ==========================================================================
# H5员工连接管理
# ==========================================================================
async def connect_employee(self, employee_id: str, websocket: WebSocket) -> None:
"""接受H5员工 WebSocket 握手并注册连接。
做什么:完成 WebSocket 握手(accept),然后将连接存入员工映射表
为什么:H5员工需要实时接收参与者变更、新消息等事件
如果同一员工重复连接(如刷新页面),旧连接会被覆盖。
Args:
employee_id: 员工企微 UserID
websocket: FastAPI WebSocket 对象
"""
# 完成 WebSocket 握手
await websocket.accept()
# 如果该员工已有连接,先关闭旧连接
if employee_id in self.employee_connections:
old_ws = self.employee_connections[employee_id]
try:
await old_ws.close()
except Exception:
pass
# 注册新连接
self.employee_connections[employee_id] = websocket
logger.info(
f"H5员工 WebSocket 连接建立: employee_id={employee_id}, "
f"当前在线员工数={len(self.employee_connections)}"
)
def disconnect_employee(self, employee_id: str) -> None:
"""从H5员工映射表中移除连接。
做什么:删除 employee_id 对应的 WebSocket 映射
为什么:员工关闭H5页面或网络断开时,需要清理映射表
Args:
employee_id: 员工企微 UserID
"""
if employee_id in self.employee_connections:
del self.employee_connections[employee_id]
logger.info(
f"H5员工 WebSocket 连接断开: employee_id={employee_id}, "
f"当前在线员工数={len(self.employee_connections)}"
)
async def send_to_employee(self, employee_id: str, data: dict) -> None:
"""向指定H5员工发送消息。
做什么:通过 WebSocket 向指定H5员工推送 JSON 数据
为什么:参与者事件、新消息等需要推送给相关员工
如果发送失败(连接已断开),自动清理该连接。
Args:
employee_id: 目标员工企微 UserID
data: 要发送的数据(会被序列化为 JSON)
"""
websocket = self.employee_connections.get(employee_id)
if not websocket:
# 该员工不在线(未打开H5页面),跳过
logger.debug(f"H5员工不在线,跳过推送: employee_id={employee_id}")
return
try:
await websocket.send_json(data)
except Exception as e:
# 发送失败 → 连接已断开,自动清理
logger.warning(f"H5员工 WebSocket 发送失败,清理连接: employee_id={employee_id}, error={e}")
self.disconnect_employee(employee_id)
async def broadcast_to_employees(self, employee_ids: List[str], data: dict) -> None:
"""向指定的多个H5员工推送消息。
做什么:遍历 employee_ids,逐一推送 JSON 数据
为什么:参与者变更事件只需通知该会话的参与者(非全员广播)
Args:
employee_ids: 目标员工ID列表
data: 要发送的数据
"""
if not employee_ids:
return
for employee_id in employee_ids:
await self.send_to_employee(employee_id, data)
# ==========================================================================
# 消息状态广播(P1-4
# ==========================================================================
async def broadcast_message_status(
self,
conv_id: str,
msg_id: str,
status: str,
participant_ids: List[str],
extra: dict = None,
) -> int:
"""向会话所有参与方广播消息状态变更。
用于撤回/已读/删除等事件的实时推送。
Args:
conv_id: 会话ID
msg_id: 消息ID
status: 新状态 (sent / delivered / read / recalled / deleted)
participant_ids: 参与方ID列表 (agent_id + employee_id)
extra: 额外数据 (可选,如 recall_by / recall_at)
Returns:
推送到客户端数量
"""
# 构建消息
payload = {
"type": "message_status",
"conv_id": conv_id,
"msg_id": msg_id,
"status": status,
**(extra or {}),
}
# 分别推送给坐席和员工
sent_count = 0
for pid in participant_ids:
# 判断是坐席还是员工
if pid in self.active_connections:
await self.send_to_agent(pid, payload)
sent_count += 1
elif pid in self.employee_connections:
await self.send_to_employee(pid, payload)
sent_count += 1
return sent_count
# ==========================================================================
# 辅助方法
# ==========================================================================
def is_employee_online(self, employee_id: str) -> bool:
"""检查指定员工是否在线(有活跃的H5 WS连接)。
做什么:查询员工是否在 employee_connections 中
为什么:某些场景需要判断员工是否在线(如是否需要通过企微消息降级推送)
Args:
employee_id: 员工企微 UserID
Returns:
bool: 是否在线
"""
return employee_id in self.employee_connections
# --------------------------------------------------------------------------
# 模块级单例
# --------------------------------------------------------------------------
# 为什么用单例:所有后端服务共享同一个 ConnectionManager 实例,
# 确保 WebSocket 连接映射表全局唯一,消息路由和会话服务都能推送事件
# --------------------------------------------------------------------------
manager = ConnectionManager()