Files
wecom_it_smart_desk/backend/app/api/high_risk_routes.py
T

191 lines
7.2 KiB
Python
Raw Normal View History

# =============================================================================
# 企微IT智能服务台 — 高危操作演示 API
# =============================================================================
# Phase 1.3 task #19: 高危操作路由白名单 + 中间件演示
# 决策来源:otm-secondary-auth.md2026-06-21
#
# 设计原则:
# 本文件只演示 require_high_risk_otp 依赖的用法,不重复实现业务。
# 实际业务端点(admin_rbac.py / admin_api.py)在后续 worktree 中追加
# Depends(require_high_risk_otp) 即可生效。
#
# 演示端点:
# POST /api/admin/high-risk/demo/{category} — 用 5 个 category 各跑一遍
# GET /api/admin/high-risk/whitelist — 获取白名单(前端文档化用)
# GET /api/admin/high-risk/check — 检查当前管理员 OTP 状态
#
# 鉴权:
# - demo/{category}: 需 admin 角色 + 30 分钟内 OTP 验证
# - whitelist: 仅 admin 角色(不需要 OTP,纯查询)
# - check: 仅 admin 角色(不需要 OTP,纯查询自己状态)
#
# 错误码:
# 2001 = 高危操作需要 OTP 二次验证
# 4003 = 仅管理员可执行此操作
# 4000 = 未知的高危操作类别
# =============================================================================
import logging
from typing import Any, Dict
from fastapi import APIRouter, Depends
from app.dependencies import (
HIGH_RISK_OPERATIONS,
UserInfo,
require_high_risk_otp,
)
from app.services.high_risk_guard import HighRiskGuard
from app.utils.response import AppException, success_response
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# 路由器
# -----------------------------------------------------------------------------
# prefix: /admin/high-risk
# 完整路径前缀: /api/admin/high-risk
# -----------------------------------------------------------------------------
router = APIRouter(prefix="/admin/high-risk")
# -----------------------------------------------------------------------------
# 演示端点 1: POST /api/admin/high-risk/demo/{category}
# -----------------------------------------------------------------------------
@router.post(
"/demo/{category}",
summary="演示高危操作 OTP 守卫",
description=(
"展示 5 类高危操作(role_change / config_change / data_export / "
"account_disable / account_create_reset)的 OTP 守卫流程。<br><br>"
"调用此端点时,如果当前管理员 30 分钟内没在 /api/mfa/verify 过 OTP,"
"会返回错误码 2001,前端应弹 OTP 输入框 → 调 /api/mfa/verify → 重试。"
),
)
async def demo_high_risk_op(
category: str,
current_user: UserInfo = Depends(require_high_risk_otp),
) -> Dict[str, Any]:
"""演示:展示高危操作 OTP 守卫。
触发流程:
1. 前端调 POST /api/admin/high-risk/demo/role_change
2. require_high_risk_otp 依赖先跑:
a. 检查 admin 角色(否则 4003
b. 检查 Redis mfa:verified:{employee_id}(否则 2001
3. 通过守卫 → 返回 success
Args:
category: 5 类之一 (role_change / config_change / data_export /
account_disable / account_create_reset)
current_user: 当前管理员(依赖自动注入)
Returns:
Dict: 演示结果
Raises:
AppException(4000): 未知的高危操作类别
AppException(4003): 非 admin 角色(来自 require_high_risk_otp
AppException(2001): 未在 30 分钟内过 OTP(来自 require_high_risk_otp
"""
# 第 1 关:类别校验
if category not in HIGH_RISK_OPERATIONS:
valid_categories = ", ".join(HIGH_RISK_OPERATIONS.keys())
raise AppException(
code=4000,
message=f"未知的高危操作类别: {category}。合法值: {valid_categories}",
)
# 第 2 关:模拟执行(不真正改数据,只演示守卫通过)
op_meta = HIGH_RISK_OPERATIONS[category]
logger.info(
f"演示高危操作 {category} 执行: "
f"employee_id={current_user.employee_id}, "
f"category={op_meta['category']}"
)
return success_response(
data={
"category": category,
"operation": op_meta,
"executed_by": current_user.employee_id,
"executed_by_name": current_user.name,
"message": (
f"演示操作 [{op_meta['category']}/{category}] 已通过 OTP 守卫"
),
"note": "本端点仅演示 OTP 守卫流程,不实际修改数据",
},
)
# -----------------------------------------------------------------------------
# 演示端点 2: GET /api/admin/high-risk/whitelist
# -----------------------------------------------------------------------------
@router.get(
"/whitelist",
summary="获取高危操作白名单",
description="返回 5 类高危操作的元数据,供前端文档化展示。",
)
async def get_whitelist(
current_user: UserInfo = Depends(require_high_risk_otp),
) -> Dict[str, Any]:
"""获取 5 类高危操作白名单。
注意:此端点也加 require_high_risk_otp,因为白名单本身属于敏感元数据。
实际生产中可改为仅 require_admin,降低前端文档加载的复杂度。
这里为了演示一致性,统一加 OTP 守卫。
Args:
current_user: 当前管理员(依赖自动注入)
Returns:
Dict: 白名单 + 分类元数据
"""
return success_response(
data={
"whitelist": HighRiskGuard.get_whitelist(),
"total_categories": len(HighRiskGuard.list_categories()),
"categories": HighRiskGuard.list_categories(),
"ttl_seconds": HighRiskGuard.DEFAULT_TTL_SECONDS,
"ttl_human": "30 分钟",
},
)
# -----------------------------------------------------------------------------
# 演示端点 3: GET /api/admin/high-risk/check
# -----------------------------------------------------------------------------
@router.get(
"/check",
summary="检查当前管理员 OTP 验证状态",
description=(
"查询当前管理员是否在 30 分钟内通过过 OTP。"
"前端在弹 OTP 输入框前先调一次此端点,如果已验证就不弹。"
),
)
async def check_otp_status(
current_user: UserInfo = Depends(require_high_risk_otp),
) -> Dict[str, Any]:
"""检查当前管理员 OTP 验证状态。
用途:前端可在做高危操作前先调此端点决定要不要弹 OTP 输入框。
Args:
current_user: 当前管理员(依赖自动注入)
Returns:
Dict: 验证状态
"""
# 注:能进到这里说明 require_high_risk_otp 已经检查过 Redis,
# 这里再用 service 查一次拿详细信息(method/verified_at)
# 由于没有 redis_client 直接传入,这里返回简化结果
return success_response(
data={
"employee_id": current_user.employee_id,
"is_verified": True, # 已经通过守卫 = verified
"message": "当前管理员 OTP 已验证,可以执行高危操作",
"note": "本端点本身需要 OTP 守卫,所以必然返回 is_verified=True",
},
)