# ============================================================================= # 企微IT智能服务台 — 高危操作守卫测试 # ============================================================================= # Phase 1.3 task #19 # 测试覆盖(对应需求文档的 5 条测试用例): # 1. admin 角色,30 分钟内没验 OTP → 调 high-risk 端点 → 失败(2001) # 2. admin 角色,30 分钟内验过 OTP → 调 high-risk 端点 → 成功 # 3. agent 角色(不是 admin) → 调 high-risk 端点 → 失败(4003) # 4. 错误类别参数 → 失败(4000) # 5. 5 个高危类别各调一次 → 全部成功 # # 关键设计: # - 用 TokenService 直接创建测试 token(不走企微回调) # - 用 mock_redis fixture(已在 conftest 提供) # - 直接操作 mock_redis 模拟 mfa:verified:{employee_id} key # # autouse fixture reset_redis_pool 说明: # app.dependencies._redis_pool 是模块级单例,会在第一次 get_redis() 后缓存。 # 跨测试运行时,第 2 个测试的 mock_redis 跟 app 用的是不同实例 → # token 写在 test 的 mock_redis,app 读的是上一个 test 的 mock_redis → 401。 # 解决:每个 test 跑前清空 _redis_pool,强制下次 get_redis() 用新 mock_redis。 # ============================================================================= import json import pytest import pytest_asyncio import app.dependencies as _deps from app.dependencies import HIGH_RISK_OPERATIONS, MFA_VERIFIED_KEY_PREFIX from app.services.high_risk_guard import ( HIGH_RISK_OPERATIONS_WHITELIST, HighRiskGuard, ) from app.services.token_service import TokenService, UNIFIED_TOKEN_PREFIX # ============================================================================= # autouse fixture: 每个测试前重置 app.dependencies._redis_pool # ============================================================================= @pytest.fixture(autouse=True) def reset_redis_pool(): """每个测试前重置 app.dependencies._redis_pool 单例。 原因: conftest 的 client fixture patch redis.asyncio.from_url, 但 app.dependencies._redis_pool 会缓存第一次的返回值,跨测试会错位。 重置后下次 get_redis() 重新走 from_url 拿当前 test 的 mock_redis。 """ _deps._redis_pool = None yield _deps._redis_pool = None # ============================================================================= # 测试辅助函数 # ============================================================================= async def create_admin_token(mock_redis, employee_id: str = "admin_test_001") -> str: """创建 admin 角色的测试 token(不走企微回调)。 Args: mock_redis: conftest 提供的 MockRedis 实例 employee_id: 企微 UserID Returns: str: token 字符串 """ token_service = TokenService(mock_redis) token = await token_service.create_token( employee_id=employee_id, name=f"管理员{employee_id}", roles=["user", "admin"], department="技术部", login_source="agent", ) return token async def create_agent_token(mock_redis, employee_id: str = "agent_test_001") -> str: """创建 agent 角色的测试 token(不走企微回调)。 Args: mock_redis: conftest 提供的 MockRedis 实例 employee_id: 企微 UserID Returns: str: token 字符串 """ token_service = TokenService(mock_redis) token = await token_service.create_token( employee_id=employee_id, name=f"坐席{employee_id}", roles=["user", "agent"], department="技术部", login_source="agent", ) return token async def mark_otp_verified(mock_redis, employee_id: str) -> None: """模拟管理员通过 OTP 验证(直接写 Redis key)。 Args: mock_redis: MockRedis 实例 employee_id: 企微 UserID """ key = f"{MFA_VERIFIED_KEY_PREFIX}{employee_id}" value = json.dumps({"method": "totp", "verified_at": "2026-06-21T15:30:00"}) await mock_redis.setex(key, 1800, value) # ============================================================================= # 测试类 # ============================================================================= class TestHighRiskGuardRequireOTP: """测试 require_high_risk_otp 守卫依赖。""" @pytest.mark.asyncio async def test_admin_without_otp_returns_2001( self, client, db_session, mock_redis ): """用例 1:admin 角色,30 分钟内没验 OTP → 调 high-risk 端点 → 失败(2001)。 验证点: - HTTP 200(业务错误通过 code 区分) - code == 2001 - message 含 "OTP" """ # 准备:admin token,但 Redis 没有 mfa:verified key token = await create_admin_token(mock_redis, "admin_no_otp") # 显式确保没有 OTP key await mock_redis.delete(f"{MFA_VERIFIED_KEY_PREFIX}admin_no_otp") response = await client.post( "/admin/high-risk/demo/role_change", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 data = response.json() assert data["code"] == 2001, f"预期 2001 实际 {data['code']}: {data}" assert "OTP" in data["message"] or "otp" in data["message"].lower() @pytest.mark.asyncio async def test_admin_with_otp_returns_success( self, client, db_session, mock_redis ): """用例 2:admin 角色,30 分钟内验过 OTP → 调 high-risk 端点 → 成功。 验证点: - code == 0 - data.category == "role_change" - data.executed_by == "admin_with_otp" """ # 准备:admin token + 标记 OTP 验证通过 token = await create_admin_token(mock_redis, "admin_with_otp") await mark_otp_verified(mock_redis, "admin_with_otp") response = await client.post( "/admin/high-risk/demo/role_change", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 data = response.json() assert data["code"] == 0, f"预期 0 实际 {data['code']}: {data}" assert data["data"]["category"] == "role_change" assert data["data"]["executed_by"] == "admin_with_otp" assert data["data"]["operation"]["category"] == "改权限" @pytest.mark.asyncio async def test_agent_role_returns_4003( self, client, db_session, mock_redis ): """用例 3:agent 角色(不是 admin) → 调 high-risk 端点 → 失败(4003)。 验证点: - 即便有 OTP key,agent 角色也会被拒 - code == 4003 """ # 准备:agent token + 即便 mark 了 OTP 也应被拒 token = await create_agent_token(mock_redis, "agent_no_admin") await mark_otp_verified(mock_redis, "agent_no_admin") response = await client.post( "/admin/high-risk/demo/role_change", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 data = response.json() assert data["code"] == 4003, f"预期 4003 实际 {data['code']}: {data}" assert "管理员" in data["message"] or "admin" in data["message"].lower() @pytest.mark.asyncio async def test_invalid_category_returns_4000( self, client, db_session, mock_redis ): """用例 4:错误类别参数 → 失败(4000)。 验证点: - 即使 admin + OTP 通过守卫,错误 category 仍然 4000 - 验证顺序:守卫通过 → 然后才是 category 校验 """ # 准备:admin token + OTP token = await create_admin_token(mock_redis, "admin_bad_cat") await mark_otp_verified(mock_redis, "admin_bad_cat") response = await client.post( "/admin/high-risk/demo/invalid_category_xyz", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 data = response.json() assert data["code"] == 4000, f"预期 4000 实际 {data['code']}: {data}" assert "未知" in data["message"] or "invalid" in data["message"].lower() @pytest.mark.asyncio @pytest.mark.parametrize( "category", [ "role_change", "config_change", "data_export", "account_disable", "account_create_reset", ], ) async def test_all_five_categories_pass( self, client, db_session, mock_redis, category ): """用例 5:5 个高危类别各调一次 → 全部成功。 验证点: - 每个 category 都返回 code == 0 - data.category == 请求的 category - data.operation.category 是中文类目 """ # 准备:admin token + OTP(每个 category 用一个独立 admin,避免 Redis 干扰) employee_id = f"admin_cat_{category}" token = await create_admin_token(mock_redis, employee_id) await mark_otp_verified(mock_redis, employee_id) response = await client.post( f"/admin/high-risk/demo/{category}", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 200 data = response.json() assert data["code"] == 0, ( f"category={category} 预期 0 实际 {data['code']}: {data}" ) assert data["data"]["category"] == category # 中文类目不应为空 assert data["data"]["operation"]["category"] # ============================================================================= # HighRiskGuard service 单元测试 # ============================================================================= class TestHighRiskGuardService: """测试 HighRiskGuard 服务类的读写功能。""" @pytest.mark.asyncio async def test_mark_verified_writes_redis(self, mock_redis): """验证 mark_verified 写入了正确的 Redis key 和 TTL。""" guard = HighRiskGuard(mock_redis, ttl_seconds=1800) result = await guard.mark_verified("user_001", method="totp") assert result is True # 验证 Redis key 存在 stored = await mock_redis.get(guard._key("user_001")) assert stored is not None # 验证 value 是 JSON info = json.loads(stored) assert info["method"] == "totp" assert "verified_at" in info @pytest.mark.asyncio async def test_is_verified_true_when_key_exists(self, mock_redis): """验证 is_verified 在 key 存在时返回 True。""" guard = HighRiskGuard(mock_redis) await guard.mark_verified("user_002") assert await guard.is_verified("user_002") is True @pytest.mark.asyncio async def test_is_verified_false_when_key_missing(self, mock_redis): """验证 is_verified 在 key 不存在时返回 False。""" guard = HighRiskGuard(mock_redis) assert await guard.is_verified("never_verified_user") is False @pytest.mark.asyncio async def test_revoke_removes_key(self, mock_redis): """验证 revoke 删除 Redis key。""" guard = HighRiskGuard(mock_redis) await guard.mark_verified("user_003") # 验证存在 assert await guard.is_verified("user_003") is True # 撤销 result = await guard.revoke("user_003") assert result is True # 验证已删除 assert await guard.is_verified("user_003") is False @pytest.mark.asyncio async def test_get_verification_info_returns_dict(self, mock_redis): """验证 get_verification_info 返回包含 method/verified_at 的 dict。""" guard = HighRiskGuard(mock_redis) await guard.mark_verified("user_004", method="sms_backup") info = await guard.get_verification_info("user_004") assert info is not None assert info["method"] == "sms_backup" assert "verified_at" in info @pytest.mark.asyncio async def test_refresh_ttl_only_when_key_exists(self, mock_redis): """验证 refresh_ttl 在 key 不存在时返回 False(不误创建)。""" guard = HighRiskGuard(mock_redis) # 不存在时刷新应失败 result = await guard.refresh_ttl("never_verified") assert result is False # 存在时刷新应成功 await guard.mark_verified("user_005") result = await guard.refresh_ttl("user_005") assert result is True class TestHighRiskGuardWhitelist: """测试白名单静态方法。""" def test_whitelist_has_5_categories(self): """白名单必须恰好 5 类。""" whitelist = HighRiskGuard.get_whitelist() assert len(whitelist) == 5 def test_whitelist_matches_dependencies(self): """service 白名单必须与 dependencies HIGH_RISK_OPERATIONS 一致。""" assert ( HIGH_RISK_OPERATIONS_WHITELIST.keys() == HIGH_RISK_OPERATIONS.keys() ) @pytest.mark.parametrize( "category", ["role_change", "config_change", "data_export", "account_disable", "account_create_reset"], ) def test_is_valid_category(self, category): """5 类全部合法。""" assert HighRiskGuard.is_valid_category(category) is True def test_invalid_category_rejected(self): """非法 category 被拒。""" assert HighRiskGuard.is_valid_category("random_xyz") is False def test_list_categories_returns_5(self): """list_categories 返回 5 项。""" cats = HighRiskGuard.list_categories() assert len(cats) == 5 assert "role_change" in cats assert "config_change" in cats class TestHighRiskRoutes: """测试 /admin/high-risk/* 演示端点的边界情况。""" @pytest.mark.asyncio async def test_whitelist_endpoint_requires_admin( self, client, db_session, mock_redis ): """whitelist 端点也走 OTP 守卫,agent 角色应被拒(4003)。""" token = await create_agent_token(mock_redis, "agent_list") await mark_otp_verified(mock_redis, "agent_list") response = await client.get( "/admin/high-risk/whitelist", headers={"Authorization": f"Bearer {token}"}, ) data = response.json() assert data["code"] == 4003 @pytest.mark.asyncio async def test_whitelist_endpoint_with_admin_otp( self, client, db_session, mock_redis ): """whitelist 端点在 admin + OTP 情况下返回 5 类清单。""" token = await create_admin_token(mock_redis, "admin_list") await mark_otp_verified(mock_redis, "admin_list") response = await client.get( "/admin/high-risk/whitelist", headers={"Authorization": f"Bearer {token}"}, ) data = response.json() assert data["code"] == 0 assert data["data"]["total_categories"] == 5 assert len(data["data"]["categories"]) == 5 assert data["data"]["ttl_seconds"] == 1800 @pytest.mark.asyncio async def test_no_token_returns_403(self, client, db_session, mock_redis): """无 token 调 high-risk 端点应返回 403(HTTPBearer 自动拒绝)。 注: FastAPI HTTPBearer 在缺少 header 时返回 403 Forbidden, 与无效 token 时的 401 不同。这是 FastAPI/Starlette 默认行为。 """ # 注: HTTPException 由 FastAPI 直接返回,不经过 AppExceptionHandler response = await client.post("/admin/high-risk/demo/role_change") assert response.status_code == 403 @pytest.mark.asyncio async def test_invalid_token_returns_401(self, client, db_session, mock_redis): """无效 token 调 high-risk 端点应返回 401。""" response = await client.post( "/admin/high-risk/demo/role_change", headers={"Authorization": "Bearer invalid_token_xxx"}, ) assert response.status_code == 401