377 lines
14 KiB
Python
377 lines
14 KiB
Python
|
|
# =============================================================================
|
|||
|
|
# 企微IT智能服务台 — 企微消息 AES 加解密工具
|
|||
|
|
# =============================================================================
|
|||
|
|
# 说明:实现企微回调消息的加解密和签名验证
|
|||
|
|
# 参考企微官方加解密库逻辑,使用 Python cryptography 库重写
|
|||
|
|
# 加密模式:AES-CBC-256,PKCS7 填充
|
|||
|
|
# 签名算法:SHA1(sort(token, timestamp, nonce, encrypt))
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
import base64
|
|||
|
|
import hashlib
|
|||
|
|
import json
|
|||
|
|
import logging
|
|||
|
|
import secrets
|
|||
|
|
import string
|
|||
|
|
import struct
|
|||
|
|
import time
|
|||
|
|
import xml.etree.ElementTree as ET
|
|||
|
|
from typing import Dict, Optional, Tuple
|
|||
|
|
|
|||
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|||
|
|
from cryptography.hazmat.backends import default_backend
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class WecomCrypto:
|
|||
|
|
"""企微消息加解密工具类。
|
|||
|
|
|
|||
|
|
实现企微回调消息的完整加解密流程:
|
|||
|
|
1. 签名验证:SHA1(sort(token, timestamp, nonce, encrypt))
|
|||
|
|
2. AES 解密:CBC 模式,PKCS7 去填充
|
|||
|
|
3. AES 加密:CBC 模式,PKCS7 填充
|
|||
|
|
|
|||
|
|
Attributes:
|
|||
|
|
token: 企微回调配置的 Token
|
|||
|
|
aes_key: 解码后的 AES 密钥(32 字节)
|
|||
|
|
corp_id: 企业ID
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, token: str, encoding_aes_key: str, corp_id: str):
|
|||
|
|
"""初始化加解密工具。
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
token: 企微回调配置的 Token(用于签名验证)
|
|||
|
|
encoding_aes_key: 企微回调配置的 EncodingAESKey(43 位 Base64 字符串)
|
|||
|
|
corp_id: 企业ID(用于消息体中的校验)
|
|||
|
|
"""
|
|||
|
|
self.token = token
|
|||
|
|
# EncodingAESKey 是 43 位 Base64 编码字符串
|
|||
|
|
# 解码时需要先补上 1 个 "=" 使其成为合法的 Base64 字符串
|
|||
|
|
# 解码后得到 32 字节的 AES 密钥(AES-256)
|
|||
|
|
self.aes_key = base64.b64decode(encoding_aes_key + "=")
|
|||
|
|
# AES CBC 模式的 IV(初始化向量)取密钥的前 16 字节
|
|||
|
|
self.iv = self.aes_key[:16]
|
|||
|
|
self.corp_id = corp_id
|
|||
|
|
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
# 签名验证
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
def verify_signature(
|
|||
|
|
self, signature: str, timestamp: str, nonce: str, encrypt: str
|
|||
|
|
) -> bool:
|
|||
|
|
"""验证企微回调签名。
|
|||
|
|
|
|||
|
|
企微的签名算法:SHA1(sort([token, timestamp, nonce, encrypt]))
|
|||
|
|
将 token、timestamp、nonce、encrypt 四个字符串字典序排列后拼接,计算 SHA1。
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
signature: 企微传来的签名(msg_signature 参数)
|
|||
|
|
timestamp: 时间戳
|
|||
|
|
nonce: 随机数
|
|||
|
|
encrypt: 加密的消息内容
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
bool: 签名是否验证通过
|
|||
|
|
"""
|
|||
|
|
# 将四个参数按字典序排列
|
|||
|
|
sort_list = sorted([self.token, timestamp, nonce, encrypt])
|
|||
|
|
# 拼接为一个字符串
|
|||
|
|
concat_str = "".join(sort_list)
|
|||
|
|
# 计算 SHA1 哈希值
|
|||
|
|
computed = hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
|
|||
|
|
# 与传入的签名比较
|
|||
|
|
is_valid = computed == signature
|
|||
|
|
if not is_valid:
|
|||
|
|
logger.warning(
|
|||
|
|
f"签名验证失败: computed={computed}, received={signature}"
|
|||
|
|
)
|
|||
|
|
return is_valid
|
|||
|
|
|
|||
|
|
def generate_signature(
|
|||
|
|
self, timestamp: str, nonce: str, encrypt: str
|
|||
|
|
) -> str:
|
|||
|
|
"""生成签名(用于加密响应消息时)。
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
timestamp: 时间戳
|
|||
|
|
nonce: 随机数
|
|||
|
|
encrypt: 加密后的消息内容
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
str: SHA1 签名字符串
|
|||
|
|
"""
|
|||
|
|
sort_list = sorted([self.token, timestamp, nonce, encrypt])
|
|||
|
|
concat_str = "".join(sort_list)
|
|||
|
|
return hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
|
|||
|
|
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
# AES 解密
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
def decrypt(self, encrypted_text: str) -> str:
|
|||
|
|
"""AES-CBC 解密企微加密消息。
|
|||
|
|
|
|||
|
|
解密流程:
|
|||
|
|
1. Base64 解码得到密文
|
|||
|
|
2. AES-CBC 解密
|
|||
|
|
3. 去除 PKCS7 填充
|
|||
|
|
4. 提取明文内容(格式:random(16) + msg_len(4) + msg + corp_id)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
encrypted_text: Base64 编码的加密消息
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
str: 解密后的明文 XML 字符串
|
|||
|
|
|
|||
|
|
Raises:
|
|||
|
|
ValueError: 解密失败时抛出
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 1. Base64 解码得到密文字节
|
|||
|
|
encrypted_data = base64.b64decode(encrypted_text)
|
|||
|
|
|
|||
|
|
# 2. AES-CBC 解密
|
|||
|
|
cipher = Cipher(
|
|||
|
|
algorithms.AES(self.aes_key),
|
|||
|
|
modes.CBC(self.iv),
|
|||
|
|
backend=default_backend(),
|
|||
|
|
)
|
|||
|
|
decryptor = cipher.decryptor()
|
|||
|
|
decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
|
|||
|
|
|
|||
|
|
# 3. 去除 PKCS7 填充
|
|||
|
|
# PKCS7: 填充字节的值等于填充的字节数
|
|||
|
|
# 例如填充了 5 个字节,则每个字节的值都是 0x05
|
|||
|
|
pad_len = decrypted_data[-1]
|
|||
|
|
# 校验填充是否合法(填充值应等于填充长度)
|
|||
|
|
if pad_len < 1 or pad_len > 32:
|
|||
|
|
raise ValueError(f"无效的 PKCS7 填充值: {pad_len}")
|
|||
|
|
# 检查所有填充字节是否一致
|
|||
|
|
for i in range(pad_len):
|
|||
|
|
if decrypted_data[-(i + 1)] != pad_len:
|
|||
|
|
raise ValueError("PKCS7 填充校验失败")
|
|||
|
|
# 去除填充部分
|
|||
|
|
plaintext_data = decrypted_data[:-pad_len]
|
|||
|
|
|
|||
|
|
# 4. 提取明文内容
|
|||
|
|
# 企微加密格式:random(16字节) + msg_len(4字节网络序) + msg + corp_id
|
|||
|
|
# 跳过前 16 字节随机串
|
|||
|
|
msg_len = struct.unpack("!I", plaintext_data[16:20])[0]
|
|||
|
|
msg_content = plaintext_data[20 : 20 + msg_len].decode("utf-8")
|
|||
|
|
from_corp_id = plaintext_data[20 + msg_len :].decode("utf-8")
|
|||
|
|
|
|||
|
|
# 5. 校验 corp_id 是否匹配
|
|||
|
|
if from_corp_id != self.corp_id:
|
|||
|
|
raise ValueError(
|
|||
|
|
f"corp_id 不匹配: expected={self.corp_id}, got={from_corp_id}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
logger.debug(f"AES 解密成功, 明文长度: {len(msg_content)}")
|
|||
|
|
return msg_content
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"AES 解密失败: {e}")
|
|||
|
|
raise ValueError(f"消息解密失败: {e}") from e
|
|||
|
|
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
# AES 加密
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
def encrypt(self, plaintext: str) -> str:
|
|||
|
|
"""AES-CBC 加密消息(用于被动回复)。
|
|||
|
|
|
|||
|
|
加密流程:
|
|||
|
|
1. 构造明文:random(16) + msg_len(4) + msg + corp_id
|
|||
|
|
2. PKCS7 填充
|
|||
|
|
3. AES-CBC 加密
|
|||
|
|
4. Base64 编码
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
plaintext: 要加密的明文字符串
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
str: Base64 编码的加密结果
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 1. 构造明文字节
|
|||
|
|
# 16 字节随机串(增加密文随机性,防止相同明文产生相同密文)
|
|||
|
|
random_str = secrets.token_bytes(16)
|
|||
|
|
# 消息内容字节
|
|||
|
|
msg_bytes = plaintext.encode("utf-8")
|
|||
|
|
# 消息长度(4 字节网络序,大端)
|
|||
|
|
msg_len = struct.pack("!I", len(msg_bytes))
|
|||
|
|
# corp_id 字节
|
|||
|
|
corp_id_bytes = self.corp_id.encode("utf-8")
|
|||
|
|
# 拼接
|
|||
|
|
plaintext_data = random_str + msg_len + msg_bytes + corp_id_bytes
|
|||
|
|
|
|||
|
|
# 2. PKCS7 填充
|
|||
|
|
# 块大小为 32 字节(AES-256 的块大小实际是 16 字节,
|
|||
|
|
# 但企微官方实现使用 32 字节块做 PKCS7 填充)
|
|||
|
|
block_size = 32
|
|||
|
|
pad_len = block_size - (len(plaintext_data) % block_size)
|
|||
|
|
# 填充字节的值等于填充的字节数
|
|||
|
|
plaintext_data += bytes([pad_len] * pad_len)
|
|||
|
|
|
|||
|
|
# 3. AES-CBC 加密
|
|||
|
|
cipher = Cipher(
|
|||
|
|
algorithms.AES(self.aes_key),
|
|||
|
|
modes.CBC(self.iv),
|
|||
|
|
backend=default_backend(),
|
|||
|
|
)
|
|||
|
|
encryptor = cipher.encryptor()
|
|||
|
|
encrypted_data = encryptor.update(plaintext_data) + encryptor.finalize()
|
|||
|
|
|
|||
|
|
# 4. Base64 编码
|
|||
|
|
result = base64.b64encode(encrypted_data).decode("utf-8")
|
|||
|
|
logger.debug(f"AES 加密成功, 密文长度: {len(result)}")
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"AES 加密失败: {e}")
|
|||
|
|
raise ValueError(f"消息加密失败: {e}") from e
|
|||
|
|
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
# 解密回调消息(完整流程)
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
def decrypt_message(
|
|||
|
|
self,
|
|||
|
|
xml_body: str,
|
|||
|
|
msg_signature: str,
|
|||
|
|
timestamp: str,
|
|||
|
|
nonce: str,
|
|||
|
|
) -> Dict[str, str]:
|
|||
|
|
"""解密企微回调消息的完整流程。
|
|||
|
|
|
|||
|
|
流程:
|
|||
|
|
1. 从 XML 中提取 Encrypt 字段
|
|||
|
|
2. 验证签名
|
|||
|
|
3. AES 解密
|
|||
|
|
4. 解析明文 XML,返回消息内容字典
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
xml_body: 企微 POST 的 XML 请求体
|
|||
|
|
msg_signature: 企微传来的签名
|
|||
|
|
timestamp: 时间戳
|
|||
|
|
nonce: 随机数
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Dict[str, str]: 解密后的消息内容,包含 from_user_name, content, msg_type 等
|
|||
|
|
|
|||
|
|
Raises:
|
|||
|
|
ValueError: 签名验证失败或解密失败
|
|||
|
|
"""
|
|||
|
|
# 1. 解析 XML,提取 Encrypt 字段
|
|||
|
|
try:
|
|||
|
|
root = ET.fromstring(xml_body)
|
|||
|
|
encrypt_node = root.find("Encrypt")
|
|||
|
|
if encrypt_node is None:
|
|||
|
|
raise ValueError("XML 中未找到 Encrypt 字段")
|
|||
|
|
encrypt = encrypt_node.text or ""
|
|||
|
|
except ET.ParseError as e:
|
|||
|
|
raise ValueError(f"XML 解析失败: {e}") from e
|
|||
|
|
|
|||
|
|
# 2. 验证签名
|
|||
|
|
if not self.verify_signature(msg_signature, timestamp, nonce, encrypt):
|
|||
|
|
raise ValueError("签名验证失败")
|
|||
|
|
|
|||
|
|
# 3. AES 解密
|
|||
|
|
plaintext_xml = self.decrypt(encrypt)
|
|||
|
|
|
|||
|
|
# 4. 解析明文 XML
|
|||
|
|
try:
|
|||
|
|
plain_root = ET.fromstring(plaintext_xml)
|
|||
|
|
result = {}
|
|||
|
|
# 提取常见字段
|
|||
|
|
for child in plain_root:
|
|||
|
|
result[child.tag] = child.text or ""
|
|||
|
|
logger.info(f"消息解密成功: from={result.get('FromUserName', 'unknown')}")
|
|||
|
|
return result
|
|||
|
|
except ET.ParseError as e:
|
|||
|
|
raise ValueError(f"明文 XML 解析失败: {e}") from e
|
|||
|
|
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
# 加密响应消息(完整流程)
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
def encrypt_message(
|
|||
|
|
self, reply_content: str, nonce: Optional[str] = None
|
|||
|
|
) -> str:
|
|||
|
|
"""加密响应消息的完整流程(用于被动回复)。
|
|||
|
|
|
|||
|
|
流程:
|
|||
|
|
1. AES 加密消息内容
|
|||
|
|
2. 生成签名
|
|||
|
|
3. 构造响应 XML
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
reply_content: 要回复的消息内容
|
|||
|
|
nonce: 随机数(可选,默认自动生成)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
str: 加密后的 XML 响应字符串
|
|||
|
|
"""
|
|||
|
|
# 生成时间戳和随机数
|
|||
|
|
timestamp = str(int(time.time()))
|
|||
|
|
if nonce is None:
|
|||
|
|
nonce = "".join(
|
|||
|
|
secrets.choice(string.ascii_letters + string.digits)
|
|||
|
|
for _ in range(10)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# AES 加密
|
|||
|
|
encrypt = self.encrypt(reply_content)
|
|||
|
|
|
|||
|
|
# 生成签名
|
|||
|
|
signature = self.generate_signature(timestamp, nonce, encrypt)
|
|||
|
|
|
|||
|
|
# 构造响应 XML
|
|||
|
|
response_xml = (
|
|||
|
|
f"<xml>"
|
|||
|
|
f"<Encrypt><![CDATA[{encrypt}]]></Encrypt>"
|
|||
|
|
f"<MsgSignature><![CDATA[{signature}]]></MsgSignature>"
|
|||
|
|
f"<TimeStamp>{timestamp}</TimeStamp>"
|
|||
|
|
f"<Nonce><![CDATA[{nonce}]]></Nonce>"
|
|||
|
|
f"</xml>"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
logger.debug("响应消息加密完成")
|
|||
|
|
return response_xml
|
|||
|
|
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
# 验证 URL 有效性(GET 请求解密 echostr)
|
|||
|
|
# --------------------------------------------------------------------------
|
|||
|
|
def decrypt_echostr(
|
|||
|
|
self,
|
|||
|
|
msg_signature: str,
|
|||
|
|
timestamp: str,
|
|||
|
|
nonce: str,
|
|||
|
|
echostr: str,
|
|||
|
|
) -> str:
|
|||
|
|
"""解密企微回调验证的 echostr。
|
|||
|
|
|
|||
|
|
企微配置回调 URL 时会发送 GET 请求,需要:
|
|||
|
|
1. 验证签名
|
|||
|
|
2. 解密 echostr
|
|||
|
|
3. 返回明文 echostr
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
msg_signature: 企微传来的签名
|
|||
|
|
timestamp: 时间戳
|
|||
|
|
nonce: 随机数
|
|||
|
|
echostr: 加密的验证字符串
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
str: 解密后的 echostr 明文
|
|||
|
|
|
|||
|
|
Raises:
|
|||
|
|
ValueError: 签名验证失败或解密失败
|
|||
|
|
"""
|
|||
|
|
# 验证签名
|
|||
|
|
if not self.verify_signature(msg_signature, timestamp, nonce, echostr):
|
|||
|
|
raise ValueError("回调URL验证签名失败")
|
|||
|
|
|
|||
|
|
# 解密 echostr
|
|||
|
|
plaintext = self.decrypt(echostr)
|
|||
|
|
logger.info("回调URL验证成功,echostr 解密完成")
|
|||
|
|
return plaintext
|