377 lines
14 KiB
Python
377 lines
14 KiB
Python
# =============================================================================
|
||
# 企微IT智能服务台 — 企微消息 AES 加解密工具
|
||
# =============================================================================
|
||
# 说明:实现企微回调消息的加解密和签名验证
|
||
# 参考企微官方加解密库逻辑,使用 Python cryptography 库重写
|
||
# 加密模式:AES-CBC-256,PKCS7 填充
|
||
# 签名算法:SHA1(sort(token, timestamp, nonce, encrypt))
|
||
# =============================================================================
|
||
|
||
import base64
|
||
import hashlib
|
||
import json
|
||
import logging
|
||
import secrets
|
||
import string
|
||
import struct
|
||
import time
|
||
import xml.etree.ElementTree as ET
|
||
from typing import Dict, Optional, Tuple
|
||
|
||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||
from cryptography.hazmat.backends import default_backend
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class WecomCrypto:
|
||
"""企微消息加解密工具类。
|
||
|
||
实现企微回调消息的完整加解密流程:
|
||
1. 签名验证:SHA1(sort(token, timestamp, nonce, encrypt))
|
||
2. AES 解密:CBC 模式,PKCS7 去填充
|
||
3. AES 加密:CBC 模式,PKCS7 填充
|
||
|
||
Attributes:
|
||
token: 企微回调配置的 Token
|
||
aes_key: 解码后的 AES 密钥(32 字节)
|
||
corp_id: 企业ID
|
||
"""
|
||
|
||
def __init__(self, token: str, encoding_aes_key: str, corp_id: str):
|
||
"""初始化加解密工具。
|
||
|
||
Args:
|
||
token: 企微回调配置的 Token(用于签名验证)
|
||
encoding_aes_key: 企微回调配置的 EncodingAESKey(43 位 Base64 字符串)
|
||
corp_id: 企业ID(用于消息体中的校验)
|
||
"""
|
||
self.token = token
|
||
# EncodingAESKey 是 43 位 Base64 编码字符串
|
||
# 解码时需要先补上 1 个 "=" 使其成为合法的 Base64 字符串
|
||
# 解码后得到 32 字节的 AES 密钥(AES-256)
|
||
self.aes_key = base64.b64decode(encoding_aes_key + "=")
|
||
# AES CBC 模式的 IV(初始化向量)取密钥的前 16 字节
|
||
self.iv = self.aes_key[:16]
|
||
self.corp_id = corp_id
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 签名验证
|
||
# --------------------------------------------------------------------------
|
||
def verify_signature(
|
||
self, signature: str, timestamp: str, nonce: str, encrypt: str
|
||
) -> bool:
|
||
"""验证企微回调签名。
|
||
|
||
企微的签名算法:SHA1(sort([token, timestamp, nonce, encrypt]))
|
||
将 token、timestamp、nonce、encrypt 四个字符串字典序排列后拼接,计算 SHA1。
|
||
|
||
Args:
|
||
signature: 企微传来的签名(msg_signature 参数)
|
||
timestamp: 时间戳
|
||
nonce: 随机数
|
||
encrypt: 加密的消息内容
|
||
|
||
Returns:
|
||
bool: 签名是否验证通过
|
||
"""
|
||
# 将四个参数按字典序排列
|
||
sort_list = sorted([self.token, timestamp, nonce, encrypt])
|
||
# 拼接为一个字符串
|
||
concat_str = "".join(sort_list)
|
||
# 计算 SHA1 哈希值
|
||
computed = hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
|
||
# 与传入的签名比较
|
||
is_valid = computed == signature
|
||
if not is_valid:
|
||
logger.warning(
|
||
f"签名验证失败: computed={computed}, received={signature}"
|
||
)
|
||
return is_valid
|
||
|
||
def generate_signature(
|
||
self, timestamp: str, nonce: str, encrypt: str
|
||
) -> str:
|
||
"""生成签名(用于加密响应消息时)。
|
||
|
||
Args:
|
||
timestamp: 时间戳
|
||
nonce: 随机数
|
||
encrypt: 加密后的消息内容
|
||
|
||
Returns:
|
||
str: SHA1 签名字符串
|
||
"""
|
||
sort_list = sorted([self.token, timestamp, nonce, encrypt])
|
||
concat_str = "".join(sort_list)
|
||
return hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
|
||
|
||
# --------------------------------------------------------------------------
|
||
# AES 解密
|
||
# --------------------------------------------------------------------------
|
||
def decrypt(self, encrypted_text: str) -> str:
|
||
"""AES-CBC 解密企微加密消息。
|
||
|
||
解密流程:
|
||
1. Base64 解码得到密文
|
||
2. AES-CBC 解密
|
||
3. 去除 PKCS7 填充
|
||
4. 提取明文内容(格式:random(16) + msg_len(4) + msg + corp_id)
|
||
|
||
Args:
|
||
encrypted_text: Base64 编码的加密消息
|
||
|
||
Returns:
|
||
str: 解密后的明文 XML 字符串
|
||
|
||
Raises:
|
||
ValueError: 解密失败时抛出
|
||
"""
|
||
try:
|
||
# 1. Base64 解码得到密文字节
|
||
encrypted_data = base64.b64decode(encrypted_text)
|
||
|
||
# 2. AES-CBC 解密
|
||
cipher = Cipher(
|
||
algorithms.AES(self.aes_key),
|
||
modes.CBC(self.iv),
|
||
backend=default_backend(),
|
||
)
|
||
decryptor = cipher.decryptor()
|
||
decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
|
||
|
||
# 3. 去除 PKCS7 填充
|
||
# PKCS7: 填充字节的值等于填充的字节数
|
||
# 例如填充了 5 个字节,则每个字节的值都是 0x05
|
||
pad_len = decrypted_data[-1]
|
||
# 校验填充是否合法(填充值应等于填充长度)
|
||
if pad_len < 1 or pad_len > 32:
|
||
raise ValueError(f"无效的 PKCS7 填充值: {pad_len}")
|
||
# 检查所有填充字节是否一致
|
||
for i in range(pad_len):
|
||
if decrypted_data[-(i + 1)] != pad_len:
|
||
raise ValueError("PKCS7 填充校验失败")
|
||
# 去除填充部分
|
||
plaintext_data = decrypted_data[:-pad_len]
|
||
|
||
# 4. 提取明文内容
|
||
# 企微加密格式:random(16字节) + msg_len(4字节网络序) + msg + corp_id
|
||
# 跳过前 16 字节随机串
|
||
msg_len = struct.unpack("!I", plaintext_data[16:20])[0]
|
||
msg_content = plaintext_data[20 : 20 + msg_len].decode("utf-8")
|
||
from_corp_id = plaintext_data[20 + msg_len :].decode("utf-8")
|
||
|
||
# 5. 校验 corp_id 是否匹配
|
||
if from_corp_id != self.corp_id:
|
||
raise ValueError(
|
||
f"corp_id 不匹配: expected={self.corp_id}, got={from_corp_id}"
|
||
)
|
||
|
||
logger.debug(f"AES 解密成功, 明文长度: {len(msg_content)}")
|
||
return msg_content
|
||
|
||
except Exception as e:
|
||
logger.error(f"AES 解密失败: {e}")
|
||
raise ValueError(f"消息解密失败: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# AES 加密
|
||
# --------------------------------------------------------------------------
|
||
def encrypt(self, plaintext: str) -> str:
|
||
"""AES-CBC 加密消息(用于被动回复)。
|
||
|
||
加密流程:
|
||
1. 构造明文:random(16) + msg_len(4) + msg + corp_id
|
||
2. PKCS7 填充
|
||
3. AES-CBC 加密
|
||
4. Base64 编码
|
||
|
||
Args:
|
||
plaintext: 要加密的明文字符串
|
||
|
||
Returns:
|
||
str: Base64 编码的加密结果
|
||
"""
|
||
try:
|
||
# 1. 构造明文字节
|
||
# 16 字节随机串(增加密文随机性,防止相同明文产生相同密文)
|
||
random_str = secrets.token_bytes(16)
|
||
# 消息内容字节
|
||
msg_bytes = plaintext.encode("utf-8")
|
||
# 消息长度(4 字节网络序,大端)
|
||
msg_len = struct.pack("!I", len(msg_bytes))
|
||
# corp_id 字节
|
||
corp_id_bytes = self.corp_id.encode("utf-8")
|
||
# 拼接
|
||
plaintext_data = random_str + msg_len + msg_bytes + corp_id_bytes
|
||
|
||
# 2. PKCS7 填充
|
||
# 块大小为 32 字节(AES-256 的块大小实际是 16 字节,
|
||
# 但企微官方实现使用 32 字节块做 PKCS7 填充)
|
||
block_size = 32
|
||
pad_len = block_size - (len(plaintext_data) % block_size)
|
||
# 填充字节的值等于填充的字节数
|
||
plaintext_data += bytes([pad_len] * pad_len)
|
||
|
||
# 3. AES-CBC 加密
|
||
cipher = Cipher(
|
||
algorithms.AES(self.aes_key),
|
||
modes.CBC(self.iv),
|
||
backend=default_backend(),
|
||
)
|
||
encryptor = cipher.encryptor()
|
||
encrypted_data = encryptor.update(plaintext_data) + encryptor.finalize()
|
||
|
||
# 4. Base64 编码
|
||
result = base64.b64encode(encrypted_data).decode("utf-8")
|
||
logger.debug(f"AES 加密成功, 密文长度: {len(result)}")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"AES 加密失败: {e}")
|
||
raise ValueError(f"消息加密失败: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 解密回调消息(完整流程)
|
||
# --------------------------------------------------------------------------
|
||
def decrypt_message(
|
||
self,
|
||
xml_body: str,
|
||
msg_signature: str,
|
||
timestamp: str,
|
||
nonce: str,
|
||
) -> Dict[str, str]:
|
||
"""解密企微回调消息的完整流程。
|
||
|
||
流程:
|
||
1. 从 XML 中提取 Encrypt 字段
|
||
2. 验证签名
|
||
3. AES 解密
|
||
4. 解析明文 XML,返回消息内容字典
|
||
|
||
Args:
|
||
xml_body: 企微 POST 的 XML 请求体
|
||
msg_signature: 企微传来的签名
|
||
timestamp: 时间戳
|
||
nonce: 随机数
|
||
|
||
Returns:
|
||
Dict[str, str]: 解密后的消息内容,包含 from_user_name, content, msg_type 等
|
||
|
||
Raises:
|
||
ValueError: 签名验证失败或解密失败
|
||
"""
|
||
# 1. 解析 XML,提取 Encrypt 字段
|
||
try:
|
||
root = ET.fromstring(xml_body)
|
||
encrypt_node = root.find("Encrypt")
|
||
if encrypt_node is None:
|
||
raise ValueError("XML 中未找到 Encrypt 字段")
|
||
encrypt = encrypt_node.text or ""
|
||
except ET.ParseError as e:
|
||
raise ValueError(f"XML 解析失败: {e}") from e
|
||
|
||
# 2. 验证签名
|
||
if not self.verify_signature(msg_signature, timestamp, nonce, encrypt):
|
||
raise ValueError("签名验证失败")
|
||
|
||
# 3. AES 解密
|
||
plaintext_xml = self.decrypt(encrypt)
|
||
|
||
# 4. 解析明文 XML
|
||
try:
|
||
plain_root = ET.fromstring(plaintext_xml)
|
||
result = {}
|
||
# 提取常见字段
|
||
for child in plain_root:
|
||
result[child.tag] = child.text or ""
|
||
logger.info(f"消息解密成功: from={result.get('FromUserName', 'unknown')}")
|
||
return result
|
||
except ET.ParseError as e:
|
||
raise ValueError(f"明文 XML 解析失败: {e}") from e
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 加密响应消息(完整流程)
|
||
# --------------------------------------------------------------------------
|
||
def encrypt_message(
|
||
self, reply_content: str, nonce: Optional[str] = None
|
||
) -> str:
|
||
"""加密响应消息的完整流程(用于被动回复)。
|
||
|
||
流程:
|
||
1. AES 加密消息内容
|
||
2. 生成签名
|
||
3. 构造响应 XML
|
||
|
||
Args:
|
||
reply_content: 要回复的消息内容
|
||
nonce: 随机数(可选,默认自动生成)
|
||
|
||
Returns:
|
||
str: 加密后的 XML 响应字符串
|
||
"""
|
||
# 生成时间戳和随机数
|
||
timestamp = str(int(time.time()))
|
||
if nonce is None:
|
||
nonce = "".join(
|
||
secrets.choice(string.ascii_letters + string.digits)
|
||
for _ in range(10)
|
||
)
|
||
|
||
# AES 加密
|
||
encrypt = self.encrypt(reply_content)
|
||
|
||
# 生成签名
|
||
signature = self.generate_signature(timestamp, nonce, encrypt)
|
||
|
||
# 构造响应 XML
|
||
response_xml = (
|
||
f"<xml>"
|
||
f"<Encrypt><![CDATA[{encrypt}]]></Encrypt>"
|
||
f"<MsgSignature><![CDATA[{signature}]]></MsgSignature>"
|
||
f"<TimeStamp>{timestamp}</TimeStamp>"
|
||
f"<Nonce><![CDATA[{nonce}]]></Nonce>"
|
||
f"</xml>"
|
||
)
|
||
|
||
logger.debug("响应消息加密完成")
|
||
return response_xml
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 验证 URL 有效性(GET 请求解密 echostr)
|
||
# --------------------------------------------------------------------------
|
||
def decrypt_echostr(
|
||
self,
|
||
msg_signature: str,
|
||
timestamp: str,
|
||
nonce: str,
|
||
echostr: str,
|
||
) -> str:
|
||
"""解密企微回调验证的 echostr。
|
||
|
||
企微配置回调 URL 时会发送 GET 请求,需要:
|
||
1. 验证签名
|
||
2. 解密 echostr
|
||
3. 返回明文 echostr
|
||
|
||
Args:
|
||
msg_signature: 企微传来的签名
|
||
timestamp: 时间戳
|
||
nonce: 随机数
|
||
echostr: 加密的验证字符串
|
||
|
||
Returns:
|
||
str: 解密后的 echostr 明文
|
||
|
||
Raises:
|
||
ValueError: 签名验证失败或解密失败
|
||
"""
|
||
# 验证签名
|
||
if not self.verify_signature(msg_signature, timestamp, nonce, echostr):
|
||
raise ValueError("回调URL验证签名失败")
|
||
|
||
# 解密 echostr
|
||
plaintext = self.decrypt(echostr)
|
||
logger.info("回调URL验证成功,echostr 解密完成")
|
||
return plaintext
|