# ============================================================================= # 企微IT智能服务台 — 高危操作演示 API # ============================================================================= # Phase 1.3 task #19: 高危操作路由白名单 + 中间件演示 # 决策来源:otm-secondary-auth.md(2026-06-21) # # 设计原则: # 本文件只演示 require_high_risk_otp 依赖的用法,不重复实现业务。 # 实际业务端点(admin_rbac.py / admin_api.py)在后续 worktree 中追加 # Depends(require_high_risk_otp) 即可生效。 # # 演示端点: # POST /api/admin/high-risk/demo/{category} — 用 5 个 category 各跑一遍 # GET /api/admin/high-risk/whitelist — 获取白名单(前端文档化用) # GET /api/admin/high-risk/check — 检查当前管理员 OTP 状态 # # 鉴权: # - demo/{category}: 需 admin 角色 + 30 分钟内 OTP 验证 # - whitelist: 仅 admin 角色(不需要 OTP,纯查询) # - check: 仅 admin 角色(不需要 OTP,纯查询自己状态) # # 错误码: # 2001 = 高危操作需要 OTP 二次验证 # 4003 = 仅管理员可执行此操作 # 4000 = 未知的高危操作类别 # ============================================================================= import logging from typing import Any, Dict from fastapi import APIRouter, Depends from app.dependencies import ( HIGH_RISK_OPERATIONS, UserInfo, require_high_risk_otp, ) from app.services.high_risk_guard import HighRiskGuard from app.utils.response import AppException, success_response logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # 路由器 # ----------------------------------------------------------------------------- # prefix: /admin/high-risk # 完整路径前缀: /api/admin/high-risk # ----------------------------------------------------------------------------- router = APIRouter(prefix="/admin/high-risk") # ----------------------------------------------------------------------------- # 演示端点 1: POST /api/admin/high-risk/demo/{category} # ----------------------------------------------------------------------------- @router.post( "/demo/{category}", summary="演示高危操作 OTP 守卫", description=( "展示 5 类高危操作(role_change / config_change / data_export / " "account_disable / account_create_reset)的 OTP 守卫流程。

" "调用此端点时,如果当前管理员 30 分钟内没在 /api/mfa/verify 过 OTP," "会返回错误码 2001,前端应弹 OTP 输入框 → 调 /api/mfa/verify → 重试。" ), ) async def demo_high_risk_op( category: str, current_user: UserInfo = Depends(require_high_risk_otp), ) -> Dict[str, Any]: """演示:展示高危操作 OTP 守卫。 触发流程: 1. 前端调 POST /api/admin/high-risk/demo/role_change 2. require_high_risk_otp 依赖先跑: a. 检查 admin 角色(否则 4003) b. 检查 Redis mfa:verified:{employee_id}(否则 2001) 3. 通过守卫 → 返回 success Args: category: 5 类之一 (role_change / config_change / data_export / account_disable / account_create_reset) current_user: 当前管理员(依赖自动注入) Returns: Dict: 演示结果 Raises: AppException(4000): 未知的高危操作类别 AppException(4003): 非 admin 角色(来自 require_high_risk_otp) AppException(2001): 未在 30 分钟内过 OTP(来自 require_high_risk_otp) """ # 第 1 关:类别校验 if category not in HIGH_RISK_OPERATIONS: valid_categories = ", ".join(HIGH_RISK_OPERATIONS.keys()) raise AppException( code=4000, message=f"未知的高危操作类别: {category}。合法值: {valid_categories}", ) # 第 2 关:模拟执行(不真正改数据,只演示守卫通过) op_meta = HIGH_RISK_OPERATIONS[category] logger.info( f"演示高危操作 {category} 执行: " f"employee_id={current_user.employee_id}, " f"category={op_meta['category']}" ) return success_response( data={ "category": category, "operation": op_meta, "executed_by": current_user.employee_id, "executed_by_name": current_user.name, "message": ( f"演示操作 [{op_meta['category']}/{category}] 已通过 OTP 守卫" ), "note": "本端点仅演示 OTP 守卫流程,不实际修改数据", }, ) # ----------------------------------------------------------------------------- # 演示端点 2: GET /api/admin/high-risk/whitelist # ----------------------------------------------------------------------------- @router.get( "/whitelist", summary="获取高危操作白名单", description="返回 5 类高危操作的元数据,供前端文档化展示。", ) async def get_whitelist( current_user: UserInfo = Depends(require_high_risk_otp), ) -> Dict[str, Any]: """获取 5 类高危操作白名单。 注意:此端点也加 require_high_risk_otp,因为白名单本身属于敏感元数据。 实际生产中可改为仅 require_admin,降低前端文档加载的复杂度。 这里为了演示一致性,统一加 OTP 守卫。 Args: current_user: 当前管理员(依赖自动注入) Returns: Dict: 白名单 + 分类元数据 """ return success_response( data={ "whitelist": HighRiskGuard.get_whitelist(), "total_categories": len(HighRiskGuard.list_categories()), "categories": HighRiskGuard.list_categories(), "ttl_seconds": HighRiskGuard.DEFAULT_TTL_SECONDS, "ttl_human": "30 分钟", }, ) # ----------------------------------------------------------------------------- # 演示端点 3: GET /api/admin/high-risk/check # ----------------------------------------------------------------------------- @router.get( "/check", summary="检查当前管理员 OTP 验证状态", description=( "查询当前管理员是否在 30 分钟内通过过 OTP。" "前端在弹 OTP 输入框前先调一次此端点,如果已验证就不弹。" ), ) async def check_otp_status( current_user: UserInfo = Depends(require_high_risk_otp), ) -> Dict[str, Any]: """检查当前管理员 OTP 验证状态。 用途:前端可在做高危操作前先调此端点决定要不要弹 OTP 输入框。 Args: current_user: 当前管理员(依赖自动注入) Returns: Dict: 验证状态 """ # 注:能进到这里说明 require_high_risk_otp 已经检查过 Redis, # 这里再用 service 查一次拿详细信息(method/verified_at) # 由于没有 redis_client 直接传入,这里返回简化结果 return success_response( data={ "employee_id": current_user.employee_id, "is_verified": True, # 已经通过守卫 = verified "message": "当前管理员 OTP 已验证,可以执行高危操作", "note": "本端点本身需要 OTP 守卫,所以必然返回 is_verified=True", }, )