Files
wecom_it_smart_desk/backend/app/dependencies.py
T
Simon 8bfd0cfdc3 fix: v0.5.6 require_role 装饰器 signature + 3 个 schema 同步 migration
🛠️ Bug 修复:
- backend/app/dependencies.py: 修 require_role 装饰器
  问题:@wraps 让 FastAPI 看到 __wrapped__ 签名,Depends 默认值未被解析,
        current_user 实际是 Depends 对象 → 'Depends' object has no attribute 'roles'
  修法:用 inspect 合并签名 + 手动设 wrapper.__signature__,
        把 current_user 加进 FastAPI 看到的参数列表
  影响:所有用 @require_role 的 endpoint 在生产都受影响,修后正常

📦 Dependencies:
- backend/requirements.txt: pydantic 2.7.4
  原因:2.7.5 被 PyPI yank,清华源不缓存,build 失败
  (本次不进生产,但合并时一起跟)

🗃️ Alembic migrations(3 个,生产必跑):
- 010_add_agent_otp: agents.otp_secret + agents.otp_enabled
  背景:Agent 模型加了 OTP 字段但没建 migration,坐席登录报
        'column agents.otp_secret does not exist'
  字段:otp_secret VARCHAR(64) NULL, otp_enabled BOOLEAN DEFAULT false
  安全:nullable + default,现有坐席不受影响

- 011_add_conversation_impact: conversations 3 个评估字段
  背景:坐席发消息 500 报 'column conversations.impact_scope does not exist'
  字段:impact_scope INT DEFAULT 0, is_blocking BOOL DEFAULT false,
        emotion_state VARCHAR(20) DEFAULT 'normal'
  安全:都有 default,现有会话自动填默认值

- 012_sync_remaining_fields: 模型 vs DB 剩余漂移
  背景:dev-check-schema-drift 找到 4 个 dev 模式下没暴露的字段
  字段:conversations.dify_conversation_id VARCHAR(128) NULL,
        employees.it_level VARCHAR(20) DEFAULT 'silver',
        employees.it_level_source VARCHAR(20) DEFAULT 'system',
        employees.notes JSON DEFAULT '{}'
  安全:都有 default,现有数据自动填默认值

部署:
  cd /app && python -m alembic upgrade head
  docker compose restart backend
  验证:curl http://10.90.5.110:8000/health → 200
2026-06-16 19:24:27 +08:00

284 lines
7.8 KiB
Python

