# ============================================================================= # 企微IT智能服务台 — WecomCrypto 加解密测试 # ============================================================================= # 测试覆盖: # 1. AES 密钥解码(43 位 EncodingAESKey → 32 字节密钥) # 2. 签名生成与验证(SHA1(sort(token, timestamp, nonce, encrypt))) # 3. AES 加密 + 解密往返(encrypt → decrypt 还原原文) # 4. corp_id 不匹配时解密失败 # 5. 完整消息解密流程(decrypt_message) # 6. 完整消息加密流程(encrypt_message) # 7. echostr 解密流程(decrypt_echostr) # 8. 无效签名验证失败 # 9. 无效密文解密失败 # ============================================================================= import hashlib import pytest from app.utils.wecom_crypto import WecomCrypto # 测试用配置(和企微开发文档示例一致) TEST_TOKEN = "test_token_abc" TEST_ENCODING_AES_KEY = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFG" # 43 字符 TEST_CORP_ID = "ww_test_corp_id" @pytest.fixture def crypto(): """创建 WecomCrypto 实例。""" return WecomCrypto( token=TEST_TOKEN, encoding_aes_key=TEST_ENCODING_AES_KEY, corp_id=TEST_CORP_ID, ) class TestWecomCryptoInit: """测试 WecomCrypto 初始化。""" def test_aes_key_decoding(self, crypto): """验证 43 位 EncodingAESKey 正确解码为 32 字节 AES 密钥。""" import base64 expected_key = base64.b64decode(TEST_ENCODING_AES_KEY + "=") assert crypto.aes_key == expected_key assert len(crypto.aes_key) == 32 # AES-256 需要 32 字节密钥 def test_iv_is_first_16_bytes_of_key(self, crypto): """验证 IV 取自 AES 密钥的前 16 字节。""" assert crypto.iv == crypto.aes_key[:16] assert len(crypto.iv) == 16 def test_token_stored(self, crypto): """验证 Token 正确存储。""" assert crypto.token == TEST_TOKEN def test_corp_id_stored(self, crypto): """验证 CorpID 正确存储。""" assert crypto.corp_id == TEST_CORP_ID class TestSignature: """测试签名生成与验证。""" def test_generate_signature(self, crypto): """验证签名生成算法:SHA1(sort([token, timestamp, nonce, encrypt]))。""" timestamp = "1234567890" nonce = "test_nonce" encrypt = "test_encrypt_content" signature = crypto.generate_signature(timestamp, nonce, encrypt) # 手动计算预期签名 sort_list = sorted([TEST_TOKEN, timestamp, nonce, encrypt]) concat_str = "".join(sort_list) expected = hashlib.sha1(concat_str.encode("utf-8")).hexdigest() assert signature == expected def test_verify_signature_valid(self, crypto): """验证正确签名通过校验。""" timestamp = "1234567890" nonce = "test_nonce" encrypt = "test_encrypt_content" signature = crypto.generate_signature(timestamp, nonce, encrypt) assert crypto.verify_signature(signature, timestamp, nonce, encrypt) is True def test_verify_signature_invalid(self, crypto): """验证错误签名不通过校验。""" assert crypto.verify_signature( "invalid_signature", "1234567890", "nonce", "encrypt" ) is False def test_verify_signature_tampered_timestamp(self, crypto): """验证篡改时间戳后签名校验失败。""" signature = crypto.generate_signature("1234567890", "nonce", "encrypt") assert crypto.verify_signature(signature, "9999999999", "nonce", "encrypt") is False class TestEncryptDecrypt: """测试 AES 加密与解密的往返一致性。""" def test_encrypt_decrypt_roundtrip(self, crypto): """验证加密后解密能还原原文。""" plaintext = "你好企微" encrypted = crypto.encrypt(plaintext) decrypted = crypto.decrypt(encrypted) assert decrypted == plaintext def test_encrypt_produces_different_ciphertext(self, crypto): """验证相同明文多次加密产生不同密文(因为 16 字节随机串)。""" plaintext = "测试消息" encrypted1 = crypto.encrypt(plaintext) encrypted2 = crypto.encrypt(plaintext) assert encrypted1 != encrypted2 def test_decrypt_with_wrong_corp_id(self): """验证 corp_id 不匹配时解密抛出 ValueError。""" crypto1 = WecomCrypto(TEST_TOKEN, TEST_ENCODING_AES_KEY, TEST_CORP_ID) crypto2 = WecomCrypto(TEST_TOKEN, TEST_ENCODING_AES_KEY, "wrong_corp_id") encrypted = crypto1.encrypt("测试消息") with pytest.raises(ValueError, match="corp_id 不匹配"): crypto2.decrypt(encrypted) def test_decrypt_invalid_base64(self, crypto): """验证无效 Base64 密文解密抛出 ValueError。""" with pytest.raises(ValueError): crypto.decrypt("这不是有效的base64密文!!!") def test_encrypt_decrypt_empty_string(self, crypto): """验证空字符串加密解密往返。""" encrypted = crypto.encrypt("") decrypted = crypto.decrypt(encrypted) assert decrypted == "" def test_encrypt_decrypt_long_text(self, crypto): """验证长文本加密解密往返。""" long_text = "A" * 10000 encrypted = crypto.encrypt(long_text) decrypted = crypto.decrypt(encrypted) assert decrypted == long_text def test_encrypt_decrypt_chinese_text(self, crypto): """验证中文内容加密解密往返。""" chinese_text = "密码重置、VPN连接、软件安装,请按步骤操作。" encrypted = crypto.encrypt(chinese_text) decrypted = crypto.decrypt(encrypted) assert decrypted == chinese_text class TestDecryptMessage: """测试完整的消息解密流程。""" def test_decrypt_message_full_flow(self, crypto): """验证从 XML 密文到明文的完整解密流程。""" # 先加密一段消息 original_msg = "Hellouser001" encrypted = crypto.encrypt(original_msg) # 构造企微回调的 XML 格式 timestamp = "1234567890" nonce = "test_nonce" signature = crypto.generate_signature(timestamp, nonce, encrypted) xml_body = f"" result = crypto.decrypt_message(xml_body, signature, timestamp, nonce) assert result.get("Content") == "Hello" assert result.get("FromUserName") == "user001" def test_decrypt_message_invalid_signature(self, crypto): """验证签名错误时解密消息抛出 ValueError。""" encrypted = crypto.encrypt("test") xml_body = f"" with pytest.raises(ValueError, match="签名验证失败"): crypto.decrypt_message(xml_body, "invalid_signature", "timestamp", "nonce") def test_decrypt_message_missing_encrypt_field(self, crypto): """验证 XML 缺少 Encrypt 字段时抛出 ValueError。""" xml_body = "text" with pytest.raises(ValueError, match="未找到 Encrypt 字段"): crypto.decrypt_message(xml_body, "sig", "ts", "nonce") def test_decrypt_message_invalid_xml(self, crypto): """验证无效 XML 抛出 ValueError。""" with pytest.raises(ValueError, match="XML 解析失败"): crypto.decrypt_message("not valid xml", "sig", "ts", "nonce") class TestEncryptMessage: """测试完整的消息加密流程。""" def test_encrypt_message_format(self, crypto): """验证加密响应消息的 XML 格式正确。""" result = crypto.encrypt_message("回复消息", nonce="test_nonce") assert "" in result assert "" in result assert "" in result assert "" in result def test_encrypt_message_roundtrip(self, crypto): """验证加密后的消息可以被正确解密。""" original = "测试回复内容" encrypted_xml = crypto.encrypt_message(original, nonce="test_nonce") # 从加密 XML 中提取各字段 import xml.etree.ElementTree as ET root = ET.fromstring(encrypted_xml) encrypt_text = root.find("Encrypt").text msg_signature = root.find("MsgSignature").text timestamp = root.find("TimeStamp").text nonce = root.find("Nonce").text # 解密验证 decrypted = crypto.decrypt(encrypt_text) assert decrypted == original class TestDecryptEchostr: """测试回调 URL 验证的 echostr 解密。""" def test_decrypt_echostr_valid(self, crypto): """验证正确的 echostr 解密。""" echostr = "verify_token_12345" encrypted = crypto.encrypt(echostr) timestamp = "1234567890" nonce = "test_nonce" signature = crypto.generate_signature(timestamp, nonce, encrypted) result = crypto.decrypt_echostr(signature, timestamp, nonce, encrypted) assert result == echostr def test_decrypt_echostr_invalid_signature(self, crypto): """验证签名错误时 echostr 解密失败。""" encrypted = crypto.encrypt("test") with pytest.raises(ValueError, match="回调URL验证签名失败"): crypto.decrypt_echostr("wrong_sig", "ts", "nonce", encrypted)