Files

264 lines
8.4 KiB
Python
Raw Permalink Normal View History

# =============================================================================
# 企微IT智能服务台 — 统一 Token 服务
# =============================================================================
# 说明:统一 Token 管理,支持以下功能:
# 1. 创建统一格式的 Token(包含角色信息)
# 2. 验证 Token 并获取用户信息
# 3. 切换当前角色
# 4. 兼容旧格式 Tokenemployee:token 和 agent:token
# =============================================================================
import json
import logging
import secrets
from datetime import datetime
from typing import Dict, List, Optional
import redis.asyncio as aioredis
from app.config import settings
logger = logging.getLogger(__name__)
# Token TTL8小时)
TOKEN_TTL_SECONDS = 8 * 60 * 60
# 统一 Token Key 前缀
UNIFIED_TOKEN_PREFIX = "user:token:"
# 旧格式 Token Key 前缀(兼容)
EMPLOYEE_TOKEN_PREFIX = "employee:token:"
AGENT_TOKEN_PREFIX = "agent:token:"
class TokenService:
"""统一 Token 服务。
管理用户 Token 的创建、验证、角色切换等操作。
"""
def __init__(self, redis_client: aioredis.Redis):
"""初始化 Token 服务。
Args:
redis_client: Redis 异步客户端
"""
self.redis = redis_client
async def create_token(
self,
employee_id: str,
name: str,
roles: List[str],
department: Optional[str] = None,
avatar: Optional[str] = None,
login_source: str = "portal",
) -> str:
"""创建统一格式的 Token。
Args:
employee_id: 企微 UserID
name: 用户姓名
roles: 角色列表(如 ["user", "agent"]
department: 部门(可选)
avatar: 头像URL(可选)
login_source: 登录来源(portal/agent/h5
Returns:
str: Token 字符串
"""
# 生成 Token
token = secrets.token_urlsafe(32)
# 构建 Token 数据
token_data = {
"employee_id": employee_id,
"name": name,
"department": department or "",
"avatar": avatar or "",
"roles": roles,
"current_role": self._get_default_role(roles),
"login_source": login_source,
"created_at": datetime.now().isoformat(),
"last_active": datetime.now().isoformat(),
}
# 存入 Redis(统一格式)
await self.redis.setex(
f"{UNIFIED_TOKEN_PREFIX}{token}",
TOKEN_TTL_SECONDS,
json.dumps(token_data, ensure_ascii=False),
)
# 同时存入旧格式(兼容性)
if login_source == "agent":
await self.redis.setex(
f"{AGENT_TOKEN_PREFIX}{token}",
TOKEN_TTL_SECONDS,
employee_id,
)
else:
await self.redis.setex(
f"{EMPLOYEE_TOKEN_PREFIX}{token}",
TOKEN_TTL_SECONDS,
employee_id,
)
logger.info(f"创建 Token: employee_id={employee_id}, roles={roles}, source={login_source}")
return token
async def get_user_info(self, token: str) -> Optional[Dict]:
"""获取 Token 对应的用户信息。
支持新旧两种 Token 格式。
Args:
token: Token 字符串
Returns:
Optional[Dict]: 用户信息字典,如果 Token 无效返回 None
"""
# 1. 尝试统一格式
data = await self.redis.get(f"{UNIFIED_TOKEN_PREFIX}{token}")
if data:
try:
user_info = json.loads(data)
# 更新最后活跃时间
user_info["last_active"] = datetime.now().isoformat()
await self.redis.setex(
f"{UNIFIED_TOKEN_PREFIX}{token}",
TOKEN_TTL_SECONDS,
json.dumps(user_info, ensure_ascii=False),
)
return user_info
except json.JSONDecodeError:
logger.error(f"Token 数据解析失败: {token[:10]}...")
return None
# 2. 尝试旧格式(employee:token
employee_id = await self.redis.get(f"{EMPLOYEE_TOKEN_PREFIX}{token}")
if employee_id:
employee_id = employee_id.decode("utf-8") if isinstance(employee_id, bytes) else employee_id
# 获取员工信息缓存
info_data = await self.redis.get(f"employee:info:{employee_id}")
if info_data:
try:
info = json.loads(info_data)
return {
"employee_id": employee_id,
"name": info.get("employee_name", ""),
"department": info.get("department", ""),
"avatar": info.get("avatar", ""),
"roles": ["user"],
"current_role": "user",
"login_source": "h5",
"created_at": datetime.now().isoformat(),
"last_active": datetime.now().isoformat(),
}
except json.JSONDecodeError:
pass
# 降级:只有 employee_id
return {
"employee_id": employee_id,
"name": "",
"department": "",
"avatar": "",
"roles": ["user"],
"current_role": "user",
"login_source": "h5",
"created_at": datetime.now().isoformat(),
"last_active": datetime.now().isoformat(),
}
# 3. 尝试旧格式(agent:token
agent_id = await self.redis.get(f"{AGENT_TOKEN_PREFIX}{token}")
if agent_id:
agent_id = agent_id.decode("utf-8") if isinstance(agent_id, bytes) else agent_id
return {
"employee_id": agent_id,
"name": "",
"department": "",
"avatar": "",
"roles": ["user", "agent"],
"current_role": "agent",
"login_source": "agent",
"created_at": datetime.now().isoformat(),
"last_active": datetime.now().isoformat(),
}
return None
async def switch_role(self, token: str, new_role: str) -> bool:
"""切换当前角色。
Args:
token: Token 字符串
new_role: 目标角色标识
Returns:
bool: 是否切换成功
"""
# 获取当前用户信息
user_info = await self.get_user_info(token)
if not user_info:
return False
# 验证用户是否有目标角色
if new_role not in user_info.get("roles", []):
logger.warning(f"用户 {user_info['employee_id']} 没有 {new_role} 角色")
return False
# 更新当前角色
user_info["current_role"] = new_role
user_info["last_active"] = datetime.now().isoformat()
# 保存到 Redis(统一格式)
await self.redis.setex(
f"{UNIFIED_TOKEN_PREFIX}{token}",
TOKEN_TTL_SECONDS,
json.dumps(user_info, ensure_ascii=False),
)
# 同时更新旧格式(如果存在)
# 注意:旧格式只存储 employee_id,不需要更新
logger.info(f"用户 {user_info['employee_id']} 切换角色到 {new_role}")
return True
async def invalidate_token(self, token: str) -> None:
"""使 Token 失效。
删除统一格式和旧格式的 Token。
Args:
token: Token 字符串
"""
# 删除统一格式
await self.redis.delete(f"{UNIFIED_TOKEN_PREFIX}{token}")
# 删除旧格式
await self.redis.delete(f"{EMPLOYEE_TOKEN_PREFIX}{token}")
await self.redis.delete(f"{AGENT_TOKEN_PREFIX}{token}")
logger.info(f"Token 已失效: {token[:10]}...")
def _get_default_role(self, roles: List[str]) -> str:
"""获取默认角色。
优先级:admin > agent > user
Args:
roles: 角色列表
Returns:
str: 默认角色标识
"""
if "admin" in roles:
return "admin"
elif "agent" in roles:
return "agent"
else:
return "user"