# ============================================================================= # 企微IT智能服务台 — 会话状态流转测试 # ============================================================================= # 测试覆盖: # 1. 会话创建默认状态为 queued # 2. 坐席接单:queued → serving # 3. 结单:serving → resolved # 4. 重复接单处理 # 5. 已结单会话的操作限制 # 6. 置顶/取消置顶切换 # 7. 代办/取消代办切换 # 8. 会话列表过滤 # ============================================================================= import uuid from datetime import datetime import pytest import pytest_asyncio from httpx import ASGITransport, AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from unittest.mock import patch from app.models.agent import Agent from app.models.conversation import Conversation from app.services.session_service import SessionService from tests.conftest import create_test_conversation, create_test_agent, MockRedis class TestConversationStateFlow: """测试会话状态流转。""" @pytest.mark.asyncio async def test_new_conversation_default_status_queued(self, db_session): """验证新会话默认状态为 queued。""" conv = create_test_conversation() db_session.add(conv) await db_session.flush() assert conv.status == "queued" @pytest.mark.asyncio async def test_assign_conversation_to_serving(self, db_session): """验证坐席接单将会话状态改为 serving。""" conv = create_test_conversation(status="queued") agent = create_test_agent(user_id="agent001", name="坐席小王") db_session.add_all([conv, agent]) await db_session.flush() session_service = SessionService(db_session) result = await session_service.assign_agent(conv.id, "agent001") assert result.status == "serving" assert result.assigned_agent_id == "agent001" @pytest.mark.asyncio async def test_resolve_conversation(self, db_session): """验证结单将会话状态改为 resolved。""" conv = create_test_conversation(status="serving") db_session.add(conv) await db_session.flush() session_service = SessionService(db_session) result = await session_service.resolve_conversation(conv.id) assert result.status == "resolved" @pytest.mark.asyncio async def test_resolve_queued_conversation_is_allowed(self, db_session): """验证 queued 状态的会话可以直接结单(员工问题自行解决)。""" conv = create_test_conversation(status="queued") db_session.add(conv) await db_session.flush() session_service = SessionService(db_session) # queued → resolved 是合法的状态流转 result = await session_service.resolve_conversation(conv.id) assert result.status == "resolved" @pytest.mark.asyncio async def test_cannot_resolve_already_resolved_conversation(self, db_session): """验证已结单的会话不能再结单。""" conv = create_test_conversation(status="resolved") db_session.add(conv) await db_session.flush() session_service = SessionService(db_session) from app.utils.response import AppException with pytest.raises(AppException): await session_service.resolve_conversation(conv.id) class TestConversationToggle: """测试会话标记切换。""" @pytest.mark.asyncio async def test_toggle_pin(self, db_session): """验证置顶切换:未置顶→置顶。""" conv = create_test_conversation(is_pinned=False) db_session.add(conv) await db_session.flush() session_service = SessionService(db_session) result = await session_service.toggle_pin(conv.id) assert result.is_pinned is True @pytest.mark.asyncio async def test_toggle_pin_off(self, db_session): """验证置顶切换:置顶→取消置顶。""" conv = create_test_conversation(is_pinned=True) db_session.add(conv) await db_session.flush() session_service = SessionService(db_session) result = await session_service.toggle_pin(conv.id) assert result.is_pinned is False @pytest.mark.asyncio async def test_toggle_todo(self, db_session): """验证代办切换:未代办→代办。""" conv = create_test_conversation(is_todo=False) db_session.add(conv) await db_session.flush() session_service = SessionService(db_session) result = await session_service.toggle_todo(conv.id) assert result.is_todo is True @pytest.mark.asyncio async def test_toggle_todo_off(self, db_session): """验证代办切换:代办→取消代办。""" conv = create_test_conversation(is_todo=True) db_session.add(conv) await db_session.flush() session_service = SessionService(db_session) result = await session_service.toggle_todo(conv.id) assert result.is_todo is False class TestConversationList: """测试会话列表查询。""" @pytest.mark.asyncio async def test_list_all_conversations(self, db_session): """验证获取所有会话。""" convs = [ create_test_conversation(employee_id=f"list_user_{i}", status="queued") for i in range(3) ] db_session.add_all(convs) await db_session.flush() session_service = SessionService(db_session) result, total = await session_service.get_conversations() assert total >= 3 @pytest.mark.asyncio async def test_list_conversations_by_status(self, db_session): """验证按状态过滤会话。""" db_session.add(create_test_conversation(employee_id="filter_queued", status="queued")) db_session.add(create_test_conversation(employee_id="filter_serving", status="serving")) await db_session.flush() session_service = SessionService(db_session) result, total = await session_service.get_conversations(status="queued") for conv in result: assert conv.status == "queued" class TestConversationAPI: """测试会话管理 API 端点。""" @pytest.mark.asyncio async def test_get_conversations_endpoint(self, client, db_session, mock_redis): """验证 GET /api/conversations 返回正确格式。""" conv = create_test_conversation(employee_id="api_list_user") db_session.add(conv) await db_session.flush() # 登录坐席获取 token(/api/conversations 需要 get_current_agent 认证) login_resp = await client.post( "/agents/login", json={"user_id": "conv_list_agent", "name": "会话列表坐席"}, ) token = login_resp.json()["data"]["token"] response = await client.get( "/conversations", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 data = response.json() assert data["code"] == 0 assert "items" in data["data"] @pytest.mark.asyncio async def test_resolve_conversation_endpoint(self, client, db_session, mock_redis): """验证 POST /api/conversations/{id}/resolve 结单。 权限:只有主责坐席(assigned_agent_id)才能结单。 """ # 先创建坐席 from app.models.agent import Agent as AgentModel agent = AgentModel( user_id="resolve_test_agent", name="结单测试坐席", status="online", current_load=0, max_load=5, ) db_session.add(agent) await db_session.flush() # 创建分配给此坐席的会话 conv = create_test_conversation(employee_id="api_resolve_user", status="serving") conv.assigned_agent_id = "resolve_test_agent" db_session.add(conv) await db_session.flush() # 登录坐席获取 token login_resp = await client.post( "/agents/login", json={"user_id": "resolve_test_agent", "name": "结单测试坐席"}, ) token = login_resp.json()["data"]["token"] response = await client.post( f"/conversations/{conv.id}/resolve", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 data = response.json() assert data["code"] == 0 assert data["data"]["status"] == "resolved"