Files

794 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 邀请功能(Participant)单元测试
# =============================================================================
# 测试覆盖:
# 一、邀请参与者(POST /api/conversations/{id}/invite-participant
# 1. 成功邀请:participants 更新,系统消息创建
# 2. 非主责坐席邀请 → 3030
# 3. 非服务中会话邀请 → 3031
# 4. 重复邀请(所有被邀请人已在) → 3032
# 5. 邀请不存在的会话 → 3003
# 6. 未认证邀请 → 401
#
# 二、加入会话(POST /api/conversations/{id}/join
# 1. 成功加入:joined 状态更新,系统消息创建
# 2. 未被邀请者加入 → 3034
# 3. 已结束会话加入 → 3033
# 4. 不存在的会话加入 → 3003
#
# 三、移除参与者(DELETE /api/conversations/{id}/participants/{user_id}
# 1. 成功移除:从 participants 中移除,系统消息创建
# 2. 非主责坐席移除 → 3035
# 3. 移除不在列表中的人员 → 3036
# 4. 未认证移除 → 401
#
# 四、参与者退出(POST /api/conversations/{id}/leave-participant
# 1. 成功退出:从 participants 中移除,系统消息创建
# 2. 非参与者退出 → 3037
# 3. 不存在的会话退出 → 3003
#
# 五、端到端闭环
# 1. 邀请 → 加入 → 退出(完整生命周期)
# 2. 邀请 → 加入 → 坐席移除(管理员操作)
# =============================================================================
import uuid
from datetime import datetime
from unittest.mock import AsyncMock, patch
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.agent import Agent
from app.models.conversation import Conversation
from tests.conftest import create_test_conversation, create_test_agent, MockRedis
# =============================================================================
# 辅助函数
# =============================================================================
async def login_agent(client, user_id: str, name: str) -> dict:
"""登录坐席并返回认证头字典。
做什么:调用登录 API 获取 token,组装 Authorization 头
为什么:invite-participant 和 remove-participant 端点需要坐席认证
Args:
client: httpx 异步测试客户端
user_id: 坐席ID
name: 坐席名称
Returns:
dict: {"Authorization": "Bearer xxx"}
"""
response = await client.post(
"/agents/login",
json={"user_id": user_id, "name": name},
)
data = response.json()
token = data["data"]["token"]
return {"Authorization": f"Bearer {token}"}
async def create_serving_conversation_with_participants(
db_session: AsyncSession,
employee_id: str = "emp_001",
agent_user_id: str = "agent_owner",
participants: list = None,
) -> Conversation:
"""创建一个 serving 状态且有主责坐席的会话(可选已有参与者)。
做什么:创建测试会话,设置 assigned_agent_id 和 participants
为什么:邀请功能测试需要 serving 状态的会话作为前提
Args:
db_session: 数据库会话
employee_id: 员工ID
agent_user_id: 主责坐席ID
participants: 已有参与者列表
Returns:
Conversation: 创建的会话对象
"""
conv = create_test_conversation(
employee_id=employee_id,
status="serving",
)
conv.assigned_agent_id = agent_user_id
conv.participants = participants or []
db_session.add(conv)
await db_session.flush()
return conv
# =============================================================================
# 一、邀请参与者测试
# =============================================================================
class TestInviteParticipant:
"""测试邀请参与者接口 POST /api/conversations/{id}/invite-participant。"""
@pytest.mark.asyncio
async def test_invite_success_updates_participants(
self, client, db_session, mock_redis
):
"""验证成功邀请:participants 列表更新,返回包含新参与者。
场景:主责坐席邀请2名员工加入会话。
"""
# 创建坐席
owner = create_test_agent(user_id="owner_001", name="坐席A", status="online")
db_session.add(owner)
await db_session.flush()
# 创建 serving 会话,分配给 owner
conv = await create_serving_conversation_with_participants(
db_session, employee_id="emp_invite", agent_user_id="owner_001"
)
# 坐席A 登录并发起邀请
headers = await login_agent(client, "owner_001", "坐席A")
# Mock WebSocket 广播(避免真实 WS 连接)
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_zhang", "name": "张三", "department": "技术部", "type": "employee"},
{"id": "emp_li", "name": "李四", "department": "财务部", "type": "employee"},
],
"history_mode": "recent10",
},
headers=headers,
)
# 验证响应
assert response.status_code == 200
data = response.json()
assert data["code"] == 0
participants = data["data"]["participants"]
# 验证 participants 包含新添加的两人
participant_ids = [p["id"] for p in participants]
assert "emp_zhang" in participant_ids
assert "emp_li" in participant_ids
@pytest.mark.asyncio
async def test_invite_non_owner_agent_rejected(
self, client, db_session, mock_redis
):
"""验证非主责坐席无法邀请 → 错误码 3030。
场景:协作坐席(非主责)尝试邀请他人。
"""
# 创建两个坐席
owner = create_test_agent(user_id="owner_002", name="主责坐席", status="online")
other = create_test_agent(user_id="other_002", name="其他坐席", status="online")
db_session.add_all([owner, other])
await db_session.flush()
# 创建会话,主责是 owner
conv = await create_serving_conversation_with_participants(
db_session, employee_id="emp_002", agent_user_id="owner_002"
)
# 其他坐席登录并尝试邀请
headers = await login_agent(client, "other_002", "其他坐席")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_wang", "name": "王五", "type": "employee"},
],
},
headers=headers,
)
# 验证:返回错误码 3030(后端所有 AppException 返回 HTTP 200 + 业务错误码)
data = response.json()
assert data["code"] == 3030
@pytest.mark.asyncio
async def test_invite_non_serving_conversation_rejected(
self, client, db_session, mock_redis
):
"""验证非服务中会话无法邀请 → 错误码 3031。
场景:对已结单(closed)的会话尝试邀请。
"""
owner = create_test_agent(user_id="owner_003", name="坐席C", status="online")
db_session.add(owner)
await db_session.flush()
# 创建 closed 状态的会话
conv = create_test_conversation(
employee_id="emp_003",
status="closed",
)
conv.assigned_agent_id = "owner_003"
conv.participants = []
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "owner_003", "坐席C")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_zhao", "name": "赵六", "type": "employee"},
],
},
headers=headers,
)
data = response.json()
assert data["code"] == 3031
@pytest.mark.asyncio
async def test_invite_duplicate_participants_rejected(
self, client, db_session, mock_redis
):
"""验证重复邀请同一批人 → 错误码 3032。
场景:参与者已在列表中,再次邀请相同的人。
"""
owner = create_test_agent(user_id="owner_004", name="坐席D", status="online")
db_session.add(owner)
await db_session.flush()
# 创建已有参与者的会话
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_004",
agent_user_id="owner_004",
participants=[
{"id": "emp_dup", "name": "重复人", "department": "技术部", "type": "employee"},
],
)
headers = await login_agent(client, "owner_004", "坐席D")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_dup", "name": "重复人", "type": "employee"},
],
},
headers=headers,
)
data = response.json()
assert data["code"] == 3032
@pytest.mark.asyncio
async def test_invite_nonexistent_conversation(
self, client, db_session, mock_redis
):
"""验证邀请不存在的会话 → 错误码 3003。"""
owner = create_test_agent(user_id="owner_005", name="坐席E", status="online")
db_session.add(owner)
await db_session.flush()
headers = await login_agent(client, "owner_005", "坐席E")
fake_id = str(uuid.uuid4())
response = await client.post(
f"/conversations/{fake_id}/invite-participant",
json={
"participants": [
{"id": "emp_x", "name": "某人", "type": "employee"},
],
},
headers=headers,
)
data = response.json()
assert data["code"] == 3003
@pytest.mark.asyncio
async def test_invite_without_auth_rejected(
self, client, db_session, mock_redis
):
"""验证未认证邀请 → 错误码 1002。"""
conv = await create_serving_conversation_with_participants(
db_session, employee_id="emp_noauth", agent_user_id="owner_noauth"
)
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_y", "name": "某人", "type": "employee"},
],
},
)
data = response.json()
assert data["code"] == 1002
# =============================================================================
# 二、加入会话测试
# =============================================================================
class TestJoinConversation:
"""测试加入会话接口 POST /api/conversations/{id}/join。"""
@pytest.mark.asyncio
async def test_join_success_updates_joined_status(
self, client, db_session, mock_redis
):
"""验证成功加入:joined 状态更新为 True。
场景:被邀请员工通过链接加入会话。
"""
# 创建会话,已有被邀请但未加入的参与者
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_join_001",
agent_user_id="agent_join",
participants=[
{"id": "emp_zhang", "name": "张三", "department": "技术部", "type": "employee", "joined": False},
],
)
# Mock WebSocket 广播
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/join",
json={"employee_id": "emp_zhang"},
)
# 验证响应
assert response.status_code == 200
data = response.json()
assert data["code"] == 0
# 验证 joined 状态
participants = data["data"]["participants"]
zhang = next(p for p in participants if p["id"] == "emp_zhang")
assert zhang["joined"] is True
assert "joined_at" in zhang
@pytest.mark.asyncio
async def test_join_not_invited_rejected(
self, client, db_session, mock_redis
):
"""验证未被邀请者无法加入 → 错误码 3034。
场景:未被邀请的员工尝试加入会话。
"""
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_join_002",
agent_user_id="agent_join_002",
)
response = await client.post(
f"/conversations/{conv.id}/join",
json={"employee_id": "emp_hacker"},
)
data = response.json()
assert data["code"] == 3034
@pytest.mark.asyncio
async def test_join_closed_conversation_rejected(
self, client, db_session, mock_redis
):
"""验证已结单会话无法加入 → 错误码 3033。
场景:被邀请人尝试加入已结束的会话。
"""
conv = create_test_conversation(
employee_id="emp_join_003",
status="closed",
)
conv.assigned_agent_id = "agent_join_003"
conv.participants = [
{"id": "emp_late", "name": "迟到者", "type": "employee", "joined": False},
]
db_session.add(conv)
await db_session.flush()
response = await client.post(
f"/conversations/{conv.id}/join",
json={"employee_id": "emp_late"},
)
data = response.json()
assert data["code"] == 3033
@pytest.mark.asyncio
async def test_join_nonexistent_conversation(
self, client, db_session, mock_redis
):
"""验证加入不存在的会话 → 错误码 3003。"""
fake_id = str(uuid.uuid4())
response = await client.post(
f"/conversations/{fake_id}/join",
json={"employee_id": "emp_ghost"},
)
data = response.json()
assert data["code"] == 3003
# =============================================================================
# 三、移除参与者测试
# =============================================================================
class TestRemoveParticipant:
"""测试移除参与者接口 DELETE /api/conversations/{id}/participants/{user_id}"""
@pytest.mark.asyncio
async def test_remove_success(
self, client, db_session, mock_redis
):
"""验证成功移除:从 participants 列表中移除目标。
场景:主责坐席移除一名参与者。
"""
owner = create_test_agent(user_id="owner_rm", name="坐席RM", status="online")
db_session.add(owner)
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_rm_001",
agent_user_id="owner_rm",
participants=[
{"id": "emp_target", "name": "被移除人", "type": "employee", "joined": True},
{"id": "emp_keep", "name": "保留人", "type": "employee", "joined": True},
],
)
headers = await login_agent(client, "owner_rm", "坐席RM")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.delete(
f"/conversations/{conv.id}/participants/emp_target",
headers=headers,
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 0
# 验证被移除人不在列表中
participants = data["data"]["participants"]
participant_ids = [p["id"] for p in participants]
assert "emp_target" not in participant_ids
assert "emp_keep" in participant_ids
@pytest.mark.asyncio
async def test_remove_non_owner_rejected(
self, client, db_session, mock_redis
):
"""验证非主责坐席无法移除 → 错误码 3035。"""
owner = create_test_agent(user_id="owner_rm2", name="主责", status="online")
other = create_test_agent(user_id="other_rm2", name="其他坐席", status="online")
db_session.add_all([owner, other])
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_rm_002",
agent_user_id="owner_rm2",
participants=[
{"id": "emp_victim", "name": "被移除人", "type": "employee"},
],
)
headers = await login_agent(client, "other_rm2", "其他坐席")
response = await client.delete(
f"/conversations/{conv.id}/participants/emp_victim",
headers=headers,
)
data = response.json()
assert data["code"] == 3035
@pytest.mark.asyncio
async def test_remove_nonexistent_participant(
self, client, db_session, mock_redis
):
"""验证移除不在列表中的人员 → 错误码 3036。"""
owner = create_test_agent(user_id="owner_rm3", name="坐席", status="online")
db_session.add(owner)
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_rm_003",
agent_user_id="owner_rm3",
)
headers = await login_agent(client, "owner_rm3", "坐席")
response = await client.delete(
f"/conversations/{conv.id}/participants/emp_ghost",
headers=headers,
)
data = response.json()
assert data["code"] == 3036
@pytest.mark.asyncio
async def test_remove_without_auth_rejected(
self, client, db_session, mock_redis
):
"""验证未认证移除 → 错误码 1002。"""
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_rm_noauth",
agent_user_id="owner_noauth",
participants=[
{"id": "emp_target", "name": "被移除人", "type": "employee"},
],
)
response = await client.delete(
f"/conversations/{conv.id}/participants/emp_target",
)
data = response.json()
assert data["code"] == 1002
# =============================================================================
# 四、参与者退出测试
# =============================================================================
class TestLeaveAsParticipant:
"""测试参与者退出接口 POST /api/conversations/{id}/leave-participant。"""
@pytest.mark.asyncio
async def test_leave_success(
self, client, db_session, mock_redis
):
"""验证成功退出:从 participants 列表中移除自己。
场景:被邀请人主动退出会话。
"""
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_leave_001",
agent_user_id="agent_leave",
participants=[
{"id": "emp_leaver", "name": "退出者", "type": "employee", "joined": True},
{"id": "emp_stayer", "name": "留守者", "type": "employee", "joined": True},
],
)
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/leave-participant",
json={"employee_id": "emp_leaver"},
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 0
# 验证退出者不在列表中
participants = data["data"]["participants"]
participant_ids = [p["id"] for p in participants]
assert "emp_leaver" not in participant_ids
assert "emp_stayer" in participant_ids
@pytest.mark.asyncio
async def test_leave_not_participant_rejected(
self, client, db_session, mock_redis
):
"""验证非参与者退出 → 错误码 3037。
场景:未被邀请的人尝试退出会话。
"""
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_leave_002",
agent_user_id="agent_leave_002",
)
response = await client.post(
f"/conversations/{conv.id}/leave-participant",
json={"employee_id": "emp_stranger"},
)
data = response.json()
assert data["code"] == 3037
@pytest.mark.asyncio
async def test_leave_nonexistent_conversation(
self, client, db_session, mock_redis
):
"""验证退出不存在的会话 → 错误码 3003。"""
fake_id = str(uuid.uuid4())
response = await client.post(
f"/conversations/{fake_id}/leave-participant",
json={"employee_id": "emp_ghost"},
)
data = response.json()
assert data["code"] == 3003
# =============================================================================
# 五、端到端闭环测试
# =============================================================================
class TestInviteEndToEnd:
"""邀请功能端到端闭环测试。"""
@pytest.mark.asyncio
async def test_full_lifecycle_invite_join_leave(
self, client, db_session, mock_redis
):
"""验证完整生命周期:邀请 → 加入 → 退出。
场景:
1. 坐席邀请张三
2. 张三加入
3. 张三退出
"""
owner = create_test_agent(user_id="owner_e2e", name="坐席E2E", status="online")
db_session.add(owner)
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_e2e_001",
agent_user_id="owner_e2e",
)
headers = await login_agent(client, "owner_e2e", "坐席E2E")
# Step 1: 邀请
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
invite_resp = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_e2e_zhang", "name": "张三", "department": "技术部", "type": "employee"},
],
},
headers=headers,
)
assert invite_resp.status_code == 200
invite_data = invite_resp.json()
participants_after_invite = invite_data["data"]["participants"]
assert any(p["id"] == "emp_e2e_zhang" for p in participants_after_invite)
# Step 2: 加入
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
join_resp = await client.post(
f"/conversations/{conv.id}/join",
json={"employee_id": "emp_e2e_zhang"},
)
assert join_resp.status_code == 200
join_data = join_resp.json()
participants_after_join = join_data["data"]["participants"]
zhang = next(p for p in participants_after_join if p["id"] == "emp_e2e_zhang")
assert zhang["joined"] is True
# Step 3: 退出
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
leave_resp = await client.post(
f"/conversations/{conv.id}/leave-participant",
json={"employee_id": "emp_e2e_zhang"},
)
assert leave_resp.status_code == 200
leave_data = leave_resp.json()
participants_after_leave = leave_data["data"]["participants"]
assert not any(p["id"] == "emp_e2e_zhang" for p in participants_after_leave)
@pytest.mark.asyncio
async def test_full_lifecycle_invite_join_remove(
self, client, db_session, mock_redis
):
"""验证完整生命周期:邀请 → 加入 → 坐席移除。
场景:
1. 坐席邀请李四
2. 李四加入
3. 坐席移除李四
"""
owner = create_test_agent(user_id="owner_e2e2", name="坐席E2E2", status="online")
db_session.add(owner)
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_e2e_002",
agent_user_id="owner_e2e2",
)
headers = await login_agent(client, "owner_e2e2", "坐席E2E2")
# Step 1: 邀请
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
invite_resp = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_e2e_li", "name": "李四", "department": "财务部", "type": "employee"},
],
},
headers=headers,
)
assert invite_resp.status_code == 200
# Step 2: 加入
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
join_resp = await client.post(
f"/conversations/{conv.id}/join",
json={"employee_id": "emp_e2e_li"},
)
assert join_resp.status_code == 200
# Step 3: 坐席移除
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
remove_resp = await client.delete(
f"/conversations/{conv.id}/participants/emp_e2e_li",
headers=headers,
)
assert remove_resp.status_code == 200
remove_data = remove_resp.json()
participants_after_remove = remove_data["data"]["participants"]
assert not any(p["id"] == "emp_e2e_li" for p in participants_after_remove)
@pytest.mark.asyncio
async def test_invite_partial_duplicate_merges(
self, client, db_session, mock_redis
):
"""验证邀请部分新人 + 部分已在人员:只添加新人,忽略已有人。
场景:会话已有张三,再邀请张三和王五,只有王五被添加。
"""
owner = create_test_agent(user_id="owner_merge", name="坐席合并", status="online")
db_session.add(owner)
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_merge",
agent_user_id="owner_merge",
participants=[
{"id": "emp_existing", "name": "已有张三", "department": "技术部", "type": "employee"},
],
)
headers = await login_agent(client, "owner_merge", "坐席合并")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_existing", "name": "已有张三", "type": "employee"},
{"id": "emp_new", "name": "新人王五", "department": "市场部", "type": "employee"},
],
},
headers=headers,
)
assert response.status_code == 200
data = response.json()
participants = data["data"]["participants"]
participant_ids = [p["id"] for p in participants]
assert "emp_existing" in participant_ids # 已有的仍在
assert "emp_new" in participant_ids # 新人被添加