Files

794 lines
28 KiB
Python
Raw Permalink Normal View History

# =============================================================================
# 企微IT智能服务台 — 邀请功能(Participant)单元测试
# =============================================================================
# 测试覆盖:
# 一、邀请参与者(POST /api/conversations/{id}/invite-participant
# 1. 成功邀请:participants 更新,系统消息创建
# 2. 非主责坐席邀请 → 3030
# 3. 非服务中会话邀请 → 3031
# 4. 重复邀请(所有被邀请人已在) → 3032
# 5. 邀请不存在的会话 → 3003
# 6. 未认证邀请 → 401
#
# 二、加入会话(POST /api/conversations/{id}/join
# 1. 成功加入:joined 状态更新,系统消息创建
# 2. 未被邀请者加入 → 3034
# 3. 已结束会话加入 → 3033
# 4. 不存在的会话加入 → 3003
#
# 三、移除参与者(DELETE /api/conversations/{id}/participants/{user_id}
# 1. 成功移除:从 participants 中移除,系统消息创建
# 2. 非主责坐席移除 → 3035
# 3. 移除不在列表中的人员 → 3036
# 4. 未认证移除 → 401
#
# 四、参与者退出(POST /api/conversations/{id}/leave-participant
# 1. 成功退出:从 participants 中移除,系统消息创建
# 2. 非参与者退出 → 3037
# 3. 不存在的会话退出 → 3003
#
# 五、端到端闭环
# 1. 邀请 → 加入 → 退出(完整生命周期)
# 2. 邀请 → 加入 → 坐席移除(管理员操作)
# =============================================================================
import uuid
from datetime import datetime
from unittest.mock import AsyncMock, patch
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.agent import Agent
from app.models.conversation import Conversation
from tests.conftest import create_test_conversation, create_test_agent, MockRedis
# =============================================================================
# 辅助函数
# =============================================================================
async def login_agent(client, user_id: str, name: str) -> dict:
"""登录坐席并返回认证头字典。
做什么:调用登录 API 获取 token,组装 Authorization 头
为什么:invite-participant 和 remove-participant 端点需要坐席认证
Args:
client: httpx 异步测试客户端
user_id: 坐席ID
name: 坐席名称
Returns:
dict: {"Authorization": "Bearer xxx"}
"""
response = await client.post(
"/agents/login",
json={"user_id": user_id, "name": name},
)
data = response.json()
token = data["data"]["token"]
return {"Authorization": f"Bearer {token}"}
async def create_serving_conversation_with_participants(
db_session: AsyncSession,
employee_id: str = "emp_001",
agent_user_id: str = "agent_owner",
participants: list = None,
) -> Conversation:
"""创建一个 serving 状态且有主责坐席的会话(可选已有参与者)。
做什么:创建测试会话,设置 assigned_agent_id 和 participants
为什么:邀请功能测试需要 serving 状态的会话作为前提
Args:
db_session: 数据库会话
employee_id: 员工ID
agent_user_id: 主责坐席ID
participants: 已有参与者列表
Returns:
Conversation: 创建的会话对象
"""
conv = create_test_conversation(
employee_id=employee_id,
status="serving",
)
conv.assigned_agent_id = agent_user_id
conv.participants = participants or []
db_session.add(conv)
await db_session.flush()
return conv
# =============================================================================
# 一、邀请参与者测试
# =============================================================================
class TestInviteParticipant:
"""测试邀请参与者接口 POST /api/conversations/{id}/invite-participant。"""
@pytest.mark.asyncio
async def test_invite_success_updates_participants(
self, client, db_session, mock_redis
):
"""验证成功邀请:participants 列表更新,返回包含新参与者。
场景:主责坐席邀请2名员工加入会话。
"""
# 创建坐席
owner = create_test_agent(user_id="owner_001", name="坐席A", status="online")
db_session.add(owner)
await db_session.flush()
# 创建 serving 会话,分配给 owner
conv = await create_serving_conversation_with_participants(
db_session, employee_id="emp_invite", agent_user_id="owner_001"
)
# 坐席A 登录并发起邀请
headers = await login_agent(client, "owner_001", "坐席A")
# Mock WebSocket 广播(避免真实 WS 连接)
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_zhang", "name": "张三", "department": "技术部", "type": "employee"},
{"id": "emp_li", "name": "李四", "department": "财务部", "type": "employee"},
],
"history_mode": "recent10",
},
headers=headers,
)
# 验证响应
assert response.status_code == 200
data = response.json()
assert data["code"] == 0
participants = data["data"]["participants"]
# 验证 participants 包含新添加的两人
participant_ids = [p["id"] for p in participants]
assert "emp_zhang" in participant_ids
assert "emp_li" in participant_ids
@pytest.mark.asyncio
async def test_invite_non_owner_agent_rejected(
self, client, db_session, mock_redis
):
"""验证非主责坐席无法邀请 → 错误码 3030。
场景:协作坐席(非主责)尝试邀请他人。
"""
# 创建两个坐席
owner = create_test_agent(user_id="owner_002", name="主责坐席", status="online")
other = create_test_agent(user_id="other_002", name="其他坐席", status="online")
db_session.add_all([owner, other])
await db_session.flush()
# 创建会话,主责是 owner
conv = await create_serving_conversation_with_participants(
db_session, employee_id="emp_002", agent_user_id="owner_002"
)
# 其他坐席登录并尝试邀请
headers = await login_agent(client, "other_002", "其他坐席")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_wang", "name": "王五", "type": "employee"},
],
},
headers=headers,
)
# 验证:返回错误码 3030(后端所有 AppException 返回 HTTP 200 + 业务错误码)
data = response.json()
assert data["code"] == 3030
@pytest.mark.asyncio
async def test_invite_non_serving_conversation_rejected(
self, client, db_session, mock_redis
):
"""验证非服务中会话无法邀请 → 错误码 3031。
场景:对已结单(closed)的会话尝试邀请。
"""
owner = create_test_agent(user_id="owner_003", name="坐席C", status="online")
db_session.add(owner)
await db_session.flush()
# 创建 closed 状态的会话
conv = create_test_conversation(
employee_id="emp_003",
status="closed",
)
conv.assigned_agent_id = "owner_003"
conv.participants = []
db_session.add(conv)
await db_session.flush()
headers = await login_agent(client, "owner_003", "坐席C")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_zhao", "name": "赵六", "type": "employee"},
],
},
headers=headers,
)
data = response.json()
assert data["code"] == 3031
@pytest.mark.asyncio
async def test_invite_duplicate_participants_rejected(
self, client, db_session, mock_redis
):
"""验证重复邀请同一批人 → 错误码 3032。
场景:参与者已在列表中,再次邀请相同的人。
"""
owner = create_test_agent(user_id="owner_004", name="坐席D", status="online")
db_session.add(owner)
await db_session.flush()
# 创建已有参与者的会话
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_004",
agent_user_id="owner_004",
participants=[
{"id": "emp_dup", "name": "重复人", "department": "技术部", "type": "employee"},
],
)
headers = await login_agent(client, "owner_004", "坐席D")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_dup", "name": "重复人", "type": "employee"},
],
},
headers=headers,
)
data = response.json()
assert data["code"] == 3032
@pytest.mark.asyncio
async def test_invite_nonexistent_conversation(
self, client, db_session, mock_redis
):
"""验证邀请不存在的会话 → 错误码 3003。"""
owner = create_test_agent(user_id="owner_005", name="坐席E", status="online")
db_session.add(owner)
await db_session.flush()
headers = await login_agent(client, "owner_005", "坐席E")
fake_id = str(uuid.uuid4())
response = await client.post(
f"/conversations/{fake_id}/invite-participant",
json={
"participants": [
{"id": "emp_x", "name": "某人", "type": "employee"},
],
},
headers=headers,
)
data = response.json()
assert data["code"] == 3003
@pytest.mark.asyncio
async def test_invite_without_auth_rejected(
self, client, db_session, mock_redis
):
"""验证未认证邀请 → 错误码 1002。"""
conv = await create_serving_conversation_with_participants(
db_session, employee_id="emp_noauth", agent_user_id="owner_noauth"
)
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_y", "name": "某人", "type": "employee"},
],
},
)
data = response.json()
assert data["code"] == 1002
# =============================================================================
# 二、加入会话测试
# =============================================================================
class TestJoinConversation:
"""测试加入会话接口 POST /api/conversations/{id}/join。"""
@pytest.mark.asyncio
async def test_join_success_updates_joined_status(
self, client, db_session, mock_redis
):
"""验证成功加入:joined 状态更新为 True。
场景:被邀请员工通过链接加入会话。
"""
# 创建会话,已有被邀请但未加入的参与者
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_join_001",
agent_user_id="agent_join",
participants=[
{"id": "emp_zhang", "name": "张三", "department": "技术部", "type": "employee", "joined": False},
],
)
# Mock WebSocket 广播
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/join",
json={"employee_id": "emp_zhang"},
)
# 验证响应
assert response.status_code == 200
data = response.json()
assert data["code"] == 0
# 验证 joined 状态
participants = data["data"]["participants"]
zhang = next(p for p in participants if p["id"] == "emp_zhang")
assert zhang["joined"] is True
assert "joined_at" in zhang
@pytest.mark.asyncio
async def test_join_not_invited_rejected(
self, client, db_session, mock_redis
):
"""验证未被邀请者无法加入 → 错误码 3034。
场景:未被邀请的员工尝试加入会话。
"""
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_join_002",
agent_user_id="agent_join_002",
)
response = await client.post(
f"/conversations/{conv.id}/join",
json={"employee_id": "emp_hacker"},
)
data = response.json()
assert data["code"] == 3034
@pytest.mark.asyncio
async def test_join_closed_conversation_rejected(
self, client, db_session, mock_redis
):
"""验证已结单会话无法加入 → 错误码 3033。
场景:被邀请人尝试加入已结束的会话。
"""
conv = create_test_conversation(
employee_id="emp_join_003",
status="closed",
)
conv.assigned_agent_id = "agent_join_003"
conv.participants = [
{"id": "emp_late", "name": "迟到者", "type": "employee", "joined": False},
]
db_session.add(conv)
await db_session.flush()
response = await client.post(
f"/conversations/{conv.id}/join",
json={"employee_id": "emp_late"},
)
data = response.json()
assert data["code"] == 3033
@pytest.mark.asyncio
async def test_join_nonexistent_conversation(
self, client, db_session, mock_redis
):
"""验证加入不存在的会话 → 错误码 3003。"""
fake_id = str(uuid.uuid4())
response = await client.post(
f"/conversations/{fake_id}/join",
json={"employee_id": "emp_ghost"},
)
data = response.json()
assert data["code"] == 3003
# =============================================================================
# 三、移除参与者测试
# =============================================================================
class TestRemoveParticipant:
"""测试移除参与者接口 DELETE /api/conversations/{id}/participants/{user_id}"""
@pytest.mark.asyncio
async def test_remove_success(
self, client, db_session, mock_redis
):
"""验证成功移除:从 participants 列表中移除目标。
场景:主责坐席移除一名参与者。
"""
owner = create_test_agent(user_id="owner_rm", name="坐席RM", status="online")
db_session.add(owner)
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_rm_001",
agent_user_id="owner_rm",
participants=[
{"id": "emp_target", "name": "被移除人", "type": "employee", "joined": True},
{"id": "emp_keep", "name": "保留人", "type": "employee", "joined": True},
],
)
headers = await login_agent(client, "owner_rm", "坐席RM")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.delete(
f"/conversations/{conv.id}/participants/emp_target",
headers=headers,
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 0
# 验证被移除人不在列表中
participants = data["data"]["participants"]
participant_ids = [p["id"] for p in participants]
assert "emp_target" not in participant_ids
assert "emp_keep" in participant_ids
@pytest.mark.asyncio
async def test_remove_non_owner_rejected(
self, client, db_session, mock_redis
):
"""验证非主责坐席无法移除 → 错误码 3035。"""
owner = create_test_agent(user_id="owner_rm2", name="主责", status="online")
other = create_test_agent(user_id="other_rm2", name="其他坐席", status="online")
db_session.add_all([owner, other])
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_rm_002",
agent_user_id="owner_rm2",
participants=[
{"id": "emp_victim", "name": "被移除人", "type": "employee"},
],
)
headers = await login_agent(client, "other_rm2", "其他坐席")
response = await client.delete(
f"/conversations/{conv.id}/participants/emp_victim",
headers=headers,
)
data = response.json()
assert data["code"] == 3035
@pytest.mark.asyncio
async def test_remove_nonexistent_participant(
self, client, db_session, mock_redis
):
"""验证移除不在列表中的人员 → 错误码 3036。"""
owner = create_test_agent(user_id="owner_rm3", name="坐席", status="online")
db_session.add(owner)
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_rm_003",
agent_user_id="owner_rm3",
)
headers = await login_agent(client, "owner_rm3", "坐席")
response = await client.delete(
f"/conversations/{conv.id}/participants/emp_ghost",
headers=headers,
)
data = response.json()
assert data["code"] == 3036
@pytest.mark.asyncio
async def test_remove_without_auth_rejected(
self, client, db_session, mock_redis
):
"""验证未认证移除 → 错误码 1002。"""
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_rm_noauth",
agent_user_id="owner_noauth",
participants=[
{"id": "emp_target", "name": "被移除人", "type": "employee"},
],
)
response = await client.delete(
f"/conversations/{conv.id}/participants/emp_target",
)
data = response.json()
assert data["code"] == 1002
# =============================================================================
# 四、参与者退出测试
# =============================================================================
class TestLeaveAsParticipant:
"""测试参与者退出接口 POST /api/conversations/{id}/leave-participant。"""
@pytest.mark.asyncio
async def test_leave_success(
self, client, db_session, mock_redis
):
"""验证成功退出:从 participants 列表中移除自己。
场景:被邀请人主动退出会话。
"""
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_leave_001",
agent_user_id="agent_leave",
participants=[
{"id": "emp_leaver", "name": "退出者", "type": "employee", "joined": True},
{"id": "emp_stayer", "name": "留守者", "type": "employee", "joined": True},
],
)
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/leave-participant",
json={"employee_id": "emp_leaver"},
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 0
# 验证退出者不在列表中
participants = data["data"]["participants"]
participant_ids = [p["id"] for p in participants]
assert "emp_leaver" not in participant_ids
assert "emp_stayer" in participant_ids
@pytest.mark.asyncio
async def test_leave_not_participant_rejected(
self, client, db_session, mock_redis
):
"""验证非参与者退出 → 错误码 3037。
场景:未被邀请的人尝试退出会话。
"""
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_leave_002",
agent_user_id="agent_leave_002",
)
response = await client.post(
f"/conversations/{conv.id}/leave-participant",
json={"employee_id": "emp_stranger"},
)
data = response.json()
assert data["code"] == 3037
@pytest.mark.asyncio
async def test_leave_nonexistent_conversation(
self, client, db_session, mock_redis
):
"""验证退出不存在的会话 → 错误码 3003。"""
fake_id = str(uuid.uuid4())
response = await client.post(
f"/conversations/{fake_id}/leave-participant",
json={"employee_id": "emp_ghost"},
)
data = response.json()
assert data["code"] == 3003
# =============================================================================
# 五、端到端闭环测试
# =============================================================================
class TestInviteEndToEnd:
"""邀请功能端到端闭环测试。"""
@pytest.mark.asyncio
async def test_full_lifecycle_invite_join_leave(
self, client, db_session, mock_redis
):
"""验证完整生命周期:邀请 → 加入 → 退出。
场景:
1. 坐席邀请张三
2. 张三加入
3. 张三退出
"""
owner = create_test_agent(user_id="owner_e2e", name="坐席E2E", status="online")
db_session.add(owner)
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_e2e_001",
agent_user_id="owner_e2e",
)
headers = await login_agent(client, "owner_e2e", "坐席E2E")
# Step 1: 邀请
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
invite_resp = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_e2e_zhang", "name": "张三", "department": "技术部", "type": "employee"},
],
},
headers=headers,
)
assert invite_resp.status_code == 200
invite_data = invite_resp.json()
participants_after_invite = invite_data["data"]["participants"]
assert any(p["id"] == "emp_e2e_zhang" for p in participants_after_invite)
# Step 2: 加入
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
join_resp = await client.post(
f"/conversations/{conv.id}/join",
json={"employee_id": "emp_e2e_zhang"},
)
assert join_resp.status_code == 200
join_data = join_resp.json()
participants_after_join = join_data["data"]["participants"]
zhang = next(p for p in participants_after_join if p["id"] == "emp_e2e_zhang")
assert zhang["joined"] is True
# Step 3: 退出
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
leave_resp = await client.post(
f"/conversations/{conv.id}/leave-participant",
json={"employee_id": "emp_e2e_zhang"},
)
assert leave_resp.status_code == 200
leave_data = leave_resp.json()
participants_after_leave = leave_data["data"]["participants"]
assert not any(p["id"] == "emp_e2e_zhang" for p in participants_after_leave)
@pytest.mark.asyncio
async def test_full_lifecycle_invite_join_remove(
self, client, db_session, mock_redis
):
"""验证完整生命周期:邀请 → 加入 → 坐席移除。
场景:
1. 坐席邀请李四
2. 李四加入
3. 坐席移除李四
"""
owner = create_test_agent(user_id="owner_e2e2", name="坐席E2E2", status="online")
db_session.add(owner)
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_e2e_002",
agent_user_id="owner_e2e2",
)
headers = await login_agent(client, "owner_e2e2", "坐席E2E2")
# Step 1: 邀请
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
invite_resp = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_e2e_li", "name": "李四", "department": "财务部", "type": "employee"},
],
},
headers=headers,
)
assert invite_resp.status_code == 200
# Step 2: 加入
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
join_resp = await client.post(
f"/conversations/{conv.id}/join",
json={"employee_id": "emp_e2e_li"},
)
assert join_resp.status_code == 200
# Step 3: 坐席移除
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
remove_resp = await client.delete(
f"/conversations/{conv.id}/participants/emp_e2e_li",
headers=headers,
)
assert remove_resp.status_code == 200
remove_data = remove_resp.json()
participants_after_remove = remove_data["data"]["participants"]
assert not any(p["id"] == "emp_e2e_li" for p in participants_after_remove)
@pytest.mark.asyncio
async def test_invite_partial_duplicate_merges(
self, client, db_session, mock_redis
):
"""验证邀请部分新人 + 部分已在人员:只添加新人,忽略已有人。
场景:会话已有张三,再邀请张三和王五,只有王五被添加。
"""
owner = create_test_agent(user_id="owner_merge", name="坐席合并", status="online")
db_session.add(owner)
await db_session.flush()
conv = await create_serving_conversation_with_participants(
db_session,
employee_id="emp_merge",
agent_user_id="owner_merge",
participants=[
{"id": "emp_existing", "name": "已有张三", "department": "技术部", "type": "employee"},
],
)
headers = await login_agent(client, "owner_merge", "坐席合并")
with patch("app.services.ws_manager.manager.broadcast", new_callable=AsyncMock):
response = await client.post(
f"/conversations/{conv.id}/invite-participant",
json={
"participants": [
{"id": "emp_existing", "name": "已有张三", "type": "employee"},
{"id": "emp_new", "name": "新人王五", "department": "市场部", "type": "employee"},
],
},
headers=headers,
)
assert response.status_code == 200
data = response.json()
participants = data["data"]["participants"]
participant_ids = [p["id"] for p in participants]
assert "emp_existing" in participant_ids # 已有的仍在
assert "emp_new" in participant_ids # 新人被添加