# ============================================================================= # 企微IT智能服务台 — MFA(TOTP)服务封装 # ============================================================================= # 说明:把 pyotp + qrcode 的使用集中到 service 层,API 层只关心业务流程 # 设计要点: # 1. secret 生成/校验/二维码生成 — 全部静态方法,无状态 # 2. valid_window=1 允许 ±30s 容忍(防用户手机秒数漂移) # 3. Redis 验证标记独立 key(与 otp_secret 共存,不冲突) # key 格式: mfa:verified:{employee_id}, TTL 1800s(30 分钟复用) # 4. backup codes 在决策阶段已废止(otm-secondary-auth.md),所以本服务 # 不实现 backup code 逻辑,丢手机场景走 admin reset # ============================================================================= import base64 import io import logging from typing import Tuple import pyotp import qrcode import redis.asyncio as aioredis logger = logging.getLogger(__name__) # MFA 验证状态在 Redis 里的存活时间(秒) # 跟 otm-secondary-auth.md 决策一致:30 分钟复用窗口 MFA_VERIFIED_TTL_SECONDS = 1800 class MFAService: """MFA TOTP 服务 — 封装 pyotp 二维码生成与验证。 所有方法都是纯函数/静态方法,无内部状态。 Redis 由调用方注入,便于测试时 mock。 """ # -------------------------------------------------------------------------- # Secret 生成 # -------------------------------------------------------------------------- @staticmethod def generate_secret() -> str: """生成新的 TOTP 共享密钥。 Returns: str: 32 字符 base32 编码的随机密钥 """ return pyotp.random_base32() # -------------------------------------------------------------------------- # 二维码生成 # -------------------------------------------------------------------------- @staticmethod def build_provisioning_uri(secret: str, employee_id: str) -> str: """构造 otpauth:// URI,供 Authenticator 扫码识别。 Args: secret: TOTP 共享密钥(base32) employee_id: 用户标识(扫码后显示的账户名) Returns: str: otpauth://totp/... 格式 URI """ totp = pyotp.TOTP(secret) return totp.provisioning_uri( name=employee_id, issuer_name="企微IT智能服务台", ) @staticmethod def render_qrcode_base64(otpauth_url: str) -> str: """把 otpauth URI 渲染成 PNG 并返回 base64 字符串。 Args: otpauth_url: otpauth:// URI Returns: str: PNG 的 base64(不含 data:image/png;base64, 前缀, 由前端自行拼接或直接用 data URL) """ img = qrcode.make(otpauth_url) buf = io.BytesIO() img.save(buf, format="PNG") return base64.b64encode(buf.getvalue()).decode("utf-8") # -------------------------------------------------------------------------- # 验证码校验 # -------------------------------------------------------------------------- @staticmethod def verify_code(secret: str, otp_code: str, valid_window: int = 1) -> bool: """校验用户输入的 6 位 OTP 码。 Args: secret: TOTP 共享密钥(base32) otp_code: 用户输入的 6 位码 valid_window: 时间容忍窗口(1 = 允许当前 ±30s) Returns: bool: True=验证通过, False=验证失败 """ if not secret or not otp_code: return False try: totp = pyotp.TOTP(secret) return bool(totp.verify(otp_code, valid_window=valid_window)) except Exception as e: # 任意异常(secret 格式错、码非数字等)都视为验证失败 logger.warning(f"MFA verify_code 异常: {e}") return False # -------------------------------------------------------------------------- # 高层便捷方法:启动绑定 # -------------------------------------------------------------------------- @staticmethod def start_binding(employee_id: str) -> Tuple[str, str, str]: """一次性生成绑定所需的全部数据(secret + URI + QR)。 Args: employee_id: 用户标识 Returns: Tuple[str, str, str]: (secret, otpauth_url, qr_code_base64) """ secret = MFAService.generate_secret() otpauth_url = MFAService.build_provisioning_uri(secret, employee_id) qr_base64 = MFAService.render_qrcode_base64(otpauth_url) return secret, otpauth_url, qr_base64 # -------------------------------------------------------------------------- # Redis 验证标记(30 分钟复用) # -------------------------------------------------------------------------- @staticmethod async def mark_verified( redis: aioredis.Redis, employee_id: str, ttl_seconds: int = MFA_VERIFIED_TTL_SECONDS ) -> None: """在 Redis 里写"已验证"标记,后续敏感操作直接查这个 key。 Args: redis: Redis 客户端 employee_id: 用户标识 ttl_seconds: 存活秒数,默认 1800s """ key = f"mfa:verified:{employee_id}" await redis.set(key, "1", ex=ttl_seconds) @staticmethod async def is_verified(redis: aioredis.Redis, employee_id: str) -> bool: """检查用户当前是否有未过期的 MFA 验证标记。 Args: redis: Redis 客户端 employee_id: 用户标识 Returns: bool: True=在 30 分钟复用窗口内 """ key = f"mfa:verified:{employee_id}" return bool(await redis.exists(key)) @staticmethod async def clear_verified(redis: aioredis.Redis, employee_id: str) -> None: """清除 Redis 验证标记(关闭 MFA 时调用)。""" key = f"mfa:verified:{employee_id}" await redis.delete(key) @staticmethod async def get_verified_ttl(redis: aioredis.Redis, employee_id: str) -> int: """获取 Redis 验证标记剩余秒数(测试用,生产路径用不到)。 Args: redis: Redis 客户端 employee_id: 用户标识 Returns: int: 剩余秒数(无 key 返回 -2) """ key = f"mfa:verified:{employee_id}" ttl = await redis.ttl(key) return int(ttl) if ttl is not None else -2