242 lines
9.4 KiB
Python
242 lines
9.4 KiB
Python
# =============================================================================
|
||
# 企微IT智能服务台 — WecomCrypto 加解密测试
|
||
# =============================================================================
|
||
# 测试覆盖:
|
||
# 1. AES 密钥解码(43 位 EncodingAESKey → 32 字节密钥)
|
||
# 2. 签名生成与验证(SHA1(sort(token, timestamp, nonce, encrypt)))
|
||
# 3. AES 加密 + 解密往返(encrypt → decrypt 还原原文)
|
||
# 4. corp_id 不匹配时解密失败
|
||
# 5. 完整消息解密流程(decrypt_message)
|
||
# 6. 完整消息加密流程(encrypt_message)
|
||
# 7. echostr 解密流程(decrypt_echostr)
|
||
# 8. 无效签名验证失败
|
||
# 9. 无效密文解密失败
|
||
# =============================================================================
|
||
|
||
import hashlib
|
||
|
||
import pytest
|
||
|
||
from app.utils.wecom_crypto import WecomCrypto
|
||
|
||
|
||
# 测试用配置(和企微开发文档示例一致)
|
||
TEST_TOKEN = "test_token_abc"
|
||
TEST_ENCODING_AES_KEY = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFG" # 43 字符
|
||
TEST_CORP_ID = "ww_test_corp_id"
|
||
|
||
|
||
@pytest.fixture
|
||
def crypto():
|
||
"""创建 WecomCrypto 实例。"""
|
||
return WecomCrypto(
|
||
token=TEST_TOKEN,
|
||
encoding_aes_key=TEST_ENCODING_AES_KEY,
|
||
corp_id=TEST_CORP_ID,
|
||
)
|
||
|
||
|
||
class TestWecomCryptoInit:
|
||
"""测试 WecomCrypto 初始化。"""
|
||
|
||
def test_aes_key_decoding(self, crypto):
|
||
"""验证 43 位 EncodingAESKey 正确解码为 32 字节 AES 密钥。"""
|
||
import base64
|
||
expected_key = base64.b64decode(TEST_ENCODING_AES_KEY + "=")
|
||
assert crypto.aes_key == expected_key
|
||
assert len(crypto.aes_key) == 32 # AES-256 需要 32 字节密钥
|
||
|
||
def test_iv_is_first_16_bytes_of_key(self, crypto):
|
||
"""验证 IV 取自 AES 密钥的前 16 字节。"""
|
||
assert crypto.iv == crypto.aes_key[:16]
|
||
assert len(crypto.iv) == 16
|
||
|
||
def test_token_stored(self, crypto):
|
||
"""验证 Token 正确存储。"""
|
||
assert crypto.token == TEST_TOKEN
|
||
|
||
def test_corp_id_stored(self, crypto):
|
||
"""验证 CorpID 正确存储。"""
|
||
assert crypto.corp_id == TEST_CORP_ID
|
||
|
||
|
||
class TestSignature:
|
||
"""测试签名生成与验证。"""
|
||
|
||
def test_generate_signature(self, crypto):
|
||
"""验证签名生成算法:SHA1(sort([token, timestamp, nonce, encrypt]))。"""
|
||
timestamp = "1234567890"
|
||
nonce = "test_nonce"
|
||
encrypt = "test_encrypt_content"
|
||
|
||
signature = crypto.generate_signature(timestamp, nonce, encrypt)
|
||
|
||
# 手动计算预期签名
|
||
sort_list = sorted([TEST_TOKEN, timestamp, nonce, encrypt])
|
||
concat_str = "".join(sort_list)
|
||
expected = hashlib.sha1(concat_str.encode("utf-8")).hexdigest()
|
||
|
||
assert signature == expected
|
||
|
||
def test_verify_signature_valid(self, crypto):
|
||
"""验证正确签名通过校验。"""
|
||
timestamp = "1234567890"
|
||
nonce = "test_nonce"
|
||
encrypt = "test_encrypt_content"
|
||
|
||
signature = crypto.generate_signature(timestamp, nonce, encrypt)
|
||
assert crypto.verify_signature(signature, timestamp, nonce, encrypt) is True
|
||
|
||
def test_verify_signature_invalid(self, crypto):
|
||
"""验证错误签名不通过校验。"""
|
||
assert crypto.verify_signature(
|
||
"invalid_signature", "1234567890", "nonce", "encrypt"
|
||
) is False
|
||
|
||
def test_verify_signature_tampered_timestamp(self, crypto):
|
||
"""验证篡改时间戳后签名校验失败。"""
|
||
signature = crypto.generate_signature("1234567890", "nonce", "encrypt")
|
||
assert crypto.verify_signature(signature, "9999999999", "nonce", "encrypt") is False
|
||
|
||
|
||
class TestEncryptDecrypt:
|
||
"""测试 AES 加密与解密的往返一致性。"""
|
||
|
||
def test_encrypt_decrypt_roundtrip(self, crypto):
|
||
"""验证加密后解密能还原原文。"""
|
||
plaintext = "<xml><Content>你好企微</Content></xml>"
|
||
encrypted = crypto.encrypt(plaintext)
|
||
decrypted = crypto.decrypt(encrypted)
|
||
assert decrypted == plaintext
|
||
|
||
def test_encrypt_produces_different_ciphertext(self, crypto):
|
||
"""验证相同明文多次加密产生不同密文(因为 16 字节随机串)。"""
|
||
plaintext = "测试消息"
|
||
encrypted1 = crypto.encrypt(plaintext)
|
||
encrypted2 = crypto.encrypt(plaintext)
|
||
assert encrypted1 != encrypted2
|
||
|
||
def test_decrypt_with_wrong_corp_id(self):
|
||
"""验证 corp_id 不匹配时解密抛出 ValueError。"""
|
||
crypto1 = WecomCrypto(TEST_TOKEN, TEST_ENCODING_AES_KEY, TEST_CORP_ID)
|
||
crypto2 = WecomCrypto(TEST_TOKEN, TEST_ENCODING_AES_KEY, "wrong_corp_id")
|
||
|
||
encrypted = crypto1.encrypt("测试消息")
|
||
with pytest.raises(ValueError, match="corp_id 不匹配"):
|
||
crypto2.decrypt(encrypted)
|
||
|
||
def test_decrypt_invalid_base64(self, crypto):
|
||
"""验证无效 Base64 密文解密抛出 ValueError。"""
|
||
with pytest.raises(ValueError):
|
||
crypto.decrypt("这不是有效的base64密文!!!")
|
||
|
||
def test_encrypt_decrypt_empty_string(self, crypto):
|
||
"""验证空字符串加密解密往返。"""
|
||
encrypted = crypto.encrypt("")
|
||
decrypted = crypto.decrypt(encrypted)
|
||
assert decrypted == ""
|
||
|
||
def test_encrypt_decrypt_long_text(self, crypto):
|
||
"""验证长文本加密解密往返。"""
|
||
long_text = "A" * 10000
|
||
encrypted = crypto.encrypt(long_text)
|
||
decrypted = crypto.decrypt(encrypted)
|
||
assert decrypted == long_text
|
||
|
||
def test_encrypt_decrypt_chinese_text(self, crypto):
|
||
"""验证中文内容加密解密往返。"""
|
||
chinese_text = "密码重置、VPN连接、软件安装,请按步骤操作。"
|
||
encrypted = crypto.encrypt(chinese_text)
|
||
decrypted = crypto.decrypt(encrypted)
|
||
assert decrypted == chinese_text
|
||
|
||
|
||
class TestDecryptMessage:
|
||
"""测试完整的消息解密流程。"""
|
||
|
||
def test_decrypt_message_full_flow(self, crypto):
|
||
"""验证从 XML 密文到明文的完整解密流程。"""
|
||
# 先加密一段消息
|
||
original_msg = "<xml><Content>Hello</Content><FromUserName>user001</FromUserName></xml>"
|
||
encrypted = crypto.encrypt(original_msg)
|
||
|
||
# 构造企微回调的 XML 格式
|
||
timestamp = "1234567890"
|
||
nonce = "test_nonce"
|
||
signature = crypto.generate_signature(timestamp, nonce, encrypted)
|
||
|
||
xml_body = f"<xml><Encrypt><![CDATA[{encrypted}]]></Encrypt></xml>"
|
||
|
||
result = crypto.decrypt_message(xml_body, signature, timestamp, nonce)
|
||
assert result.get("Content") == "Hello"
|
||
assert result.get("FromUserName") == "user001"
|
||
|
||
def test_decrypt_message_invalid_signature(self, crypto):
|
||
"""验证签名错误时解密消息抛出 ValueError。"""
|
||
encrypted = crypto.encrypt("<xml><Content>test</Content></xml>")
|
||
xml_body = f"<xml><Encrypt><![CDATA[{encrypted}]]></Encrypt></xml>"
|
||
|
||
with pytest.raises(ValueError, match="签名验证失败"):
|
||
crypto.decrypt_message(xml_body, "invalid_signature", "timestamp", "nonce")
|
||
|
||
def test_decrypt_message_missing_encrypt_field(self, crypto):
|
||
"""验证 XML 缺少 Encrypt 字段时抛出 ValueError。"""
|
||
xml_body = "<xml><MsgType>text</MsgType></xml>"
|
||
with pytest.raises(ValueError, match="未找到 Encrypt 字段"):
|
||
crypto.decrypt_message(xml_body, "sig", "ts", "nonce")
|
||
|
||
def test_decrypt_message_invalid_xml(self, crypto):
|
||
"""验证无效 XML 抛出 ValueError。"""
|
||
with pytest.raises(ValueError, match="XML 解析失败"):
|
||
crypto.decrypt_message("not valid xml", "sig", "ts", "nonce")
|
||
|
||
|
||
class TestEncryptMessage:
|
||
"""测试完整的消息加密流程。"""
|
||
|
||
def test_encrypt_message_format(self, crypto):
|
||
"""验证加密响应消息的 XML 格式正确。"""
|
||
result = crypto.encrypt_message("回复消息", nonce="test_nonce")
|
||
assert "<Encrypt>" in result
|
||
assert "<MsgSignature>" in result
|
||
assert "<TimeStamp>" in result
|
||
assert "<Nonce>" in result
|
||
|
||
def test_encrypt_message_roundtrip(self, crypto):
|
||
"""验证加密后的消息可以被正确解密。"""
|
||
original = "测试回复内容"
|
||
encrypted_xml = crypto.encrypt_message(original, nonce="test_nonce")
|
||
|
||
# 从加密 XML 中提取各字段
|
||
import xml.etree.ElementTree as ET
|
||
root = ET.fromstring(encrypted_xml)
|
||
encrypt_text = root.find("Encrypt").text
|
||
msg_signature = root.find("MsgSignature").text
|
||
timestamp = root.find("TimeStamp").text
|
||
nonce = root.find("Nonce").text
|
||
|
||
# 解密验证
|
||
decrypted = crypto.decrypt(encrypt_text)
|
||
assert decrypted == original
|
||
|
||
|
||
class TestDecryptEchostr:
|
||
"""测试回调 URL 验证的 echostr 解密。"""
|
||
|
||
def test_decrypt_echostr_valid(self, crypto):
|
||
"""验证正确的 echostr 解密。"""
|
||
echostr = "verify_token_12345"
|
||
encrypted = crypto.encrypt(echostr)
|
||
timestamp = "1234567890"
|
||
nonce = "test_nonce"
|
||
signature = crypto.generate_signature(timestamp, nonce, encrypted)
|
||
|
||
result = crypto.decrypt_echostr(signature, timestamp, nonce, encrypted)
|
||
assert result == echostr
|
||
|
||
def test_decrypt_echostr_invalid_signature(self, crypto):
|
||
"""验证签名错误时 echostr 解密失败。"""
|
||
encrypted = crypto.encrypt("test")
|
||
with pytest.raises(ValueError, match="回调URL验证签名失败"):
|
||
crypto.decrypt_echostr("wrong_sig", "ts", "nonce", encrypted)
|