# ============================================================================= # 企微IT智能服务台 — 邀请功能(Participant)单元测试 # ============================================================================= # 测试覆盖: # 一、邀请参与者(POST /api/conversations/{id}/invite-participant) # 1. 成功邀请:participants 更新,系统消息创建 # 2. 非主责坐席邀请 → 3030 # 3. 非服务中会话邀请 → 3031 # 4. 重复邀请(所有被邀请人已在) → 3032 # 5. 邀请不存在的会话 → 3003 # 6. 未认证邀请 → 401 # # 二、加入会话(POST /api/conversations/{id}/join) # 1. 成功加入:joined 状态更新,系统消息创建 # 2. 未被邀请者加入 → 3034 # 3. 已结束会话加入 → 3033 # 4. 不存在的会话加入 → 3003 # # 三、移除参与者(DELETE /api/conversations/{id}/participants/{user_id}) # 1. 成功移除:从 participants 中移除,系统消息创建 # 2. 非主责坐席移除 → 3035 # 3. 移除不在列表中的人员 → 3036 # 4. 未认证移除 → 401 # # 四、参与者退出(POST /api/conversations/{id}/leave-participant) # 1. 成功退出:从 participants 中移除,系统消息创建 # 2. 非参与者退出 → 3037 # 3. 不存在的会话退出 → 3003 # # 五、端到端闭环 # 1. 邀请 → 加入 → 退出(完整生命周期) # 2. 邀请 → 加入 → 坐席移除(管理员操作) # ============================================================================= import uuid from datetime import datetime from unittest.mock import AsyncMock, patch import pytest import pytest_asyncio from sqlalchemy.ext.asyncio import AsyncSession from app.models.agent import Agent from app.models.conversation import Conversation from tests.conftest import create_test_conversation, create_test_agent, MockRedis # ============================================================================= # 辅助函数 # ============================================================================= async def login_agent(client, user_id: str, name: str) -> dict: """登录坐席并返回认证头字典。 做什么:调用登录 API 获取 token,组装 Authorization 头 为什么:invite-participant 和 remove-participant 端点需要坐席认证 Args: client: httpx 异步测试客户端 user_id: 坐席ID name: 坐席名称 Returns: dict: {"Authorization": "Bearer xxx"} """ response = await client.post( "/agents/login", json={"user_id": user_id, "name": name}, ) data = response.json() token = data["data"]["token"] return {"Authorization": f"Bearer {token}"} async def create_serving_conversation_with_participants( db_session: AsyncSession, employee_id: str = "emp_001", agent_user_id: str = "agent_owner", participants: list = None, ) -> Conversation: """创建一个 serving 状态且有主责坐席的会话(可选已有参与者)。 做什么:创建测试会话,设置 assigned_agent_id 和 participants 为什么:邀请功能测试需要 serving 状态的会话作为前提 Args: db_session: 数据库会话 employee_id: 员工ID agent_user_id: 主责坐席ID participants: 已有参与者列表 Returns: Conversation: 创建的会话对象 """ conv = create_test_conversation( employee_id=employee_id, status="serving", ) conv.assigned_agent_id = agent_user_id conv.participants = participants or [] db_session.add(conv) await db_session.flush() return conv # ============================================================================= # 一、邀请参与者测试 # ============================================================================= class TestInviteParticipant: """测试邀请参与者接口 POST /api/conversations/{id}/invite-participant。""" @pytest.mark.asyncio async def test_invite_success_updates_participants( self, client, db_session, mock_redis ): """验证成功邀请:participants 列表更新,返回包含新参与者。 场景:主责坐席邀请2名员工加入会话。 """ # 创建坐席 owner = create_test_agent(user_id="owner_001", name="坐席A", status="online") db_session.add(owner) await db_session.flush() # 创建 serving 会话,分配给 owner conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_invite", agent_user_id="owner_001" ) # 坐席A 登录并发起邀请 headers = await login_agent(client, "owner_001", "坐席A") # Mock WebSocket 广播(避免真实 WS 连接) with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): response = await client.post( f"/conversations/{conv.id}/invite-participant", json={ "participants": [ {"id": "emp_zhang", "name": "张三", "department": "技术部", "type": "employee"}, {"id": "emp_li", "name": "李四", "department": "财务部", "type": "employee"}, ], "history_mode": "recent10", }, headers=headers, ) # 验证响应 assert response.status_code == 200 data = response.json() assert data["code"] == 0 participants = data["data"]["participants"] # 验证 participants 包含新添加的两人 participant_ids = [p["id"] for p in participants] assert "emp_zhang" in participant_ids assert "emp_li" in participant_ids @pytest.mark.asyncio async def test_invite_non_owner_agent_rejected( self, client, db_session, mock_redis ): """验证非主责坐席无法邀请 → 错误码 3030。 场景:协作坐席(非主责)尝试邀请他人。 """ # 创建两个坐席 owner = create_test_agent(user_id="owner_002", name="主责坐席", status="online") other = create_test_agent(user_id="other_002", name="其他坐席", status="online") db_session.add_all([owner, other]) await db_session.flush() # 创建会话,主责是 owner conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_002", agent_user_id="owner_002" ) # 其他坐席登录并尝试邀请 headers = await login_agent(client, "other_002", "其他坐席") with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): response = await client.post( f"/conversations/{conv.id}/invite-participant", json={ "participants": [ {"id": "emp_wang", "name": "王五", "type": "employee"}, ], }, headers=headers, ) # 验证:返回错误码 3030(后端所有 AppException 返回 HTTP 200 + 业务错误码) data = response.json() assert data["code"] == 3030 @pytest.mark.asyncio async def test_invite_non_serving_conversation_rejected( self, client, db_session, mock_redis ): """验证非服务中会话无法邀请 → 错误码 3031。 场景:对已结单(closed)的会话尝试邀请。 """ owner = create_test_agent(user_id="owner_003", name="坐席C", status="online") db_session.add(owner) await db_session.flush() # 创建 closed 状态的会话 conv = create_test_conversation( employee_id="emp_003", status="closed", ) conv.assigned_agent_id = "owner_003" conv.participants = [] db_session.add(conv) await db_session.flush() headers = await login_agent(client, "owner_003", "坐席C") with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): response = await client.post( f"/conversations/{conv.id}/invite-participant", json={ "participants": [ {"id": "emp_zhao", "name": "赵六", "type": "employee"}, ], }, headers=headers, ) data = response.json() assert data["code"] == 3031 @pytest.mark.asyncio async def test_invite_duplicate_participants_rejected( self, client, db_session, mock_redis ): """验证重复邀请同一批人 → 错误码 3032。 场景:参与者已在列表中,再次邀请相同的人。 """ owner = create_test_agent(user_id="owner_004", name="坐席D", status="online") db_session.add(owner) await db_session.flush() # 创建已有参与者的会话 conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_004", agent_user_id="owner_004", participants=[ {"id": "emp_dup", "name": "重复人", "department": "技术部", "type": "employee"}, ], ) headers = await login_agent(client, "owner_004", "坐席D") with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): response = await client.post( f"/conversations/{conv.id}/invite-participant", json={ "participants": [ {"id": "emp_dup", "name": "重复人", "type": "employee"}, ], }, headers=headers, ) data = response.json() assert data["code"] == 3032 @pytest.mark.asyncio async def test_invite_nonexistent_conversation( self, client, db_session, mock_redis ): """验证邀请不存在的会话 → 错误码 3003。""" owner = create_test_agent(user_id="owner_005", name="坐席E", status="online") db_session.add(owner) await db_session.flush() headers = await login_agent(client, "owner_005", "坐席E") fake_id = str(uuid.uuid4()) response = await client.post( f"/conversations/{fake_id}/invite-participant", json={ "participants": [ {"id": "emp_x", "name": "某人", "type": "employee"}, ], }, headers=headers, ) data = response.json() assert data["code"] == 3003 @pytest.mark.asyncio async def test_invite_without_auth_rejected( self, client, db_session, mock_redis ): """验证未认证邀请 → 错误码 1002。""" conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_noauth", agent_user_id="owner_noauth" ) response = await client.post( f"/conversations/{conv.id}/invite-participant", json={ "participants": [ {"id": "emp_y", "name": "某人", "type": "employee"}, ], }, ) data = response.json() assert data["code"] == 1002 # ============================================================================= # 二、加入会话测试 # ============================================================================= class TestJoinConversation: """测试加入会话接口 POST /api/conversations/{id}/join。""" @pytest.mark.asyncio async def test_join_success_updates_joined_status( self, client, db_session, mock_redis ): """验证成功加入:joined 状态更新为 True。 场景:被邀请员工通过链接加入会话。 """ # 创建会话,已有被邀请但未加入的参与者 conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_join_001", agent_user_id="agent_join", participants=[ {"id": "emp_zhang", "name": "张三", "department": "技术部", "type": "employee", "joined": False}, ], ) # Mock WebSocket 广播 with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): response = await client.post( f"/conversations/{conv.id}/join", json={"employee_id": "emp_zhang"}, ) # 验证响应 assert response.status_code == 200 data = response.json() assert data["code"] == 0 # 验证 joined 状态 participants = data["data"]["participants"] zhang = next(p for p in participants if p["id"] == "emp_zhang") assert zhang["joined"] is True assert "joined_at" in zhang @pytest.mark.asyncio async def test_join_not_invited_rejected( self, client, db_session, mock_redis ): """验证未被邀请者无法加入 → 错误码 3034。 场景:未被邀请的员工尝试加入会话。 """ conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_join_002", agent_user_id="agent_join_002", ) response = await client.post( f"/conversations/{conv.id}/join", json={"employee_id": "emp_hacker"}, ) data = response.json() assert data["code"] == 3034 @pytest.mark.asyncio async def test_join_closed_conversation_rejected( self, client, db_session, mock_redis ): """验证已结单会话无法加入 → 错误码 3033。 场景:被邀请人尝试加入已结束的会话。 """ conv = create_test_conversation( employee_id="emp_join_003", status="closed", ) conv.assigned_agent_id = "agent_join_003" conv.participants = [ {"id": "emp_late", "name": "迟到者", "type": "employee", "joined": False}, ] db_session.add(conv) await db_session.flush() response = await client.post( f"/conversations/{conv.id}/join", json={"employee_id": "emp_late"}, ) data = response.json() assert data["code"] == 3033 @pytest.mark.asyncio async def test_join_nonexistent_conversation( self, client, db_session, mock_redis ): """验证加入不存在的会话 → 错误码 3003。""" fake_id = str(uuid.uuid4()) response = await client.post( f"/conversations/{fake_id}/join", json={"employee_id": "emp_ghost"}, ) data = response.json() assert data["code"] == 3003 # ============================================================================= # 三、移除参与者测试 # ============================================================================= class TestRemoveParticipant: """测试移除参与者接口 DELETE /api/conversations/{id}/participants/{user_id}。""" @pytest.mark.asyncio async def test_remove_success( self, client, db_session, mock_redis ): """验证成功移除:从 participants 列表中移除目标。 场景:主责坐席移除一名参与者。 """ owner = create_test_agent(user_id="owner_rm", name="坐席RM", status="online") db_session.add(owner) await db_session.flush() conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_rm_001", agent_user_id="owner_rm", participants=[ {"id": "emp_target", "name": "被移除人", "type": "employee", "joined": True}, {"id": "emp_keep", "name": "保留人", "type": "employee", "joined": True}, ], ) headers = await login_agent(client, "owner_rm", "坐席RM") with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): response = await client.delete( f"/conversations/{conv.id}/participants/emp_target", headers=headers, ) assert response.status_code == 200 data = response.json() assert data["code"] == 0 # 验证被移除人不在列表中 participants = data["data"]["participants"] participant_ids = [p["id"] for p in participants] assert "emp_target" not in participant_ids assert "emp_keep" in participant_ids @pytest.mark.asyncio async def test_remove_non_owner_rejected( self, client, db_session, mock_redis ): """验证非主责坐席无法移除 → 错误码 3035。""" owner = create_test_agent(user_id="owner_rm2", name="主责", status="online") other = create_test_agent(user_id="other_rm2", name="其他坐席", status="online") db_session.add_all([owner, other]) await db_session.flush() conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_rm_002", agent_user_id="owner_rm2", participants=[ {"id": "emp_victim", "name": "被移除人", "type": "employee"}, ], ) headers = await login_agent(client, "other_rm2", "其他坐席") response = await client.delete( f"/conversations/{conv.id}/participants/emp_victim", headers=headers, ) data = response.json() assert data["code"] == 3035 @pytest.mark.asyncio async def test_remove_nonexistent_participant( self, client, db_session, mock_redis ): """验证移除不在列表中的人员 → 错误码 3036。""" owner = create_test_agent(user_id="owner_rm3", name="坐席", status="online") db_session.add(owner) await db_session.flush() conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_rm_003", agent_user_id="owner_rm3", ) headers = await login_agent(client, "owner_rm3", "坐席") response = await client.delete( f"/conversations/{conv.id}/participants/emp_ghost", headers=headers, ) data = response.json() assert data["code"] == 3036 @pytest.mark.asyncio async def test_remove_without_auth_rejected( self, client, db_session, mock_redis ): """验证未认证移除 → 错误码 1002。""" conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_rm_noauth", agent_user_id="owner_noauth", participants=[ {"id": "emp_target", "name": "被移除人", "type": "employee"}, ], ) response = await client.delete( f"/conversations/{conv.id}/participants/emp_target", ) data = response.json() assert data["code"] == 1002 # ============================================================================= # 四、参与者退出测试 # ============================================================================= class TestLeaveAsParticipant: """测试参与者退出接口 POST /api/conversations/{id}/leave-participant。""" @pytest.mark.asyncio async def test_leave_success( self, client, db_session, mock_redis ): """验证成功退出:从 participants 列表中移除自己。 场景:被邀请人主动退出会话。 """ conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_leave_001", agent_user_id="agent_leave", participants=[ {"id": "emp_leaver", "name": "退出者", "type": "employee", "joined": True}, {"id": "emp_stayer", "name": "留守者", "type": "employee", "joined": True}, ], ) with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): response = await client.post( f"/conversations/{conv.id}/leave-participant", json={"employee_id": "emp_leaver"}, ) assert response.status_code == 200 data = response.json() assert data["code"] == 0 # 验证退出者不在列表中 participants = data["data"]["participants"] participant_ids = [p["id"] for p in participants] assert "emp_leaver" not in participant_ids assert "emp_stayer" in participant_ids @pytest.mark.asyncio async def test_leave_not_participant_rejected( self, client, db_session, mock_redis ): """验证非参与者退出 → 错误码 3037。 场景:未被邀请的人尝试退出会话。 """ conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_leave_002", agent_user_id="agent_leave_002", ) response = await client.post( f"/conversations/{conv.id}/leave-participant", json={"employee_id": "emp_stranger"}, ) data = response.json() assert data["code"] == 3037 @pytest.mark.asyncio async def test_leave_nonexistent_conversation( self, client, db_session, mock_redis ): """验证退出不存在的会话 → 错误码 3003。""" fake_id = str(uuid.uuid4()) response = await client.post( f"/conversations/{fake_id}/leave-participant", json={"employee_id": "emp_ghost"}, ) data = response.json() assert data["code"] == 3003 # ============================================================================= # 五、端到端闭环测试 # ============================================================================= class TestInviteEndToEnd: """邀请功能端到端闭环测试。""" @pytest.mark.asyncio async def test_full_lifecycle_invite_join_leave( self, client, db_session, mock_redis ): """验证完整生命周期:邀请 → 加入 → 退出。 场景: 1. 坐席邀请张三 2. 张三加入 3. 张三退出 """ owner = create_test_agent(user_id="owner_e2e", name="坐席E2E", status="online") db_session.add(owner) await db_session.flush() conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_e2e_001", agent_user_id="owner_e2e", ) headers = await login_agent(client, "owner_e2e", "坐席E2E") # Step 1: 邀请 with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): invite_resp = await client.post( f"/conversations/{conv.id}/invite-participant", json={ "participants": [ {"id": "emp_e2e_zhang", "name": "张三", "department": "技术部", "type": "employee"}, ], }, headers=headers, ) assert invite_resp.status_code == 200 invite_data = invite_resp.json() participants_after_invite = invite_data["data"]["participants"] assert any(p["id"] == "emp_e2e_zhang" for p in participants_after_invite) # Step 2: 加入 with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): join_resp = await client.post( f"/conversations/{conv.id}/join", json={"employee_id": "emp_e2e_zhang"}, ) assert join_resp.status_code == 200 join_data = join_resp.json() participants_after_join = join_data["data"]["participants"] zhang = next(p for p in participants_after_join if p["id"] == "emp_e2e_zhang") assert zhang["joined"] is True # Step 3: 退出 with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): leave_resp = await client.post( f"/conversations/{conv.id}/leave-participant", json={"employee_id": "emp_e2e_zhang"}, ) assert leave_resp.status_code == 200 leave_data = leave_resp.json() participants_after_leave = leave_data["data"]["participants"] assert not any(p["id"] == "emp_e2e_zhang" for p in participants_after_leave) @pytest.mark.asyncio async def test_full_lifecycle_invite_join_remove( self, client, db_session, mock_redis ): """验证完整生命周期:邀请 → 加入 → 坐席移除。 场景: 1. 坐席邀请李四 2. 李四加入 3. 坐席移除李四 """ owner = create_test_agent(user_id="owner_e2e2", name="坐席E2E2", status="online") db_session.add(owner) await db_session.flush() conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_e2e_002", agent_user_id="owner_e2e2", ) headers = await login_agent(client, "owner_e2e2", "坐席E2E2") # Step 1: 邀请 with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): invite_resp = await client.post( f"/conversations/{conv.id}/invite-participant", json={ "participants": [ {"id": "emp_e2e_li", "name": "李四", "department": "财务部", "type": "employee"}, ], }, headers=headers, ) assert invite_resp.status_code == 200 # Step 2: 加入 with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): join_resp = await client.post( f"/conversations/{conv.id}/join", json={"employee_id": "emp_e2e_li"}, ) assert join_resp.status_code == 200 # Step 3: 坐席移除 with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): remove_resp = await client.delete( f"/conversations/{conv.id}/participants/emp_e2e_li", headers=headers, ) assert remove_resp.status_code == 200 remove_data = remove_resp.json() participants_after_remove = remove_data["data"]["participants"] assert not any(p["id"] == "emp_e2e_li" for p in participants_after_remove) @pytest.mark.asyncio async def test_invite_partial_duplicate_merges( self, client, db_session, mock_redis ): """验证邀请部分新人 + 部分已在人员:只添加新人,忽略已有人。 场景:会话已有张三,再邀请张三和王五,只有王五被添加。 """ owner = create_test_agent(user_id="owner_merge", name="坐席合并", status="online") db_session.add(owner) await db_session.flush() conv = await create_serving_conversation_with_participants( db_session, employee_id="emp_merge", agent_user_id="owner_merge", participants=[ {"id": "emp_existing", "name": "已有张三", "department": "技术部", "type": "employee"}, ], ) headers = await login_agent(client, "owner_merge", "坐席合并") with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock): response = await client.post( f"/conversations/{conv.id}/invite-participant", json={ "participants": [ {"id": "emp_existing", "name": "已有张三", "type": "employee"}, {"id": "emp_new", "name": "新人王五", "department": "市场部", "type": "employee"}, ], }, headers=headers, ) assert response.status_code == 200 data = response.json() participants = data["data"]["participants"] participant_ids = [p["id"] for p in participants] assert "emp_existing" in participant_ids # 已有的仍在 assert "emp_new" in participant_ids # 新人被添加