# ============================================================================= # 企微IT智能服务台 — 统一 Token 服务 # ============================================================================= # 说明:统一 Token 管理,支持以下功能: # 1. 创建统一格式的 Token(包含角色信息) # 2. 验证 Token 并获取用户信息 # 3. 切换当前角色 # 4. 兼容旧格式 Token(employee:token 和 agent:token) # ============================================================================= import json import logging import secrets from datetime import datetime from typing import Dict, List, Optional import redis.asyncio as aioredis from app.config import settings logger = logging.getLogger(__name__) # Token TTL(8小时) TOKEN_TTL_SECONDS = 8 * 60 * 60 # 统一 Token Key 前缀 UNIFIED_TOKEN_PREFIX = "user:token:" # 旧格式 Token Key 前缀(兼容) EMPLOYEE_TOKEN_PREFIX = "employee:token:" AGENT_TOKEN_PREFIX = "agent:token:" class TokenService: """统一 Token 服务。 管理用户 Token 的创建、验证、角色切换等操作。 """ def __init__(self, redis_client: aioredis.Redis): """初始化 Token 服务。 Args: redis_client: Redis 异步客户端 """ self.redis = redis_client async def create_token( self, employee_id: str, name: str, roles: List[str], department: Optional[str] = None, avatar: Optional[str] = None, login_source: str = "portal", ) -> str: """创建统一格式的 Token。 Args: employee_id: 企微 UserID name: 用户姓名 roles: 角色列表(如 ["user", "agent"]) department: 部门(可选) avatar: 头像URL(可选) login_source: 登录来源(portal/agent/h5) Returns: str: Token 字符串 """ # 生成 Token token = secrets.token_urlsafe(32) # 构建 Token 数据 token_data = { "employee_id": employee_id, "name": name, "department": department or "", "avatar": avatar or "", "roles": roles, "current_role": self._get_default_role(roles), "login_source": login_source, "created_at": datetime.now().isoformat(), "last_active": datetime.now().isoformat(), } # 存入 Redis(统一格式) await self.redis.setex( f"{UNIFIED_TOKEN_PREFIX}{token}", TOKEN_TTL_SECONDS, json.dumps(token_data, ensure_ascii=False), ) # 同时存入旧格式(兼容性) if login_source == "agent": await self.redis.setex( f"{AGENT_TOKEN_PREFIX}{token}", TOKEN_TTL_SECONDS, employee_id, ) else: await self.redis.setex( f"{EMPLOYEE_TOKEN_PREFIX}{token}", TOKEN_TTL_SECONDS, employee_id, ) logger.info(f"创建 Token: employee_id={employee_id}, roles={roles}, source={login_source}") return token async def get_user_info(self, token: str) -> Optional[Dict]: """获取 Token 对应的用户信息。 支持新旧两种 Token 格式。 Args: token: Token 字符串 Returns: Optional[Dict]: 用户信息字典,如果 Token 无效返回 None """ # 1. 尝试统一格式 data = await self.redis.get(f"{UNIFIED_TOKEN_PREFIX}{token}") if data: try: user_info = json.loads(data) # 更新最后活跃时间 user_info["last_active"] = datetime.now().isoformat() await self.redis.setex( f"{UNIFIED_TOKEN_PREFIX}{token}", TOKEN_TTL_SECONDS, json.dumps(user_info, ensure_ascii=False), ) return user_info except json.JSONDecodeError: logger.error(f"Token 数据解析失败: {token[:10]}...") return None # 2. 尝试旧格式(employee:token) employee_id = await self.redis.get(f"{EMPLOYEE_TOKEN_PREFIX}{token}") if employee_id: employee_id = employee_id.decode("utf-8") if isinstance(employee_id, bytes) else employee_id # 获取员工信息缓存 info_data = await self.redis.get(f"employee:info:{employee_id}") if info_data: try: info = json.loads(info_data) return { "employee_id": employee_id, "name": info.get("employee_name", ""), "department": info.get("department", ""), "avatar": info.get("avatar", ""), "roles": ["user"], "current_role": "user", "login_source": "h5", "created_at": datetime.now().isoformat(), "last_active": datetime.now().isoformat(), } except json.JSONDecodeError: pass # 降级:只有 employee_id return { "employee_id": employee_id, "name": "", "department": "", "avatar": "", "roles": ["user"], "current_role": "user", "login_source": "h5", "created_at": datetime.now().isoformat(), "last_active": datetime.now().isoformat(), } # 3. 尝试旧格式(agent:token) agent_id = await self.redis.get(f"{AGENT_TOKEN_PREFIX}{token}") if agent_id: agent_id = agent_id.decode("utf-8") if isinstance(agent_id, bytes) else agent_id return { "employee_id": agent_id, "name": "", "department": "", "avatar": "", "roles": ["user", "agent"], "current_role": "agent", "login_source": "agent", "created_at": datetime.now().isoformat(), "last_active": datetime.now().isoformat(), } return None async def switch_role(self, token: str, new_role: str) -> bool: """切换当前角色。 Args: token: Token 字符串 new_role: 目标角色标识 Returns: bool: 是否切换成功 """ # 获取当前用户信息 user_info = await self.get_user_info(token) if not user_info: return False # 验证用户是否有目标角色 if new_role not in user_info.get("roles", []): logger.warning(f"用户 {user_info['employee_id']} 没有 {new_role} 角色") return False # 更新当前角色 user_info["current_role"] = new_role user_info["last_active"] = datetime.now().isoformat() # 保存到 Redis(统一格式) await self.redis.setex( f"{UNIFIED_TOKEN_PREFIX}{token}", TOKEN_TTL_SECONDS, json.dumps(user_info, ensure_ascii=False), ) # 同时更新旧格式(如果存在) # 注意:旧格式只存储 employee_id,不需要更新 logger.info(f"用户 {user_info['employee_id']} 切换角色到 {new_role}") return True async def invalidate_token(self, token: str) -> None: """使 Token 失效。 删除统一格式和旧格式的 Token。 Args: token: Token 字符串 """ # 删除统一格式 await self.redis.delete(f"{UNIFIED_TOKEN_PREFIX}{token}") # 删除旧格式 await self.redis.delete(f"{EMPLOYEE_TOKEN_PREFIX}{token}") await self.redis.delete(f"{AGENT_TOKEN_PREFIX}{token}") logger.info(f"Token 已失效: {token[:10]}...") def _get_default_role(self, roles: List[str]) -> str: """获取默认角色。 优先级:admin > agent > user Args: roles: 角色列表 Returns: str: 默认角色标识 """ if "admin" in roles: return "admin" elif "agent" in roles: return "agent" else: return "user"