Files

513 lines
18 KiB
Python
Raw Permalink Normal View History

# =============================================================================
# 企微IT智能服务台 — 增强版本地回调模拟器
# =============================================================================
# 覆盖场景:
# 场景 1: 普通咨询(单条消息,中性情绪)
# 场景 2: 多轮对话 + 举手标记(同一员工连续追问后要求转人工)
# 场景 3: 愤怒情绪 + 需介入标记(连发 4+ 条含愤怒关键词的消息)
# 场景 4: 担忧情绪(含"担心"、"出错"等关键词)
# 场景 5: 紧急 + 举手双重标记
# 场景 6: 已结单后重新咨询(验证新会话创建)
#
# 使用方式:
# 1. 先启动后端(端口 8000)
# 2. 运行本脚本:python simulate_wecom_enhanced.py
#
# 期望结果:前端坐席工作台能看到 6 种不同标签组合的会话
# =============================================================================
import hashlib
import json
import secrets
import string
import struct
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
# ---- Windows 终端编码兼容 ----
if sys.platform == "win32":
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
# ---- 配置 ----
# 与 backend/.env 中的配置保持一致
CORP_ID = "wwa8c87970b2011f41"
TOKEN = "pjJquWIacCdCh9NeQ5axMrTPtQPk"
ENCODING_AES_KEY = "42k7Ty5qCQHMwsAlzQdZ9tw87rPxUy0IgqcQgktUjJu"
AGENT_ID = "1000133"
# 后端地址
BACKEND_URL = "http://localhost:8000"
# ---- 场景定义 ----
# 每个场景 = (场景名称, 消息列表)
# 每条消息 = (employee_id, 消息内容, 预期触发的标记)
SCENARIOS = [
# ------------------------------------------------------------------
# 场景 1: 普通咨询
# 预期:无特殊标记,紧急度 1,情绪 neutral
# ------------------------------------------------------------------
(
"场景1: 普通咨询",
[
("chenwei", "打印机无法连接", "neutral"),
],
),
# ------------------------------------------------------------------
# 场景 2: 多轮对话 + 举手标记
# 同一员工连续 3 条消息后触发"转人工"
# 预期:hand_raise=True, urgency >= 2
# ------------------------------------------------------------------
(
"场景2: 多轮对话+举手",
[
("liuna", "我的OA系统登录不了", "neutral"),
("liuna", "试了好几次都不行", "neutral"),
("liuna", "转人工!我要找真人客服", "hand_raise"),
],
),
# ------------------------------------------------------------------
# 场景 3: 愤怒情绪 + 需介入标记
# 同一员工连发 4 条含愤怒关键词的消息
# 预期:emotion=angry, need_intervene=True(>3阈值), urgency >= 3
# ------------------------------------------------------------------
(
"场景3: 愤怒+需介入",
[
("zhaoqiang", "网络又断了", "neutral"),
("zhaoqiang", "这周已经断3次了,崩溃", "angry"),
("zhaoqiang", "你们IT到底行不行?投诉", "angry"),
("zhaoqiang", "再没人理我我真的要投诉了!!!", "angry"),
],
),
# ------------------------------------------------------------------
# 场景 4: 担忧情绪
# 预期:emotion=worried, urgency >= 2
# ------------------------------------------------------------------
(
"场景4: 担忧情绪",
[
("sunli", "担心我的邮件数据丢失了", "worried"),
],
),
# ------------------------------------------------------------------
# 场景 5: 紧急 + 举手双重标记
# 预期:emotion=urgent, hand_raise=True, urgency >= 3
# ------------------------------------------------------------------
(
"场景5: 紧急+举手",
[
("wanghai", "服务器马上要宕机了,紧急!转人工!", "urgent+hand_raise"),
],
),
# ------------------------------------------------------------------
# 场景 6: 已结单后重新咨询
# 注意:此场景需要先有一个已结束的会话
# 这里先发一条普通消息创建会话,脚本结束后可在前端手动结单
# 然后用同一员工ID再发一条消息,验证新会话创建
# 为了自动化测试,这里直接模拟两个阶段
# ------------------------------------------------------------------
(
"场景6: 结单后重新咨询",
[
("zhoumei", "我的邮箱收不到邮件", "neutral"),
# 中间需要前端手动结单,这里暂只发消息
# 结单后可再运行一次此脚本验证新会话创建
],
),
]
# ---- AES 加解密(从项目 WecomCrypto 类提取的核心逻辑)----
import base64
class SimpleWecomCrypto:
"""简化版企微加解密工具,用于本地测试。"""
def __init__(self, token: str, encoding_aes_key: str, corp_id: str):
self.token = token
self.aes_key = base64.b64decode(encoding_aes_key + "=") # 补位 "="
self.iv = self.aes_key[:16]
self.corp_id = corp_id
def generate_signature(self, timestamp: str, nonce: str, encrypt: str) -> str:
"""生成签名:SHA1(sort([token, timestamp, nonce, encrypt]))"""
sort_list = sorted([self.token, timestamp, nonce, encrypt])
concat_str = "".join(sort_list)
return hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
def encrypt(self, plaintext: str) -> str:
"""AES-CBC 加密(模拟企微加密消息)"""
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# 构造明文:random(16) + msg_len(4) + msg + corp_id
random_str = secrets.token_bytes(16)
msg_bytes = plaintext.encode("utf-8")
msg_len = struct.pack("!I", len(msg_bytes))
corp_id_bytes = self.corp_id.encode("utf-8")
plaintext_data = random_str + msg_len + msg_bytes + corp_id_bytes
# PKCS7 填充(块大小 32
block_size = 32
pad_len = block_size - (len(plaintext_data) % block_size)
plaintext_data += bytes([pad_len] * pad_len)
# AES-CBC 加密
cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(self.iv), backend=default_backend())
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(plaintext_data) + encryptor.finalize()
return base64.b64encode(encrypted_data).decode("utf-8")
def build_encrypted_xml(self, reply_content: str) -> str:
"""构造企微回调的加密 XML 消息体"""
timestamp = str(int(time.time()))
nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10))
# 加密消息内容
encrypt = self.encrypt(reply_content)
# 生成签名
signature = self.generate_signature(timestamp, nonce, encrypt)
# 构造 XML(与企微推送格式一致)
xml = (
f"<xml>"
f"<ToUserName><![CDATA[{self.corp_id}]]></ToUserName>"
f"<AgentID>{AGENT_ID}</AgentID>"
f"<Encrypt><![CDATA[{encrypt}]]></Encrypt>"
f"</xml>"
)
return xml, signature, timestamp, nonce
# ---- 测试函数 ----
def test_health():
"""测试 0:后端健康检查"""
print("=" * 60)
print("测试 0: 后端健康检查")
print("=" * 60)
try:
req = urllib.request.Request(f"{BACKEND_URL}/health", method="GET")
with urllib.request.urlopen(req, timeout=3) as resp:
body = json.loads(resp.read().decode("utf-8"))
if body.get("status") == "ok":
print(" [OK] 后端正常运行")
else:
print(f" [WARN] 后端响应异常: {body}")
except Exception as e:
print(f" [FAIL] 后端不可达: {e}")
print(" 请先启动后端:cd backend && python -m uvicorn app.main:app --host 0.0.0.0 --port 8000")
return False
print()
return True
def test_verify_url(crypto: SimpleWecomCrypto):
"""测试 1:验证回调 URL(模拟企微 GET 请求)"""
print("=" * 60)
print("测试 1: 验证回调 URLGET /api/wecom/callback")
print("=" * 60)
timestamp = str(int(time.time()))
nonce = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(10))
echostr_plaintext = str(secrets.randbelow(10**10))
echostr_encrypted = crypto.encrypt(echostr_plaintext)
signature = crypto.generate_signature(timestamp, nonce, echostr_encrypted)
url = (
f"{BACKEND_URL}/api/wecom/callback?"
f"msg_signature={urllib.parse.quote(signature)}&"
f"timestamp={urllib.parse.quote(timestamp)}&"
f"nonce={urllib.parse.quote(nonce)}&"
f"echostr={urllib.parse.quote(echostr_encrypted)}"
)
try:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
status = resp.status
body = resp.read().decode("utf-8")
if body.strip() == echostr_plaintext:
print(f" [OK] 验证成功! 状态码={status}")
else:
print(f" [FAIL] 验证失败! 期望={echostr_plaintext}, 实际={body[:200]}")
except urllib.error.HTTPError as e:
print(f" [FAIL] HTTP 错误: {e.code}")
except Exception as e:
print(f" [FAIL] 请求失败: {e}")
print()
def send_message(crypto: SimpleWecomCrypto, employee_id: str, content: str) -> bool:
"""发送单条员工消息(模拟企微 POST 回调)。
Args:
crypto: 加密工具实例
employee_id: 员工企微 UserID
content: 消息内容
Returns:
bool: 是否成功(后端返回 "success"
"""
# 构造企微消息 XML(明文)
message_xml = (
f"<xml>"
f"<ToUserName><![CDATA[{CORP_ID}]]></ToUserName>"
f"<FromUserName><![CDATA[{employee_id}]]></FromUserName>"
f"<CreateTime>{int(time.time())}</CreateTime>"
f"<MsgType><![CDATA[text]]></MsgType>"
f"<Content><![CDATA[{content}]]></Content>"
f"<MsgId>{secrets.randbelow(10**18)}</MsgId>"
f"<AgentID>{AGENT_ID}</AgentID>"
f"</xml>"
)
# 加密消息 XML
encrypted_xml, signature, timestamp, nonce = crypto.build_encrypted_xml(message_xml)
# 构造 POST 请求 URL(带签名参数)
url = (
f"{BACKEND_URL}/api/wecom/callback?"
f"msg_signature={urllib.parse.quote(signature)}&"
f"timestamp={urllib.parse.quote(timestamp)}&"
f"nonce={urllib.parse.quote(nonce)}"
)
try:
req = urllib.request.Request(
url,
data=encrypted_xml.encode("utf-8"),
headers={"Content-Type": "application/xml"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
body = resp.read().decode("utf-8")
if body.strip() == "success":
print(f" [OK] {employee_id}: \"{content[:30]}{'...' if len(content) > 30 else ''}\"")
return True
else:
print(f" [WARN] 返回非success: {body[:100]}")
return False
except urllib.error.HTTPError as e:
err_body = e.read().decode("utf-8")[:200]
print(f" [FAIL] HTTP {e.code}: {err_body}")
return False
except Exception as e:
print(f" [FAIL] {e}")
return False
def run_scenario(crypto: SimpleWecomCrypto, scenario_name: str, messages: list):
"""运行一个测试场景。
Args:
crypto: 加密工具实例
scenario_name: 场景名称
messages: 消息列表 [(employee_id, content, expected_tag), ...]
"""
print("=" * 60)
print(f" {scenario_name}")
print("=" * 60)
print(f" 消息数: {len(messages)}")
print(f" 预期标记: {', '.join(set(m[2] for m in messages if m[2] != 'neutral'))}")
print()
success_count = 0
for employee_id, content, expected_tag in messages:
ok = send_message(crypto, employee_id, content)
if ok:
success_count += 1
# 消息间稍作延迟,模拟真实场景节奏
time.sleep(0.3)
total = len(messages)
status = "[OK]" if success_count == total else "[WARN]"
print(f"\n {status} 场景结果: {success_count}/{total} 条消息成功")
print()
def verify_conversations():
"""验证后端会话列表,展示各会话的标记和紧急度。"""
print("=" * 60)
print(" 验证: 查询会话列表(GET /api/conversations")
print("=" * 60)
try:
req = urllib.request.Request(f"{BACKEND_URL}/api/conversations", method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
body = json.loads(resp.read().decode("utf-8"))
items = body.get("data", {}).get("items", [])
if not items:
print(" [WARN] 无会话数据")
print()
return
print(f"{len(items)} 个会话:\n")
# 表头
print(f" {'员工ID':<15} {'状态':<10} {'紧急度':<8} {'标记'}")
print(f" {'-'*15} {'-'*10} {'-'*8} {'-'*30}")
for conv in items:
emp_id = conv.get("employee_id", "?")
status = conv.get("status", "?")
urgency = conv.get("urgency_score", "?")
tags = conv.get("tags", {})
# 解析标记为可读字符串
tag_parts = []
if tags.get("hand_raise"):
tag_parts.append("[举手]")
if tags.get("need_intervene"):
tag_parts.append("[需介入]")
emotion = tags.get("emotion", "")
if emotion and emotion != "neutral":
tag_parts.append(f"[情绪:{emotion}]")
repeat = tags.get("repeat_count", 0)
if repeat > 1:
tag_parts.append(f"[追问x{repeat}]")
tag_str = " ".join(tag_parts) if tag_parts else "-"
print(f" {emp_id:<15} {status:<10} {urgency:<8} {tag_str}")
print()
# 验证各场景的预期标记
print(" 场景验证结果:")
checks = {
"chenwei": {"expected_tags": [], "desc": "场景1-普通咨询"},
"liuna": {"expected_tags": ["hand_raise"], "desc": "场景2-举手"},
"zhaoqiang": {"expected_tags": ["angry", "need_intervene"], "desc": "场景3-愤怒+介入"},
"sunli": {"expected_tags": ["worried"], "desc": "场景4-担忧"},
"wanghai": {"expected_tags": ["urgent", "hand_raise"], "desc": "场景5-紧急+举手"},
"zhoumei": {"expected_tags": [], "desc": "场景6-结单重开"},
}
for conv in items:
emp_id = conv.get("employee_id", "")
if emp_id not in checks:
continue
check = checks[emp_id]
tags = conv.get("tags", {})
# 收集实际标记
actual_tags = []
if tags.get("hand_raise"):
actual_tags.append("hand_raise")
if tags.get("need_intervene"):
actual_tags.append("need_intervene")
emotion = tags.get("emotion", "")
if emotion and emotion != "neutral":
actual_tags.append(emotion)
# 对比
expected = set(check["expected_tags"])
actual = set(actual_tags)
matched = expected & actual
missing = expected - actual
extra = actual - expected
if not expected and not actual:
result = "[OK] 无特殊标记(符合预期)"
elif missing:
result = f"[WARN] 缺少标记: {', '.join(missing)}"
else:
result = f"[OK] 预期标记全部命中"
print(f" {check['desc']}: {result}")
print()
except Exception as e:
print(f" [FAIL] 查询失败: {e}")
print()
# ---- 主程序 ----
def main():
print()
print("[工具] 企微回调增强版本地模拟器")
print(f" 后端地址: {BACKEND_URL}")
print(f" Corp ID: {CORP_ID}")
print(f" Agent ID: {AGENT_ID}")
print(f" 场景数量: {len(SCENARIOS)}")
print()
# 初始化加解密工具
try:
crypto = SimpleWecomCrypto(
token=TOKEN,
encoding_aes_key=ENCODING_AES_KEY,
corp_id=CORP_ID,
)
print("[OK] 加解密工具初始化成功")
except Exception as e:
print(f"[FAIL] 加解密工具初始化失败: {e}")
return
print()
# 测试 0:健康检查
if not test_health():
return
# 测试 1:验证回调 URL
test_verify_url(crypto)
# 逐场景发送消息
print("=" * 60)
print(" 开始发送场景消息")
print("=" * 60)
print()
total_messages = sum(len(msgs) for _, msgs in SCENARIOS)
print(f"{len(SCENARIOS)} 个场景, {total_messages} 条消息")
print()
for scenario_name, messages in SCENARIOS:
run_scenario(crypto, scenario_name, messages)
# 场景间稍长延迟,确保数据库提交完成
time.sleep(1)
# 验证结果
verify_conversations()
print("=" * 60)
print("[完成] 增强版模拟测试完成!")
print()
print("后续步骤:")
print(" 1. 在坐席端 (http://localhost:5173) 查看新会话")
print(" 2. 观察不同标签组合在前端的展示效果")
print(" 3. 测试接单/回复/转交/结单等操作")
print(" 4. 结单 zhoumei 的会话后,再运行一次脚本验证场景6")
print("=" * 60)
if __name__ == "__main__":
main()