# ============================================================================= # 企微IT智能服务台 — 本地回调模拟器 # ============================================================================= # 模拟企微服务器的回调行为,在本地测试消息收发流程: # 1. GET /api/wecom/callback — 验证 URL 有效性 # 2. POST /api/wecom/callback — 推送加密消息 # # 使用方式: # 1. 先启动后端(端口 8001) # 2. 运行本脚本:python simulate_wecom.py # # 脚本会自动: # - 使用项目的 WecomCrypto 类加密消息 # - 发送 GET 请求验证回调 URL # - 发送 POST 请求模拟员工消息 # - 检查后端是否正确处理了消息 # ============================================================================= import hashlib import json import secrets import string import struct import sys import time import urllib.error import urllib.parse import urllib.request import xml.etree.ElementTree as ET # ---- Windows 终端编码兼容 ---- # Windows 默认 GBK 编码无法输出 emoji,强制切换为 UTF-8 if sys.platform == "win32": try: sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") except Exception: pass # ---- 配置 ---- # 与 backend/.env 中的配置保持一致 CORP_ID = "wwa8c87970b2011f41" TOKEN = "pjJquWIacCdCh9NeQ5axMrTPtQPk" ENCODING_AES_KEY = "42k7Ty5qCQHMwsAlzQdZ9tw87rPxUy0IgqcQgktUjJu" AGENT_ID = "1000133" # 后端地址 BACKEND_URL = "http://localhost:8000" # ---- AES 加解密(从项目 WecomCrypto 类提取的核心逻辑)---- import base64 class SimpleWecomCrypto: """简化版企微加解密工具,用于本地测试。""" def __init__(self, token: str, encoding_aes_key: str, corp_id: str): self.token = token self.aes_key = base64.b64decode(encoding_aes_key + "=") # 补位 "=" self.iv = self.aes_key[:16] self.corp_id = corp_id def generate_signature(self, timestamp: str, nonce: str, encrypt: str) -> str: """生成签名:SHA1(sort([token, timestamp, nonce, encrypt]))""" sort_list = sorted([self.token, timestamp, nonce, encrypt]) concat_str = "".join(sort_list) return hashlib.sha1(concat_str.encode("utf-8")).hexdigest() def encrypt(self, plaintext: str) -> str: """AES-CBC 加密(模拟企微加密消息)""" from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend # 构造明文:random(16) + msg_len(4) + msg + corp_id random_str = secrets.token_bytes(16) msg_bytes = plaintext.encode("utf-8") msg_len = struct.pack("!I", len(msg_bytes)) corp_id_bytes = self.corp_id.encode("utf-8") plaintext_data = random_str + msg_len + msg_bytes + corp_id_bytes # PKCS7 填充(块大小 32) block_size = 32 pad_len = block_size - (len(plaintext_data) % block_size) plaintext_data += bytes([pad_len] * pad_len) # AES-CBC 加密 cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(self.iv), backend=default_backend()) encryptor = cipher.encryptor() encrypted_data = encryptor.update(plaintext_data) + encryptor.finalize() return base64.b64encode(encrypted_data).decode("utf-8") def build_encrypted_xml(self, reply_content: str) -> str: """构造企微回调的加密 XML 消息体""" timestamp = str(int(time.time())) nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10)) # 加密消息内容 encrypt = self.encrypt(reply_content) # 生成签名 signature = self.generate_signature(timestamp, nonce, encrypt) # 构造 XML(与企微推送格式一致) xml = ( f"" f"" f"{AGENT_ID}" f"" f"" ) return xml, signature, timestamp, nonce # ---- 测试函数 ---- def test_verify_url(crypto: SimpleWecomCrypto): """测试 1:验证回调 URL(模拟企微 GET 请求)""" print("=" * 60) print("测试 1: 验证回调 URL(GET /api/wecom/callback)") print("=" * 60) # 模拟企微验证 URL 时的 GET 参数 # 企微会生成一个随机的 echostr 并加密 timestamp = str(int(time.time())) nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10)) # 加密一个随机的 echostr 明文 echostr_plaintext = str(secrets.randbelow(10**10)) # 随机数字 echostr_encrypted = crypto.encrypt(echostr_plaintext) # 生成签名 signature = crypto.generate_signature(timestamp, nonce, echostr_encrypted) # 构造 GET 请求 URL url = ( f"{BACKEND_URL}/api/wecom/callback?" f"msg_signature={urllib.parse.quote(signature)}&" f"timestamp={urllib.parse.quote(timestamp)}&" f"nonce={urllib.parse.quote(nonce)}&" f"echostr={urllib.parse.quote(echostr_encrypted)}" ) try: req = urllib.request.Request(url, method="GET") with urllib.request.urlopen(req, timeout=5) as resp: status = resp.status body = resp.read().decode("utf-8") # 验证:后端应返回解密后的 echostr 明文 if body.strip() == echostr_plaintext: print(f" [OK] 验证成功! 状态码={status}") print(f" 返回的 echostr 明文匹配: {body[:50]}") else: print(f" [FAIL] 验证失败! 状态码={status}") print(f" 期望: {echostr_plaintext}") print(f" 实际: {body[:200]}") except urllib.error.HTTPError as e: print(f" [FAIL] HTTP 错误: {e.code}") print(f" 响应: {e.read().decode('utf-8')[:200]}") except Exception as e: print(f" [FAIL] 请求失败: {e}") print() def test_receive_message(crypto: SimpleWecomCrypto, employee_id: str, content: str): """测试 2:发送员工消息(模拟企微 POST 回调)""" print("=" * 60) print(f"测试 2: 接收员工消息(POST /api/wecom/callback)") print(f" 员工: {employee_id}") print(f" 消息: {content}") print("=" * 60) # 构造企微消息 XML(明文) message_xml = ( f"" f"" f"" f"{int(time.time())}" f"" f"" f"{secrets.randbelow(10**18)}" f"{AGENT_ID}" f"" ) # 加密消息 XML encrypted_xml, signature, timestamp, nonce = crypto.build_encrypted_xml(message_xml) # 构造 POST 请求 URL(带签名参数) url = ( f"{BACKEND_URL}/api/wecom/callback?" f"msg_signature={urllib.parse.quote(signature)}&" f"timestamp={urllib.parse.quote(timestamp)}&" f"nonce={urllib.parse.quote(nonce)}" ) try: req = urllib.request.Request( url, data=encrypted_xml.encode("utf-8"), headers={"Content-Type": "application/xml"}, method="POST", ) with urllib.request.urlopen(req, timeout=10) as resp: status = resp.status body = resp.read().decode("utf-8") # 企微期望后端返回 "success" if body.strip() == "success": print(f" [OK] 消息处理成功! 状态码={status}") print(f" 后端返回: {body}") else: print(f" [WARN] 状态码={status}") print(f" 返回内容: {body[:200]}") except urllib.error.HTTPError as e: print(f" [FAIL] HTTP 错误: {e.code}") print(f" 响应: {e.read().decode('utf-8')[:200]}") except Exception as e: print(f" [FAIL] 请求失败: {e}") print() def test_check_conversation(employee_id: str): """测试 3:检查消息是否成功创建了会话""" print("=" * 60) print(f"测试 3: 检查会话列表(GET /api/agents)") print("=" * 60) url = f"{BACKEND_URL}/api/agents" try: req = urllib.request.Request(url, method="GET") with urllib.request.urlopen(req, timeout=5) as resp: body = json.loads(resp.read().decode("utf-8")) items = body.get("data", {}).get("items", []) print(f" 当前坐席数量: {len(items)}") # 查询会话列表(需要 token,暂时跳过详细验证) print(f" ✅ 后端 API 正常响应") except Exception as e: print(f" [FAIL] 检查失败: {e}") print() def test_health(): """测试 0:后端健康检查""" print("=" * 60) print("测试 0: 后端健康检查") print("=" * 60) try: req = urllib.request.Request(f"{BACKEND_URL}/health", method="GET") with urllib.request.urlopen(req, timeout=3) as resp: body = json.loads(resp.read().decode("utf-8")) if body.get("status") == "ok": print(f" [OK] 后端正常运行") else: print(f" [WARN] 后端响应异常: {body}") except Exception as e: print(f" [FAIL] 后端不可达: {e}") print(f" 请先启动后端:cd backend && python -m uvicorn app.main:app --host 0.0.0.0 --port 8001") print() # ---- 主程序 ---- def main(): print() print("[工具] 企微回调本地模拟器") print(f" 后端地址: {BACKEND_URL}") print(f" Corp ID: {CORP_ID}") print(f" Agent ID: {AGENT_ID}") print() # 初始化加解密工具 try: crypto = SimpleWecomCrypto( token=TOKEN, encoding_aes_key=ENCODING_AES_KEY, corp_id=CORP_ID, ) print("[OK] 加解密工具初始化成功") except Exception as e: print(f"[FAIL] 加解密工具初始化失败: {e}") print(" 请检查 ENCODING_AES_KEY 是否为 43 位字符串") return print() # 测试 0:健康检查 test_health() # 测试 1:验证回调 URL test_verify_url(crypto) # 测试 2:发送员工消息(模拟 3 条不同场景的消息) test_messages = [ ("zhangsan", "我的电脑无法连接VPN,急!"), ("lisi", "请帮我重置邮箱密码"), ("wangwu", "我已经问了三次了还没人回复!!!"), # 会触发情绪标记 ] for employee_id, content in test_messages: test_receive_message(crypto, employee_id, content) time.sleep(0.5) # 稍等一下避免请求太快 # 测试 3:检查会话 test_check_conversation("zhangsan") print("=" * 60) print("[完成] 模拟测试完成!") print() print("后续步骤:") print(" 1. 在坐席端 (http://localhost:5173) 查看是否有新会话") print(" 2. 在后端日志中查看消息路由和标记检测结果") print(" 3. 确认企微回调流程正常后,配置公网 URL 对接真实企微") print("=" * 60) if __name__ == "__main__": main()