Files

750 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 坐席会话全局可见 + 接手功能 测试
# =============================================================================
# 测试覆盖:
# 一、会话列表接口(GET /api/conversations
# 1. 返回全部活跃会话(queued + serving + 其他坐席 serving
# 2. is_mine / assigned_agent_name / can_grab 字段标注正确
# 3. N+1 查询优化(坐席信息批量查询)
#
# 二、接手接口(POST /api/conversations/{id}/grab
# 1. 成功接手:原坐席 load-1,新坐席 load+1assigned_agent_id 切换
# 2. 不能接手未分配坐席的会话 → 3011
# 3. 不能接手自己的会话 → 3012
# 4. 不能接手非 serving 状态会话 → 3013
# 5. 不能接手已结单会话 → 3002
# 6. 接手后 WebSocket 广播 conversation_updated
#
# 三、边界情况
# 1. 满负荷坐席接手 → 3005
# 2. 会话不存在 → 3003
# 3. 接手成功后返回字段验证(is_mine=True, can_grab=False
# =============================================================================
import uuid
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.agent import Agent
from app.models.conversation import Conversation
from app.services.session_service import SessionService
from app.utils.response import AppException
from tests.conftest import create_test_conversation, create_test_agent, MockRedis
# =============================================================================
# 辅助函数:登录坐席并获取认证头
# =============================================================================
async def login_agent(client: AsyncClient, user_id: str, name: str) -> dict:
"""登录坐席并返回认证头字典。"""
response = await client.post(
"/agents/login",
json={"user_id": user_id, "name": name},
)
data = response.json()
token = data["data"]["token"]
return {"Authorization": f"Bearer {token}"}
async def create_and_assign_conversation(
db_session: AsyncSession,
employee_id: str = "emp_001",
agent_user_id: str = "agent_001",
) -> Conversation:
"""创建一个已分配坐席的 serving 状态会话。"""
conv = create_test_conversation(
employee_id=employee_id,
status="serving",
)
conv.assigned_agent_id = agent_user_id
db_session.add(conv)
await db_session.flush()
return conv
# =============================================================================
# 一、会话列表接口测试
# =============================================================================
class TestConversationListGlobalVisibility:
"""测试会话列表全局可见功能。"""
@pytest.mark.asyncio
async def test_list_returns_all_active_conversations(
self, client, db_session, mock_redis
):
"""验证 GET /api/conversations 返回全部活跃会话。
场景:数据库中有 queued、serving(自己的)、serving(其他坐席的)三种会话,
当前坐席应能看到所有这些会话。
"""
# 准备:创建坐席
agent = create_test_agent(user_id="viewer_001", name="查看坐席")
other_agent = create_test_agent(user_id="other_001", name="其他坐席")
db_session.add_all([agent, other_agent])
await db_session.flush()
# 创建三种状态的会话
conv_queued = create_test_conversation(
employee_id="emp_queued", status="queued"
)
conv_my_serving = create_test_conversation(
employee_id="emp_my", status="serving"
)
conv_my_serving.assigned_agent_id = "viewer_001"
conv_other_serving = create_test_conversation(
employee_id="emp_other", status="serving"
)
conv_other_serving.assigned_agent_id = "other_001"
db_session.add_all([conv_queued, conv_my_serving, conv_other_serving])
await db_session.flush()
# 登录并请求
headers = await login_agent(client, "viewer_001", "查看坐席")
response = await client.get("/conversations", headers=headers)
assert response.status_code == 200
data = response.json()
assert data["code"] == 0
items = data["data"]["items"]
total = data["data"]["total"]
# 应至少包含我们创建的3个活跃会话
assert total >= 3
# 验证三种类型的会话都出现在结果中
conv_ids = {item["id"] for item in items}
assert str(conv_queued.id) in conv_ids
assert str(conv_my_serving.id) in conv_ids
assert str(conv_other_serving.id) in conv_ids
@pytest.mark.asyncio
async def test_is_mine_field_correctness(
self, client, db_session, mock_redis
):
"""验证 is_mine 字段标注正确。
- 自己的会话 is_mine=True
- 其他坐席的会话 is_mine=False
- 未分配坐席的会话 is_mine=False
"""
agent = create_test_agent(user_id="mine_agent", name="我的坐席")
other = create_test_agent(user_id="other_agent", name="他人坐席")
db_session.add_all([agent, other])
await db_session.flush()
conv_mine = create_test_conversation(
employee_id="emp_mine", status="serving"
)
conv_mine.assigned_agent_id = "mine_agent"
conv_other = create_test_conversation(
employee_id="emp_other2", status="serving"
)
conv_other.assigned_agent_id = "other_agent"
conv_unassigned = create_test_conversation(
employee_id="emp_unassigned", status="queued"
)
db_session.add_all([conv_mine, conv_other, conv_unassigned])
await db_session.flush()
headers = await login_agent(client, "mine_agent", "我的坐席")
response = await client.get("/conversations", headers=headers)
data = response.json()
items = data["data"]["items"]
# 构建一个 id → item 的映射
item_map = {item["id"]: item for item in items}
# 自己的会话 → is_mine=True
assert item_map[str(conv_mine.id)]["is_mine"] is True
# 其他坐席的会话 → is_mine=False
assert item_map[str(conv_other.id)]["is_mine"] is False
# 未分配坐席的会话 → is_mine=False
assert item_map[str(conv_unassigned.id)]["is_mine"] is False
@pytest.mark.asyncio
async def test_assigned_agent_name_field(
self, client, db_session, mock_redis
):
"""验证 assigned_agent_name 字段正确返回坐席姓名。
- 已分配坐席的会话应返回坐席姓名
- 未分配坐席的会话应返回 None
"""
agent = create_test_agent(user_id="name_agent", name="坐席张三")
db_session.add(agent)
await db_session.flush()
conv_assigned = create_test_conversation(
employee_id="emp_assigned", status="serving"
)
conv_assigned.assigned_agent_id = "name_agent"
conv_unassigned = create_test_conversation(
employee_id="emp_no_agent", status="queued"
)
db_session.add_all([conv_assigned, conv_unassigned])
await db_session.flush()
headers = await login_agent(client, "name_agent", "坐席张三")
response = await client.get("/conversations", headers=headers)
data = response.json()
items = data["data"]["items"]
item_map = {item["id"]: item for item in items}
# 已分配的会话应包含坐席姓名
assert item_map[str(conv_assigned.id)]["assigned_agent_name"] == "坐席张三"
# 未分配的会话坐席姓名为 None
assert item_map[str(conv_unassigned.id)]["assigned_agent_name"] is None
@pytest.mark.asyncio
async def test_can_grab_field_correctness(
self, client, db_session, mock_redis
):
"""验证 can_grab 字段标注正确。
can_grab = True 的条件:assigned_agent_id 非空 且 不是自己 且 status=serving
- 其他坐席的 serving 会话 → can_grab=True
- 自己的会话 → can_grab=False
- 未分配的 queued 会话 → can_grab=False
- 其他坐席的 queued 会话 → can_grab=False
- 其他坐席的 resolved 会话 → can_grab=False
"""
agent = create_test_agent(user_id="grab_checker", name="检查坐席")
other = create_test_agent(user_id="grab_other", name="他人坐席")
db_session.add_all([agent, other])
await db_session.flush()
# 其他坐席的 serving 会话 → 可接手
conv_other_serving = create_test_conversation(
employee_id="emp_other_serving", status="serving"
)
conv_other_serving.assigned_agent_id = "grab_other"
# 自己的会话 → 不可接手
conv_my_serving = create_test_conversation(
employee_id="emp_my_serving", status="serving"
)
conv_my_serving.assigned_agent_id = "grab_checker"
# 未分配的 queued 会话 → 不可接手
conv_queued = create_test_conversation(
employee_id="emp_queued_grab", status="queued"
)
# 已结单会话 → 不可接手
conv_resolved = create_test_conversation(
employee_id="emp_resolved_grab", status="resolved"
)
conv_resolved.assigned_agent_id = "grab_other"
db_session.add_all([
conv_other_serving, conv_my_serving, conv_queued, conv_resolved
])
await db_session.flush()
headers = await login_agent(client, "grab_checker", "检查坐席")
response = await client.get("/conversations", headers=headers)
data = response.json()
items = data["data"]["items"]
item_map = {item["id"]: item for item in items}
# 其他坐席 serving → can_grab=True
assert item_map[str(conv_other_serving.id)]["can_grab"] is True
# 自己的会话 → can_grab=False
assert item_map[str(conv_my_serving.id)]["can_grab"] is False
# queued 会话 → can_grab=False
assert item_map[str(conv_queued.id)]["can_grab"] is False
# resolved 会话 → can_grab=False
assert item_map[str(conv_resolved.id)]["can_grab"] is False
# =============================================================================
# 二、接手接口测试
# =============================================================================
class TestGrabConversation:
"""测试接手会话接口 POST /api/conversations/{id}/grab。"""
@pytest.mark.asyncio
async def test_grab_success_switches_agent_and_load(
self, client, db_session, mock_redis
):
"""验证成功接手:原坐席 load-1,新坐席 load+1assigned_agent_id 切换。"""
# 创建原坐席(已有1个会话)
old_agent = create_test_agent(
user_id="old_agent", name="原坐席", status="online"
)
old_agent.current_load = 1
old_agent.max_load = 5
# 创建新坐席(准备接手)
new_agent = create_test_agent(
user_id="new_agent", name="新坐席", status="online"
)
new_agent.current_load = 0
new_agent.max_load = 5
db_session.add_all([old_agent, new_agent])
await db_session.flush()
# 创建一个 serving 状态的会话,分配给原坐席
conv = create_test_conversation(
employee_id="emp_grab_success", status="serving"
)
conv.assigned_agent_id = "old_agent"
db_session.add(conv)
await db_session.flush()
# 新坐席登录并发起接手
headers = await login_agent(client, "new_agent", "新坐席")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock) as mock_broadcast:
response = await client.post(
f"/conversations/{conv.id}/grab",
headers=headers,
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 0
# 验证会话的 assigned_agent_id 已切换
assert data["data"]["assigned_agent_id"] == "new_agent"
# 验证原坐席 current_load 减 1
stmt = select(Agent).where(Agent.user_id == "old_agent")
result = await db_session.execute(stmt)
refreshed_old = result.scalars().first()
assert refreshed_old.current_load == 0 # 1 - 1 = 0
# 验证新坐席 current_load 加 1
stmt = select(Agent).where(Agent.user_id == "new_agent")
result = await db_session.execute(stmt)
refreshed_new = result.scalars().first()
assert refreshed_new.current_load == 1 # 0 + 1 = 1
@pytest.mark.asyncio
async def test_grab_no_agent_error_3011(
self, client, db_session, mock_redis
):
"""验证不能接手未分配坐席的会话 → 3011。"""
agent = create_test_agent(user_id="grab_no_agent_user", name="测试坐席")
db_session.add(agent)
await db_session.flush()
# 创建一个 queued 状态(未分配坐席)的会话
conv = create_test_conversation(
employee_id="emp_no_agent", status="queued"
)
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "grab_no_agent_user", "测试坐席")
response = await client.post(
f"/conversations/{conv.id}/grab",
headers=headers,
)
data = response.json()
assert data["code"] == 3011
assert "尚未分配坐席" in data["message"]
@pytest.mark.asyncio
async def test_grab_self_error_3012(
self, client, db_session, mock_redis
):
"""验证不能接手自己的会话 → 3012。"""
agent = create_test_agent(user_id="grab_self_user", name="自接坐席")
db_session.add(agent)
await db_session.flush()
# 创建一个分配给自己的 serving 会话
conv = create_test_conversation(
employee_id="emp_self_grab", status="serving"
)
conv.assigned_agent_id = "grab_self_user"
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "grab_self_user", "自接坐席")
response = await client.post(
f"/conversations/{conv.id}/grab",
headers=headers,
)
data = response.json()
assert data["code"] == 3012
assert "不能接手自己的会话" in data["message"]
@pytest.mark.asyncio
async def test_grab_not_serving_error_3013(
self, client, db_session, mock_redis
):
"""验证不能接手非 serving 状态的会话 → 3013。"""
other_agent = create_test_agent(user_id="other_for_3013", name="他人坐席")
grabber = create_test_agent(user_id="grabber_for_3013", name="接手坐席")
db_session.add_all([other_agent, grabber])
await db_session.flush()
# 创建一个 queued 状态但已分配坐席的会话(边界:assigned + queued
conv = create_test_conversation(
employee_id="emp_not_serving", status="queued"
)
conv.assigned_agent_id = "other_for_3013"
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "grabber_for_3013", "接手坐席")
response = await client.post(
f"/conversations/{conv.id}/grab",
headers=headers,
)
data = response.json()
assert data["code"] == 3013
assert "只能接手服务中的会话" in data["message"]
@pytest.mark.asyncio
async def test_grab_resolved_error_3002(
self, client, db_session, mock_redis
):
"""验证不能接手已结单的会话 → 3002。
注意:源码中 resolved 检查在 status != serving 检查之前,
所以 resolved 会优先命中 3002 而非 3013。
"""
other_agent = create_test_agent(user_id="other_for_3002", name="他人坐席")
grabber = create_test_agent(user_id="grabber_for_3002", name="接手坐席")
db_session.add_all([other_agent, grabber])
await db_session.flush()
# 创建已结单但分配了坐席的会话
conv = create_test_conversation(
employee_id="emp_resolved_grab_test", status="resolved"
)
conv.assigned_agent_id = "other_for_3002"
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "grabber_for_3002", "接手坐席")
response = await client.post(
f"/conversations/{conv.id}/grab",
headers=headers,
)
data = response.json()
# resolved 检查在 status != serving 之前,应返回 3002
assert data["code"] == 3002
assert "已结单" in data["message"]
@pytest.mark.asyncio
async def test_grab_broadcasts_websocket(
self, client, db_session, mock_redis
):
"""验证接手成功后广播 WebSocket conversation_updated 事件。"""
old_agent = create_test_agent(
user_id="ws_old_agent", name="原坐席", status="online"
)
old_agent.current_load = 1
new_agent = create_test_agent(
user_id="ws_new_agent", name="新坐席", status="online"
)
db_session.add_all([old_agent, new_agent])
await db_session.flush()
conv = create_test_conversation(
employee_id="emp_ws_grab", status="serving"
)
conv.assigned_agent_id = "ws_old_agent"
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "ws_new_agent", "新坐席")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock) as mock_broadcast:
response = await client.post(
f"/conversations/{conv.id}/grab",
headers=headers,
)
# 验证广播被调用
mock_broadcast.assert_called_once()
broadcast_data = mock_broadcast.call_args[0][0]
assert broadcast_data["type"] == "conversation_updated"
assert broadcast_data["data"]["conversation_id"] == str(conv.id)
assert broadcast_data["data"]["old_agent_id"] == "ws_old_agent"
assert broadcast_data["data"]["new_agent_id"] == "ws_new_agent"
@pytest.mark.asyncio
async def test_grab_success_response_fields(
self, client, db_session, mock_redis
):
"""验证接手成功后返回的扩展字段:is_mine=True, can_grab=False, assigned_agent_name。"""
old_agent = create_test_agent(
user_id="resp_old_agent", name="原坐席", status="online"
)
old_agent.current_load = 1
new_agent = create_test_agent(
user_id="resp_new_agent", name="新坐席", status="online"
)
db_session.add_all([old_agent, new_agent])
await db_session.flush()
conv = create_test_conversation(
employee_id="emp_resp_grab", status="serving"
)
conv.assigned_agent_id = "resp_old_agent"
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "resp_new_agent", "新坐席")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/grab",
headers=headers,
)
data = response.json()
assert data["code"] == 0
result = data["data"]
# 接手后该会话属于当前坐席
assert result["is_mine"] is True
# 自己的会话不能再接手
assert result["can_grab"] is False
# 坐席姓名应为新坐席姓名
assert result["assigned_agent_name"] == "新坐席"
# =============================================================================
# 三、边界情况测试
# =============================================================================
class TestGrabEdgeCases:
"""测试接手功能的边界情况。"""
@pytest.mark.asyncio
async def test_grab_when_agent_at_max_load_error_3005(
self, client, db_session, mock_redis
):
"""验证满负荷坐席无法接手 → 3005。"""
# 原坐席有1个会话
old_agent = create_test_agent(
user_id="max_old_agent", name="原坐席", status="online"
)
old_agent.current_load = 1
# 新坐席已满负荷
full_agent = create_test_agent(
user_id="full_agent", name="满负荷坐席", status="online"
)
full_agent.current_load = 5
full_agent.max_load = 5
db_session.add_all([old_agent, full_agent])
await db_session.flush()
conv = create_test_conversation(
employee_id="emp_max_load", status="serving"
)
conv.assigned_agent_id = "max_old_agent"
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "full_agent", "满负荷坐席")
response = await client.post(
f"/conversations/{conv.id}/grab",
headers=headers,
)
data = response.json()
assert data["code"] == 3005
assert "满负荷" in data["message"]
@pytest.mark.asyncio
async def test_grab_nonexistent_conversation_error_3003(
self, client, db_session, mock_redis
):
"""验证接手不存在的会话 → 3003。"""
agent = create_test_agent(
user_id="grab_ghost_agent", name="幽灵坐席", status="online"
)
db_session.add(agent)
await db_session.flush()
fake_id = str(uuid.uuid4())
headers = await login_agent(client, "grab_ghost_agent", "幽灵坐席")
response = await client.post(
f"/conversations/{fake_id}/grab",
headers=headers,
)
data = response.json()
# SessionService._get_conversation 抛出 ERR_CONVERSATION_NOT_FOUND (3003)
assert data["code"] == 3003
@pytest.mark.asyncio
async def test_grab_ai_handling_status_error_3013(
self, client, db_session, mock_redis
):
"""验证不能接手 ai_handling 状态的会话 → 3013。"""
other_agent = create_test_agent(
user_id="ai_other", name="AI坐席", status="online"
)
grabber = create_test_agent(
user_id="ai_grabber", name="接手坐席", status="online"
)
db_session.add_all([other_agent, grabber])
await db_session.flush()
conv = create_test_conversation(
employee_id="emp_ai_handling", status="ai_handling"
)
conv.assigned_agent_id = "ai_other"
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "ai_grabber", "接手坐席")
response = await client.post(
f"/conversations/{conv.id}/grab",
headers=headers,
)
data = response.json()
# ai_handling 不是 serving,应返回 3013
assert data["code"] == 3013
@pytest.mark.asyncio
async def test_grab_without_auth_returns_unauthorized(
self, client, db_session, mock_redis
):
"""验证未登录时接手请求返回未授权错误。"""
conv = create_test_conversation(
employee_id="emp_no_auth", status="serving"
)
conv.assigned_agent_id = "some_agent"
db_session.add(conv)
await db_session.flush()
# 不带 Authorization 头
response = await client.post(
f"/conversations/{conv.id}/grab",
)
data = response.json()
assert data["code"] == 1002 # ERR_UNAUTHORIZED
@pytest.mark.asyncio
async def test_grab_old_agent_load_never_goes_negative(
self, client, db_session, mock_redis
):
"""验证原坐席 current_load 不会变为负数(源码有 if > 0 保护)。"""
# 原坐席 current_load 为 0(异常数据场景)
old_agent = create_test_agent(
user_id="zero_load_old", name="零负荷原坐席", status="online"
)
old_agent.current_load = 0
new_agent = create_test_agent(
user_id="zero_load_new", name="新坐席", status="online"
)
new_agent.current_load = 0
db_session.add_all([old_agent, new_agent])
await db_session.flush()
conv = create_test_conversation(
employee_id="emp_zero_load", status="serving"
)
conv.assigned_agent_id = "zero_load_old"
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "zero_load_new", "新坐席")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/grab",
headers=headers,
)
data = response.json()
assert data["code"] == 0
# 原坐席 load 应仍为 0(不会变为负数)
stmt = select(Agent).where(Agent.user_id == "zero_load_old")
result = await db_session.execute(stmt)
refreshed_old = result.scalars().first()
assert refreshed_old.current_load == 0
# 新坐席 load 应为 1
stmt = select(Agent).where(Agent.user_id == "zero_load_new")
result = await db_session.execute(stmt)
refreshed_new = result.scalars().first()
assert refreshed_new.current_load == 1
# =============================================================================
# 四、会话列表 N+1 查询优化验证
# =============================================================================
class TestConversationListN1Optimization:
"""测试会话列表接口的 N+1 查询优化。"""
@pytest.mark.asyncio
async def test_batch_query_agent_names(
self, client, db_session, mock_redis
):
"""验证多个会话涉及多个坐席时,assigned_agent_name 全部正确返回。
这间接验证了 N+1 优化:所有坐席姓名通过一次 IN 查询获取。
如果 N+1 没优化,此测试仍会通过,但此测试确保批量查询结果映射正确。
"""
# 创建3个坐席
agents = [
create_test_agent(user_id=f"batch_agent_{i}", name=f"坐席{i+1}")
for i in range(3)
]
db_session.add_all(agents)
await db_session.flush()
# 创建3个会话,分别分配给不同坐席
convs = [
create_test_conversation(
employee_id=f"emp_batch_{i}", status="serving"
)
for i in range(3)
]
for i, conv in enumerate(convs):
conv.assigned_agent_id = f"batch_agent_{i}"
db_session.add_all(convs)
await db_session.flush()
headers = await login_agent(client, "batch_agent_0", "坐席1")
response = await client.get("/conversations", headers=headers)
data = response.json()
assert data["code"] == 0
items = data["data"]["items"]
item_map = {item["id"]: item for item in items}
# 验证所有坐席姓名正确
for i, conv in enumerate(convs):
item = item_map[str(conv.id)]
assert item["assigned_agent_name"] == f"坐席{i+1}", \
f"会话 {conv.id} 的坐席姓名应为 '坐席{i+1}',实际为 '{item['assigned_agent_name']}'"