# ============================================================================= # 企微IT智能服务台 — 增强版本地回调模拟器 # ============================================================================= # 覆盖场景: # 场景 1: 普通咨询(单条消息,中性情绪) # 场景 2: 多轮对话 + 举手标记(同一员工连续追问后要求转人工) # 场景 3: 愤怒情绪 + 需介入标记(连发 4+ 条含愤怒关键词的消息) # 场景 4: 担忧情绪(含"担心"、"出错"等关键词) # 场景 5: 紧急 + 举手双重标记 # 场景 6: 已结单后重新咨询(验证新会话创建) # # 使用方式: # 1. 先启动后端(端口 8000) # 2. 运行本脚本:python simulate_wecom_enhanced.py # # 期望结果:前端坐席工作台能看到 6 种不同标签组合的会话 # ============================================================================= import hashlib import json import secrets import string import struct import sys import time import urllib.error import urllib.parse import urllib.request import xml.etree.ElementTree as ET # ---- Windows 终端编码兼容 ---- if sys.platform == "win32": try: sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") except Exception: pass # ---- 配置 ---- # 与 backend/.env 中的配置保持一致 CORP_ID = "wwa8c87970b2011f41" TOKEN = "pjJquWIacCdCh9NeQ5axMrTPtQPk" ENCODING_AES_KEY = "42k7Ty5qCQHMwsAlzQdZ9tw87rPxUy0IgqcQgktUjJu" AGENT_ID = "1000133" # 后端地址 BACKEND_URL = "http://localhost:8000" # ---- 场景定义 ---- # 每个场景 = (场景名称, 消息列表) # 每条消息 = (employee_id, 消息内容, 预期触发的标记) SCENARIOS = [ # ------------------------------------------------------------------ # 场景 1: 普通咨询 # 预期:无特殊标记,紧急度 1,情绪 neutral # ------------------------------------------------------------------ ( "场景1: 普通咨询", [ ("chenwei", "打印机无法连接", "neutral"), ], ), # ------------------------------------------------------------------ # 场景 2: 多轮对话 + 举手标记 # 同一员工连续 3 条消息后触发"转人工" # 预期:hand_raise=True, urgency >= 2 # ------------------------------------------------------------------ ( "场景2: 多轮对话+举手", [ ("liuna", "我的OA系统登录不了", "neutral"), ("liuna", "试了好几次都不行", "neutral"), ("liuna", "转人工!我要找真人客服", "hand_raise"), ], ), # ------------------------------------------------------------------ # 场景 3: 愤怒情绪 + 需介入标记 # 同一员工连发 4 条含愤怒关键词的消息 # 预期:emotion=angry, need_intervene=True(>3阈值), urgency >= 3 # ------------------------------------------------------------------ ( "场景3: 愤怒+需介入", [ ("zhaoqiang", "网络又断了", "neutral"), ("zhaoqiang", "这周已经断3次了,崩溃", "angry"), ("zhaoqiang", "你们IT到底行不行?投诉", "angry"), ("zhaoqiang", "再没人理我我真的要投诉了!!!", "angry"), ], ), # ------------------------------------------------------------------ # 场景 4: 担忧情绪 # 预期:emotion=worried, urgency >= 2 # ------------------------------------------------------------------ ( "场景4: 担忧情绪", [ ("sunli", "担心我的邮件数据丢失了", "worried"), ], ), # ------------------------------------------------------------------ # 场景 5: 紧急 + 举手双重标记 # 预期:emotion=urgent, hand_raise=True, urgency >= 3 # ------------------------------------------------------------------ ( "场景5: 紧急+举手", [ ("wanghai", "服务器马上要宕机了,紧急!转人工!", "urgent+hand_raise"), ], ), # ------------------------------------------------------------------ # 场景 6: 已结单后重新咨询 # 注意:此场景需要先有一个已结束的会话 # 这里先发一条普通消息创建会话,脚本结束后可在前端手动结单 # 然后用同一员工ID再发一条消息,验证新会话创建 # 为了自动化测试,这里直接模拟两个阶段 # ------------------------------------------------------------------ ( "场景6: 结单后重新咨询", [ ("zhoumei", "我的邮箱收不到邮件", "neutral"), # 中间需要前端手动结单,这里暂只发消息 # 结单后可再运行一次此脚本验证新会话创建 ], ), ] # ---- AES 加解密(从项目 WecomCrypto 类提取的核心逻辑)---- import base64 class SimpleWecomCrypto: """简化版企微加解密工具,用于本地测试。""" def __init__(self, token: str, encoding_aes_key: str, corp_id: str): self.token = token self.aes_key = base64.b64decode(encoding_aes_key + "=") # 补位 "=" self.iv = self.aes_key[:16] self.corp_id = corp_id def generate_signature(self, timestamp: str, nonce: str, encrypt: str) -> str: """生成签名:SHA1(sort([token, timestamp, nonce, encrypt]))""" sort_list = sorted([self.token, timestamp, nonce, encrypt]) concat_str = "".join(sort_list) return hashlib.sha1(concat_str.encode("utf-8")).hexdigest() def encrypt(self, plaintext: str) -> str: """AES-CBC 加密(模拟企微加密消息)""" from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend # 构造明文:random(16) + msg_len(4) + msg + corp_id random_str = secrets.token_bytes(16) msg_bytes = plaintext.encode("utf-8") msg_len = struct.pack("!I", len(msg_bytes)) corp_id_bytes = self.corp_id.encode("utf-8") plaintext_data = random_str + msg_len + msg_bytes + corp_id_bytes # PKCS7 填充(块大小 32) block_size = 32 pad_len = block_size - (len(plaintext_data) % block_size) plaintext_data += bytes([pad_len] * pad_len) # AES-CBC 加密 cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(self.iv), backend=default_backend()) encryptor = cipher.encryptor() encrypted_data = encryptor.update(plaintext_data) + encryptor.finalize() return base64.b64encode(encrypted_data).decode("utf-8") def build_encrypted_xml(self, reply_content: str) -> str: """构造企微回调的加密 XML 消息体""" timestamp = str(int(time.time())) nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10)) # 加密消息内容 encrypt = self.encrypt(reply_content) # 生成签名 signature = self.generate_signature(timestamp, nonce, encrypt) # 构造 XML(与企微推送格式一致) xml = ( f"" f"" f"{AGENT_ID}" f"" f"" ) return xml, signature, timestamp, nonce # ---- 测试函数 ---- def test_health(): """测试 0:后端健康检查""" print("=" * 60) print("测试 0: 后端健康检查") print("=" * 60) try: req = urllib.request.Request(f"{BACKEND_URL}/health", method="GET") with urllib.request.urlopen(req, timeout=3) as resp: body = json.loads(resp.read().decode("utf-8")) if body.get("status") == "ok": print(" [OK] 后端正常运行") else: print(f" [WARN] 后端响应异常: {body}") except Exception as e: print(f" [FAIL] 后端不可达: {e}") print(" 请先启动后端:cd backend && python -m uvicorn app.main:app --host 0.0.0.0 --port 8000") return False print() return True def test_verify_url(crypto: SimpleWecomCrypto): """测试 1:验证回调 URL(模拟企微 GET 请求)""" print("=" * 60) print("测试 1: 验证回调 URL(GET /api/wecom/callback)") print("=" * 60) timestamp = str(int(time.time())) nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10)) echostr_plaintext = str(secrets.randbelow(10**10)) echostr_encrypted = crypto.encrypt(echostr_plaintext) signature = crypto.generate_signature(timestamp, nonce, echostr_encrypted) url = ( f"{BACKEND_URL}/api/wecom/callback?" f"msg_signature={urllib.parse.quote(signature)}&" f"timestamp={urllib.parse.quote(timestamp)}&" f"nonce={urllib.parse.quote(nonce)}&" f"echostr={urllib.parse.quote(echostr_encrypted)}" ) try: req = urllib.request.Request(url, method="GET") with urllib.request.urlopen(req, timeout=5) as resp: status = resp.status body = resp.read().decode("utf-8") if body.strip() == echostr_plaintext: print(f" [OK] 验证成功! 状态码={status}") else: print(f" [FAIL] 验证失败! 期望={echostr_plaintext}, 实际={body[:200]}") except urllib.error.HTTPError as e: print(f" [FAIL] HTTP 错误: {e.code}") except Exception as e: print(f" [FAIL] 请求失败: {e}") print() def send_message(crypto: SimpleWecomCrypto, employee_id: str, content: str) -> bool: """发送单条员工消息(模拟企微 POST 回调)。 Args: crypto: 加密工具实例 employee_id: 员工企微 UserID content: 消息内容 Returns: bool: 是否成功(后端返回 "success") """ # 构造企微消息 XML(明文) message_xml = ( f"" f"" f"" f"{int(time.time())}" f"" f"" f"{secrets.randbelow(10**18)}" f"{AGENT_ID}" f"" ) # 加密消息 XML encrypted_xml, signature, timestamp, nonce = crypto.build_encrypted_xml(message_xml) # 构造 POST 请求 URL(带签名参数) url = ( f"{BACKEND_URL}/api/wecom/callback?" f"msg_signature={urllib.parse.quote(signature)}&" f"timestamp={urllib.parse.quote(timestamp)}&" f"nonce={urllib.parse.quote(nonce)}" ) try: req = urllib.request.Request( url, data=encrypted_xml.encode("utf-8"), headers={"Content-Type": "application/xml"}, method="POST", ) with urllib.request.urlopen(req, timeout=10) as resp: body = resp.read().decode("utf-8") if body.strip() == "success": print(f" [OK] {employee_id}: \"{content[:30]}{'...' if len(content) > 30 else ''}\"") return True else: print(f" [WARN] 返回非success: {body[:100]}") return False except urllib.error.HTTPError as e: err_body = e.read().decode("utf-8")[:200] print(f" [FAIL] HTTP {e.code}: {err_body}") return False except Exception as e: print(f" [FAIL] {e}") return False def run_scenario(crypto: SimpleWecomCrypto, scenario_name: str, messages: list): """运行一个测试场景。 Args: crypto: 加密工具实例 scenario_name: 场景名称 messages: 消息列表 [(employee_id, content, expected_tag), ...] """ print("=" * 60) print(f" {scenario_name}") print("=" * 60) print(f" 消息数: {len(messages)}") print(f" 预期标记: {', '.join(set(m[2] for m in messages if m[2] != 'neutral'))}") print() success_count = 0 for employee_id, content, expected_tag in messages: ok = send_message(crypto, employee_id, content) if ok: success_count += 1 # 消息间稍作延迟,模拟真实场景节奏 time.sleep(0.3) total = len(messages) status = "[OK]" if success_count == total else "[WARN]" print(f"\n {status} 场景结果: {success_count}/{total} 条消息成功") print() def verify_conversations(): """验证后端会话列表,展示各会话的标记和紧急度。""" print("=" * 60) print(" 验证: 查询会话列表(GET /api/conversations)") print("=" * 60) try: req = urllib.request.Request(f"{BACKEND_URL}/api/conversations", method="GET") with urllib.request.urlopen(req, timeout=5) as resp: body = json.loads(resp.read().decode("utf-8")) items = body.get("data", {}).get("items", []) if not items: print(" [WARN] 无会话数据") print() return print(f" 共 {len(items)} 个会话:\n") # 表头 print(f" {'员工ID':<15} {'状态':<10} {'紧急度':<8} {'标记'}") print(f" {'-'*15} {'-'*10} {'-'*8} {'-'*30}") for conv in items: emp_id = conv.get("employee_id", "?") status = conv.get("status", "?") urgency = conv.get("urgency_score", "?") tags = conv.get("tags", {}) # 解析标记为可读字符串 tag_parts = [] if tags.get("hand_raise"): tag_parts.append("[举手]") if tags.get("need_intervene"): tag_parts.append("[需介入]") emotion = tags.get("emotion", "") if emotion and emotion != "neutral": tag_parts.append(f"[情绪:{emotion}]") repeat = tags.get("repeat_count", 0) if repeat > 1: tag_parts.append(f"[追问x{repeat}]") tag_str = " ".join(tag_parts) if tag_parts else "-" print(f" {emp_id:<15} {status:<10} {urgency:<8} {tag_str}") print() # 验证各场景的预期标记 print(" 场景验证结果:") checks = { "chenwei": {"expected_tags": [], "desc": "场景1-普通咨询"}, "liuna": {"expected_tags": ["hand_raise"], "desc": "场景2-举手"}, "zhaoqiang": {"expected_tags": ["angry", "need_intervene"], "desc": "场景3-愤怒+介入"}, "sunli": {"expected_tags": ["worried"], "desc": "场景4-担忧"}, "wanghai": {"expected_tags": ["urgent", "hand_raise"], "desc": "场景5-紧急+举手"}, "zhoumei": {"expected_tags": [], "desc": "场景6-结单重开"}, } for conv in items: emp_id = conv.get("employee_id", "") if emp_id not in checks: continue check = checks[emp_id] tags = conv.get("tags", {}) # 收集实际标记 actual_tags = [] if tags.get("hand_raise"): actual_tags.append("hand_raise") if tags.get("need_intervene"): actual_tags.append("need_intervene") emotion = tags.get("emotion", "") if emotion and emotion != "neutral": actual_tags.append(emotion) # 对比 expected = set(check["expected_tags"]) actual = set(actual_tags) matched = expected & actual missing = expected - actual extra = actual - expected if not expected and not actual: result = "[OK] 无特殊标记(符合预期)" elif missing: result = f"[WARN] 缺少标记: {', '.join(missing)}" else: result = f"[OK] 预期标记全部命中" print(f" {check['desc']}: {result}") print() except Exception as e: print(f" [FAIL] 查询失败: {e}") print() # ---- 主程序 ---- def main(): print() print("[工具] 企微回调增强版本地模拟器") print(f" 后端地址: {BACKEND_URL}") print(f" Corp ID: {CORP_ID}") print(f" Agent ID: {AGENT_ID}") print(f" 场景数量: {len(SCENARIOS)}") print() # 初始化加解密工具 try: crypto = SimpleWecomCrypto( token=TOKEN, encoding_aes_key=ENCODING_AES_KEY, corp_id=CORP_ID, ) print("[OK] 加解密工具初始化成功") except Exception as e: print(f"[FAIL] 加解密工具初始化失败: {e}") return print() # 测试 0:健康检查 if not test_health(): return # 测试 1:验证回调 URL test_verify_url(crypto) # 逐场景发送消息 print("=" * 60) print(" 开始发送场景消息") print("=" * 60) print() total_messages = sum(len(msgs) for _, msgs in SCENARIOS) print(f" 共 {len(SCENARIOS)} 个场景, {total_messages} 条消息") print() for scenario_name, messages in SCENARIOS: run_scenario(crypto, scenario_name, messages) # 场景间稍长延迟,确保数据库提交完成 time.sleep(1) # 验证结果 verify_conversations() print("=" * 60) print("[完成] 增强版模拟测试完成!") print() print("后续步骤:") print(" 1. 在坐席端 (http://localhost:5173) 查看新会话") print(" 2. 观察不同标签组合在前端的展示效果") print(" 3. 测试接单/回复/转交/结单等操作") print(" 4. 结单 zhoumei 的会话后,再运行一次脚本验证场景6") print("=" * 60) if __name__ == "__main__": main()