Files
wecom_it_smart_desk/backend/tests/test_wecom_crypto.py
T

242 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — WecomCrypto 加解密测试
# =============================================================================
# 测试覆盖:
# 1. AES 密钥解码(43 位 EncodingAESKey → 32 字节密钥)
# 2. 签名生成与验证(SHA1(sort(token, timestamp, nonce, encrypt))
# 3. AES 加密 + 解密往返(encrypt → decrypt 还原原文)
# 4. corp_id 不匹配时解密失败
# 5. 完整消息解密流程(decrypt_message
# 6. 完整消息加密流程(encrypt_message
# 7. echostr 解密流程(decrypt_echostr
# 8. 无效签名验证失败
# 9. 无效密文解密失败
# =============================================================================
import hashlib
import pytest
from app.utils.wecom_crypto import WecomCrypto
# 测试用配置(和企微开发文档示例一致)
TEST_TOKEN = "test_token_abc"
TEST_ENCODING_AES_KEY = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFG" # 43 字符
TEST_CORP_ID = "ww_test_corp_id"
@pytest.fixture
def crypto():
"""创建 WecomCrypto 实例。"""
return WecomCrypto(
token=TEST_TOKEN,
encoding_aes_key=TEST_ENCODING_AES_KEY,
corp_id=TEST_CORP_ID,
)
class TestWecomCryptoInit:
"""测试 WecomCrypto 初始化。"""
def test_aes_key_decoding(self, crypto):
"""验证 43 位 EncodingAESKey 正确解码为 32 字节 AES 密钥。"""
import base64
expected_key = base64.b64decode(TEST_ENCODING_AES_KEY + "=")
assert crypto.aes_key == expected_key
assert len(crypto.aes_key) == 32 # AES-256 需要 32 字节密钥
def test_iv_is_first_16_bytes_of_key(self, crypto):
"""验证 IV 取自 AES 密钥的前 16 字节。"""
assert crypto.iv == crypto.aes_key[:16]
assert len(crypto.iv) == 16
def test_token_stored(self, crypto):
"""验证 Token 正确存储。"""
assert crypto.token == TEST_TOKEN
def test_corp_id_stored(self, crypto):
"""验证 CorpID 正确存储。"""
assert crypto.corp_id == TEST_CORP_ID
class TestSignature:
"""测试签名生成与验证。"""
def test_generate_signature(self, crypto):
"""验证签名生成算法:SHA1(sort([token, timestamp, nonce, encrypt]))。"""
timestamp = "1234567890"
nonce = "test_nonce"
encrypt = "test_encrypt_content"
signature = crypto.generate_signature(timestamp, nonce, encrypt)
# 手动计算预期签名
sort_list = sorted([TEST_TOKEN, timestamp, nonce, encrypt])
concat_str = "".join(sort_list)
expected = hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
assert signature == expected
def test_verify_signature_valid(self, crypto):
"""验证正确签名通过校验。"""
timestamp = "1234567890"
nonce = "test_nonce"
encrypt = "test_encrypt_content"
signature = crypto.generate_signature(timestamp, nonce, encrypt)
assert crypto.verify_signature(signature, timestamp, nonce, encrypt) is True
def test_verify_signature_invalid(self, crypto):
"""验证错误签名不通过校验。"""
assert crypto.verify_signature(
"invalid_signature", "1234567890", "nonce", "encrypt"
) is False
def test_verify_signature_tampered_timestamp(self, crypto):
"""验证篡改时间戳后签名校验失败。"""
signature = crypto.generate_signature("1234567890", "nonce", "encrypt")
assert crypto.verify_signature(signature, "9999999999", "nonce", "encrypt") is False
class TestEncryptDecrypt:
"""测试 AES 加密与解密的往返一致性。"""
def test_encrypt_decrypt_roundtrip(self, crypto):
"""验证加密后解密能还原原文。"""
plaintext = "<xml><Content>你好企微</Content></xml>"
encrypted = crypto.encrypt(plaintext)
decrypted = crypto.decrypt(encrypted)
assert decrypted == plaintext
def test_encrypt_produces_different_ciphertext(self, crypto):
"""验证相同明文多次加密产生不同密文(因为 16 字节随机串)。"""
plaintext = "测试消息"
encrypted1 = crypto.encrypt(plaintext)
encrypted2 = crypto.encrypt(plaintext)
assert encrypted1 != encrypted2
def test_decrypt_with_wrong_corp_id(self):
"""验证 corp_id 不匹配时解密抛出 ValueError。"""
crypto1 = WecomCrypto(TEST_TOKEN, TEST_ENCODING_AES_KEY, TEST_CORP_ID)
crypto2 = WecomCrypto(TEST_TOKEN, TEST_ENCODING_AES_KEY, "wrong_corp_id")
encrypted = crypto1.encrypt("测试消息")
with pytest.raises(ValueError, match="corp_id 不匹配"):
crypto2.decrypt(encrypted)
def test_decrypt_invalid_base64(self, crypto):
"""验证无效 Base64 密文解密抛出 ValueError。"""
with pytest.raises(ValueError):
crypto.decrypt("这不是有效的base64密文!!!")
def test_encrypt_decrypt_empty_string(self, crypto):
"""验证空字符串加密解密往返。"""
encrypted = crypto.encrypt("")
decrypted = crypto.decrypt(encrypted)
assert decrypted == ""
def test_encrypt_decrypt_long_text(self, crypto):
"""验证长文本加密解密往返。"""
long_text = "A" * 10000
encrypted = crypto.encrypt(long_text)
decrypted = crypto.decrypt(encrypted)
assert decrypted == long_text
def test_encrypt_decrypt_chinese_text(self, crypto):
"""验证中文内容加密解密往返。"""
chinese_text = "密码重置、VPN连接、软件安装,请按步骤操作。"
encrypted = crypto.encrypt(chinese_text)
decrypted = crypto.decrypt(encrypted)
assert decrypted == chinese_text
class TestDecryptMessage:
"""测试完整的消息解密流程。"""
def test_decrypt_message_full_flow(self, crypto):
"""验证从 XML 密文到明文的完整解密流程。"""
# 先加密一段消息
original_msg = "<xml><Content>Hello</Content><FromUserName>user001</FromUserName></xml>"
encrypted = crypto.encrypt(original_msg)
# 构造企微回调的 XML 格式
timestamp = "1234567890"
nonce = "test_nonce"
signature = crypto.generate_signature(timestamp, nonce, encrypted)
xml_body = f"<xml><Encrypt><![CDATA[{encrypted}]]></Encrypt></xml>"
result = crypto.decrypt_message(xml_body, signature, timestamp, nonce)
assert result.get("Content") == "Hello"
assert result.get("FromUserName") == "user001"
def test_decrypt_message_invalid_signature(self, crypto):
"""验证签名错误时解密消息抛出 ValueError。"""
encrypted = crypto.encrypt("<xml><Content>test</Content></xml>")
xml_body = f"<xml><Encrypt><![CDATA[{encrypted}]]></Encrypt></xml>"
with pytest.raises(ValueError, match="签名验证失败"):
crypto.decrypt_message(xml_body, "invalid_signature", "timestamp", "nonce")
def test_decrypt_message_missing_encrypt_field(self, crypto):
"""验证 XML 缺少 Encrypt 字段时抛出 ValueError。"""
xml_body = "<xml><MsgType>text</MsgType></xml>"
with pytest.raises(ValueError, match="未找到 Encrypt 字段"):
crypto.decrypt_message(xml_body, "sig", "ts", "nonce")
def test_decrypt_message_invalid_xml(self, crypto):
"""验证无效 XML 抛出 ValueError。"""
with pytest.raises(ValueError, match="XML 解析失败"):
crypto.decrypt_message("not valid xml", "sig", "ts", "nonce")
class TestEncryptMessage:
"""测试完整的消息加密流程。"""
def test_encrypt_message_format(self, crypto):
"""验证加密响应消息的 XML 格式正确。"""
result = crypto.encrypt_message("回复消息", nonce="test_nonce")
assert "<Encrypt>" in result
assert "<MsgSignature>" in result
assert "<TimeStamp>" in result
assert "<Nonce>" in result
def test_encrypt_message_roundtrip(self, crypto):
"""验证加密后的消息可以被正确解密。"""
original = "测试回复内容"
encrypted_xml = crypto.encrypt_message(original, nonce="test_nonce")
# 从加密 XML 中提取各字段
import xml.etree.ElementTree as ET
root = ET.fromstring(encrypted_xml)
encrypt_text = root.find("Encrypt").text
msg_signature = root.find("MsgSignature").text
timestamp = root.find("TimeStamp").text
nonce = root.find("Nonce").text
# 解密验证
decrypted = crypto.decrypt(encrypt_text)
assert decrypted == original
class TestDecryptEchostr:
"""测试回调 URL 验证的 echostr 解密。"""
def test_decrypt_echostr_valid(self, crypto):
"""验证正确的 echostr 解密。"""
echostr = "verify_token_12345"
encrypted = crypto.encrypt(echostr)
timestamp = "1234567890"
nonce = "test_nonce"
signature = crypto.generate_signature(timestamp, nonce, encrypted)
result = crypto.decrypt_echostr(signature, timestamp, nonce, encrypted)
assert result == echostr
def test_decrypt_echostr_invalid_signature(self, crypto):
"""验证签名错误时 echostr 解密失败。"""
encrypted = crypto.encrypt("test")
with pytest.raises(ValueError, match="回调URL验证签名失败"):
crypto.decrypt_echostr("wrong_sig", "ts", "nonce", encrypted)