Files

513 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 增强版本地回调模拟器
# =============================================================================
# 覆盖场景:
# 场景 1: 普通咨询(单条消息,中性情绪)
# 场景 2: 多轮对话 + 举手标记(同一员工连续追问后要求转人工)
# 场景 3: 愤怒情绪 + 需介入标记(连发 4+ 条含愤怒关键词的消息)
# 场景 4: 担忧情绪(含"担心"、"出错"等关键词)
# 场景 5: 紧急 + 举手双重标记
# 场景 6: 已结单后重新咨询(验证新会话创建)
#
# 使用方式:
# 1. 先启动后端(端口 8000)
# 2. 运行本脚本:python simulate_wecom_enhanced.py
#
# 期望结果:前端坐席工作台能看到 6 种不同标签组合的会话
# =============================================================================
import hashlib
import json
import secrets
import string
import struct
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
# ---- Windows 终端编码兼容 ----
if sys.platform == "win32":
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
# ---- 配置 ----
# 与 backend/.env 中的配置保持一致
CORP_ID = "wwa8c87970b2011f41"
TOKEN = "pjJquWIacCdCh9NeQ5axMrTPtQPk"
ENCODING_AES_KEY = "42k7Ty5qCQHMwsAlzQdZ9tw87rPxUy0IgqcQgktUjJu"
AGENT_ID = "1000133"
# 后端地址
BACKEND_URL = "http://localhost:8000"
# ---- 场景定义 ----
# 每个场景 = (场景名称, 消息列表)
# 每条消息 = (employee_id, 消息内容, 预期触发的标记)
SCENARIOS = [
# ------------------------------------------------------------------
# 场景 1: 普通咨询
# 预期:无特殊标记,紧急度 1,情绪 neutral
# ------------------------------------------------------------------
(
"场景1: 普通咨询",
[
("chenwei", "打印机无法连接", "neutral"),
],
),
# ------------------------------------------------------------------
# 场景 2: 多轮对话 + 举手标记
# 同一员工连续 3 条消息后触发"转人工"
# 预期:hand_raise=True, urgency >= 2
# ------------------------------------------------------------------
(
"场景2: 多轮对话+举手",
[
("liuna", "我的OA系统登录不了", "neutral"),
("liuna", "试了好几次都不行", "neutral"),
("liuna", "转人工!我要找真人客服", "hand_raise"),
],
),
# ------------------------------------------------------------------
# 场景 3: 愤怒情绪 + 需介入标记
# 同一员工连发 4 条含愤怒关键词的消息
# 预期:emotion=angry, need_intervene=True(>3阈值), urgency >= 3
# ------------------------------------------------------------------
(
"场景3: 愤怒+需介入",
[
("zhaoqiang", "网络又断了", "neutral"),
("zhaoqiang", "这周已经断3次了,崩溃", "angry"),
("zhaoqiang", "你们IT到底行不行?投诉", "angry"),
("zhaoqiang", "再没人理我我真的要投诉了!!!", "angry"),
],
),
# ------------------------------------------------------------------
# 场景 4: 担忧情绪
# 预期:emotion=worried, urgency >= 2
# ------------------------------------------------------------------
(
"场景4: 担忧情绪",
[
("sunli", "担心我的邮件数据丢失了", "worried"),
],
),
# ------------------------------------------------------------------
# 场景 5: 紧急 + 举手双重标记
# 预期:emotion=urgent, hand_raise=True, urgency >= 3
# ------------------------------------------------------------------
(
"场景5: 紧急+举手",
[
("wanghai", "服务器马上要宕机了,紧急!转人工!", "urgent+hand_raise"),
],
),
# ------------------------------------------------------------------
# 场景 6: 已结单后重新咨询
# 注意:此场景需要先有一个已结束的会话
# 这里先发一条普通消息创建会话,脚本结束后可在前端手动结单
# 然后用同一员工ID再发一条消息,验证新会话创建
# 为了自动化测试,这里直接模拟两个阶段
# ------------------------------------------------------------------
(
"场景6: 结单后重新咨询",
[
("zhoumei", "我的邮箱收不到邮件", "neutral"),
# 中间需要前端手动结单,这里暂只发消息
# 结单后可再运行一次此脚本验证新会话创建
],
),
]
# ---- AES 加解密(从项目 WecomCrypto 类提取的核心逻辑)----
import base64
class SimpleWecomCrypto:
"""简化版企微加解密工具,用于本地测试。"""
def __init__(self, token: str, encoding_aes_key: str, corp_id: str):
self.token = token
self.aes_key = base64.b64decode(encoding_aes_key + "=") # 补位 "="
self.iv = self.aes_key[:16]
self.corp_id = corp_id
def generate_signature(self, timestamp: str, nonce: str, encrypt: str) -> str:
"""生成签名:SHA1(sort([token, timestamp, nonce, encrypt]))"""
sort_list = sorted([self.token, timestamp, nonce, encrypt])
concat_str = "".join(sort_list)
return hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
def encrypt(self, plaintext: str) -> str:
"""AES-CBC 加密(模拟企微加密消息)"""
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# 构造明文:random(16) + msg_len(4) + msg + corp_id
random_str = secrets.token_bytes(16)
msg_bytes = plaintext.encode("utf-8")
msg_len = struct.pack("!I", len(msg_bytes))
corp_id_bytes = self.corp_id.encode("utf-8")
plaintext_data = random_str + msg_len + msg_bytes + corp_id_bytes
# PKCS7 填充(块大小 32
block_size = 32
pad_len = block_size - (len(plaintext_data) % block_size)
plaintext_data += bytes([pad_len] * pad_len)
# AES-CBC 加密
cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(self.iv), backend=default_backend())
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(plaintext_data) + encryptor.finalize()
return base64.b64encode(encrypted_data).decode("utf-8")
def build_encrypted_xml(self, reply_content: str) -> str:
"""构造企微回调的加密 XML 消息体"""
timestamp = str(int(time.time()))
nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10))
# 加密消息内容
encrypt = self.encrypt(reply_content)
# 生成签名
signature = self.generate_signature(timestamp, nonce, encrypt)
# 构造 XML(与企微推送格式一致)
xml = (
f"<xml>"
f"<ToUserName><![CDATA[{self.corp_id}]]></ToUserName>"
f"<AgentID>{AGENT_ID}</AgentID>"
f"<Encrypt><![CDATA[{encrypt}]]></Encrypt>"
f"</xml>"
)
return xml, signature, timestamp, nonce
# ---- 测试函数 ----
def test_health():
"""测试 0:后端健康检查"""
print("=" * 60)
print("测试 0: 后端健康检查")
print("=" * 60)
try:
req = urllib.request.Request(f"{BACKEND_URL}/health", method="GET")
with urllib.request.urlopen(req, timeout=3) as resp:
body = json.loads(resp.read().decode("utf-8"))
if body.get("status") == "ok":
print(" [OK] 后端正常运行")
else:
print(f" [WARN] 后端响应异常: {body}")
except Exception as e:
print(f" [FAIL] 后端不可达: {e}")
print(" 请先启动后端:cd backend && python -m uvicorn app.main:app --host 0.0.0.0 --port 8000")
return False
print()
return True
def test_verify_url(crypto: SimpleWecomCrypto):
"""测试 1:验证回调 URL(模拟企微 GET 请求)"""
print("=" * 60)
print("测试 1: 验证回调 URLGET /api/wecom/callback")
print("=" * 60)
timestamp = str(int(time.time()))
nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10))
echostr_plaintext = str(secrets.randbelow(10**10))
echostr_encrypted = crypto.encrypt(echostr_plaintext)
signature = crypto.generate_signature(timestamp, nonce, echostr_encrypted)
url = (
f"{BACKEND_URL}/api/wecom/callback?"
f"msg_signature={urllib.parse.quote(signature)}&"
f"timestamp={urllib.parse.quote(timestamp)}&"
f"nonce={urllib.parse.quote(nonce)}&"
f"echostr={urllib.parse.quote(echostr_encrypted)}"
)
try:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
status = resp.status
body = resp.read().decode("utf-8")
if body.strip() == echostr_plaintext:
print(f" [OK] 验证成功! 状态码={status}")
else:
print(f" [FAIL] 验证失败! 期望={echostr_plaintext}, 实际={body[:200]}")
except urllib.error.HTTPError as e:
print(f" [FAIL] HTTP 错误: {e.code}")
except Exception as e:
print(f" [FAIL] 请求失败: {e}")
print()
def send_message(crypto: SimpleWecomCrypto, employee_id: str, content: str) -> bool:
"""发送单条员工消息(模拟企微 POST 回调)。
Args:
crypto: 加密工具实例
employee_id: 员工企微 UserID
content: 消息内容
Returns:
bool: 是否成功(后端返回 "success"
"""
# 构造企微消息 XML(明文)
message_xml = (
f"<xml>"
f"<ToUserName><![CDATA[{CORP_ID}]]></ToUserName>"
f"<FromUserName><![CDATA[{employee_id}]]></FromUserName>"
f"<CreateTime>{int(time.time())}</CreateTime>"
f"<MsgType><![CDATA[text]]></MsgType>"
f"<Content><![CDATA[{content}]]></Content>"
f"<MsgId>{secrets.randbelow(10**18)}</MsgId>"
f"<AgentID>{AGENT_ID}</AgentID>"
f"</xml>"
)
# 加密消息 XML
encrypted_xml, signature, timestamp, nonce = crypto.build_encrypted_xml(message_xml)
# 构造 POST 请求 URL(带签名参数)
url = (
f"{BACKEND_URL}/api/wecom/callback?"
f"msg_signature={urllib.parse.quote(signature)}&"
f"timestamp={urllib.parse.quote(timestamp)}&"
f"nonce={urllib.parse.quote(nonce)}"
)
try:
req = urllib.request.Request(
url,
data=encrypted_xml.encode("utf-8"),
headers={"Content-Type": "application/xml"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
body = resp.read().decode("utf-8")
if body.strip() == "success":
print(f" [OK] {employee_id}: \"{content[:30]}{'...' if len(content) > 30 else ''}\"")
return True
else:
print(f" [WARN] 返回非success: {body[:100]}")
return False
except urllib.error.HTTPError as e:
err_body = e.read().decode("utf-8")[:200]
print(f" [FAIL] HTTP {e.code}: {err_body}")
return False
except Exception as e:
print(f" [FAIL] {e}")
return False
def run_scenario(crypto: SimpleWecomCrypto, scenario_name: str, messages: list):
"""运行一个测试场景。
Args:
crypto: 加密工具实例
scenario_name: 场景名称
messages: 消息列表 [(employee_id, content, expected_tag), ...]
"""
print("=" * 60)
print(f" {scenario_name}")
print("=" * 60)
print(f" 消息数: {len(messages)}")
print(f" 预期标记: {', '.join(set(m[2] for m in messages if m[2] != 'neutral'))}")
print()
success_count = 0
for employee_id, content, expected_tag in messages:
ok = send_message(crypto, employee_id, content)
if ok:
success_count += 1
# 消息间稍作延迟,模拟真实场景节奏
time.sleep(0.3)
total = len(messages)
status = "[OK]" if success_count == total else "[WARN]"
print(f"\n {status} 场景结果: {success_count}/{total} 条消息成功")
print()
def verify_conversations():
"""验证后端会话列表,展示各会话的标记和紧急度。"""
print("=" * 60)
print(" 验证: 查询会话列表(GET /api/conversations")
print("=" * 60)
try:
req = urllib.request.Request(f"{BACKEND_URL}/api/conversations", method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
body = json.loads(resp.read().decode("utf-8"))
items = body.get("data", {}).get("items", [])
if not items:
print(" [WARN] 无会话数据")
print()
return
print(f"{len(items)} 个会话:\n")
# 表头
print(f" {'员工ID':<15} {'状态':<10} {'紧急度':<8} {'标记'}")
print(f" {'-'*15} {'-'*10} {'-'*8} {'-'*30}")
for conv in items:
emp_id = conv.get("employee_id", "?")
status = conv.get("status", "?")
urgency = conv.get("urgency_score", "?")
tags = conv.get("tags", {})
# 解析标记为可读字符串
tag_parts = []
if tags.get("hand_raise"):
tag_parts.append("[举手]")
if tags.get("need_intervene"):
tag_parts.append("[需介入]")
emotion = tags.get("emotion", "")
if emotion and emotion != "neutral":
tag_parts.append(f"[情绪:{emotion}]")
repeat = tags.get("repeat_count", 0)
if repeat > 1:
tag_parts.append(f"[追问x{repeat}]")
tag_str = " ".join(tag_parts) if tag_parts else "-"
print(f" {emp_id:<15} {status:<10} {urgency:<8} {tag_str}")
print()
# 验证各场景的预期标记
print(" 场景验证结果:")
checks = {
"chenwei": {"expected_tags": [], "desc": "场景1-普通咨询"},
"liuna": {"expected_tags": ["hand_raise"], "desc": "场景2-举手"},
"zhaoqiang": {"expected_tags": ["angry", "need_intervene"], "desc": "场景3-愤怒+介入"},
"sunli": {"expected_tags": ["worried"], "desc": "场景4-担忧"},
"wanghai": {"expected_tags": ["urgent", "hand_raise"], "desc": "场景5-紧急+举手"},
"zhoumei": {"expected_tags": [], "desc": "场景6-结单重开"},
}
for conv in items:
emp_id = conv.get("employee_id", "")
if emp_id not in checks:
continue
check = checks[emp_id]
tags = conv.get("tags", {})
# 收集实际标记
actual_tags = []
if tags.get("hand_raise"):
actual_tags.append("hand_raise")
if tags.get("need_intervene"):
actual_tags.append("need_intervene")
emotion = tags.get("emotion", "")
if emotion and emotion != "neutral":
actual_tags.append(emotion)
# 对比
expected = set(check["expected_tags"])
actual = set(actual_tags)
matched = expected & actual
missing = expected - actual
extra = actual - expected
if not expected and not actual:
result = "[OK] 无特殊标记(符合预期)"
elif missing:
result = f"[WARN] 缺少标记: {', '.join(missing)}"
else:
result = f"[OK] 预期标记全部命中"
print(f" {check['desc']}: {result}")
print()
except Exception as e:
print(f" [FAIL] 查询失败: {e}")
print()
# ---- 主程序 ----
def main():
print()
print("[工具] 企微回调增强版本地模拟器")
print(f" 后端地址: {BACKEND_URL}")
print(f" Corp ID: {CORP_ID}")
print(f" Agent ID: {AGENT_ID}")
print(f" 场景数量: {len(SCENARIOS)}")
print()
# 初始化加解密工具
try:
crypto = SimpleWecomCrypto(
token=TOKEN,
encoding_aes_key=ENCODING_AES_KEY,
corp_id=CORP_ID,
)
print("[OK] 加解密工具初始化成功")
except Exception as e:
print(f"[FAIL] 加解密工具初始化失败: {e}")
return
print()
# 测试 0:健康检查
if not test_health():
return
# 测试 1:验证回调 URL
test_verify_url(crypto)
# 逐场景发送消息
print("=" * 60)
print(" 开始发送场景消息")
print("=" * 60)
print()
total_messages = sum(len(msgs) for _, msgs in SCENARIOS)
print(f"{len(SCENARIOS)} 个场景, {total_messages} 条消息")
print()
for scenario_name, messages in SCENARIOS:
run_scenario(crypto, scenario_name, messages)
# 场景间稍长延迟,确保数据库提交完成
time.sleep(1)
# 验证结果
verify_conversations()
print("=" * 60)
print("[完成] 增强版模拟测试完成!")
print()
print("后续步骤:")
print(" 1. 在坐席端 (http://localhost:5173) 查看新会话")
print(" 2. 观察不同标签组合在前端的展示效果")
print(" 3. 测试接单/回复/转交/结单等操作")
print(" 4. 结单 zhoumei 的会话后,再运行一次脚本验证场景6")
print("=" * 60)
if __name__ == "__main__":
main()