# ============================================================================= # 企微IT智能服务台 — MessageRouter 消息路由测试 # ============================================================================= # 测试覆盖: # 1. 查找或创建会话(新员工创建 / 已有会话复用) # 2. VIP 检测(总监/CEO/普通员工) # 3. 举手标记检测集成 # 4. 情绪标记检测集成 # 5. 需介入标记检测集成 # 6. 紧急度评分集成(验证 Bug 1 修复:await calculate_urgency) # 7. 消息记录创建 # 8. VIP 检测失败不阻塞流程 # ============================================================================= import json from datetime import datetime from unittest.mock import AsyncMock, MagicMock, patch import pytest import pytest_asyncio from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.conversation import Conversation from app.models.message import Message from app.models.system_config import SystemConfig from app.services.message_router import MessageRouter from app.services.scoring_service import ScoringService from app.services.wecom_service import WecomService from tests.conftest import create_test_conversation, MockRedis @pytest_asyncio.fixture async def setup_router_db(db_session): """初始化路由器所需的数据库配置。""" configs = [ SystemConfig(config_key="hand_raise_keywords", config_value='["转人工","人工","真人"]'), SystemConfig(config_key="emotion_keywords_angry", config_value='["崩溃","愤怒","投诉"]'), SystemConfig(config_key="emotion_keywords_urgent", config_value='["急","紧急","马上"]'), SystemConfig(config_key="emotion_keywords_worried", config_value='["担心","害怕"]'), SystemConfig(config_key="intervene_round_threshold", config_value="3"), SystemConfig(config_key="urgency_base_keyword_score", config_value="1"), SystemConfig(config_key="urgency_emotion_bonus", config_value="1"), SystemConfig(config_key="urgency_vip_bonus", config_value="1"), SystemConfig(config_key="urgency_repeat_bonus", config_value="1"), ] db_session.add_all(configs) await db_session.flush() def _create_mock_wecom_service(): """创建模拟的 WecomService。""" mock = AsyncMock(spec=WecomService) mock.get_user_info = AsyncMock(return_value={ "name": "张三", "department": "[1, 2]", "position": "工程师", }) mock.send_text_message = AsyncMock(return_value={"errcode": 0}) mock.close = AsyncMock() return mock @pytest_asyncio.fixture def mock_wecom_service(): return _create_mock_wecom_service() @pytest_asyncio.fixture def router(db_session, mock_wecom_service, setup_router_db): """创建消息路由器实例。""" scoring_service = ScoringService(db_session) return MessageRouter( db=db_session, wecom_service=mock_wecom_service, scoring_service=scoring_service, ) class TestFindOrCreateConversation: """测试查找或创建会话。""" @pytest.mark.asyncio async def test_create_new_conversation(self, router, db_session): """验证新员工首次发消息时创建新会话。""" conv = await router._find_or_create_conversation("new_employee_001", "帮我重置密码") assert conv is not None assert conv.employee_id == "new_employee_001" assert conv.status == "queued" assert conv.urgency_score == 1 assert conv.last_message_summary == "帮我重置密码" @pytest.mark.asyncio async def test_reuse_existing_queued_conversation(self, router, db_session): """验证已有 queued 状态的会话会被复用。""" # 先创建一个会话 existing = create_test_conversation(employee_id="reuse_user", status="queued") db_session.add(existing) await db_session.flush() existing_id = existing.id # 再次查找应复用 conv = await router._find_or_create_conversation("reuse_user", "新消息") assert conv.id == existing_id @pytest.mark.asyncio async def test_reuse_existing_serving_conversation(self, router, db_session): """验证已有 serving 状态的会话会被复用。""" existing = create_test_conversation(employee_id="serving_user", status="serving") db_session.add(existing) await db_session.flush() existing_id = existing.id conv = await router._find_or_create_conversation("serving_user", "追加消息") assert conv.id == existing_id @pytest.mark.asyncio async def test_create_new_when_resolved(self, router, db_session): """验证 resolved 状态的会话不会被复用,会创建新会话。""" existing = create_test_conversation(employee_id="resolved_user", status="resolved") db_session.add(existing) await db_session.flush() conv = await router._find_or_create_conversation("resolved_user", "新咨询") assert conv.id != existing.id assert conv.status == "queued" @pytest.mark.asyncio async def test_summary_truncated_to_256(self, router, db_session): """验证消息摘要截取前 256 字符。""" long_content = "A" * 300 conv = await router._find_or_create_conversation("trunc_user", long_content) assert len(conv.last_message_summary) == 256 class TestCheckVip: """测试 VIP 检测。""" @pytest.mark.asyncio async def test_vip_detection_for_director(self, router, db_session, mock_wecom_service): """验证总监级别被识别为 VIP。""" mock_wecom_service.get_user_info.return_value = { "name": "王总监", "department": "[1]", "position": "技术总监", } conv = create_test_conversation(employee_id="vip_director") db_session.add(conv) await db_session.flush() await router._check_vip(conv) assert conv.is_vip is True assert conv.employee_name == "王总监" assert conv.position == "技术总监" @pytest.mark.asyncio async def test_vip_detection_for_ceo(self, router, db_session, mock_wecom_service): """验证 CEO 被识别为 VIP。""" mock_wecom_service.get_user_info.return_value = { "name": "李CEO", "department": "[1]", "position": "CEO", } conv = create_test_conversation(employee_id="vip_ceo") db_session.add(conv) await db_session.flush() await router._check_vip(conv) assert conv.is_vip is True @pytest.mark.asyncio async def test_no_vip_for_regular_engineer(self, router, db_session, mock_wecom_service): """验证普通工程师不被识别为 VIP。""" mock_wecom_service.get_user_info.return_value = { "name": "张三", "department": "[1]", "position": "工程师", } conv = create_test_conversation(employee_id="regular_engineer") db_session.add(conv) await db_session.flush() await router._check_vip(conv) assert conv.is_vip is False @pytest.mark.asyncio async def test_vip_check_failure_does_not_block(self, router, db_session, mock_wecom_service): """验证 VIP 检测 API 失败时不阻塞消息路由。""" mock_wecom_service.get_user_info.side_effect = Exception("API 调用失败") conv = create_test_conversation(employee_id="api_fail_user") db_session.add(conv) await db_session.flush() # 不应抛出异常 await router._check_vip(conv) assert conv.is_vip is False # 保持默认值 @pytest.mark.asyncio async def test_vip_check_skipped_if_already_detected(self, router, db_session, mock_wecom_service): """验证已检测过 VIP 的会话不再重复检测。""" conv = create_test_conversation(employee_id="already_vip", is_vip=True) db_session.add(conv) await db_session.flush() await router._check_vip(conv) # get_user_info 不应被调用(因为 is_vip 已经为 True) mock_wecom_service.get_user_info.assert_not_called() class TestRouteMessage: """测试完整的消息路由流程。""" @pytest.mark.asyncio async def test_route_normal_message(self, router, db_session, mock_wecom_service): """验证普通消息的路由流程。""" conv = await router.route_message("normal_user", "帮我重置密码") assert conv is not None assert conv.employee_id == "normal_user" assert conv.status == "queued" assert conv.urgency_score >= 1 @pytest.mark.asyncio async def test_route_message_with_hand_raise(self, router, db_session, mock_wecom_service): """验证举手关键词触发举手标记。""" conv = await router.route_message("hand_raise_user", "我要转人工") assert conv.tags.get("hand_raise") is True @pytest.mark.asyncio async def test_route_message_with_emotion(self, router, db_session, mock_wecom_service): """验证情绪关键词触发情绪标记。""" conv = await router.route_message("angry_user", "太崩溃了!系统太差了") assert conv.tags.get("emotion") == "angry" assert "崩溃" in conv.tags.get("emotion_keywords", []) @pytest.mark.asyncio async def test_route_message_creates_message_record(self, router, db_session, mock_wecom_service): """验证路由消息时创建消息记录。""" conv = await router.route_message("msg_record_user", "测试消息内容") # 查询消息记录 stmt = select(Message).where(Message.conversation_id == conv.id) result = await db_session.execute(stmt) messages = list(result.scalars().all()) assert len(messages) >= 1 msg = messages[0] assert msg.sender_type == "employee" assert msg.content == "测试消息内容" @pytest.mark.asyncio async def test_route_message_urgency_is_int_not_coroutine(self, router, db_session, mock_wecom_service): """验证 Bug 1 修复后 urgency_score 是整数而非协程对象。""" conv = await router.route_message("bug1_test_user", "转人工,很急") # Bug 1 修复前:urgency_score 会是 coroutine 对象 # 修复后:urgency_score 应该是整数 assert isinstance(conv.urgency_score, int) assert 1 <= conv.urgency_score <= 5 @pytest.mark.asyncio async def test_route_message_repeat_count_increments(self, router, db_session, mock_wecom_service): """验证追问轮次计数递增。""" # 第一次消息 conv1 = await router.route_message("repeat_user", "第一条消息") assert conv1.tags.get("repeat_count") == 1 # 第二次消息(同一会话) conv2 = await router.route_message("repeat_user", "第二条消息") assert conv2.tags.get("repeat_count") == 2 @pytest.mark.asyncio async def test_route_message_updates_last_message_summary(self, router, db_session, mock_wecom_service): """验证路由消息时更新最后消息摘要。""" conv = await router.route_message("summary_user", "VPN连接不上怎么办") assert conv.last_message_summary == "VPN连接不上怎么办"