Files

377 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 企微消息 AES 加解密工具
# =============================================================================
# 说明:实现企微回调消息的加解密和签名验证
# 参考企微官方加解密库逻辑,使用 Python cryptography 库重写
# 加密模式:AES-CBC-256PKCS7 填充
# 签名算法:SHA1(sort(token, timestamp, nonce, encrypt))
# =============================================================================
import base64
import hashlib
import json
import logging
import secrets
import string
import struct
import time
import xml.etree.ElementTree as ET
from typing import Dict, Optional, Tuple
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
logger = logging.getLogger(__name__)
class WecomCrypto:
"""企微消息加解密工具类。
实现企微回调消息的完整加解密流程:
1. 签名验证:SHA1(sort(token, timestamp, nonce, encrypt))
2. AES 解密:CBC 模式,PKCS7 去填充
3. AES 加密:CBC 模式,PKCS7 填充
Attributes:
token: 企微回调配置的 Token
aes_key: 解码后的 AES 密钥(32 字节)
corp_id: 企业ID
"""
def __init__(self, token: str, encoding_aes_key: str, corp_id: str):
"""初始化加解密工具。
Args:
token: 企微回调配置的 Token(用于签名验证)
encoding_aes_key: 企微回调配置的 EncodingAESKey43 位 Base64 字符串)
corp_id: 企业ID(用于消息体中的校验)
"""
self.token = token
# EncodingAESKey 是 43 位 Base64 编码字符串
# 解码时需要先补上 1 个 "=" 使其成为合法的 Base64 字符串
# 解码后得到 32 字节的 AES 密钥(AES-256
self.aes_key = base64.b64decode(encoding_aes_key + "=")
# AES CBC 模式的 IV(初始化向量)取密钥的前 16 字节
self.iv = self.aes_key[:16]
self.corp_id = corp_id
# --------------------------------------------------------------------------
# 签名验证
# --------------------------------------------------------------------------
def verify_signature(
self, signature: str, timestamp: str, nonce: str, encrypt: str
) -> bool:
"""验证企微回调签名。
企微的签名算法:SHA1(sort([token, timestamp, nonce, encrypt]))
将 token、timestamp、nonce、encrypt 四个字符串字典序排列后拼接,计算 SHA1。
Args:
signature: 企微传来的签名(msg_signature 参数)
timestamp: 时间戳
nonce: 随机数
encrypt: 加密的消息内容
Returns:
bool: 签名是否验证通过
"""
# 将四个参数按字典序排列
sort_list = sorted([self.token, timestamp, nonce, encrypt])
# 拼接为一个字符串
concat_str = "".join(sort_list)
# 计算 SHA1 哈希值
computed = hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
# 与传入的签名比较
is_valid = computed == signature
if not is_valid:
logger.warning(
f"签名验证失败: computed={computed}, received={signature}"
)
return is_valid
def generate_signature(
self, timestamp: str, nonce: str, encrypt: str
) -> str:
"""生成签名(用于加密响应消息时)。
Args:
timestamp: 时间戳
nonce: 随机数
encrypt: 加密后的消息内容
Returns:
str: SHA1 签名字符串
"""
sort_list = sorted([self.token, timestamp, nonce, encrypt])
concat_str = "".join(sort_list)
return hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
# --------------------------------------------------------------------------
# AES 解密
# --------------------------------------------------------------------------
def decrypt(self, encrypted_text: str) -> str:
"""AES-CBC 解密企微加密消息。
解密流程:
1. Base64 解码得到密文
2. AES-CBC 解密
3. 去除 PKCS7 填充
4. 提取明文内容(格式:random(16) + msg_len(4) + msg + corp_id
Args:
encrypted_text: Base64 编码的加密消息
Returns:
str: 解密后的明文 XML 字符串
Raises:
ValueError: 解密失败时抛出
"""
try:
# 1. Base64 解码得到密文字节
encrypted_data = base64.b64decode(encrypted_text)
# 2. AES-CBC 解密
cipher = Cipher(
algorithms.AES(self.aes_key),
modes.CBC(self.iv),
backend=default_backend(),
)
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
# 3. 去除 PKCS7 填充
# PKCS7: 填充字节的值等于填充的字节数
# 例如填充了 5 个字节,则每个字节的值都是 0x05
pad_len = decrypted_data[-1]
# 校验填充是否合法(填充值应等于填充长度)
if pad_len < 1 or pad_len > 32:
raise ValueError(f"无效的 PKCS7 填充值: {pad_len}")
# 检查所有填充字节是否一致
for i in range(pad_len):
if decrypted_data[-(i + 1)] != pad_len:
raise ValueError("PKCS7 填充校验失败")
# 去除填充部分
plaintext_data = decrypted_data[:-pad_len]
# 4. 提取明文内容
# 企微加密格式:random(16字节) + msg_len(4字节网络序) + msg + corp_id
# 跳过前 16 字节随机串
msg_len = struct.unpack("!I", plaintext_data[16:20])[0]
msg_content = plaintext_data[20 : 20 + msg_len].decode("utf-8")
from_corp_id = plaintext_data[20 + msg_len :].decode("utf-8")
# 5. 校验 corp_id 是否匹配
if from_corp_id != self.corp_id:
raise ValueError(
f"corp_id 不匹配: expected={self.corp_id}, got={from_corp_id}"
)
logger.debug(f"AES 解密成功, 明文长度: {len(msg_content)}")
return msg_content
except Exception as e:
logger.error(f"AES 解密失败: {e}")
raise ValueError(f"消息解密失败: {e}") from e
# --------------------------------------------------------------------------
# AES 加密
# --------------------------------------------------------------------------
def encrypt(self, plaintext: str) -> str:
"""AES-CBC 加密消息(用于被动回复)。
加密流程:
1. 构造明文:random(16) + msg_len(4) + msg + corp_id
2. PKCS7 填充
3. AES-CBC 加密
4. Base64 编码
Args:
plaintext: 要加密的明文字符串
Returns:
str: Base64 编码的加密结果
"""
try:
# 1. 构造明文字节
# 16 字节随机串(增加密文随机性,防止相同明文产生相同密文)
random_str = secrets.token_bytes(16)
# 消息内容字节
msg_bytes = plaintext.encode("utf-8")
# 消息长度(4 字节网络序,大端)
msg_len = struct.pack("!I", len(msg_bytes))
# corp_id 字节
corp_id_bytes = self.corp_id.encode("utf-8")
# 拼接
plaintext_data = random_str + msg_len + msg_bytes + corp_id_bytes
# 2. PKCS7 填充
# 块大小为 32 字节(AES-256 的块大小实际是 16 字节,
# 但企微官方实现使用 32 字节块做 PKCS7 填充)
block_size = 32
pad_len = block_size - (len(plaintext_data) % block_size)
# 填充字节的值等于填充的字节数
plaintext_data += bytes([pad_len] * pad_len)
# 3. AES-CBC 加密
cipher = Cipher(
algorithms.AES(self.aes_key),
modes.CBC(self.iv),
backend=default_backend(),
)
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(plaintext_data) + encryptor.finalize()
# 4. Base64 编码
result = base64.b64encode(encrypted_data).decode("utf-8")
logger.debug(f"AES 加密成功, 密文长度: {len(result)}")
return result
except Exception as e:
logger.error(f"AES 加密失败: {e}")
raise ValueError(f"消息加密失败: {e}") from e
# --------------------------------------------------------------------------
# 解密回调消息(完整流程)
# --------------------------------------------------------------------------
def decrypt_message(
self,
xml_body: str,
msg_signature: str,
timestamp: str,
nonce: str,
) -> Dict[str, str]:
"""解密企微回调消息的完整流程。
流程:
1. 从 XML 中提取 Encrypt 字段
2. 验证签名
3. AES 解密
4. 解析明文 XML,返回消息内容字典
Args:
xml_body: 企微 POST 的 XML 请求体
msg_signature: 企微传来的签名
timestamp: 时间戳
nonce: 随机数
Returns:
Dict[str, str]: 解密后的消息内容,包含 from_user_name, content, msg_type 等
Raises:
ValueError: 签名验证失败或解密失败
"""
# 1. 解析 XML,提取 Encrypt 字段
try:
root = ET.fromstring(xml_body)
encrypt_node = root.find("Encrypt")
if encrypt_node is None:
raise ValueError("XML 中未找到 Encrypt 字段")
encrypt = encrypt_node.text or ""
except ET.ParseError as e:
raise ValueError(f"XML 解析失败: {e}") from e
# 2. 验证签名
if not self.verify_signature(msg_signature, timestamp, nonce, encrypt):
raise ValueError("签名验证失败")
# 3. AES 解密
plaintext_xml = self.decrypt(encrypt)
# 4. 解析明文 XML
try:
plain_root = ET.fromstring(plaintext_xml)
result = {}
# 提取常见字段
for child in plain_root:
result[child.tag] = child.text or ""
logger.info(f"消息解密成功: from={result.get('FromUserName', 'unknown')}")
return result
except ET.ParseError as e:
raise ValueError(f"明文 XML 解析失败: {e}") from e
# --------------------------------------------------------------------------
# 加密响应消息(完整流程)
# --------------------------------------------------------------------------
def encrypt_message(
self, reply_content: str, nonce: Optional[str] = None
) -> str:
"""加密响应消息的完整流程(用于被动回复)。
流程:
1. AES 加密消息内容
2. 生成签名
3. 构造响应 XML
Args:
reply_content: 要回复的消息内容
nonce: 随机数(可选,默认自动生成)
Returns:
str: 加密后的 XML 响应字符串
"""
# 生成时间戳和随机数
timestamp = str(int(time.time()))
if nonce is None:
nonce = "".join(
secrets.choice(string.ascii_letters + string.digits)
for _ in range(10)
)
# AES 加密
encrypt = self.encrypt(reply_content)
# 生成签名
signature = self.generate_signature(timestamp, nonce, encrypt)
# 构造响应 XML
response_xml = (
f"<xml>"
f"<Encrypt><![CDATA[{encrypt}]]></Encrypt>"
f"<MsgSignature><![CDATA[{signature}]]></MsgSignature>"
f"<TimeStamp>{timestamp}</TimeStamp>"
f"<Nonce><![CDATA[{nonce}]]></Nonce>"
f"</xml>"
)
logger.debug("响应消息加密完成")
return response_xml
# --------------------------------------------------------------------------
# 验证 URL 有效性(GET 请求解密 echostr
# --------------------------------------------------------------------------
def decrypt_echostr(
self,
msg_signature: str,
timestamp: str,
nonce: str,
echostr: str,
) -> str:
"""解密企微回调验证的 echostr。
企微配置回调 URL 时会发送 GET 请求,需要:
1. 验证签名
2. 解密 echostr
3. 返回明文 echostr
Args:
msg_signature: 企微传来的签名
timestamp: 时间戳
nonce: 随机数
echostr: 加密的验证字符串
Returns:
str: 解密后的 echostr 明文
Raises:
ValueError: 签名验证失败或解密失败
"""
# 验证签名
if not self.verify_signature(msg_signature, timestamp, nonce, echostr):
raise ValueError("回调URL验证签名失败")
# 解密 echostr
plaintext = self.decrypt(echostr)
logger.info("回调URL验证成功,echostr 解密完成")
return plaintext