# ============================================================================= # 企微IT智能服务台 — 摇人多坐席协作功能 测试 # ============================================================================= # 测试覆盖: # 一、邀请协作(POST /api/conversations/{id}/invite) # 1. 成功邀请:collaborating_agent_ids 更新,WS广播,WS定向推送 # 2. 邀请已结单会话 → 3002 # 3. 邀请未接单会话(queued)→ 3020 # 4. 非主责/协作坐席邀请 → 3021 # 5. 邀请主责坐席本人 → 3022 # 6. 邀请已在协作中的坐席 → 3023 # 7. 邀请离线坐席 → 3024 # 8. 协作坐席也可以摇人(再摇第三人) # 9. 邀请不存在的坐席 → 3004 # 10. 邀请不存在的会话 → 3003 # # 二、退出协作(POST /api/conversations/{id}/leave) # 1. 成功退出:从 collaborating_agent_ids 移除,WS 广播 # 2. 主责坐席尝试退出 → 3025 # 3. 非协作坐席退出 → 3026 # 4. 退出后清空当前选中会话 # # 三、列表集成测试 # 1. collaborating_agent_ids 字段正确 # 2. collaborating_agent_names 姓名映射正确 # 3. is_collaborator 字段正确 # 4. 协作坐席仍能查看和回复 # # 四、权限矩阵验证(端到端) # 1. 协作坐席不能结单 # 2. 协作坐席不能转接 # 3. 协作坐席不占负载 # ============================================================================= import uuid from datetime import datetime from unittest.mock import AsyncMock, MagicMock, patch import pytest import pytest_asyncio from httpx import ASGITransport, AsyncClient from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.agent import Agent from app.models.conversation import Conversation from app.services.session_service import SessionService from app.utils.response import AppException from tests.conftest import create_test_conversation, create_test_agent, MockRedis # ============================================================================= # 辅助函数 # ============================================================================= async def login_agent(client: AsyncClient, user_id: str, name: str) -> dict: """登录坐席并返回认证头字典。""" response = await client.post( "/agents/login", json={"user_id": user_id, "name": name}, ) data = response.json() token = data["data"]["token"] return {"Authorization": f"Bearer {token}"} async def create_serving_conversation( db_session: AsyncSession, employee_id: str = "emp_001", agent_user_id: str = "agent_owner", collab_ids: list = None, ) -> Conversation: """创建一个 serving 状态且有主责坐席的会话(可选已有协作坐席)。""" conv = create_test_conversation( employee_id=employee_id, status="serving", ) conv.assigned_agent_id = agent_user_id conv.collaborating_agent_ids = collab_ids or [] db_session.add(conv) await db_session.flush() return conv # ============================================================================= # 一、邀请协作测试 # ============================================================================= class TestInviteCollaborator: """测试邀请协作接口 POST /api/conversations/{id}/invite。""" @pytest.mark.asyncio async def test_invite_success_updates_collaborating_ids( self, client, db_session, mock_redis ): """验证成功邀请:collaborating_agent_ids 更新,WS广播+定向推送。 场景:坐席A(owner)在处理会话,邀请在线坐席B加入协作。 """ # 创建坐席 owner = create_test_agent(user_id="owner_001", name="坐席A", status="online") invitee = create_test_agent(user_id="invitee_001", name="坐席B", status="online") db_session.add_all([owner, invitee]) await db_session.flush() # 创建 serving 会话,分配给 owner conv = await create_serving_conversation( db_session, employee_id="emp_invite", agent_user_id="owner_001" ) # 坐席A 登录并发起邀请 headers = await login_agent(client, "owner_001", "坐席A") with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock) as mock_broadcast, \ patch("app.services.ws_manager.manager.send_to_agent", new_callable=AsyncMock) as mock_send: response = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "invitee_001"}, headers=headers, ) assert response.status_code == 200 data = response.json() assert data["code"] == 0 # 验证 collaborating_agent_ids 包含被邀请坐席 result = data["data"] assert "invitee_001" in result["collaborating_agent_ids"] # 验证 WS 广播被调用(collaborator_joined) mock_broadcast.assert_called_once() broadcast_msg = mock_broadcast.call_args[0][0] assert broadcast_msg["type"] == "collaborator_joined" assert broadcast_msg["data"]["agent_id"] == "invitee_001" assert broadcast_msg["data"]["inviter_agent_id"] == "owner_001" # 验证 WS 定向推送被调用(collaborator_invited) mock_send.assert_called_once_with("invitee_001", mock_send.call_args[0][1]) sent_msg = mock_send.call_args[0][1] assert sent_msg["type"] == "collaborator_invited" assert sent_msg["data"]["invitee_agent_id"] == "invitee_001" # 验证数据库持久化 stmt = select(Conversation).where(Conversation.id == conv.id) result_db = await db_session.execute(stmt) db_conv = result_db.scalars().first() assert "invitee_001" in db_conv.collaborating_agent_ids @pytest.mark.asyncio async def test_invite_resolved_conversation_error_3002( self, client, db_session, mock_redis ): """验证不能邀请已结单会话 → 3002。""" owner = create_test_agent(user_id="owner_resolved", name="坐席A", status="online") invitee = create_test_agent(user_id="invitee_resolved", name="坐席B", status="online") db_session.add_all([owner, invitee]) await db_session.flush() conv = create_test_conversation( employee_id="emp_resolved_invite", status="resolved" ) conv.assigned_agent_id = "owner_resolved" db_session.add(conv) await db_session.flush() headers = await login_agent(client, "owner_resolved", "坐席A") response = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "invitee_resolved"}, headers=headers, ) data = response.json() assert data["code"] == 3002 # ERR_CONVERSATION_RESOLVED @pytest.mark.asyncio async def test_invite_queued_conversation_error_3020( self, client, db_session, mock_redis ): """验证不能邀请未接单(queued)的会话 → 3020。""" owner = create_test_agent(user_id="owner_queued", name="坐席A", status="online") invitee = create_test_agent(user_id="invitee_queued", name="坐席B", status="online") db_session.add_all([owner, invitee]) await db_session.flush() # queued 状态,未分配坐席 → 应先报 3021(不是 owner/collaborator) # 但如果有 assigned_agent_id=owner,则是 queued 状态 → 3020 conv = create_test_conversation( employee_id="emp_queued_invite", status="queued" ) conv.assigned_agent_id = "owner_queued" db_session.add(conv) await db_session.flush() headers = await login_agent(client, "owner_queued", "坐席A") response = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "invitee_queued"}, headers=headers, ) data = response.json() assert data["code"] == 3020 assert "服务中" in data["message"] @pytest.mark.asyncio async def test_invite_by_non_owner_error_3021( self, client, db_session, mock_redis ): """验证非主责/协作坐席不能摇人 → 3021。""" owner = create_test_agent(user_id="owner_3021", name="坐席A", status="online") invitee = create_test_agent(user_id="invitee_3021", name="坐席B", status="online") stranger = create_test_agent(user_id="stranger_3021", name="路过的坐席", status="online") db_session.add_all([owner, invitee, stranger]) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_3021", agent_user_id="owner_3021" ) # 用路过的坐席登录(既不是主责也不是协作坐席) headers = await login_agent(client, "stranger_3021", "路过的坐席") response = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "invitee_3021"}, headers=headers, ) data = response.json() assert data["code"] == 3021 assert "摇人" in data["message"] @pytest.mark.asyncio async def test_invite_owner_self_error_3022( self, client, db_session, mock_redis ): """验证不能邀请主责坐席本人 → 3022。""" owner = create_test_agent(user_id="owner_3022", name="坐席A", status="online") db_session.add(owner) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_3022", agent_user_id="owner_3022" ) headers = await login_agent(client, "owner_3022", "坐席A") response = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "owner_3022"}, # 邀请自己 headers=headers, ) data = response.json() assert data["code"] == 3022 @pytest.mark.asyncio async def test_invite_duplicate_collaborator_error_3023( self, client, db_session, mock_redis ): """验证不能重复邀请已在协作中的坐席 → 3023。""" owner = create_test_agent(user_id="owner_3023", name="坐席A", status="online") invitee = create_test_agent(user_id="invitee_3023", name="坐席B", status="online") db_session.add_all([owner, invitee]) await db_session.flush() # 坐席B 已在协作列表中 conv = await create_serving_conversation( db_session, employee_id="emp_3023", agent_user_id="owner_3023", collab_ids=["invitee_3023"], ) headers = await login_agent(client, "owner_3023", "坐席A") response = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "invitee_3023"}, headers=headers, ) data = response.json() assert data["code"] == 3023 @pytest.mark.asyncio async def test_invite_offline_agent_error_3024( self, client, db_session, mock_redis ): """验证不能邀请离线坐席 → 3024。""" owner = create_test_agent(user_id="owner_3024", name="坐席A", status="online") offline_agent = create_test_agent(user_id="offline_3024", name="离线坐席", status="offline") db_session.add_all([owner, offline_agent]) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_3024", agent_user_id="owner_3024" ) headers = await login_agent(client, "owner_3024", "坐席A") response = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "offline_3024"}, headers=headers, ) data = response.json() assert data["code"] == 3024 assert "不在线" in data["message"] @pytest.mark.asyncio async def test_invite_nonexistent_agent_error_3004( self, client, db_session, mock_redis ): """验证邀请不存在的坐席 → 3004。""" owner = create_test_agent(user_id="owner_3004", name="坐席A", status="online") db_session.add(owner) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_3004", agent_user_id="owner_3004" ) headers = await login_agent(client, "owner_3004", "坐席A") response = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "nonexistent_agent"}, headers=headers, ) data = response.json() assert data["code"] == 3004 # ERR_AGENT_NOT_FOUND @pytest.mark.asyncio async def test_invite_nonexistent_conversation_error_3003( self, client, db_session, mock_redis ): """验证邀请不存在的会话 → 3003。""" agent = create_test_agent(user_id="agent_3003", name="坐席A", status="online") invitee = create_test_agent(user_id="invitee_3003", name="坐席B", status="online") db_session.add_all([agent, invitee]) await db_session.flush() fake_id = str(uuid.uuid4()) headers = await login_agent(client, "agent_3003", "坐席A") response = await client.post( f"/conversations/{fake_id}/invite", json={"agent_id": "invitee_3003"}, headers=headers, ) data = response.json() assert data["code"] == 3003 # ERR_CONVERSATION_NOT_FOUND @pytest.mark.asyncio async def test_collaborator_can_also_invite_others( self, client, db_session, mock_redis ): """验证协作坐席也可以摇人(再摇第三人加入)。 场景:坐席A(owner)邀请坐席B → 坐席B 再摇坐席C """ owner = create_test_agent(user_id="chain_owner", name="坐席A", status="online") collab1 = create_test_agent(user_id="chain_collab1", name="坐席B", status="online") collab2 = create_test_agent(user_id="chain_collab2", name="坐席C", status="online") db_session.add_all([owner, collab1, collab2]) await db_session.flush() # 坐席A 创建会话,邀请坐席B conv = await create_serving_conversation( db_session, employee_id="emp_chain", agent_user_id="chain_owner", collab_ids=["chain_collab1"], ) # 坐席B 登录并发起邀请坐席C headers = await login_agent(client, "chain_collab1", "坐席B") with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock), \ patch("app.services.ws_manager.manager.send_to_agent", new_callable=AsyncMock): response = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "chain_collab2"}, headers=headers, ) assert response.status_code == 200 data = response.json() assert data["code"] == 0 # 验证 collaborating_agent_ids 包含两个协作坐席 result = data["data"] assert "chain_collab1" in result["collaborating_agent_ids"] assert "chain_collab2" in result["collaborating_agent_ids"] @pytest.mark.asyncio async def test_invite_without_auth_returns_unauthorized( self, client, db_session, mock_redis ): """验证未登录时邀请返回未授权错误。""" owner = create_test_agent(user_id="noauth_owner", name="坐席A", status="online") invitee = create_test_agent(user_id="noauth_invitee", name="坐席B", status="online") db_session.add_all([owner, invitee]) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_noauth", agent_user_id="noauth_owner" ) response = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "noauth_invitee"}, ) data = response.json() assert data["code"] == 1002 # ERR_UNAUTHORIZED # ============================================================================= # 二、退出协作测试 # ============================================================================= class TestLeaveCollaboration: """测试退出协作接口 POST /api/conversations/{id}/leave。""" @pytest.mark.asyncio async def test_leave_success_removes_from_list( self, client, db_session, mock_redis ): """验证成功退出:从 collaborating_agent_ids 移除,WS 广播。""" owner = create_test_agent(user_id="leave_owner", name="坐席A", status="online") collab = create_test_agent(user_id="leave_collab", name="坐席B", status="online") db_session.add_all([owner, collab]) await db_session.flush() # 坐席B 已在协作列表中 conv = await create_serving_conversation( db_session, employee_id="emp_leave", agent_user_id="leave_owner", collab_ids=["leave_collab"], ) headers = await login_agent(client, "leave_collab", "坐席B") with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock) as mock_broadcast: response = await client.post( f"/conversations/{conv.id}/leave", headers=headers, ) assert response.status_code == 200 data = response.json() assert data["code"] == 0 # 验证 collaborating_agent_ids 不再包含坐席B result = data["data"] assert "leave_collab" not in result["collaborating_agent_ids"] # 验证 WS 广播被调用(collaborator_left) mock_broadcast.assert_called_once() broadcast_msg = mock_broadcast.call_args[0][0] assert broadcast_msg["type"] == "collaborator_left" assert broadcast_msg["data"]["agent_id"] == "leave_collab" # 验证数据库持久化 stmt = select(Conversation).where(Conversation.id == conv.id) result_db = await db_session.execute(stmt) db_conv = result_db.scalars().first() assert "leave_collab" not in db_conv.collaborating_agent_ids @pytest.mark.asyncio async def test_leave_owner_error_3025( self, client, db_session, mock_redis ): """验证主责坐席不能退出协作 → 3025。""" owner = create_test_agent(user_id="leave_owner_3025", name="坐席A", status="online") db_session.add(owner) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_leave_owner", agent_user_id="leave_owner_3025", collab_ids=["some_collab"], ) headers = await login_agent(client, "leave_owner_3025", "坐席A") response = await client.post( f"/conversations/{conv.id}/leave", headers=headers, ) data = response.json() assert data["code"] == 3025 assert "主责坐席" in data["message"] @pytest.mark.asyncio async def test_leave_non_collaborator_error_3026( self, client, db_session, mock_redis ): """验证不在协作列表中的坐席不能退出 → 3026。""" owner = create_test_agent(user_id="leave_owner_3026", name="坐席A", status="online") stranger = create_test_agent(user_id="stranger_3026", name="路过的坐席", status="online") db_session.add_all([owner, stranger]) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_leave_stranger", agent_user_id="leave_owner_3026", ) headers = await login_agent(client, "stranger_3026", "路过的坐席") response = await client.post( f"/conversations/{conv.id}/leave", headers=headers, ) data = response.json() assert data["code"] == 3026 assert "协作列表" in data["message"] @pytest.mark.asyncio async def test_leave_without_auth_returns_unauthorized( self, client, db_session, mock_redis ): """验证未登录时退出返回未授权错误。""" owner = create_test_agent(user_id="noauth_leave_owner", name="坐席A", status="online") collab = create_test_agent(user_id="noauth_leave_collab", name="坐席B", status="online") db_session.add_all([owner, collab]) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_noauth_leave", agent_user_id="noauth_leave_owner", collab_ids=["noauth_leave_collab"], ) response = await client.post( f"/conversations/{conv.id}/leave", ) data = response.json() assert data["code"] == 1002 # ERR_UNAUTHORIZED # ============================================================================= # 三、列表集成测试 # ============================================================================= class TestCollaborationListIntegration: """测试会话列表接口的协作字段集成。""" @pytest.mark.asyncio async def test_list_includes_collaboration_fields( self, client, db_session, mock_redis ): """验证列表接口返回 collaborating_agent_ids 和 _names 字段。""" owner = create_test_agent(user_id="list_owner", name="坐席A", status="online") collab1 = create_test_agent(user_id="list_collab1", name="坐席B", status="online") collab2 = create_test_agent(user_id="list_collab2", name="坐席C", status="online") db_session.add_all([owner, collab1, collab2]) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_list_collab", agent_user_id="list_owner", collab_ids=["list_collab1", "list_collab2"], ) # 以坐席A身份查看列表 headers = await login_agent(client, "list_owner", "坐席A") response = await client.get("/conversations", headers=headers) data = response.json() assert data["code"] == 0 items = data["data"]["items"] item_map = {item["id"]: item for item in items} conv_item = item_map[str(conv.id)] # 验证 collaborating_agent_ids assert "list_collab1" in conv_item["collaborating_agent_ids"] assert "list_collab2" in conv_item["collaborating_agent_ids"] assert len(conv_item["collaborating_agent_ids"]) == 2 # 验证 collaborating_agent_names assert conv_item["collaborating_agent_names"]["list_collab1"] == "坐席B" assert conv_item["collaborating_agent_names"]["list_collab2"] == "坐席C" # 验证 is_collaborator(坐席A是主责不是协作坐席) assert conv_item["is_collaborator"] is False @pytest.mark.asyncio async def test_list_is_collaborator_field_correctness( self, client, db_session, mock_redis ): """验证 is_collaborator 字段标注正确。 - 主责坐席 → is_collaborator=False - 协作坐席(且非主责)→ is_collaborator=True - 既非主责也非协作 → is_collaborator=False """ owner = create_test_agent(user_id="iscoll_owner", name="主责坐席", status="online") collab = create_test_agent(user_id="iscoll_collab", name="协作坐席", status="online") stranger = create_test_agent(user_id="iscoll_stranger", name="路人坐席", status="online") db_session.add_all([owner, collab, stranger]) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_iscoll", agent_user_id="iscoll_owner", collab_ids=["iscoll_collab"], ) # 主责坐席查看 → is_collaborator=False headers_owner = await login_agent(client, "iscoll_owner", "主责坐席") resp = await client.get("/conversations", headers=headers_owner) items = resp.json()["data"]["items"] item_map = {item["id"]: item for item in items} assert item_map[str(conv.id)]["is_collaborator"] is False # 协作坐席查看 → is_collaborator=True headers_collab = await login_agent(client, "iscoll_collab", "协作坐席") resp = await client.get("/conversations", headers=headers_collab) items = resp.json()["data"]["items"] item_map = {item["id"]: item for item in items} assert item_map[str(conv.id)]["is_collaborator"] is True # 路人坐席查看 → is_collaborator=False headers_stranger = await login_agent(client, "iscoll_stranger", "路人坐席") resp = await client.get("/conversations", headers=headers_stranger) items = resp.json()["data"]["items"] item_map = {item["id"]: item for item in items} assert item_map[str(conv.id)]["is_collaborator"] is False @pytest.mark.asyncio async def test_list_no_collaborators_returns_empty_arrays( self, client, db_session, mock_redis ): """验证无协作坐席时返回空数组和空对象。""" owner = create_test_agent(user_id="empty_owner", name="坐席A", status="online") db_session.add(owner) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_empty_collab", agent_user_id="empty_owner" ) headers = await login_agent(client, "empty_owner", "坐席A") response = await client.get("/conversations", headers=headers) data = response.json() items = data["data"]["items"] item_map = {item["id"]: item for item in items} conv_item = item_map[str(conv.id)] assert conv_item["collaborating_agent_ids"] == [] assert conv_item["collaborating_agent_names"] == {} assert conv_item["is_collaborator"] is False # ============================================================================= # 四、权限矩阵验证(端到端) # ============================================================================= class TestCollaborationPermissions: """测试协作坐席的权限边界。 协作坐席可以:查看会话、发送回复、摇人(再邀请) 协作坐席不能:结单、转接、置顶/代办 协作坐席不占负载。 """ @pytest.mark.asyncio async def test_collaborator_does_not_count_load( self, client, db_session, mock_redis ): """验证协作坐席加入后负载不变(不占负载)。 主责坐席 current_load 应保持为1,协作坐席 current_load 保持不变。 """ owner = create_test_agent(user_id="load_owner", name="坐席A", status="online") owner.current_load = 1 collab = create_test_agent(user_id="load_collab", name="坐席B", status="online") collab.current_load = 0 db_session.add_all([owner, collab]) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_load", agent_user_id="load_owner" ) headers = await login_agent(client, "load_owner", "坐席A") with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock), \ patch("app.services.ws_manager.manager.send_to_agent", new_callable=AsyncMock): await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "load_collab"}, headers=headers, ) # 验证主责坐席 load 不变(=1) stmt = select(Agent).where(Agent.user_id == "load_owner") result = await db_session.execute(stmt) db_owner = result.scalars().first() assert db_owner.current_load == 1 # 验证协作坐席 load 不变(=0) stmt = select(Agent).where(Agent.user_id == "load_collab") result = await db_session.execute(stmt) db_collab = result.scalars().first() assert db_collab.current_load == 0 # 协作不占负载 @pytest.mark.asyncio async def test_collaborator_cannot_resolve_conversation( self, client, db_session, mock_redis ): """验证协作坐席不能结单。 场景:坐席A(owner)邀请坐席B协作 → 坐席B 尝试结单(应失败) """ owner = create_test_agent(user_id="perm_owner", name="坐席A", status="online") collab = create_test_agent(user_id="perm_collab", name="坐席B", status="online") db_session.add_all([owner, collab]) await db_session.flush() conv = await create_serving_conversation( db_session, employee_id="emp_perm", agent_user_id="perm_owner", collab_ids=["perm_collab"], ) # 协作坐席登录并尝试结单 headers = await login_agent(client, "perm_collab", "坐席B") with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): response = await client.post( f"/conversations/{conv.id}/resolve", headers=headers, ) data = response.json() # 协作坐席不是主责坐席,resolve 应返回 3027(只有主责坐席才能结单) assert data["code"] == 3027, f"协作坐席不应该能结单,期望 code=3027,实际 code={data['code']}" @pytest.mark.asyncio async def test_full_invite_leave_cycle( self, client, db_session, mock_redis ): """端到端测试:邀请→查看列表→退出→验证清理。 完整流程: 1. 坐席A 邀请坐席B 2. 坐席B 查看列表,确认协作会话出现 3. 坐席A 邀请坐席C 4. 坐席A 查看列表,验证两个协作坐席 5. 坐席B 退出协作 6. 验证坐席B 不再出现在协作列表中,坐席C 仍存在 """ # 创建坐席 owner = create_test_agent(user_id="e2e_owner", name="坐席A", status="online") collab_b = create_test_agent(user_id="e2e_b", name="坐席B", status="online") collab_c = create_test_agent(user_id="e2e_c", name="坐席C", status="online") db_session.add_all([owner, collab_b, collab_c]) await db_session.flush() # 创建会话 conv = await create_serving_conversation( db_session, employee_id="emp_e2e", agent_user_id="e2e_owner" ) headers_a = await login_agent(client, "e2e_owner", "坐席A") # Step 1: 坐席A 邀请坐席B with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock), \ patch("app.services.ws_manager.manager.send_to_agent", new_callable=AsyncMock): resp = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "e2e_b"}, headers=headers_a, ) assert resp.json()["code"] == 0 # Step 2: 坐席B 查看列表,确认协作会话出现 headers_b = await login_agent(client, "e2e_b", "坐席B") resp = await client.get("/conversations", headers=headers_b) items = resp.json()["data"]["items"] item_map = {item["id"]: item for item in items} assert str(conv.id) in item_map assert item_map[str(conv.id)]["is_collaborator"] is True # Step 3: 坐席A 邀请坐席C with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock), \ patch("app.services.ws_manager.manager.send_to_agent", new_callable=AsyncMock): resp = await client.post( f"/conversations/{conv.id}/invite", json={"agent_id": "e2e_c"}, headers=headers_a, ) assert resp.json()["code"] == 0 # Step 4: 坐席A 查看列表,验证两个协作坐席 resp = await client.get("/conversations", headers=headers_a) items = resp.json()["data"]["items"] item_map = {item["id"]: item for item in items} conv_item = item_map[str(conv.id)] assert "e2e_b" in conv_item["collaborating_agent_ids"] assert "e2e_c" in conv_item["collaborating_agent_ids"] assert conv_item["collaborating_agent_names"]["e2e_b"] == "坐席B" assert conv_item["collaborating_agent_names"]["e2e_c"] == "坐席C" # Step 5: 坐席B 退出协作 with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): resp = await client.post( f"/conversations/{conv.id}/leave", headers=headers_b, ) assert resp.json()["code"] == 0 # Step 6: 验证坐席B 已移除,坐席C 仍存在 resp = await client.get("/conversations", headers=headers_a) items = resp.json()["data"]["items"] item_map = {item["id"]: item for item in items} conv_item = item_map[str(conv.id)] assert "e2e_b" not in conv_item["collaborating_agent_ids"] assert "e2e_c" in conv_item["collaborating_agent_ids"]