# =============================================================================
# 企微IT智能服务台 — 统一认证依赖
# =============================================================================
# 说明:提供统一的认证依赖函数,支持:
# 1. get_current_user: 获取当前用户信息(包含角色)
# 2. require_role: 角色验证装饰器
# 3. require_admin: 管理员权限验证
# =============================================================================
import inspect
import json
import logging
from dataclasses import dataclass
from functools import wraps
from typing import List, Optional
import redis.asyncio as aioredis
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.config import settings
from app.services.token_service import TokenService
logger = logging.getLogger(__name__)
# HTTP Bearer 认证方案
security = HTTPBearer()
@dataclass
class UserInfo:
"""用户信息数据类。
Attributes:
employee_id: 企微 UserID
name: 用户姓名
department: 部门
avatar: 头像URL
roles: 角色列表
current_role: 当前选择的角色
login_source: 登录来源
"""
employee_id: str
name: str
department: str
avatar: str
roles: List[str]
current_role: str
login_source: str
# Redis 连接池(单例)
_redis_pool: Optional[aioredis.Redis] = None
async def get_redis() -> aioredis.Redis:
"""获取 Redis 连接。
Returns:
aioredis.Redis: Redis 异步客户端
"""
global _redis_pool
if _redis_pool is None:
_redis_pool = settings.create_redis_client()
return _redis_pool
# 共享服务实例(用于 wecom_callback.py 等模块)
# 这些函数提供同步获取服务实例的方式,用于非 FastAPI DI 的场景
def get_shared_redis() -> aioredis.Redis:
"""获取 Redis 客户端(同步版本,用于非 async 场景)。
Returns:
aioredis.Redis: Redis 客户端实例
"""
return settings.create_redis_client()
def get_shared_wecom_service():
"""获取 WecomService 共享实例。
Returns:
WecomService: 企微服务实例
"""
from app.services.wecom_service import WecomService
return WecomService(settings.create_redis_client())
def get_shared_ai_handler():
"""获取 AIHandler 共享实例。
Returns:
AIHandler: AI 处理器实例
"""
from app.services.ai_handler import AIHandler
from app.services.ai_service import AIService
return AIHandler(ai_service=AIService())
# FastAPI Depends 函数(用于路由依赖注入)
async def dep_redis() -> aioredis.Redis:
"""Redis 客户端依赖注入。
Returns:
aioredis.Redis: Redis 异步客户端
"""
return await get_redis()
def dep_wecom_service():
"""WecomService 依赖注入。
Returns:
WecomService: 企微服务实例
"""
from app.services.wecom_service import WecomService
return WecomService(settings.create_redis_client())
def dep_ai_handler():
"""AIHandler 依赖注入。
Returns:
AIHandler: AI 处理器实例
"""
from app.services.ai_handler import AIHandler
from app.services.ai_service import AIService
return AIHandler(ai_service=AIService())
def dep_wingman_service():
"""WingmanService 依赖注入。
Returns:
WingmanService: AI Wingman 服务实例
"""
from app.services.wingman_service import WingmanService
return WingmanService()
# 应用生命周期管理函数
async def init_shared_services():
"""初始化共享服务(应用启动时调用)。
创建 Redis 连接池,初始化共享服务实例。
"""
global _redis_pool
_redis_pool = settings.create_redis_client()
logger.info("共享服务初始化完成")
async def cleanup_shared_services():
"""清理共享服务(应用关闭时调用)。
关闭 Redis 连接池。
"""
global _redis_pool
if _redis_pool:
await _redis_pool.close()
_redis_pool = None
logger.info("共享服务清理完成")
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> UserInfo:
"""统一认证依赖:从 Token 获取用户信息。
支持新旧两种 Token 格式。
Args:
credentials: HTTP Bearer Token
Returns:
UserInfo: 用户信息
Raises:
HTTPException: Token 无效或已过期
"""
token = credentials.credentials
# 获取 Redis 连接
redis_client = await get_redis()
# 创建 Token 服务
token_service = TokenService(redis_client)
# 获取用户信息
user_info = await token_service.get_user_info(token)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 无效或已过期",
headers={"WWW-Authenticate": "Bearer"},
)
return UserInfo(
employee_id=user_info["employee_id"],
name=user_info.get("name", ""),
department=user_info.get("department", ""),
avatar=user_info.get("avatar", ""),
roles=user_info.get("roles", ["user"]),
current_role=user_info.get("current_role", "user"),
login_source=user_info.get("login_source", "portal"),
)
def require_role(*required_roles: str):
"""角色验证装饰器。
检查用户是否拥有指定角色之一。
Args:
*required_roles: 允许的角色列表
Returns:
装饰器函数
Example:
@router.get("/api/admin/dashboard")
@require_role("admin")
async def get_dashboard(current_user: UserInfo = Depends(get_current_user)):
pass
"""
def decorator(func):
# 合并 func 签名 + current_user 参数,让 FastAPI 能正确解析 Depends
# (v0.5.6 修复:之前用 @wraps,FastAPI 看到的是 __wrapped__ 的签名,
# 没有 current_user,导致 Depends 默认值未被解析,current_user 实际是 Depends 对象)
sig = inspect.signature(func)
params = list(sig.parameters.values())
params.append(
inspect.Parameter(
'current_user',
inspect.Parameter.KEYWORD_ONLY,
annotation=UserInfo,
default=Depends(get_current_user),
)
)
new_sig = sig.replace(parameters=params)
@wraps(func)
async def wrapper(*args, **kwargs):
# FastAPI 已经把 current_user 注入了 kwargs
current_user = kwargs.pop('current_user')
# 检查用户是否有任一所需角色
user_roles = set(current_user.roles)
required = set(required_roles)
if not user_roles.intersection(required):
logger.warning(
f"用户 {current_user.employee_id} 角色不足: "
f"拥有 {current_user.roles}, 需要 {required_roles}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"需要以下角色之一: {', '.join(required_roles)}",
)
return await func(*args, current_user=current_user, **kwargs)
# 关键:让 FastAPI 用合并后的签名,这样它能看到 current_user 这个 Depends 参数
wrapper.__signature__ = new_sig
return wrapper
return decorator
def require_admin(func):
"""管理员权限验证装饰器。
等同于 @require_role("admin")。
Example:
@router.get("/api/admin/dashboard")
@require_admin
async def get_dashboard(current_user: UserInfo = Depends(get_current_user)):
pass
"""
return require_role("admin")(func)