Files
wecom_it_smart_desk/backend/tests/test_high_risk_guard.py
T

435 lines
16 KiB
Python
Raw Normal View History

# =============================================================================
# 企微IT智能服务台 — 高危操作守卫测试
# =============================================================================
# Phase 1.3 task #19
# 测试覆盖(对应需求文档的 5 条测试用例):
# 1. admin 角色,30 分钟内没验 OTP → 调 high-risk 端点 → 失败(2001)
# 2. admin 角色,30 分钟内验过 OTP → 调 high-risk 端点 → 成功
# 3. agent 角色(不是 admin) → 调 high-risk 端点 → 失败(4003)
# 4. 错误类别参数 → 失败(4000)
# 5. 5 个高危类别各调一次 → 全部成功
#
# 关键设计:
# - 用 TokenService 直接创建测试 token(不走企微回调)
# - 用 mock_redis fixture(已在 conftest 提供)
# - 直接操作 mock_redis 模拟 mfa:verified:{employee_id} key
#
# autouse fixture reset_redis_pool 说明:
# app.dependencies._redis_pool 是模块级单例,会在第一次 get_redis() 后缓存。
# 跨测试运行时,第 2 个测试的 mock_redis 跟 app 用的是不同实例 →
# token 写在 test 的 mock_redis,app 读的是上一个 test 的 mock_redis → 401。
# 解决:每个 test 跑前清空 _redis_pool,强制下次 get_redis() 用新 mock_redis。
# =============================================================================
import json
import pytest
import pytest_asyncio
import app.dependencies as _deps
from app.dependencies import HIGH_RISK_OPERATIONS, MFA_VERIFIED_KEY_PREFIX
from app.services.high_risk_guard import (
HIGH_RISK_OPERATIONS_WHITELIST,
HighRiskGuard,
)
from app.services.token_service import TokenService, UNIFIED_TOKEN_PREFIX
# =============================================================================
# autouse fixture: 每个测试前重置 app.dependencies._redis_pool
# =============================================================================
@pytest.fixture(autouse=True)
def reset_redis_pool():
"""每个测试前重置 app.dependencies._redis_pool 单例。
原因: conftest 的 client fixture patch redis.asyncio.from_url,
但 app.dependencies._redis_pool 会缓存第一次的返回值,跨测试会错位。
重置后下次 get_redis() 重新走 from_url 拿当前 test 的 mock_redis。
"""
_deps._redis_pool = None
yield
_deps._redis_pool = None
# =============================================================================
# 测试辅助函数
# =============================================================================
async def create_admin_token(mock_redis, employee_id: str = "admin_test_001") -> str:
"""创建 admin 角色的测试 token(不走企微回调)。
Args:
mock_redis: conftest 提供的 MockRedis 实例
employee_id: 企微 UserID
Returns:
str: token 字符串
"""
token_service = TokenService(mock_redis)
token = await token_service.create_token(
employee_id=employee_id,
name=f"管理员{employee_id}",
roles=["user", "admin"],
department="技术部",
login_source="agent",
)
return token
async def create_agent_token(mock_redis, employee_id: str = "agent_test_001") -> str:
"""创建 agent 角色的测试 token(不走企微回调)。
Args:
mock_redis: conftest 提供的 MockRedis 实例
employee_id: 企微 UserID
Returns:
str: token 字符串
"""
token_service = TokenService(mock_redis)
token = await token_service.create_token(
employee_id=employee_id,
name=f"坐席{employee_id}",
roles=["user", "agent"],
department="技术部",
login_source="agent",
)
return token
async def mark_otp_verified(mock_redis, employee_id: str) -> None:
"""模拟管理员通过 OTP 验证(直接写 Redis key)。
Args:
mock_redis: MockRedis 实例
employee_id: 企微 UserID
"""
key = f"{MFA_VERIFIED_KEY_PREFIX}{employee_id}"
value = json.dumps({"method": "totp", "verified_at": "2026-06-21T15:30:00"})
await mock_redis.setex(key, 1800, value)
# =============================================================================
# 测试类
# =============================================================================
class TestHighRiskGuardRequireOTP:
"""测试 require_high_risk_otp 守卫依赖。"""
@pytest.mark.asyncio
async def test_admin_without_otp_returns_2001(
self, client, db_session, mock_redis
):
"""用例 1:admin 角色,30 分钟内没验 OTP → 调 high-risk 端点 → 失败(2001)。
验证点:
- HTTP 200(业务错误通过 code 区分)
- code == 2001
- message 含 "OTP"
"""
# 准备:admin token,但 Redis 没有 mfa:verified key
token = await create_admin_token(mock_redis, "admin_no_otp")
# 显式确保没有 OTP key
await mock_redis.delete(f"{MFA_VERIFIED_KEY_PREFIX}admin_no_otp")
response = await client.post(
"/admin/high-risk/demo/role_change",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 2001, f"预期 2001 实际 {data['code']}: {data}"
assert "OTP" in data["message"] or "otp" in data["message"].lower()
@pytest.mark.asyncio
async def test_admin_with_otp_returns_success(
self, client, db_session, mock_redis
):
"""用例 2:admin 角色,30 分钟内验过 OTP → 调 high-risk 端点 → 成功。
验证点:
- code == 0
- data.category == "role_change"
- data.executed_by == "admin_with_otp"
"""
# 准备:admin token + 标记 OTP 验证通过
token = await create_admin_token(mock_redis, "admin_with_otp")
await mark_otp_verified(mock_redis, "admin_with_otp")
response = await client.post(
"/admin/high-risk/demo/role_change",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 0, f"预期 0 实际 {data['code']}: {data}"
assert data["data"]["category"] == "role_change"
assert data["data"]["executed_by"] == "admin_with_otp"
assert data["data"]["operation"]["category"] == "改权限"
@pytest.mark.asyncio
async def test_agent_role_returns_4003(
self, client, db_session, mock_redis
):
"""用例 3agent 角色(不是 admin) → 调 high-risk 端点 → 失败(4003)。
验证点:
- 即便有 OTP keyagent 角色也会被拒
- code == 4003
"""
# 准备:agent token + 即便 mark 了 OTP 也应被拒
token = await create_agent_token(mock_redis, "agent_no_admin")
await mark_otp_verified(mock_redis, "agent_no_admin")
response = await client.post(
"/admin/high-risk/demo/role_change",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 4003, f"预期 4003 实际 {data['code']}: {data}"
assert "管理员" in data["message"] or "admin" in data["message"].lower()
@pytest.mark.asyncio
async def test_invalid_category_returns_4000(
self, client, db_session, mock_redis
):
"""用例 4:错误类别参数 → 失败(4000)。
验证点:
- 即使 admin + OTP 通过守卫,错误 category 仍然 4000
- 验证顺序:守卫通过 → 然后才是 category 校验
"""
# 准备:admin token + OTP
token = await create_admin_token(mock_redis, "admin_bad_cat")
await mark_otp_verified(mock_redis, "admin_bad_cat")
response = await client.post(
"/admin/high-risk/demo/invalid_category_xyz",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 4000, f"预期 4000 实际 {data['code']}: {data}"
assert "未知" in data["message"] or "invalid" in data["message"].lower()
@pytest.mark.asyncio
@pytest.mark.parametrize(
"category",
[
"role_change",
"config_change",
"data_export",
"account_disable",
"account_create_reset",
],
)
async def test_all_five_categories_pass(
self, client, db_session, mock_redis, category
):
"""用例 5:5 个高危类别各调一次 → 全部成功。
验证点:
- 每个 category 都返回 code == 0
- data.category == 请求的 category
- data.operation.category 是中文类目
"""
# 准备:admin token + OTP(每个 category 用一个独立 admin,避免 Redis 干扰)
employee_id = f"admin_cat_{category}"
token = await create_admin_token(mock_redis, employee_id)
await mark_otp_verified(mock_redis, employee_id)
response = await client.post(
f"/admin/high-risk/demo/{category}",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
data = response.json()
assert data["code"] == 0, (
f"category={category} 预期 0 实际 {data['code']}: {data}"
)
assert data["data"]["category"] == category
# 中文类目不应为空
assert data["data"]["operation"]["category"]
# =============================================================================
# HighRiskGuard service 单元测试
# =============================================================================
class TestHighRiskGuardService:
"""测试 HighRiskGuard 服务类的读写功能。"""
@pytest.mark.asyncio
async def test_mark_verified_writes_redis(self, mock_redis):
"""验证 mark_verified 写入了正确的 Redis key 和 TTL。"""
guard = HighRiskGuard(mock_redis, ttl_seconds=1800)
result = await guard.mark_verified("user_001", method="totp")
assert result is True
# 验证 Redis key 存在
stored = await mock_redis.get(guard._key("user_001"))
assert stored is not None
# 验证 value 是 JSON
info = json.loads(stored)
assert info["method"] == "totp"
assert "verified_at" in info
@pytest.mark.asyncio
async def test_is_verified_true_when_key_exists(self, mock_redis):
"""验证 is_verified 在 key 存在时返回 True。"""
guard = HighRiskGuard(mock_redis)
await guard.mark_verified("user_002")
assert await guard.is_verified("user_002") is True
@pytest.mark.asyncio
async def test_is_verified_false_when_key_missing(self, mock_redis):
"""验证 is_verified 在 key 不存在时返回 False。"""
guard = HighRiskGuard(mock_redis)
assert await guard.is_verified("never_verified_user") is False
@pytest.mark.asyncio
async def test_revoke_removes_key(self, mock_redis):
"""验证 revoke 删除 Redis key。"""
guard = HighRiskGuard(mock_redis)
await guard.mark_verified("user_003")
# 验证存在
assert await guard.is_verified("user_003") is True
# 撤销
result = await guard.revoke("user_003")
assert result is True
# 验证已删除
assert await guard.is_verified("user_003") is False
@pytest.mark.asyncio
async def test_get_verification_info_returns_dict(self, mock_redis):
"""验证 get_verification_info 返回包含 method/verified_at 的 dict。"""
guard = HighRiskGuard(mock_redis)
await guard.mark_verified("user_004", method="sms_backup")
info = await guard.get_verification_info("user_004")
assert info is not None
assert info["method"] == "sms_backup"
assert "verified_at" in info
@pytest.mark.asyncio
async def test_refresh_ttl_only_when_key_exists(self, mock_redis):
"""验证 refresh_ttl 在 key 不存在时返回 False(不误创建)。"""
guard = HighRiskGuard(mock_redis)
# 不存在时刷新应失败
result = await guard.refresh_ttl("never_verified")
assert result is False
# 存在时刷新应成功
await guard.mark_verified("user_005")
result = await guard.refresh_ttl("user_005")
assert result is True
class TestHighRiskGuardWhitelist:
"""测试白名单静态方法。"""
def test_whitelist_has_5_categories(self):
"""白名单必须恰好 5 类。"""
whitelist = HighRiskGuard.get_whitelist()
assert len(whitelist) == 5
def test_whitelist_matches_dependencies(self):
"""service 白名单必须与 dependencies HIGH_RISK_OPERATIONS 一致。"""
assert (
HIGH_RISK_OPERATIONS_WHITELIST.keys() == HIGH_RISK_OPERATIONS.keys()
)
@pytest.mark.parametrize(
"category",
["role_change", "config_change", "data_export",
"account_disable", "account_create_reset"],
)
def test_is_valid_category(self, category):
"""5 类全部合法。"""
assert HighRiskGuard.is_valid_category(category) is True
def test_invalid_category_rejected(self):
"""非法 category 被拒。"""
assert HighRiskGuard.is_valid_category("random_xyz") is False
def test_list_categories_returns_5(self):
"""list_categories 返回 5 项。"""
cats = HighRiskGuard.list_categories()
assert len(cats) == 5
assert "role_change" in cats
assert "config_change" in cats
class TestHighRiskRoutes:
"""测试 /admin/high-risk/* 演示端点的边界情况。"""
@pytest.mark.asyncio
async def test_whitelist_endpoint_requires_admin(
self, client, db_session, mock_redis
):
"""whitelist 端点也走 OTP 守卫,agent 角色应被拒(4003)。"""
token = await create_agent_token(mock_redis, "agent_list")
await mark_otp_verified(mock_redis, "agent_list")
response = await client.get(
"/admin/high-risk/whitelist",
headers={"Authorization": f"Bearer {token}"},
)
data = response.json()
assert data["code"] == 4003
@pytest.mark.asyncio
async def test_whitelist_endpoint_with_admin_otp(
self, client, db_session, mock_redis
):
"""whitelist 端点在 admin + OTP 情况下返回 5 类清单。"""
token = await create_admin_token(mock_redis, "admin_list")
await mark_otp_verified(mock_redis, "admin_list")
response = await client.get(
"/admin/high-risk/whitelist",
headers={"Authorization": f"Bearer {token}"},
)
data = response.json()
assert data["code"] == 0
assert data["data"]["total_categories"] == 5
assert len(data["data"]["categories"]) == 5
assert data["data"]["ttl_seconds"] == 1800
@pytest.mark.asyncio
async def test_no_token_returns_403(self, client, db_session, mock_redis):
"""无 token 调 high-risk 端点应返回 403HTTPBearer 自动拒绝)。
注: FastAPI HTTPBearer 在缺少 header 时返回 403 Forbidden,
与无效 token 时的 401 不同。这是 FastAPI/Starlette 默认行为。
"""
# 注: HTTPException 由 FastAPI 直接返回,不经过 AppExceptionHandler
response = await client.post("/admin/high-risk/demo/role_change")
assert response.status_code == 403
@pytest.mark.asyncio
async def test_invalid_token_returns_401(self, client, db_session, mock_redis):
"""无效 token 调 high-risk 端点应返回 401。"""
response = await client.post(
"/admin/high-risk/demo/role_change",
headers={"Authorization": "Bearer invalid_token_xxx"},
)
assert response.status_code == 401