Files

264 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 统一 Token 服务
# =============================================================================
# 说明:统一 Token 管理,支持以下功能:
# 1. 创建统一格式的 Token(包含角色信息)
# 2. 验证 Token 并获取用户信息
# 3. 切换当前角色
# 4. 兼容旧格式 Tokenemployee:token 和 agent:token
# =============================================================================
import json
import logging
import secrets
from datetime import datetime
from typing import Dict, List, Optional
import redis.asyncio as aioredis
from app.config import settings
logger = logging.getLogger(__name__)
# Token TTL8小时)
TOKEN_TTL_SECONDS = 8 * 60 * 60
# 统一 Token Key 前缀
UNIFIED_TOKEN_PREFIX = "user:token:"
# 旧格式 Token Key 前缀(兼容)
EMPLOYEE_TOKEN_PREFIX = "employee:token:"
AGENT_TOKEN_PREFIX = "agent:token:"
class TokenService:
"""统一 Token 服务。
管理用户 Token 的创建、验证、角色切换等操作。
"""
def __init__(self, redis_client: aioredis.Redis):
"""初始化 Token 服务。
Args:
redis_client: Redis 异步客户端
"""
self.redis = redis_client
async def create_token(
self,
employee_id: str,
name: str,
roles: List[str],
department: Optional[str] = None,
avatar: Optional[str] = None,
login_source: str = "portal",
) -> str:
"""创建统一格式的 Token。
Args:
employee_id: 企微 UserID
name: 用户姓名
roles: 角色列表(如 ["user", "agent"]
department: 部门(可选)
avatar: 头像URL(可选)
login_source: 登录来源(portal/agent/h5
Returns:
str: Token 字符串
"""
# 生成 Token
token = secrets.token_urlsafe(32)
# 构建 Token 数据
token_data = {
"employee_id": employee_id,
"name": name,
"department": department or "",
"avatar": avatar or "",
"roles": roles,
"current_role": self._get_default_role(roles),
"login_source": login_source,
"created_at": datetime.now().isoformat(),
"last_active": datetime.now().isoformat(),
}
# 存入 Redis(统一格式)
await self.redis.setex(
f"{UNIFIED_TOKEN_PREFIX}{token}",
TOKEN_TTL_SECONDS,
json.dumps(token_data, ensure_ascii=False),
)
# 同时存入旧格式(兼容性)
if login_source == "agent":
await self.redis.setex(
f"{AGENT_TOKEN_PREFIX}{token}",
TOKEN_TTL_SECONDS,
employee_id,
)
else:
await self.redis.setex(
f"{EMPLOYEE_TOKEN_PREFIX}{token}",
TOKEN_TTL_SECONDS,
employee_id,
)
logger.info(f"创建 Token: employee_id={employee_id}, roles={roles}, source={login_source}")
return token
async def get_user_info(self, token: str) -> Optional[Dict]:
"""获取 Token 对应的用户信息。
支持新旧两种 Token 格式。
Args:
token: Token 字符串
Returns:
Optional[Dict]: 用户信息字典,如果 Token 无效返回 None
"""
# 1. 尝试统一格式
data = await self.redis.get(f"{UNIFIED_TOKEN_PREFIX}{token}")
if data:
try:
user_info = json.loads(data)
# 更新最后活跃时间
user_info["last_active"] = datetime.now().isoformat()
await self.redis.setex(
f"{UNIFIED_TOKEN_PREFIX}{token}",
TOKEN_TTL_SECONDS,
json.dumps(user_info, ensure_ascii=False),
)
return user_info
except json.JSONDecodeError:
logger.error(f"Token 数据解析失败: {token[:10]}...")
return None
# 2. 尝试旧格式(employee:token
employee_id = await self.redis.get(f"{EMPLOYEE_TOKEN_PREFIX}{token}")
if employee_id:
employee_id = employee_id.decode("utf-8") if isinstance(employee_id, bytes) else employee_id
# 获取员工信息缓存
info_data = await self.redis.get(f"employee:info:{employee_id}")
if info_data:
try:
info = json.loads(info_data)
return {
"employee_id": employee_id,
"name": info.get("employee_name", ""),
"department": info.get("department", ""),
"avatar": info.get("avatar", ""),
"roles": ["user"],
"current_role": "user",
"login_source": "h5",
"created_at": datetime.now().isoformat(),
"last_active": datetime.now().isoformat(),
}
except json.JSONDecodeError:
pass
# 降级:只有 employee_id
return {
"employee_id": employee_id,
"name": "",
"department": "",
"avatar": "",
"roles": ["user"],
"current_role": "user",
"login_source": "h5",
"created_at": datetime.now().isoformat(),
"last_active": datetime.now().isoformat(),
}
# 3. 尝试旧格式(agent:token
agent_id = await self.redis.get(f"{AGENT_TOKEN_PREFIX}{token}")
if agent_id:
agent_id = agent_id.decode("utf-8") if isinstance(agent_id, bytes) else agent_id
return {
"employee_id": agent_id,
"name": "",
"department": "",
"avatar": "",
"roles": ["user", "agent"],
"current_role": "agent",
"login_source": "agent",
"created_at": datetime.now().isoformat(),
"last_active": datetime.now().isoformat(),
}
return None
async def switch_role(self, token: str, new_role: str) -> bool:
"""切换当前角色。
Args:
token: Token 字符串
new_role: 目标角色标识
Returns:
bool: 是否切换成功
"""
# 获取当前用户信息
user_info = await self.get_user_info(token)
if not user_info:
return False
# 验证用户是否有目标角色
if new_role not in user_info.get("roles", []):
logger.warning(f"用户 {user_info['employee_id']} 没有 {new_role} 角色")
return False
# 更新当前角色
user_info["current_role"] = new_role
user_info["last_active"] = datetime.now().isoformat()
# 保存到 Redis(统一格式)
await self.redis.setex(
f"{UNIFIED_TOKEN_PREFIX}{token}",
TOKEN_TTL_SECONDS,
json.dumps(user_info, ensure_ascii=False),
)
# 同时更新旧格式(如果存在)
# 注意:旧格式只存储 employee_id,不需要更新
logger.info(f"用户 {user_info['employee_id']} 切换角色到 {new_role}")
return True
async def invalidate_token(self, token: str) -> None:
"""使 Token 失效。
删除统一格式和旧格式的 Token。
Args:
token: Token 字符串
"""
# 删除统一格式
await self.redis.delete(f"{UNIFIED_TOKEN_PREFIX}{token}")
# 删除旧格式
await self.redis.delete(f"{EMPLOYEE_TOKEN_PREFIX}{token}")
await self.redis.delete(f"{AGENT_TOKEN_PREFIX}{token}")
logger.info(f"Token 已失效: {token[:10]}...")
def _get_default_role(self, roles: List[str]) -> str:
"""获取默认角色。
优先级:admin > agent > user
Args:
roles: 角色列表
Returns:
str: 默认角色标识
"""
if "admin" in roles:
return "admin"
elif "agent" in roles:
return "agent"
else:
return "user"