264 lines
8.4 KiB
Python
264 lines
8.4 KiB
Python
# =============================================================================
|
||
# 企微IT智能服务台 — 统一 Token 服务
|
||
# =============================================================================
|
||
# 说明:统一 Token 管理,支持以下功能:
|
||
# 1. 创建统一格式的 Token(包含角色信息)
|
||
# 2. 验证 Token 并获取用户信息
|
||
# 3. 切换当前角色
|
||
# 4. 兼容旧格式 Token(employee:token 和 agent:token)
|
||
# =============================================================================
|
||
|
||
import json
|
||
import logging
|
||
import secrets
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional
|
||
|
||
import redis.asyncio as aioredis
|
||
|
||
from app.config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Token TTL(8小时)
|
||
TOKEN_TTL_SECONDS = 8 * 60 * 60
|
||
|
||
# 统一 Token Key 前缀
|
||
UNIFIED_TOKEN_PREFIX = "user:token:"
|
||
|
||
# 旧格式 Token Key 前缀(兼容)
|
||
EMPLOYEE_TOKEN_PREFIX = "employee:token:"
|
||
AGENT_TOKEN_PREFIX = "agent:token:"
|
||
|
||
|
||
class TokenService:
|
||
"""统一 Token 服务。
|
||
|
||
管理用户 Token 的创建、验证、角色切换等操作。
|
||
"""
|
||
|
||
def __init__(self, redis_client: aioredis.Redis):
|
||
"""初始化 Token 服务。
|
||
|
||
Args:
|
||
redis_client: Redis 异步客户端
|
||
"""
|
||
self.redis = redis_client
|
||
|
||
async def create_token(
|
||
self,
|
||
employee_id: str,
|
||
name: str,
|
||
roles: List[str],
|
||
department: Optional[str] = None,
|
||
avatar: Optional[str] = None,
|
||
login_source: str = "portal",
|
||
) -> str:
|
||
"""创建统一格式的 Token。
|
||
|
||
Args:
|
||
employee_id: 企微 UserID
|
||
name: 用户姓名
|
||
roles: 角色列表(如 ["user", "agent"])
|
||
department: 部门(可选)
|
||
avatar: 头像URL(可选)
|
||
login_source: 登录来源(portal/agent/h5)
|
||
|
||
Returns:
|
||
str: Token 字符串
|
||
"""
|
||
# 生成 Token
|
||
token = secrets.token_urlsafe(32)
|
||
|
||
# 构建 Token 数据
|
||
token_data = {
|
||
"employee_id": employee_id,
|
||
"name": name,
|
||
"department": department or "",
|
||
"avatar": avatar or "",
|
||
"roles": roles,
|
||
"current_role": self._get_default_role(roles),
|
||
"login_source": login_source,
|
||
"created_at": datetime.now().isoformat(),
|
||
"last_active": datetime.now().isoformat(),
|
||
}
|
||
|
||
# 存入 Redis(统一格式)
|
||
await self.redis.setex(
|
||
f"{UNIFIED_TOKEN_PREFIX}{token}",
|
||
TOKEN_TTL_SECONDS,
|
||
json.dumps(token_data, ensure_ascii=False),
|
||
)
|
||
|
||
# 同时存入旧格式(兼容性)
|
||
if login_source == "agent":
|
||
await self.redis.setex(
|
||
f"{AGENT_TOKEN_PREFIX}{token}",
|
||
TOKEN_TTL_SECONDS,
|
||
employee_id,
|
||
)
|
||
else:
|
||
await self.redis.setex(
|
||
f"{EMPLOYEE_TOKEN_PREFIX}{token}",
|
||
TOKEN_TTL_SECONDS,
|
||
employee_id,
|
||
)
|
||
|
||
logger.info(f"创建 Token: employee_id={employee_id}, roles={roles}, source={login_source}")
|
||
|
||
return token
|
||
|
||
async def get_user_info(self, token: str) -> Optional[Dict]:
|
||
"""获取 Token 对应的用户信息。
|
||
|
||
支持新旧两种 Token 格式。
|
||
|
||
Args:
|
||
token: Token 字符串
|
||
|
||
Returns:
|
||
Optional[Dict]: 用户信息字典,如果 Token 无效返回 None
|
||
"""
|
||
# 1. 尝试统一格式
|
||
data = await self.redis.get(f"{UNIFIED_TOKEN_PREFIX}{token}")
|
||
if data:
|
||
try:
|
||
user_info = json.loads(data)
|
||
# 更新最后活跃时间
|
||
user_info["last_active"] = datetime.now().isoformat()
|
||
await self.redis.setex(
|
||
f"{UNIFIED_TOKEN_PREFIX}{token}",
|
||
TOKEN_TTL_SECONDS,
|
||
json.dumps(user_info, ensure_ascii=False),
|
||
)
|
||
return user_info
|
||
except json.JSONDecodeError:
|
||
logger.error(f"Token 数据解析失败: {token[:10]}...")
|
||
return None
|
||
|
||
# 2. 尝试旧格式(employee:token)
|
||
employee_id = await self.redis.get(f"{EMPLOYEE_TOKEN_PREFIX}{token}")
|
||
if employee_id:
|
||
employee_id = employee_id.decode("utf-8") if isinstance(employee_id, bytes) else employee_id
|
||
# 获取员工信息缓存
|
||
info_data = await self.redis.get(f"employee:info:{employee_id}")
|
||
if info_data:
|
||
try:
|
||
info = json.loads(info_data)
|
||
return {
|
||
"employee_id": employee_id,
|
||
"name": info.get("employee_name", ""),
|
||
"department": info.get("department", ""),
|
||
"avatar": info.get("avatar", ""),
|
||
"roles": ["user"],
|
||
"current_role": "user",
|
||
"login_source": "h5",
|
||
"created_at": datetime.now().isoformat(),
|
||
"last_active": datetime.now().isoformat(),
|
||
}
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# 降级:只有 employee_id
|
||
return {
|
||
"employee_id": employee_id,
|
||
"name": "",
|
||
"department": "",
|
||
"avatar": "",
|
||
"roles": ["user"],
|
||
"current_role": "user",
|
||
"login_source": "h5",
|
||
"created_at": datetime.now().isoformat(),
|
||
"last_active": datetime.now().isoformat(),
|
||
}
|
||
|
||
# 3. 尝试旧格式(agent:token)
|
||
agent_id = await self.redis.get(f"{AGENT_TOKEN_PREFIX}{token}")
|
||
if agent_id:
|
||
agent_id = agent_id.decode("utf-8") if isinstance(agent_id, bytes) else agent_id
|
||
return {
|
||
"employee_id": agent_id,
|
||
"name": "",
|
||
"department": "",
|
||
"avatar": "",
|
||
"roles": ["user", "agent"],
|
||
"current_role": "agent",
|
||
"login_source": "agent",
|
||
"created_at": datetime.now().isoformat(),
|
||
"last_active": datetime.now().isoformat(),
|
||
}
|
||
|
||
return None
|
||
|
||
async def switch_role(self, token: str, new_role: str) -> bool:
|
||
"""切换当前角色。
|
||
|
||
Args:
|
||
token: Token 字符串
|
||
new_role: 目标角色标识
|
||
|
||
Returns:
|
||
bool: 是否切换成功
|
||
"""
|
||
# 获取当前用户信息
|
||
user_info = await self.get_user_info(token)
|
||
if not user_info:
|
||
return False
|
||
|
||
# 验证用户是否有目标角色
|
||
if new_role not in user_info.get("roles", []):
|
||
logger.warning(f"用户 {user_info['employee_id']} 没有 {new_role} 角色")
|
||
return False
|
||
|
||
# 更新当前角色
|
||
user_info["current_role"] = new_role
|
||
user_info["last_active"] = datetime.now().isoformat()
|
||
|
||
# 保存到 Redis(统一格式)
|
||
await self.redis.setex(
|
||
f"{UNIFIED_TOKEN_PREFIX}{token}",
|
||
TOKEN_TTL_SECONDS,
|
||
json.dumps(user_info, ensure_ascii=False),
|
||
)
|
||
|
||
# 同时更新旧格式(如果存在)
|
||
# 注意:旧格式只存储 employee_id,不需要更新
|
||
|
||
logger.info(f"用户 {user_info['employee_id']} 切换角色到 {new_role}")
|
||
return True
|
||
|
||
async def invalidate_token(self, token: str) -> None:
|
||
"""使 Token 失效。
|
||
|
||
删除统一格式和旧格式的 Token。
|
||
|
||
Args:
|
||
token: Token 字符串
|
||
"""
|
||
# 删除统一格式
|
||
await self.redis.delete(f"{UNIFIED_TOKEN_PREFIX}{token}")
|
||
|
||
# 删除旧格式
|
||
await self.redis.delete(f"{EMPLOYEE_TOKEN_PREFIX}{token}")
|
||
await self.redis.delete(f"{AGENT_TOKEN_PREFIX}{token}")
|
||
|
||
logger.info(f"Token 已失效: {token[:10]}...")
|
||
|
||
def _get_default_role(self, roles: List[str]) -> str:
|
||
"""获取默认角色。
|
||
|
||
优先级:admin > agent > user
|
||
|
||
Args:
|
||
roles: 角色列表
|
||
|
||
Returns:
|
||
str: 默认角色标识
|
||
"""
|
||
if "admin" in roles:
|
||
return "admin"
|
||
elif "agent" in roles:
|
||
return "agent"
|
||
else:
|
||
return "user"
|