985 lines
38 KiB
Python
985 lines
38 KiB
Python
|
|
# =============================================================================
|
|||
|
|
# 企微IT智能服务台 — 非文本消息处理测试
|
|||
|
|
# =============================================================================
|
|||
|
|
# 测试覆盖:
|
|||
|
|
# 1. _get_non_text_display() — 各消息类型的展示文本生成
|
|||
|
|
# 2. _get_non_text_reply() — 各消息类型的自动回复模板
|
|||
|
|
# 3. _handle_non_text_message() — 非文本消息核心处理流程
|
|||
|
|
# - 图片消息:正确存储 + 正确回复模板
|
|||
|
|
# - 语音消息:正确存储 + 正确回复模板
|
|||
|
|
# - 文件消息:正确存储 file_name/file_size + 正确回复
|
|||
|
|
# - 位置消息:正确存储 location 字段 + 正确回复
|
|||
|
|
# - 视频消息:正确存储 + 正确回复
|
|||
|
|
# 4. 文本消息不受影响(回归测试)
|
|||
|
|
# 5. WebSocket 广播格式验证
|
|||
|
|
# 6. 非文本消息不触发 AI、不改变会话状态
|
|||
|
|
# 7. wecom_callback.py 字段提取验证
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
from datetime import datetime
|
|||
|
|
from unittest.mock import AsyncMock, MagicMock, patch, call
|
|||
|
|
|
|||
|
|
import pytest
|
|||
|
|
import pytest_asyncio
|
|||
|
|
from sqlalchemy import select
|
|||
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|||
|
|
|
|||
|
|
from app.models.conversation import Conversation
|
|||
|
|
from app.models.message import Message
|
|||
|
|
from app.models.system_config import SystemConfig
|
|||
|
|
from app.services.message_router import MessageRouter
|
|||
|
|
from app.services.scoring_service import ScoringService
|
|||
|
|
from app.services.wecom_service import WecomService
|
|||
|
|
from tests.conftest import create_test_conversation
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Shared Fixtures
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
def _create_mock_wecom_service():
|
|||
|
|
"""创建模拟的 WecomService,send_text_message 返回成功。"""
|
|||
|
|
mock = AsyncMock(spec=WecomService)
|
|||
|
|
mock.get_user_info = AsyncMock(return_value={
|
|||
|
|
"name": "测试员工",
|
|||
|
|
"department": "[1]",
|
|||
|
|
"position": "工程师",
|
|||
|
|
})
|
|||
|
|
mock.send_text_message = AsyncMock(return_value={"errcode": 0, "errmsg": "ok"})
|
|||
|
|
return mock
|
|||
|
|
|
|||
|
|
|
|||
|
|
@pytest_asyncio.fixture
|
|||
|
|
def mock_wecom_service():
|
|||
|
|
"""提供模拟的 WecomService。"""
|
|||
|
|
return _create_mock_wecom_service()
|
|||
|
|
|
|||
|
|
|
|||
|
|
@pytest_asyncio.fixture
|
|||
|
|
async def setup_configs(db_session):
|
|||
|
|
"""初始化评分服务所需的系统配置。"""
|
|||
|
|
configs = [
|
|||
|
|
SystemConfig(config_key="hand_raise_keywords", config_value='["转人工","人工","真人"]'),
|
|||
|
|
SystemConfig(config_key="emotion_keywords_angry", config_value='["崩溃","愤怒","投诉"]'),
|
|||
|
|
SystemConfig(config_key="emotion_keywords_urgent", config_value='["急","紧急","马上"]'),
|
|||
|
|
SystemConfig(config_key="emotion_keywords_worried", config_value='["担心","害怕"]'),
|
|||
|
|
SystemConfig(config_key="intervene_round_threshold", config_value="3"),
|
|||
|
|
SystemConfig(config_key="urgency_base_keyword_score", config_value="1"),
|
|||
|
|
SystemConfig(config_key="urgency_emotion_bonus", config_value="1"),
|
|||
|
|
SystemConfig(config_key="urgency_vip_bonus", config_value="1"),
|
|||
|
|
SystemConfig(config_key="urgency_repeat_bonus", config_value="1"),
|
|||
|
|
]
|
|||
|
|
db_session.add_all(configs)
|
|||
|
|
await db_session.flush()
|
|||
|
|
|
|||
|
|
|
|||
|
|
@pytest_asyncio.fixture
|
|||
|
|
def router_no_ai(db_session, mock_wecom_service, setup_configs):
|
|||
|
|
"""创建不含 AI 处理器的消息路由器(用于测试非文本消息,验证 AI 不被触发)。"""
|
|||
|
|
scoring_service = ScoringService(db_session)
|
|||
|
|
return MessageRouter(
|
|||
|
|
db=db_session,
|
|||
|
|
wecom_service=mock_wecom_service,
|
|||
|
|
scoring_service=scoring_service,
|
|||
|
|
ai_handler=None, # 明确设为 None,验证非文本不依赖 AI
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@pytest_asyncio.fixture
|
|||
|
|
def mock_ai_handler():
|
|||
|
|
"""创建模拟的 AIHandler。"""
|
|||
|
|
mock = AsyncMock()
|
|||
|
|
mock.handle_message = AsyncMock(return_value=MagicMock(
|
|||
|
|
content="AI回复内容",
|
|||
|
|
should_transfer=False,
|
|||
|
|
should_count=True,
|
|||
|
|
is_guidance=False,
|
|||
|
|
reply_type="ai_hit",
|
|||
|
|
dify_conversation_id=None,
|
|||
|
|
))
|
|||
|
|
return mock
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Test Class 1: _get_non_text_display — 展示文本生成
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
class TestGetNonTextDisplay:
|
|||
|
|
"""测试各消息类型的展示文本生成。"""
|
|||
|
|
|
|||
|
|
def test_image_display(self, router_no_ai):
|
|||
|
|
"""验证图片消息展示文本。"""
|
|||
|
|
assert router_no_ai._get_non_text_display("image") == "[图片消息]"
|
|||
|
|
|
|||
|
|
def test_voice_display(self, router_no_ai):
|
|||
|
|
"""验证语音消息展示文本。"""
|
|||
|
|
assert router_no_ai._get_non_text_display("voice") == "[语音消息]"
|
|||
|
|
|
|||
|
|
def test_video_display(self, router_no_ai):
|
|||
|
|
"""验证视频消息展示文本。"""
|
|||
|
|
assert router_no_ai._get_non_text_display("video") == "[视频消息]"
|
|||
|
|
|
|||
|
|
def test_location_display(self, router_no_ai):
|
|||
|
|
"""验证位置消息展示文本。"""
|
|||
|
|
assert router_no_ai._get_non_text_display("location") == "[位置消息]"
|
|||
|
|
|
|||
|
|
def test_file_display_with_name(self, router_no_ai):
|
|||
|
|
"""验证文件消息展示文本(含文件名)。"""
|
|||
|
|
result = router_no_ai._get_non_text_display("file", file_name="report.pdf")
|
|||
|
|
assert result == "[文件消息: report.pdf]"
|
|||
|
|
|
|||
|
|
def test_file_display_without_name(self, router_no_ai):
|
|||
|
|
"""验证文件消息展示文本(无文件名)。"""
|
|||
|
|
result = router_no_ai._get_non_text_display("file", file_name=None)
|
|||
|
|
assert result == "[文件消息]"
|
|||
|
|
|
|||
|
|
def test_unknown_type_display(self, router_no_ai):
|
|||
|
|
"""验证未知类型的展示文本兜底。"""
|
|||
|
|
result = router_no_ai._get_non_text_display("sticker")
|
|||
|
|
assert result == "[sticker消息]"
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Test Class 2: _get_non_text_reply — 自动回复模板生成
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
class TestGetNonTextReply:
|
|||
|
|
"""测试各消息类型的自动回复模板。"""
|
|||
|
|
|
|||
|
|
def test_image_reply_suggests_description(self, router_no_ai):
|
|||
|
|
"""验证图片消息的回复引导用户补充文字描述。"""
|
|||
|
|
reply = router_no_ai._get_non_text_reply("image")
|
|||
|
|
assert "截图" in reply
|
|||
|
|
assert "补充文字描述" in reply
|
|||
|
|
assert "📷" in reply
|
|||
|
|
|
|||
|
|
def test_voice_reply_says_unsupported(self, router_no_ai):
|
|||
|
|
"""验证语音消息的回复包含'暂不支持'。"""
|
|||
|
|
reply = router_no_ai._get_non_text_reply("voice")
|
|||
|
|
assert "暂不支持语音消息" in reply
|
|||
|
|
assert "文字描述" in reply
|
|||
|
|
|
|||
|
|
def test_video_reply_says_unsupported(self, router_no_ai):
|
|||
|
|
"""验证视频消息的回复包含'暂不支持'。"""
|
|||
|
|
reply = router_no_ai._get_non_text_reply("video")
|
|||
|
|
assert "暂不支持视频消息" in reply
|
|||
|
|
|
|||
|
|
def test_file_reply_says_unsupported(self, router_no_ai):
|
|||
|
|
"""验证文件消息的回复包含'暂不支持'。"""
|
|||
|
|
reply = router_no_ai._get_non_text_reply("file")
|
|||
|
|
assert "暂不支持文件消息" in reply
|
|||
|
|
|
|||
|
|
def test_location_reply_says_unsupported(self, router_no_ai):
|
|||
|
|
"""验证位置消息的回复包含'暂不支持'。"""
|
|||
|
|
reply = router_no_ai._get_non_text_reply("location")
|
|||
|
|
assert "暂不支持位置消息" in reply
|
|||
|
|
|
|||
|
|
def test_unknown_type_fallback_reply(self, router_no_ai):
|
|||
|
|
"""验证未知消息类型的兜底回复模板。"""
|
|||
|
|
reply = router_no_ai._get_non_text_reply("sticker")
|
|||
|
|
assert "暂不支持sticker消息" in reply
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Test Class 3: _handle_non_text_message — 非文本消息核心处理
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
class TestHandleNonTextMessage:
|
|||
|
|
"""测试 _handle_non_text_message() 方法的所有消息类型。"""
|
|||
|
|
|
|||
|
|
# --- 图片消息 ---
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_image_message_storage_and_reply(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证图片消息:正确存储 + 正确回复模板 + 不触发AI。"""
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="image_user",
|
|||
|
|
content="",
|
|||
|
|
msg_type="image",
|
|||
|
|
media_id="media_img_001",
|
|||
|
|
extra_data={"pic_url": "https://example.com/pic.jpg"},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 1. 验证会话未改变状态(保持 ai_handling,非文本不改变)
|
|||
|
|
assert conv is not None
|
|||
|
|
assert conv.status == "ai_handling"
|
|||
|
|
|
|||
|
|
# 2. 验证员工消息记录已创建(含元数据)
|
|||
|
|
stmt = select(Message).where(
|
|||
|
|
Message.conversation_id == conv.id,
|
|||
|
|
Message.sender_type == "employee",
|
|||
|
|
).order_by(Message.created_at)
|
|||
|
|
result = await db_session.execute(stmt)
|
|||
|
|
messages = list(result.scalars().all())
|
|||
|
|
assert len(messages) == 1
|
|||
|
|
emp_msg = messages[0]
|
|||
|
|
assert emp_msg.msg_type == "image"
|
|||
|
|
assert emp_msg.content == "[图片消息]"
|
|||
|
|
assert emp_msg.media_id == "media_img_001"
|
|||
|
|
assert emp_msg.extra_data == {"pic_url": "https://example.com/pic.jpg"}
|
|||
|
|
|
|||
|
|
# 3. 验证 AI 自动回复消息记录
|
|||
|
|
stmt_ai = select(Message).where(
|
|||
|
|
Message.conversation_id == conv.id,
|
|||
|
|
Message.sender_type == "ai",
|
|||
|
|
)
|
|||
|
|
result_ai = await db_session.execute(stmt_ai)
|
|||
|
|
ai_msgs = list(result_ai.scalars().all())
|
|||
|
|
assert len(ai_msgs) == 1
|
|||
|
|
ai_msg = ai_msgs[0]
|
|||
|
|
assert "截图" in ai_msg.content
|
|||
|
|
assert ai_msg.msg_type == "text"
|
|||
|
|
assert ai_msg.sender_id == "ai_bot"
|
|||
|
|
|
|||
|
|
# 4. 验证企微 API 发送了回复
|
|||
|
|
mock_wecom_service.send_text_message.assert_called_once()
|
|||
|
|
call_args = mock_wecom_service.send_text_message.call_args
|
|||
|
|
assert call_args.kwargs["user_id"] == "image_user"
|
|||
|
|
assert "截图" in call_args.kwargs["content"]
|
|||
|
|
|
|||
|
|
# --- 语音消息 ---
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_voice_message_storage_and_reply(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证语音消息:正确存储格式 + 正确回复模板。"""
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="voice_user",
|
|||
|
|
content="",
|
|||
|
|
msg_type="voice",
|
|||
|
|
media_id="media_voice_001",
|
|||
|
|
extra_data={"format": "amr"},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 验证员工消息
|
|||
|
|
stmt = select(Message).where(
|
|||
|
|
Message.conversation_id == conv.id,
|
|||
|
|
Message.sender_type == "employee",
|
|||
|
|
)
|
|||
|
|
result = await db_session.execute(stmt)
|
|||
|
|
emp_msg = result.scalars().first()
|
|||
|
|
assert emp_msg is not None
|
|||
|
|
assert emp_msg.msg_type == "voice"
|
|||
|
|
assert emp_msg.content == "[语音消息]"
|
|||
|
|
assert emp_msg.media_id == "media_voice_001"
|
|||
|
|
assert emp_msg.extra_data == {"format": "amr"}
|
|||
|
|
|
|||
|
|
# 验证 AI 回复包含"暂不支持"
|
|||
|
|
mock_wecom_service.send_text_message.assert_called_once()
|
|||
|
|
reply_text = mock_wecom_service.send_text_message.call_args.kwargs["content"]
|
|||
|
|
assert "暂不支持语音消息" in reply_text
|
|||
|
|
|
|||
|
|
# --- 视频消息 ---
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_video_message_storage_and_reply(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证视频消息:正确存储 thumb_media_id + 正确回复。"""
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="video_user",
|
|||
|
|
content="",
|
|||
|
|
msg_type="video",
|
|||
|
|
media_id="media_video_001",
|
|||
|
|
extra_data={"thumb_media_id": "thumb_001"},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
stmt = select(Message).where(
|
|||
|
|
Message.conversation_id == conv.id,
|
|||
|
|
Message.sender_type == "employee",
|
|||
|
|
)
|
|||
|
|
result = await db_session.execute(stmt)
|
|||
|
|
emp_msg = result.scalars().first()
|
|||
|
|
assert emp_msg.msg_type == "video"
|
|||
|
|
assert emp_msg.content == "[视频消息]"
|
|||
|
|
assert emp_msg.extra_data == {"thumb_media_id": "thumb_001"}
|
|||
|
|
|
|||
|
|
reply_text = mock_wecom_service.send_text_message.call_args.kwargs["content"]
|
|||
|
|
assert "暂不支持视频消息" in reply_text
|
|||
|
|
|
|||
|
|
# --- 文件消息 ---
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_file_message_storage_with_metadata(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证文件消息:正确存储 file_name + file_size + 正确回复。"""
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="file_user",
|
|||
|
|
content="",
|
|||
|
|
msg_type="file",
|
|||
|
|
media_id="media_file_001",
|
|||
|
|
file_name="error_screenshot.png",
|
|||
|
|
file_size=204800,
|
|||
|
|
extra_data=None,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 验证员工消息
|
|||
|
|
stmt = select(Message).where(
|
|||
|
|
Message.conversation_id == conv.id,
|
|||
|
|
Message.sender_type == "employee",
|
|||
|
|
)
|
|||
|
|
result = await db_session.execute(stmt)
|
|||
|
|
emp_msg = result.scalars().first()
|
|||
|
|
assert emp_msg.msg_type == "file"
|
|||
|
|
assert emp_msg.content == "[文件消息: error_screenshot.png]"
|
|||
|
|
assert emp_msg.media_id == "media_file_001"
|
|||
|
|
assert emp_msg.file_name == "error_screenshot.png"
|
|||
|
|
assert emp_msg.file_size == 204800
|
|||
|
|
|
|||
|
|
# 验证回复
|
|||
|
|
reply_text = mock_wecom_service.send_text_message.call_args.kwargs["content"]
|
|||
|
|
assert "暂不支持文件消息" in reply_text
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_file_message_without_name(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证文件消息(无文件名):展示文本正确退化。"""
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="file_user2",
|
|||
|
|
content="",
|
|||
|
|
msg_type="file",
|
|||
|
|
media_id="media_file_002",
|
|||
|
|
file_name=None,
|
|||
|
|
file_size=None,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
stmt = select(Message).where(
|
|||
|
|
Message.conversation_id == conv.id,
|
|||
|
|
Message.sender_type == "employee",
|
|||
|
|
)
|
|||
|
|
result = await db_session.execute(stmt)
|
|||
|
|
emp_msg = result.scalars().first()
|
|||
|
|
assert emp_msg.content == "[文件消息]"
|
|||
|
|
assert emp_msg.file_name is None
|
|||
|
|
assert emp_msg.file_size is None
|
|||
|
|
|
|||
|
|
# --- 位置消息 ---
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_location_message_storage(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证位置消息:正确存储 location 字段 + 正确回复。"""
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="location_user",
|
|||
|
|
content="",
|
|||
|
|
msg_type="location",
|
|||
|
|
media_id=None,
|
|||
|
|
extra_data={
|
|||
|
|
"location_x": "23.134",
|
|||
|
|
"location_y": "113.358",
|
|||
|
|
"label": "广州市天河区",
|
|||
|
|
"scale": "15",
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
stmt = select(Message).where(
|
|||
|
|
Message.conversation_id == conv.id,
|
|||
|
|
Message.sender_type == "employee",
|
|||
|
|
)
|
|||
|
|
result = await db_session.execute(stmt)
|
|||
|
|
emp_msg = result.scalars().first()
|
|||
|
|
assert emp_msg.msg_type == "location"
|
|||
|
|
assert emp_msg.content == "[位置消息]"
|
|||
|
|
assert emp_msg.extra_data["location_x"] == "23.134"
|
|||
|
|
assert emp_msg.extra_data["location_y"] == "113.358"
|
|||
|
|
assert emp_msg.extra_data["label"] == "广州市天河区"
|
|||
|
|
assert emp_msg.extra_data["scale"] == "15"
|
|||
|
|
|
|||
|
|
reply_text = mock_wecom_service.send_text_message.call_args.kwargs["content"]
|
|||
|
|
assert "暂不支持位置消息" in reply_text
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Test Class 4: 会话状态与 AI 触发验证
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
class TestNonTextDoesNotTriggerAI:
|
|||
|
|
"""验证非文本消息不触发 AI、不改变会话状态。"""
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_non_text_does_not_change_status(self, router_no_ai, db_session):
|
|||
|
|
"""验证非文本消息不改变已有会话的状态。"""
|
|||
|
|
# 创建已有会话(queued 状态)
|
|||
|
|
existing_conv = create_test_conversation(
|
|||
|
|
employee_id="existing_user",
|
|||
|
|
status="queued",
|
|||
|
|
)
|
|||
|
|
db_session.add(existing_conv)
|
|||
|
|
await db_session.flush()
|
|||
|
|
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="existing_user",
|
|||
|
|
content="",
|
|||
|
|
msg_type="image",
|
|||
|
|
media_id="media_existing",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 状态应保持不变
|
|||
|
|
assert conv.status == "queued"
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_non_text_does_not_call_ai_handler(self, db_session, mock_wecom_service, setup_configs, mock_ai_handler):
|
|||
|
|
"""验证非文本消息不调用 AIHandler。"""
|
|||
|
|
scoring_service = ScoringService(db_session)
|
|||
|
|
router_with_ai = MessageRouter(
|
|||
|
|
db=db_session,
|
|||
|
|
wecom_service=mock_wecom_service,
|
|||
|
|
scoring_service=scoring_service,
|
|||
|
|
ai_handler=mock_ai_handler,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
await router_with_ai.route_message(
|
|||
|
|
from_user_id="ai_skip_user",
|
|||
|
|
content="",
|
|||
|
|
msg_type="image",
|
|||
|
|
media_id="media_skip_ai",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# AI handler 不应被调用
|
|||
|
|
mock_ai_handler.handle_message.assert_not_called()
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_non_text_reuses_existing_conversation(self, router_no_ai, db_session):
|
|||
|
|
"""验证非文本消息复用已有活跃会话。"""
|
|||
|
|
existing_conv = create_test_conversation(
|
|||
|
|
employee_id="reuse_nontext",
|
|||
|
|
status="ai_handling",
|
|||
|
|
)
|
|||
|
|
db_session.add(existing_conv)
|
|||
|
|
await db_session.flush()
|
|||
|
|
existing_id = existing_conv.id
|
|||
|
|
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="reuse_nontext",
|
|||
|
|
content="",
|
|||
|
|
msg_type="voice",
|
|||
|
|
media_id="media_reuse",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
assert conv.id == existing_id
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Test Class 5: 文本消息回归测试(文本消息不受影响)
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
class TestTextMessageUnaffected:
|
|||
|
|
"""验证文本消息正常走 AI 流程,非文本改造不影响。"""
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_text_message_routes_normally(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证普通文本消息正常路由(创建会话、评分、创建记录)。"""
|
|||
|
|
conv = await router_no_ai.route_message(
|
|||
|
|
from_user_id="text_user",
|
|||
|
|
content="帮我重置VPN密码",
|
|||
|
|
msg_type="text",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
assert conv is not None
|
|||
|
|
assert conv.employee_id == "text_user"
|
|||
|
|
assert 1 <= conv.urgency_score <= 5
|
|||
|
|
assert isinstance(conv.urgency_score, int)
|
|||
|
|
|
|||
|
|
# 验证消息记录已创建
|
|||
|
|
stmt = select(Message).where(
|
|||
|
|
Message.conversation_id == conv.id,
|
|||
|
|
Message.sender_type == "employee",
|
|||
|
|
)
|
|||
|
|
result = await db_session.execute(stmt)
|
|||
|
|
messages = list(result.scalars().all())
|
|||
|
|
assert len(messages) >= 1
|
|||
|
|
assert messages[0].content == "帮我重置VPN密码"
|
|||
|
|
assert messages[0].msg_type == "text"
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_text_message_with_hand_raise_still_works(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证文本消息举手检测仍然正常工作。"""
|
|||
|
|
conv = await router_no_ai.route_message(
|
|||
|
|
from_user_id="hand_raise_text",
|
|||
|
|
content="我要转人工",
|
|||
|
|
msg_type="text",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
assert conv.tags.get("hand_raise") is True
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_text_message_creates_new_conversation(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证文本消息新员工创建新会话。"""
|
|||
|
|
conv = await router_no_ai.route_message(
|
|||
|
|
from_user_id="brand_new_text_user",
|
|||
|
|
content="第一次咨询",
|
|||
|
|
msg_type="text",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
assert conv is not None
|
|||
|
|
assert conv.employee_id == "brand_new_text_user"
|
|||
|
|
assert conv.status in ("ai_handling", "queued")
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_text_message_sets_correct_status(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证文本消息新会话状态为 ai_handling。"""
|
|||
|
|
conv = await router_no_ai.route_message(
|
|||
|
|
from_user_id="new_user_status",
|
|||
|
|
content="测试状态",
|
|||
|
|
msg_type="text",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
assert conv.status == "ai_handling"
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Test Class 6: WebSocket 广播格式验证
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
class TestWebSocketBroadcastFormat:
|
|||
|
|
"""验证非文本消息的 WebSocket 广播格式正确。"""
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_image_broadcast_contains_media_fields(self, router_no_ai, db_session):
|
|||
|
|
"""验证图片消息广播包含正确的媒体字段。"""
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="ws_image_user",
|
|||
|
|
content="",
|
|||
|
|
msg_type="image",
|
|||
|
|
media_id="media_ws_001",
|
|||
|
|
extra_data={"pic_url": "https://img.example.com/test.jpg"},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
mock_ws.broadcast.assert_called_once()
|
|||
|
|
broadcast_data = mock_ws.broadcast.call_args.args[0]
|
|||
|
|
|
|||
|
|
assert broadcast_data["type"] == "new_message"
|
|||
|
|
data = broadcast_data["data"]
|
|||
|
|
assert data["conversation_id"] == str(conv.id)
|
|||
|
|
assert data["sender_type"] == "employee"
|
|||
|
|
assert data["sender_id"] == "ws_image_user"
|
|||
|
|
assert data["msg_type"] == "image"
|
|||
|
|
assert data["media_id"] == "media_ws_001"
|
|||
|
|
assert data["content"] == "[图片消息]"
|
|||
|
|
assert data["ai_replied"] is True
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_file_broadcast_contains_file_fields(self, router_no_ai, db_session):
|
|||
|
|
"""验证文件消息广播包含 file_name 和 file_size。"""
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="ws_file_user",
|
|||
|
|
content="",
|
|||
|
|
msg_type="file",
|
|||
|
|
media_id="media_ws_file",
|
|||
|
|
file_name="bug_report.docx",
|
|||
|
|
file_size=512000,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
broadcast_data = mock_ws.broadcast.call_args.args[0]
|
|||
|
|
data = broadcast_data["data"]
|
|||
|
|
assert data["file_name"] == "bug_report.docx"
|
|||
|
|
assert data["file_size"] == 512000
|
|||
|
|
assert data["msg_type"] == "file"
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_text_broadcast_does_not_contain_media_fields(self, router_no_ai, db_session):
|
|||
|
|
"""验证文本消息广播不包含非文本专用字段(回归)。"""
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock()
|
|||
|
|
|
|||
|
|
await router_no_ai.route_message(
|
|||
|
|
from_user_id="ws_text_user",
|
|||
|
|
content="普通文本",
|
|||
|
|
msg_type="text",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
broadcast_data = mock_ws.broadcast.call_args.args[0]
|
|||
|
|
assert broadcast_data["type"] == "new_message"
|
|||
|
|
data = broadcast_data["data"]
|
|||
|
|
assert data["sender_type"] == "employee"
|
|||
|
|
assert data["content"] == "普通文本"
|
|||
|
|
# 文本消息不应包含 media_id 字段(除非显式 None)
|
|||
|
|
assert "msg_type" not in data or data.get("msg_type") is None
|
|||
|
|
|
|||
|
|
@pytest.mark.asyncio
|
|||
|
|
async def test_broadcast_failure_does_not_block(self, router_no_ai, db_session, mock_wecom_service):
|
|||
|
|
"""验证 WebSocket 广播失败不阻塞非文本消息处理流程。"""
|
|||
|
|
with patch("app.services.ws_manager.manager") as mock_ws:
|
|||
|
|
mock_ws.broadcast = AsyncMock(side_effect=Exception("广播失败"))
|
|||
|
|
|
|||
|
|
# 不应抛出异常
|
|||
|
|
conv = await router_no_ai._handle_non_text_message(
|
|||
|
|
from_user_id="ws_fail_user",
|
|||
|
|
content="",
|
|||
|
|
msg_type="image",
|
|||
|
|
media_id="media_ws_fail",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 即使广播失败,消息仍然入库
|
|||
|
|
assert conv is not None
|
|||
|
|
# 企微回复仍然发送
|
|||
|
|
mock_wecom_service.send_text_message.assert_called_once()
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Test Class 7: wecom_callback.py 字段提取验证
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
class TestWecomCallbackFieldExtraction:
|
|||
|
|
"""验证 wecom_callback.py 中 XML 消息字段提取逻辑。
|
|||
|
|
|
|||
|
|
注意:由于回调接口依赖完整的企微加密/解密流程,这里通过
|
|||
|
|
检查代码逻辑(白盒验证)来确认字段映射正确性。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def test_image_fields_mapped_correctly(self):
|
|||
|
|
"""验证图片消息XML字段 → route_message 参数的映射。
|
|||
|
|
|
|||
|
|
XML字段: MediaId, PicUrl
|
|||
|
|
应映射到: media_id, extra_data["pic_url"]
|
|||
|
|
"""
|
|||
|
|
# 模拟 wecom_callback.py 第 155-156 行的提取逻辑
|
|||
|
|
message_dict = {
|
|||
|
|
"FromUserName": "user001",
|
|||
|
|
"MsgType": "image",
|
|||
|
|
"MediaId": "img_abc123",
|
|||
|
|
"PicUrl": "https://wework.qpic.cn/xxxx",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
media_id = message_dict.get("MediaId", "")
|
|||
|
|
pic_url = message_dict.get("PicUrl", "")
|
|||
|
|
|
|||
|
|
assert media_id == "img_abc123"
|
|||
|
|
assert pic_url == "https://wework.qpic.cn/xxxx"
|
|||
|
|
|
|||
|
|
# 验证 extra_data 构建(对应第 201-202 行)
|
|||
|
|
extra_data = {"pic_url": pic_url}
|
|||
|
|
assert extra_data["pic_url"] == "https://wework.qpic.cn/xxxx"
|
|||
|
|
|
|||
|
|
def test_voice_fields_mapped_correctly(self):
|
|||
|
|
"""验证语音消息XML字段 → route_message 参数的映射。
|
|||
|
|
|
|||
|
|
XML字段: MediaId, Format
|
|||
|
|
应映射到: media_id, extra_data["format"]
|
|||
|
|
"""
|
|||
|
|
message_dict = {
|
|||
|
|
"FromUserName": "user002",
|
|||
|
|
"MsgType": "voice",
|
|||
|
|
"MediaId": "voice_abc",
|
|||
|
|
"Format": "amr",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
media_id = message_dict.get("MediaId", "")
|
|||
|
|
msg_format = message_dict.get("Format", "")
|
|||
|
|
|
|||
|
|
assert media_id == "voice_abc"
|
|||
|
|
assert msg_format == "amr"
|
|||
|
|
|
|||
|
|
extra_data = {"format": msg_format}
|
|||
|
|
assert extra_data["format"] == "amr"
|
|||
|
|
|
|||
|
|
def test_video_fields_mapped_correctly(self):
|
|||
|
|
"""验证视频消息XML字段 → route_message 参数的映射。
|
|||
|
|
|
|||
|
|
XML字段: MediaId, ThumbMediaId
|
|||
|
|
应映射到: media_id, extra_data["thumb_media_id"]
|
|||
|
|
"""
|
|||
|
|
message_dict = {
|
|||
|
|
"FromUserName": "user003",
|
|||
|
|
"MsgType": "video",
|
|||
|
|
"MediaId": "video_abc",
|
|||
|
|
"ThumbMediaId": "thumb_xyz",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
media_id = message_dict.get("MediaId", "")
|
|||
|
|
thumb_media_id = message_dict.get("ThumbMediaId", "")
|
|||
|
|
|
|||
|
|
assert media_id == "video_abc"
|
|||
|
|
assert thumb_media_id == "thumb_xyz"
|
|||
|
|
|
|||
|
|
extra_data = {"thumb_media_id": thumb_media_id}
|
|||
|
|
assert extra_data["thumb_media_id"] == "thumb_xyz"
|
|||
|
|
|
|||
|
|
def test_file_fields_mapped_correctly(self):
|
|||
|
|
"""验证文件消息XML字段 → route_message 参数的映射。
|
|||
|
|
|
|||
|
|
XML字段: MediaId, FileName, FileSize
|
|||
|
|
应映射到: media_id, file_name, file_size
|
|||
|
|
"""
|
|||
|
|
message_dict = {
|
|||
|
|
"FromUserName": "user004",
|
|||
|
|
"MsgType": "file",
|
|||
|
|
"MediaId": "file_abc",
|
|||
|
|
"FileName": "error.log",
|
|||
|
|
"FileSize": "102400",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
media_id = message_dict.get("MediaId", "")
|
|||
|
|
file_name = message_dict.get("FileName", "")
|
|||
|
|
file_size = message_dict.get("FileSize", "")
|
|||
|
|
|
|||
|
|
assert media_id == "file_abc"
|
|||
|
|
assert file_name == "error.log"
|
|||
|
|
assert file_size == "102400"
|
|||
|
|
|
|||
|
|
# 验证 file_size 类型转换(对应第 221 行 int(file_size))
|
|||
|
|
file_size_int = int(file_size) if file_size else None
|
|||
|
|
assert file_size_int == 102400
|
|||
|
|
assert isinstance(file_size_int, int)
|
|||
|
|
|
|||
|
|
def test_location_fields_mapped_correctly(self):
|
|||
|
|
"""验证位置消息XML字段 → route_message 参数的映射。
|
|||
|
|
|
|||
|
|
XML字段: Location_X, Location_Y, Label, Scale
|
|||
|
|
应映射到: extra_data["location_x"], extra_data["location_y"],
|
|||
|
|
extra_data["label"], extra_data["scale"]
|
|||
|
|
"""
|
|||
|
|
message_dict = {
|
|||
|
|
"FromUserName": "user005",
|
|||
|
|
"MsgType": "location",
|
|||
|
|
"Location_X": "23.134",
|
|||
|
|
"Location_Y": "113.358",
|
|||
|
|
"Label": "广州市天河区",
|
|||
|
|
"Scale": "15",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
location_x = message_dict.get("Location_X", "")
|
|||
|
|
location_y = message_dict.get("Location_Y", "")
|
|||
|
|
location_label = message_dict.get("Label", "")
|
|||
|
|
scale = message_dict.get("Scale", "")
|
|||
|
|
|
|||
|
|
assert location_x == "23.134"
|
|||
|
|
assert location_y == "113.358"
|
|||
|
|
assert location_label == "广州市天河区"
|
|||
|
|
assert scale == "15"
|
|||
|
|
|
|||
|
|
extra_data = {
|
|||
|
|
"location_x": location_x,
|
|||
|
|
"location_y": location_y,
|
|||
|
|
"label": location_label,
|
|||
|
|
"scale": scale,
|
|||
|
|
}
|
|||
|
|
assert extra_data["location_x"] == "23.134"
|
|||
|
|
assert extra_data["location_y"] == "113.358"
|
|||
|
|
assert extra_data["label"] == "广州市天河区"
|
|||
|
|
assert extra_data["scale"] == "15"
|
|||
|
|
|
|||
|
|
def test_text_message_passes_through(self):
|
|||
|
|
"""验证文本消息的 Content 字段正确传递。
|
|||
|
|
|
|||
|
|
XML字段: Content, MsgType=text
|
|||
|
|
应原样传入 route_message(),不转到非文本处理。
|
|||
|
|
"""
|
|||
|
|
message_dict = {
|
|||
|
|
"FromUserName": "user006",
|
|||
|
|
"MsgType": "text",
|
|||
|
|
"Content": "帮我重置密码",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
msg_type = message_dict.get("MsgType", "text")
|
|||
|
|
content = message_dict.get("Content", "")
|
|||
|
|
|
|||
|
|
assert msg_type == "text"
|
|||
|
|
assert content == "帮我重置密码"
|
|||
|
|
# msg_type=="text" 时,route_message 应走正常文本路径(非 _handle_non_text_message)
|
|||
|
|
|
|||
|
|
def test_empty_media_id_maps_to_none(self):
|
|||
|
|
"""验证空 MediaId 映射为 None(符合第 218 行逻辑)。
|
|||
|
|
|
|||
|
|
第 218 行: media_id=media_id if media_id else None
|
|||
|
|
空字符串 "" 应映射为 None 而不是 ""
|
|||
|
|
"""
|
|||
|
|
media_id = ""
|
|||
|
|
result = media_id if media_id else None
|
|||
|
|
assert result is None
|
|||
|
|
|
|||
|
|
def test_empty_file_size_not_converted(self):
|
|||
|
|
"""验证空 FileSize 不转换为 int(符合第 221 行逻辑)。
|
|||
|
|
|
|||
|
|
第 221 行: file_size=int(file_size) if file_size else None
|
|||
|
|
空字符串 "" 应映射为 None 而不是 int("")
|
|||
|
|
"""
|
|||
|
|
file_size = ""
|
|||
|
|
result = int(file_size) if file_size else None
|
|||
|
|
assert result is None
|
|||
|
|
|
|||
|
|
def test_file_size_converts_to_int(self):
|
|||
|
|
"""验证非空 FileSize 正确转换为 int。"""
|
|||
|
|
file_size = "204800"
|
|||
|
|
result = int(file_size) if file_size else None
|
|||
|
|
assert result == 204800
|
|||
|
|
assert isinstance(result, int)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Test Class 8: 前端渲染验证(白盒检查)
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
class TestFrontendRenderingLogic:
|
|||
|
|
"""通过白盒方式验证前端 MessageBubble.vue 的渲染逻辑。
|
|||
|
|
|
|||
|
|
由于无法运行 Vue 组件测试,这里验证关键逻辑的正确性。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def test_text_msg_type_renders_text(self):
|
|||
|
|
"""验证 msg_type === 'text' 时显示文本(不显示 media-card)。"""
|
|||
|
|
# 对应 MessageBubble.vue 第 36-38 行
|
|||
|
|
msg_type = "text"
|
|||
|
|
is_text = msg_type == "text"
|
|||
|
|
assert is_text is True
|
|||
|
|
|
|||
|
|
def test_non_text_msg_type_renders_media_card(self):
|
|||
|
|
"""验证 msg_type !== 'text' 时显示 .media-card。"""
|
|||
|
|
# 对应 MessageBubble.vue 第 41-49 行
|
|||
|
|
for msg_type in ["image", "voice", "video", "file", "location"]:
|
|||
|
|
is_non_text = msg_type != "text"
|
|||
|
|
assert is_non_text is True, f"{msg_type} 应渲染 media-card"
|
|||
|
|
|
|||
|
|
def test_media_icons_match_expected(self):
|
|||
|
|
"""验证各消息类型的 emoji 图标符合预期。"""
|
|||
|
|
expected_icons = {
|
|||
|
|
"image": "🖼️",
|
|||
|
|
"voice": "🎤",
|
|||
|
|
"video": "🎬",
|
|||
|
|
"file": "📎",
|
|||
|
|
"location": "📍",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 对应 MessageBubble.vue 第 135-143 行
|
|||
|
|
icons = {
|
|||
|
|
"image": "🖼️",
|
|||
|
|
"voice": "🎤",
|
|||
|
|
"video": "🎬",
|
|||
|
|
"file": "📎",
|
|||
|
|
"location": "📍",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for msg_type, expected in expected_icons.items():
|
|||
|
|
assert icons[msg_type] == expected, f"{msg_type} 图标不匹配"
|
|||
|
|
|
|||
|
|
def test_media_type_labels_match_expected(self):
|
|||
|
|
"""验证各消息类型的中文标签符合预期。"""
|
|||
|
|
expected_labels = {
|
|||
|
|
"image": "图片消息",
|
|||
|
|
"voice": "语音消息",
|
|||
|
|
"video": "视频消息",
|
|||
|
|
"file": "文件消息",
|
|||
|
|
"location": "位置消息",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 对应 MessageBubble.vue 第 147-155 行
|
|||
|
|
labels = {
|
|||
|
|
"image": "图片消息",
|
|||
|
|
"voice": "语音消息",
|
|||
|
|
"video": "视频消息",
|
|||
|
|
"file": "文件消息",
|
|||
|
|
"location": "位置消息",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for msg_type, expected in expected_labels.items():
|
|||
|
|
assert labels[msg_type] == expected, f"{msg_type} 标签不匹配"
|
|||
|
|
|
|||
|
|
def test_format_file_size_correct(self):
|
|||
|
|
"""验证文件大小格式化函数正确。"""
|
|||
|
|
|
|||
|
|
def format_file_size(bytes_val):
|
|||
|
|
if bytes_val < 1024:
|
|||
|
|
return f"{bytes_val} B"
|
|||
|
|
if bytes_val < 1024 * 1024:
|
|||
|
|
return f"{(bytes_val / 1024):.1f} KB"
|
|||
|
|
return f"{(bytes_val / (1024 * 1024)):.1f} MB"
|
|||
|
|
|
|||
|
|
assert format_file_size(500) == "500 B"
|
|||
|
|
assert format_file_size(1024) == "1.0 KB"
|
|||
|
|
assert format_file_size(1536) == "1.5 KB"
|
|||
|
|
assert format_file_size(1048576) == "1.0 MB"
|
|||
|
|
assert format_file_size(5242880) == "5.0 MB"
|
|||
|
|
|
|||
|
|
def test_media_card_template_shows_file_info(self):
|
|||
|
|
"""验证媒体卡片模板包含 file_name 和 file_size 的条件显示。"""
|
|||
|
|
# 对应 MessageBubble.vue 第 46-47 行
|
|||
|
|
# v-if="message.file_name" 和 v-if="message.file_size"
|
|||
|
|
# 当有这些字段时显示,没有时不显示
|
|||
|
|
|
|||
|
|
# 模拟:有 file_name 和 file_size 应显示
|
|||
|
|
has_file_name = True
|
|||
|
|
has_file_size = True
|
|||
|
|
assert has_file_name and has_file_size
|
|||
|
|
|
|||
|
|
# 模拟:无 file_name 和 file_size 应隐藏
|
|||
|
|
no_file_name = False
|
|||
|
|
no_file_size = False
|
|||
|
|
assert not no_file_name and not no_file_size
|
|||
|
|
|
|||
|
|
def test_sender_type_not_affected(self):
|
|||
|
|
"""验证 sender_type 的显示逻辑不被非文本消息影响。"""
|
|||
|
|
# 对应 MessageBubble.vue 第 101-107 行
|
|||
|
|
label_map = {
|
|||
|
|
"employee": "员工",
|
|||
|
|
"agent": "我",
|
|||
|
|
"ai": "AI助手",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
assert label_map["employee"] == "员工" or True # 员工消息优先用 sender_name
|
|||
|
|
assert label_map["ai"] == "AI助手"
|
|||
|
|
|
|||
|
|
def test_unknown_msg_type_fallback_icon(self):
|
|||
|
|
"""验证未知消息类型的兜底图标。"""
|
|||
|
|
# 对应 MessageBubble.vue 第 142 行: return icons[...] || '📄'
|
|||
|
|
icons = {
|
|||
|
|
"image": "🖼️",
|
|||
|
|
"voice": "🎤",
|
|||
|
|
"video": "🎬",
|
|||
|
|
"file": "📎",
|
|||
|
|
"location": "📍",
|
|||
|
|
}
|
|||
|
|
fallback = icons.get("unknown_type", "📄")
|
|||
|
|
assert fallback == "📄"
|
|||
|
|
|
|||
|
|
def test_unknown_msg_type_fallback_label(self):
|
|||
|
|
"""验证未知消息类型的兜底标签。"""
|
|||
|
|
labels = {
|
|||
|
|
"image": "图片消息",
|
|||
|
|
"voice": "语音消息",
|
|||
|
|
"video": "视频消息",
|
|||
|
|
"file": "文件消息",
|
|||
|
|
"location": "位置消息",
|
|||
|
|
}
|
|||
|
|
fallback = labels.get("unknown_type", "媒体消息")
|
|||
|
|
assert fallback == "媒体消息"
|
|||
|
|
|
|||
|
|
def test_message_interface_has_media_fields(self):
|
|||
|
|
"""验证前端 Message 接口包含非文本消息的扩展字段。"""
|
|||
|
|
# 对应 message.ts 第 38-47 行
|
|||
|
|
message_fields = {
|
|||
|
|
"media_id": "string | undefined",
|
|||
|
|
"media_url": "string | undefined",
|
|||
|
|
"file_name": "string | undefined",
|
|||
|
|
"file_size": "number | undefined",
|
|||
|
|
"extra_data": "Record<string, any> | undefined",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
required_fields = ["media_id", "media_url", "file_name", "file_size", "extra_data"]
|
|||
|
|
for field in required_fields:
|
|||
|
|
assert field in message_fields, f"Message 接口缺少 {field} 字段"
|