Files

334 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 本地回调模拟器
# =============================================================================
# 模拟企微服务器的回调行为,在本地测试消息收发流程:
# 1. GET /api/wecom/callback — 验证 URL 有效性
# 2. POST /api/wecom/callback — 推送加密消息
#
# 使用方式:
# 1. 先启动后端(端口 8001)
# 2. 运行本脚本:python simulate_wecom.py
#
# 脚本会自动:
# - 使用项目的 WecomCrypto 类加密消息
# - 发送 GET 请求验证回调 URL
# - 发送 POST 请求模拟员工消息
# - 检查后端是否正确处理了消息
# =============================================================================
import hashlib
import json
import secrets
import string
import struct
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
# ---- Windows 终端编码兼容 ----
# Windows 默认 GBK 编码无法输出 emoji,强制切换为 UTF-8
if sys.platform == "win32":
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
# ---- 配置 ----
# 与 backend/.env 中的配置保持一致
CORP_ID = "wwa8c87970b2011f41"
TOKEN = "pjJquWIacCdCh9NeQ5axMrTPtQPk"
ENCODING_AES_KEY = "42k7Ty5qCQHMwsAlzQdZ9tw87rPxUy0IgqcQgktUjJu"
AGENT_ID = "1000133"
# 后端地址
BACKEND_URL = "http://localhost:8000"
# ---- AES 加解密(从项目 WecomCrypto 类提取的核心逻辑)----
import base64
class SimpleWecomCrypto:
"""简化版企微加解密工具,用于本地测试。"""
def __init__(self, token: str, encoding_aes_key: str, corp_id: str):
self.token = token
self.aes_key = base64.b64decode(encoding_aes_key + "=") # 补位 "="
self.iv = self.aes_key[:16]
self.corp_id = corp_id
def generate_signature(self, timestamp: str, nonce: str, encrypt: str) -> str:
"""生成签名:SHA1(sort([token, timestamp, nonce, encrypt]))"""
sort_list = sorted([self.token, timestamp, nonce, encrypt])
concat_str = "".join(sort_list)
return hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
def encrypt(self, plaintext: str) -> str:
"""AES-CBC 加密(模拟企微加密消息)"""
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# 构造明文:random(16) + msg_len(4) + msg + corp_id
random_str = secrets.token_bytes(16)
msg_bytes = plaintext.encode("utf-8")
msg_len = struct.pack("!I", len(msg_bytes))
corp_id_bytes = self.corp_id.encode("utf-8")
plaintext_data = random_str + msg_len + msg_bytes + corp_id_bytes
# PKCS7 填充(块大小 32
block_size = 32
pad_len = block_size - (len(plaintext_data) % block_size)
plaintext_data += bytes([pad_len] * pad_len)
# AES-CBC 加密
cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(self.iv), backend=default_backend())
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(plaintext_data) + encryptor.finalize()
return base64.b64encode(encrypted_data).decode("utf-8")
def build_encrypted_xml(self, reply_content: str) -> str:
"""构造企微回调的加密 XML 消息体"""
timestamp = str(int(time.time()))
nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10))
# 加密消息内容
encrypt = self.encrypt(reply_content)
# 生成签名
signature = self.generate_signature(timestamp, nonce, encrypt)
# 构造 XML(与企微推送格式一致)
xml = (
f"<xml>"
f"<ToUserName><![CDATA[{self.corp_id}]]></ToUserName>"
f"<AgentID>{AGENT_ID}</AgentID>"
f"<Encrypt><![CDATA[{encrypt}]]></Encrypt>"
f"</xml>"
)
return xml, signature, timestamp, nonce
# ---- 测试函数 ----
def test_verify_url(crypto: SimpleWecomCrypto):
"""测试 1:验证回调 URL(模拟企微 GET 请求)"""
print("=" * 60)
print("测试 1: 验证回调 URLGET /api/wecom/callback")
print("=" * 60)
# 模拟企微验证 URL 时的 GET 参数
# 企微会生成一个随机的 echostr 并加密
timestamp = str(int(time.time()))
nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10))
# 加密一个随机的 echostr 明文
echostr_plaintext = str(secrets.randbelow(10**10)) # 随机数字
echostr_encrypted = crypto.encrypt(echostr_plaintext)
# 生成签名
signature = crypto.generate_signature(timestamp, nonce, echostr_encrypted)
# 构造 GET 请求 URL
url = (
f"{BACKEND_URL}/api/wecom/callback?"
f"msg_signature={urllib.parse.quote(signature)}&"
f"timestamp={urllib.parse.quote(timestamp)}&"
f"nonce={urllib.parse.quote(nonce)}&"
f"echostr={urllib.parse.quote(echostr_encrypted)}"
)
try:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
status = resp.status
body = resp.read().decode("utf-8")
# 验证:后端应返回解密后的 echostr 明文
if body.strip() == echostr_plaintext:
print(f" [OK] 验证成功! 状态码={status}")
print(f" 返回的 echostr 明文匹配: {body[:50]}")
else:
print(f" [FAIL] 验证失败! 状态码={status}")
print(f" 期望: {echostr_plaintext}")
print(f" 实际: {body[:200]}")
except urllib.error.HTTPError as e:
print(f" [FAIL] HTTP 错误: {e.code}")
print(f" 响应: {e.read().decode('utf-8')[:200]}")
except Exception as e:
print(f" [FAIL] 请求失败: {e}")
print()
def test_receive_message(crypto: SimpleWecomCrypto, employee_id: str, content: str):
"""测试 2:发送员工消息(模拟企微 POST 回调)"""
print("=" * 60)
print(f"测试 2: 接收员工消息(POST /api/wecom/callback")
print(f" 员工: {employee_id}")
print(f" 消息: {content}")
print("=" * 60)
# 构造企微消息 XML(明文)
message_xml = (
f"<xml>"
f"<ToUserName><![CDATA[{CORP_ID}]]></ToUserName>"
f"<FromUserName><![CDATA[{employee_id}]]></FromUserName>"
f"<CreateTime>{int(time.time())}</CreateTime>"
f"<MsgType><![CDATA[text]]></MsgType>"
f"<Content><![CDATA[{content}]]></Content>"
f"<MsgId>{secrets.randbelow(10**18)}</MsgId>"
f"<AgentID>{AGENT_ID}</AgentID>"
f"</xml>"
)
# 加密消息 XML
encrypted_xml, signature, timestamp, nonce = crypto.build_encrypted_xml(message_xml)
# 构造 POST 请求 URL(带签名参数)
url = (
f"{BACKEND_URL}/api/wecom/callback?"
f"msg_signature={urllib.parse.quote(signature)}&"
f"timestamp={urllib.parse.quote(timestamp)}&"
f"nonce={urllib.parse.quote(nonce)}"
)
try:
req = urllib.request.Request(
url,
data=encrypted_xml.encode("utf-8"),
headers={"Content-Type": "application/xml"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
status = resp.status
body = resp.read().decode("utf-8")
# 企微期望后端返回 "success"
if body.strip() == "success":
print(f" [OK] 消息处理成功! 状态码={status}")
print(f" 后端返回: {body}")
else:
print(f" [WARN] 状态码={status}")
print(f" 返回内容: {body[:200]}")
except urllib.error.HTTPError as e:
print(f" [FAIL] HTTP 错误: {e.code}")
print(f" 响应: {e.read().decode('utf-8')[:200]}")
except Exception as e:
print(f" [FAIL] 请求失败: {e}")
print()
def test_check_conversation(employee_id: str):
"""测试 3:检查消息是否成功创建了会话"""
print("=" * 60)
print(f"测试 3: 检查会话列表(GET /api/agents")
print("=" * 60)
url = f"{BACKEND_URL}/api/agents"
try:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
body = json.loads(resp.read().decode("utf-8"))
items = body.get("data", {}).get("items", [])
print(f" 当前坐席数量: {len(items)}")
# 查询会话列表(需要 token,暂时跳过详细验证)
print(f" ✅ 后端 API 正常响应")
except Exception as e:
print(f" [FAIL] 检查失败: {e}")
print()
def test_health():
"""测试 0:后端健康检查"""
print("=" * 60)
print("测试 0: 后端健康检查")
print("=" * 60)
try:
req = urllib.request.Request(f"{BACKEND_URL}/health", method="GET")
with urllib.request.urlopen(req, timeout=3) as resp:
body = json.loads(resp.read().decode("utf-8"))
if body.get("status") == "ok":
print(f" [OK] 后端正常运行")
else:
print(f" [WARN] 后端响应异常: {body}")
except Exception as e:
print(f" [FAIL] 后端不可达: {e}")
print(f" 请先启动后端:cd backend && python -m uvicorn app.main:app --host 0.0.0.0 --port 8001")
print()
# ---- 主程序 ----
def main():
print()
print("[工具] 企微回调本地模拟器")
print(f" 后端地址: {BACKEND_URL}")
print(f" Corp ID: {CORP_ID}")
print(f" Agent ID: {AGENT_ID}")
print()
# 初始化加解密工具
try:
crypto = SimpleWecomCrypto(
token=TOKEN,
encoding_aes_key=ENCODING_AES_KEY,
corp_id=CORP_ID,
)
print("[OK] 加解密工具初始化成功")
except Exception as e:
print(f"[FAIL] 加解密工具初始化失败: {e}")
print(" 请检查 ENCODING_AES_KEY 是否为 43 位字符串")
return
print()
# 测试 0:健康检查
test_health()
# 测试 1:验证回调 URL
test_verify_url(crypto)
# 测试 2:发送员工消息(模拟 3 条不同场景的消息)
test_messages = [
("zhangsan", "我的电脑无法连接VPN,急!"),
("lisi", "请帮我重置邮箱密码"),
("wangwu", "我已经问了三次了还没人回复!!!"), # 会触发情绪标记
]
for employee_id, content in test_messages:
test_receive_message(crypto, employee_id, content)
time.sleep(0.5) # 稍等一下避免请求太快
# 测试 3:检查会话
test_check_conversation("zhangsan")
print("=" * 60)
print("[完成] 模拟测试完成!")
print()
print("后续步骤:")
print(" 1. 在坐席端 (http://localhost:5173) 查看是否有新会话")
print(" 2. 在后端日志中查看消息路由和标记检测结果")
print(" 3. 确认企微回调流程正常后,配置公网 URL 对接真实企微")
print("=" * 60)
if __name__ == "__main__":
main()