# =============================================================================
# 企微IT智能服务台 — 本地回调模拟器
# =============================================================================
# 模拟企微服务器的回调行为,在本地测试消息收发流程:
# 1. GET /api/wecom/callback — 验证 URL 有效性
# 2. POST /api/wecom/callback — 推送加密消息
#
# 使用方式:
# 1. 先启动后端(端口 8001)
# 2. 运行本脚本:python simulate_wecom.py
#
# 脚本会自动:
# - 使用项目的 WecomCrypto 类加密消息
# - 发送 GET 请求验证回调 URL
# - 发送 POST 请求模拟员工消息
# - 检查后端是否正确处理了消息
# =============================================================================
import hashlib
import json
import secrets
import string
import struct
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
# ---- Windows 终端编码兼容 ----
# Windows 默认 GBK 编码无法输出 emoji,强制切换为 UTF-8
if sys.platform == "win32":
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
# ---- 配置 ----
# 与 backend/.env 中的配置保持一致
CORP_ID = "wwa8c87970b2011f41"
TOKEN = "pjJquWIacCdCh9NeQ5axMrTPtQPk"
ENCODING_AES_KEY = "42k7Ty5qCQHMwsAlzQdZ9tw87rPxUy0IgqcQgktUjJu"
AGENT_ID = "1000133"
# 后端地址
BACKEND_URL = "http://localhost:8000"
# ---- AES 加解密(从项目 WecomCrypto 类提取的核心逻辑)----
import base64
class SimpleWecomCrypto:
"""简化版企微加解密工具,用于本地测试。"""
def __init__(self, token: str, encoding_aes_key: str, corp_id: str):
self.token = token
self.aes_key = base64.b64decode(encoding_aes_key + "=") # 补位 "="
self.iv = self.aes_key[:16]
self.corp_id = corp_id
def generate_signature(self, timestamp: str, nonce: str, encrypt: str) -> str:
"""生成签名:SHA1(sort([token, timestamp, nonce, encrypt]))"""
sort_list = sorted([self.token, timestamp, nonce, encrypt])
concat_str = "".join(sort_list)
return hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
def encrypt(self, plaintext: str) -> str:
"""AES-CBC 加密(模拟企微加密消息)"""
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# 构造明文:random(16) + msg_len(4) + msg + corp_id
random_str = secrets.token_bytes(16)
msg_bytes = plaintext.encode("utf-8")
msg_len = struct.pack("!I", len(msg_bytes))
corp_id_bytes = self.corp_id.encode("utf-8")
plaintext_data = random_str + msg_len + msg_bytes + corp_id_bytes
# PKCS7 填充(块大小 32)
block_size = 32
pad_len = block_size - (len(plaintext_data) % block_size)
plaintext_data += bytes([pad_len] * pad_len)
# AES-CBC 加密
cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(self.iv), backend=default_backend())
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(plaintext_data) + encryptor.finalize()
return base64.b64encode(encrypted_data).decode("utf-8")
def build_encrypted_xml(self, reply_content: str) -> str:
"""构造企微回调的加密 XML 消息体"""
timestamp = str(int(time.time()))
nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10))
# 加密消息内容
encrypt = self.encrypt(reply_content)
# 生成签名
signature = self.generate_signature(timestamp, nonce, encrypt)
# 构造 XML(与企微推送格式一致)
xml = (
f""
f""
f"{AGENT_ID}"
f""
f""
)
return xml, signature, timestamp, nonce
# ---- 测试函数 ----
def test_verify_url(crypto: SimpleWecomCrypto):
"""测试 1:验证回调 URL(模拟企微 GET 请求)"""
print("=" * 60)
print("测试 1: 验证回调 URL(GET /api/wecom/callback)")
print("=" * 60)
# 模拟企微验证 URL 时的 GET 参数
# 企微会生成一个随机的 echostr 并加密
timestamp = str(int(time.time()))
nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10))
# 加密一个随机的 echostr 明文
echostr_plaintext = str(secrets.randbelow(10**10)) # 随机数字
echostr_encrypted = crypto.encrypt(echostr_plaintext)
# 生成签名
signature = crypto.generate_signature(timestamp, nonce, echostr_encrypted)
# 构造 GET 请求 URL
url = (
f"{BACKEND_URL}/api/wecom/callback?"
f"msg_signature={urllib.parse.quote(signature)}&"
f"timestamp={urllib.parse.quote(timestamp)}&"
f"nonce={urllib.parse.quote(nonce)}&"
f"echostr={urllib.parse.quote(echostr_encrypted)}"
)
try:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
status = resp.status
body = resp.read().decode("utf-8")
# 验证:后端应返回解密后的 echostr 明文
if body.strip() == echostr_plaintext:
print(f" [OK] 验证成功! 状态码={status}")
print(f" 返回的 echostr 明文匹配: {body[:50]}")
else:
print(f" [FAIL] 验证失败! 状态码={status}")
print(f" 期望: {echostr_plaintext}")
print(f" 实际: {body[:200]}")
except urllib.error.HTTPError as e:
print(f" [FAIL] HTTP 错误: {e.code}")
print(f" 响应: {e.read().decode('utf-8')[:200]}")
except Exception as e:
print(f" [FAIL] 请求失败: {e}")
print()
def test_receive_message(crypto: SimpleWecomCrypto, employee_id: str, content: str):
"""测试 2:发送员工消息(模拟企微 POST 回调)"""
print("=" * 60)
print(f"测试 2: 接收员工消息(POST /api/wecom/callback)")
print(f" 员工: {employee_id}")
print(f" 消息: {content}")
print("=" * 60)
# 构造企微消息 XML(明文)
message_xml = (
f""
f""
f""
f"{int(time.time())}"
f""
f""
f"{secrets.randbelow(10**18)}"
f"{AGENT_ID}"
f""
)
# 加密消息 XML
encrypted_xml, signature, timestamp, nonce = crypto.build_encrypted_xml(message_xml)
# 构造 POST 请求 URL(带签名参数)
url = (
f"{BACKEND_URL}/api/wecom/callback?"
f"msg_signature={urllib.parse.quote(signature)}&"
f"timestamp={urllib.parse.quote(timestamp)}&"
f"nonce={urllib.parse.quote(nonce)}"
)
try:
req = urllib.request.Request(
url,
data=encrypted_xml.encode("utf-8"),
headers={"Content-Type": "application/xml"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
status = resp.status
body = resp.read().decode("utf-8")
# 企微期望后端返回 "success"
if body.strip() == "success":
print(f" [OK] 消息处理成功! 状态码={status}")
print(f" 后端返回: {body}")
else:
print(f" [WARN] 状态码={status}")
print(f" 返回内容: {body[:200]}")
except urllib.error.HTTPError as e:
print(f" [FAIL] HTTP 错误: {e.code}")
print(f" 响应: {e.read().decode('utf-8')[:200]}")
except Exception as e:
print(f" [FAIL] 请求失败: {e}")
print()
def test_check_conversation(employee_id: str):
"""测试 3:检查消息是否成功创建了会话"""
print("=" * 60)
print(f"测试 3: 检查会话列表(GET /api/agents)")
print("=" * 60)
url = f"{BACKEND_URL}/api/agents"
try:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
body = json.loads(resp.read().decode("utf-8"))
items = body.get("data", {}).get("items", [])
print(f" 当前坐席数量: {len(items)}")
# 查询会话列表(需要 token,暂时跳过详细验证)
print(f" ✅ 后端 API 正常响应")
except Exception as e:
print(f" [FAIL] 检查失败: {e}")
print()
def test_health():
"""测试 0:后端健康检查"""
print("=" * 60)
print("测试 0: 后端健康检查")
print("=" * 60)
try:
req = urllib.request.Request(f"{BACKEND_URL}/health", method="GET")
with urllib.request.urlopen(req, timeout=3) as resp:
body = json.loads(resp.read().decode("utf-8"))
if body.get("status") == "ok":
print(f" [OK] 后端正常运行")
else:
print(f" [WARN] 后端响应异常: {body}")
except Exception as e:
print(f" [FAIL] 后端不可达: {e}")
print(f" 请先启动后端:cd backend && python -m uvicorn app.main:app --host 0.0.0.0 --port 8001")
print()
# ---- 主程序 ----
def main():
print()
print("[工具] 企微回调本地模拟器")
print(f" 后端地址: {BACKEND_URL}")
print(f" Corp ID: {CORP_ID}")
print(f" Agent ID: {AGENT_ID}")
print()
# 初始化加解密工具
try:
crypto = SimpleWecomCrypto(
token=TOKEN,
encoding_aes_key=ENCODING_AES_KEY,
corp_id=CORP_ID,
)
print("[OK] 加解密工具初始化成功")
except Exception as e:
print(f"[FAIL] 加解密工具初始化失败: {e}")
print(" 请检查 ENCODING_AES_KEY 是否为 43 位字符串")
return
print()
# 测试 0:健康检查
test_health()
# 测试 1:验证回调 URL
test_verify_url(crypto)
# 测试 2:发送员工消息(模拟 3 条不同场景的消息)
test_messages = [
("zhangsan", "我的电脑无法连接VPN,急!"),
("lisi", "请帮我重置邮箱密码"),
("wangwu", "我已经问了三次了还没人回复!!!"), # 会触发情绪标记
]
for employee_id, content in test_messages:
test_receive_message(crypto, employee_id, content)
time.sleep(0.5) # 稍等一下避免请求太快
# 测试 3:检查会话
test_check_conversation("zhangsan")
print("=" * 60)
print("[完成] 模拟测试完成!")
print()
print("后续步骤:")
print(" 1. 在坐席端 (http://localhost:5173) 查看是否有新会话")
print(" 2. 在后端日志中查看消息路由和标记检测结果")
print(" 3. 确认企微回调流程正常后,配置公网 URL 对接真实企微")
print("=" * 60)
if __name__ == "__main__":
main()