Files

124 lines
4.0 KiB
Python

# =============================================================================
# 企微IT智能服务台 — access_token 缓存管理器
# =============================================================================
# 说明:管理企微 access_token 的获取和缓存
# 1. 优先从 Redis 缓存获取
# 2. 缓存不存在或即将过期则重新获取
# 3. access_token 有效期 7200 秒,提前 300 秒刷新
# =============================================================================
import logging
from typing import Optional
import httpx
import redis.asyncio as aioredis
from app.config import settings
logger = logging.getLogger(__name__)
class TokenManager:
"""企微 access_token 缓存管理器。
封装 access_token 的获取、缓存和自动刷新逻辑。
使用 Redis 作为缓存存储,避免频繁调用企微 API。
Attributes:
redis: Redis 异步客户端
corp_id: 企业ID
corp_secret: 应用Secret
"""
# Redis 缓存 key
CACHE_KEY = "wecom:access_token"
# access_token 有效期(秒)
TOKEN_EXPIRES = 7200
# 提前刷新时间(秒)
BUFFER_SECONDS = 300
def __init__(self, redis_client: aioredis.Redis):
"""初始化 token 管理器。
Args:
redis_client: Redis 异步客户端实例
"""
self.redis = redis_client
self.corp_id = settings.wecom_corp_id
self.corp_secret = settings.wecom_secret
self.client = httpx.AsyncClient(timeout=httpx.Timeout(connect=5.0, read=10.0))
async def get_token(self) -> str:
"""获取 access_token。
优先从 Redis 缓存获取,缓存未命中则调用企微 API 获取。
Returns:
str: access_token 字符串
Raises:
Exception: 获取失败
"""
# 1. 尝试从缓存获取
cached = await self.redis.get(self.CACHE_KEY)
if cached:
logger.debug("从缓存获取 access_token")
return cached.decode("utf-8")
# 2. 缓存未命中,刷新 token
return await self._refresh_token()
async def _refresh_token(self) -> str:
"""调用企微 API 刷新 access_token。
对应企微API:
GET https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=ID&corpsecret=SECRET
Returns:
str: 新获取的 access_token
Raises:
Exception: 获取失败
"""
logger.info("刷新 access_token")
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {
"corpid": self.corp_id,
"corpsecret": self.corp_secret,
}
try:
response = await self.client.get(url, params=params)
result = response.json()
if result.get("errcode") != 0:
error_msg = result.get("errmsg", "未知错误")
logger.error(f"获取 access_token 失败: {error_msg}")
raise Exception(f"企微API错误: {error_msg}")
access_token = result["access_token"]
expires_in = result.get("expires_in", self.TOKEN_EXPIRES)
# 缓存到 Redis,TTL = 有效期 - 提前刷新时间
cache_ttl = max(expires_in - self.BUFFER_SECONDS, 60)
await self.redis.setex(self.CACHE_KEY, cache_ttl, access_token)
logger.info(f"access_token 刷新成功,TTL={cache_ttl}")
return access_token
except httpx.HTTPError as e:
logger.error(f"获取 access_token 网络错误: {e}")
raise Exception(f"网络错误: {e}") from e
async def invalidate(self) -> None:
"""手动使缓存失效。
当检测到 token 过期或无效时调用,强制下次请求刷新。
"""
await self.redis.delete(self.CACHE_KEY)
logger.info("access_token 缓存已手动清除")
async def close(self) -> None:
"""关闭 HTTP 客户端。"""
await self.client.aclose()