# ============================================================================= # 企微IT智能服务台 — 消息去重功能测试 # ============================================================================= # 测试覆盖: # 1. MsgId 重复消息被过滤 # 2. 相同用户 + 内容重复被过滤 # 3. 不同消息正常通过 # 4. TTL 过期后消息可正常处理 # 5. Redis 不可用时降级放行 # 6. CacheService 独立方法测试 # 7. MessageRouter 集成去重测试 # ============================================================================= import asyncio import hashlib import time from unittest.mock import AsyncMock, MagicMock, patch import pytest import pytest_asyncio from sqlalchemy.ext.asyncio import AsyncSession from app.models.conversation import Conversation from app.models.message import Message from app.models.system_config import SystemConfig from app.services.cache_service import ( CacheService, MSG_DEDUP_PREFIX, CONTENT_DEDUP_PREFIX, DEFAULT_MSG_DEDUP_TTL, DEFAULT_CONTENT_DEDUP_TTL, ) from app.services.message_router import MessageRouter from app.services.scoring_service import ScoringService from app.services.wecom_service import WecomService from tests.conftest import MockRedis, create_test_conversation # ============================================================================= # Fixtures # ============================================================================= @pytest_asyncio.fixture async def setup_router_db(db_session): """初始化路由器所需的数据库配置。""" configs = [ SystemConfig(config_key="hand_raise_keywords", config_value='["转人工","人工","真人"]'), SystemConfig(config_key="emotion_keywords_angry", config_value='["崩溃","愤怒","投诉"]'), SystemConfig(config_key="emotion_keywords_urgent", config_value='["急","紧急","马上"]'), SystemConfig(config_key="emotion_keywords_worried", config_value='["担心","害怕"]'), SystemConfig(config_key="intervene_round_threshold", config_value="3"), SystemConfig(config_key="urgency_base_keyword_score", config_value="1"), SystemConfig(config_key="urgency_emotion_bonus", config_value="1"), SystemConfig(config_key="urgency_vip_bonus", config_value="1"), SystemConfig(config_key="urgency_repeat_bonus", config_value="1"), ] db_session.add_all(configs) await db_session.flush() def _create_mock_wecom_service(): """创建模拟的 WecomService。""" mock = AsyncMock(spec=WecomService) mock.get_user_info = AsyncMock(return_value={ "name": "张三", "department": "[1, 2]", "position": "工程师", }) mock.send_text_message = AsyncMock(return_value={"errcode": 0}) mock.close = AsyncMock() return mock @pytest.fixture def mock_wecom_service(): return _create_mock_wecom_service() @pytest.fixture def mock_redis_client(): """提供干净的 MockRedis 实例。""" return MockRedis() @pytest.fixture def cache_service(mock_redis_client): """提供带 MockRedis 的 CacheService。""" return CacheService(mock_redis_client) @pytest.fixture def cache_service_no_redis(): """提供无 Redis 的 CacheService(降级模式)。""" return CacheService(None) @pytest.fixture def router_with_dedup(db_session, mock_wecom_service, mock_redis_client, setup_router_db): """创建带去重功能的消息路由器。""" scoring_service = ScoringService(db_session) cache_service = CacheService(mock_redis_client) return MessageRouter( db=db_session, wecom_service=mock_wecom_service, scoring_service=scoring_service, cache_service=cache_service, ) @pytest.fixture def router_no_dedup(db_session, mock_wecom_service, setup_router_db): """创建无去重功能的消息路由器(cache_service=None)。""" scoring_service = ScoringService(db_session) return MessageRouter( db=db_session, wecom_service=mock_wecom_service, scoring_service=scoring_service, cache_service=None, ) # ============================================================================= # CacheService 独立测试 # ============================================================================= class TestCacheServiceIsDuplicate: """测试 CacheService.is_duplicate() 方法。""" @pytest.mark.asyncio async def test_first_message_not_duplicate(self, cache_service, mock_redis_client): """首次消息不应被判定为重复。""" result = await cache_service.is_duplicate("msg_001") assert result is False @pytest.mark.asyncio async def test_same_msg_id_is_duplicate(self, cache_service, mock_redis_client): """相同 MsgId 的第二次调用应被判定为重复。""" # 第一次:非重复 result1 = await cache_service.is_duplicate("msg_002") assert result1 is False # 第二次:重复 result2 = await cache_service.is_duplicate("msg_002") assert result2 is True @pytest.mark.asyncio async def test_different_msg_id_not_duplicate(self, cache_service, mock_redis_client): """不同 MsgId 不应互相影响。""" result1 = await cache_service.is_duplicate("msg_003") result2 = await cache_service.is_duplicate("msg_004") assert result1 is False assert result2 is False @pytest.mark.asyncio async def test_empty_msg_id_not_duplicate(self, cache_service): """空 MsgId 应放行(不判断为重复)。""" result = await cache_service.is_duplicate("") assert result is False @pytest.mark.asyncio async def test_no_redis_graceful_degradation(self, cache_service_no_redis): """Redis 不可用时应降级放行(返回 False)。""" result = await cache_service_no_redis.is_duplicate("msg_005") assert result is False @pytest.mark.asyncio async def test_redis_key_format(self, cache_service, mock_redis_client): """验证 Redis key 格式为 msg:dedup:{msg_id}。""" await cache_service.is_duplicate("msg_006") expected_key = f"{MSG_DEDUP_PREFIX}:msg_006" assert expected_key in mock_redis_client._data @pytest.mark.asyncio async def test_redis_key_ttl(self, cache_service, mock_redis_client): """验证 Redis key 设置了正确的 TTL。""" await cache_service.is_duplicate("msg_007") expected_key = f"{MSG_DEDUP_PREFIX}:msg_007" assert mock_redis_client._ttl.get(expected_key) == DEFAULT_MSG_DEDUP_TTL @pytest.mark.asyncio async def test_custom_ttl(self, cache_service, mock_redis_client): """验证自定义 TTL 生效。""" custom_ttl = 600 await cache_service.is_duplicate("msg_008", ttl=custom_ttl) expected_key = f"{MSG_DEDUP_PREFIX}:msg_008" assert mock_redis_client._ttl.get(expected_key) == custom_ttl class TestCacheServiceIsDuplicateContent: """测试 CacheService.is_duplicate_content() 方法。""" @pytest.mark.asyncio async def test_first_message_not_duplicate(self, cache_service): """首次消息不应被判定为内容重复。""" result = await cache_service.is_duplicate_content("user_001", "帮我重置密码") assert result is False @pytest.mark.asyncio async def test_same_user_same_content_is_duplicate(self, cache_service): """相同用户发送相同内容应被判定为重复。""" # 第一次:非重复 result1 = await cache_service.is_duplicate_content("user_002", "VPN连不上") assert result1 is False # 第二次:重复 result2 = await cache_service.is_duplicate_content("user_002", "VPN连不上") assert result2 is True @pytest.mark.asyncio async def test_same_user_different_content_not_duplicate(self, cache_service): """相同用户发送不同内容不应被判定为重复。""" result1 = await cache_service.is_duplicate_content("user_003", "重置密码") result2 = await cache_service.is_duplicate_content("user_003", "安装软件") assert result1 is False assert result2 is False @pytest.mark.asyncio async def test_different_user_same_content_not_duplicate(self, cache_service): """不同用户发送相同内容不应被判定为重复。""" result1 = await cache_service.is_duplicate_content("user_004", "VPN连不上") result2 = await cache_service.is_duplicate_content("user_005", "VPN连不上") assert result1 is False assert result2 is False @pytest.mark.asyncio async def test_empty_user_id_not_duplicate(self, cache_service): """空 user_id 应放行。""" result = await cache_service.is_duplicate_content("", "帮我重置密码") assert result is False @pytest.mark.asyncio async def test_empty_content_not_duplicate(self, cache_service): """空 content 应放行。""" result = await cache_service.is_duplicate_content("user_006", "") assert result is False @pytest.mark.asyncio async def test_no_redis_graceful_degradation(self, cache_service_no_redis): """Redis 不可用时应降级放行。""" result = await cache_service_no_redis.is_duplicate_content("user_007", "VPN连不上") assert result is False @pytest.mark.asyncio async def test_redis_key_format(self, cache_service, mock_redis_client): """验证 Redis key 包含用户ID和内容哈希。""" await cache_service.is_duplicate_content("user_008", "帮我重置密码") content_hash = hashlib.sha256("user_008:帮我重置密码".encode("utf-8")).hexdigest()[:16] expected_key = f"{CONTENT_DEDUP_PREFIX}:user_008:{content_hash}" assert expected_key in mock_redis_client._data @pytest.mark.asyncio async def test_content_dedup_ttl(self, cache_service, mock_redis_client): """验证内容去重的 TTL 默认为 60 秒。""" await cache_service.is_duplicate_content("user_009", "VPN连不上") content_hash = hashlib.sha256("user_009:VPN连不上".encode("utf-8")).hexdigest()[:16] expected_key = f"{CONTENT_DEDUP_PREFIX}:user_009:{content_hash}" assert mock_redis_client._ttl.get(expected_key) == DEFAULT_CONTENT_DEDUP_TTL # ============================================================================= # TTL 过期测试 # ============================================================================= class TestTTLExpiry: """测试 TTL 过期后消息可正常处理。""" @pytest.mark.asyncio async def test_msg_id_dedup_key_expires(self, cache_service, mock_redis_client): """验证 MsgId 去重 key 可通过手动删除模拟过期后放行。""" msg_id = "msg_expire_001" # 首次:非重复 result1 = await cache_service.is_duplicate(msg_id) assert result1 is False # 重复 result2 = await cache_service.is_duplicate(msg_id) assert result2 is True # 模拟 TTL 过期:手动删除 key key = f"{MSG_DEDUP_PREFIX}:{msg_id}" await mock_redis_client.delete(key) # 过期后:非重复 result3 = await cache_service.is_duplicate(msg_id) assert result3 is False @pytest.mark.asyncio async def test_content_dedup_key_expires(self, cache_service, mock_redis_client): """验证内容去重 key 过期后放行。""" user_id = "user_expire_001" content = "VPN连不上" # 首次:非重复 result1 = await cache_service.is_duplicate_content(user_id, content) assert result1 is False # 重复 result2 = await cache_service.is_duplicate_content(user_id, content) assert result2 is True # 模拟 TTL 过期 content_hash = hashlib.sha256(f"{user_id}:{content}".encode("utf-8")).hexdigest()[:16] key = f"{CONTENT_DEDUP_PREFIX}:{user_id}:{content_hash}" await mock_redis_client.delete(key) # 过期后:非重复 result3 = await cache_service.is_duplicate_content(user_id, content) assert result3 is False # ============================================================================= # MessageRouter 集成去重测试 # ============================================================================= class TestMessageRouterDedup: """测试 MessageRouter 集成去重功能。""" @pytest.mark.asyncio async def test_duplicate_msg_id_returns_none(self, router_with_dedup, mock_redis_client): """相同 MsgId 的重复消息应返回 None(被过滤)。""" # 首次消息正常处理 result1 = await router_with_dedup.route_message( from_user_id="dedup_user_001", content="帮我重置密码", msg_id="msg_dedup_001", ) assert result1 is not None # 相同 MsgId 再次调用,应被去重过滤 result2 = await router_with_dedup.route_message( from_user_id="dedup_user_001", content="帮我重置密码", msg_id="msg_dedup_001", ) assert result2 is None @pytest.mark.asyncio async def test_duplicate_content_returns_none(self, router_with_dedup, mock_redis_client): """相同用户发送相同内容(不同 MsgId)应在 60 秒内被过滤。""" # 首次消息正常处理 result1 = await router_with_dedup.route_message( from_user_id="dedup_user_002", content="VPN连不上", msg_id="msg_dedup_002a", ) assert result1 is not None # 不同 MsgId 但相同用户+内容,应被内容去重过滤 result2 = await router_with_dedup.route_message( from_user_id="dedup_user_002", content="VPN连不上", msg_id="msg_dedup_002b", ) assert result2 is None @pytest.mark.asyncio async def test_different_messages_pass_through(self, router_with_dedup, mock_redis_client): """不同消息应正常通过。""" result1 = await router_with_dedup.route_message( from_user_id="normal_user_001", content="帮我重置密码", msg_id="msg_normal_001", ) result2 = await router_with_dedup.route_message( from_user_id="normal_user_001", content="安装Office", msg_id="msg_normal_002", ) assert result1 is not None assert result2 is not None @pytest.mark.asyncio async def test_no_cache_service_skips_dedup(self, router_no_dedup): """cache_service=None 时跳过去重检查,所有消息正常处理。""" # 两次相同 MsgId,但无去重 → 都正常处理 result1 = await router_no_dedup.route_message( from_user_id="no_dedup_user", content="帮我重置密码", msg_id="msg_no_dedup_001", ) result2 = await router_no_dedup.route_message( from_user_id="no_dedup_user", content="帮我重置密码", msg_id="msg_no_dedup_001", ) assert result1 is not None assert result2 is not None @pytest.mark.asyncio async def test_none_msg_id_skips_msg_id_dedup(self, router_with_dedup): """msg_id=None 时跳过 MsgId 去重,但仍检查内容去重。""" # 第一次:无 msg_id,正常处理 result1 = await router_with_dedup.route_message( from_user_id="no_msgid_user", content="WiFi连不上", msg_id=None, ) assert result1 is not None # 第二次:无 msg_id,相同用户+内容 → 内容去重命中 result2 = await router_with_dedup.route_message( from_user_id="no_msgid_user", content="WiFi连不上", msg_id=None, ) assert result2 is None @pytest.mark.asyncio async def test_different_users_same_content_passes(self, router_with_dedup): """不同用户发送相同内容应正常通过(内容去重是用户维度的)。""" result1 = await router_with_dedup.route_message( from_user_id="user_a", content="帮我重置密码", msg_id="msg_user_a_001", ) result2 = await router_with_dedup.route_message( from_user_id="user_b", content="帮我重置密码", msg_id="msg_user_b_001", ) assert result1 is not None assert result2 is not None @pytest.mark.asyncio async def test_dedup_expired_allows_reprocessing( self, router_with_dedup, mock_redis_client ): """TTL 过期后,相同消息可重新处理。""" # 首次:正常处理 result1 = await router_with_dedup.route_message( from_user_id="expire_user", content="帮我重置密码", msg_id="msg_expire_001", ) assert result1 is not None # 重复:被过滤 result2 = await router_with_dedup.route_message( from_user_id="expire_user", content="帮我重置密码", msg_id="msg_expire_001", ) assert result2 is None # 模拟 TTL 过期:删除 Redis key key = f"{MSG_DEDUP_PREFIX}:msg_expire_001" await mock_redis_client.delete(key) content_hash = hashlib.sha256("expire_user:帮我重置密码".encode("utf-8")).hexdigest()[:16] content_key = f"{CONTENT_DEDUP_PREFIX}:expire_user:{content_hash}" await mock_redis_client.delete(content_key) # 过期后:可重新处理 result3 = await router_with_dedup.route_message( from_user_id="expire_user", content="帮我重置密码", msg_id="msg_expire_001", ) assert result3 is not None @pytest.mark.asyncio async def test_non_text_message_dedup(self, router_with_dedup, mock_redis_client): """非文本消息也应当经过去重检查。""" # 首次:正常处理 result1 = await router_with_dedup.route_message( from_user_id="nontext_user", content="", msg_type="image", msg_id="msg_nontext_001", media_id="media_123", ) assert result1 is not None # 重复:被过滤 result2 = await router_with_dedup.route_message( from_user_id="nontext_user", content="", msg_type="image", msg_id="msg_nontext_001", media_id="media_123", ) assert result2 is None # ============================================================================= # CacheService 通用缓存测试 # ============================================================================= class TestCacheServiceGeneral: """测试 CacheService 通用缓存操作。""" @pytest.mark.asyncio async def test_get_existing_key(self, cache_service, mock_redis_client): """获取已存在的 key。""" await cache_service.set("test_key", "test_value") result = await cache_service.get("test_key") assert result == "test_value" @pytest.mark.asyncio async def test_get_nonexistent_key(self, cache_service): """获取不存在的 key 返回 None。""" result = await cache_service.get("nonexistent_key") assert result is None @pytest.mark.asyncio async def test_set_with_ttl(self, cache_service, mock_redis_client): """设置带 TTL 的缓存。""" result = await cache_service.set("ttl_key", "ttl_value", ttl=3600) assert result is True assert mock_redis_client._data.get("ttl_key") == "ttl_value" assert mock_redis_client._ttl.get("ttl_key") == 3600 @pytest.mark.asyncio async def test_set_without_ttl(self, cache_service, mock_redis_client): """设置不带 TTL 的缓存。""" result = await cache_service.set("no_ttl_key", "no_ttl_value") assert result is True @pytest.mark.asyncio async def test_delete_existing_key(self, cache_service, mock_redis_client): """删除已存在的 key。""" await cache_service.set("delete_key", "delete_value") result = await cache_service.delete("delete_key") assert result is True assert "delete_key" not in mock_redis_client._data @pytest.mark.asyncio async def test_delete_nonexistent_key(self, cache_service): """删除不存在的 key 返回 True(Redis DELETE 语义)。""" result = await cache_service.delete("nonexistent_delete") assert result is True @pytest.mark.asyncio async def test_no_redis_operations_return_defaults(self, cache_service_no_redis): """Redis 不可用时通用操作返回默认值。""" assert await cache_service_no_redis.get("any_key") is None assert await cache_service_no_redis.set("any_key", "any_value") is False assert await cache_service_no_redis.delete("any_key") is False