124 lines
4.0 KiB
Python
124 lines
4.0 KiB
Python
# =============================================================================
|
|
# 企微IT智能服务台 — access_token 缓存管理器
|
|
# =============================================================================
|
|
# 说明:管理企微 access_token 的获取和缓存
|
|
# 1. 优先从 Redis 缓存获取
|
|
# 2. 缓存不存在或即将过期则重新获取
|
|
# 3. access_token 有效期 7200 秒,提前 300 秒刷新
|
|
# =============================================================================
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
import redis.asyncio as aioredis
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TokenManager:
|
|
"""企微 access_token 缓存管理器。
|
|
|
|
封装 access_token 的获取、缓存和自动刷新逻辑。
|
|
使用 Redis 作为缓存存储,避免频繁调用企微 API。
|
|
|
|
Attributes:
|
|
redis: Redis 异步客户端
|
|
corp_id: 企业ID
|
|
corp_secret: 应用Secret
|
|
"""
|
|
|
|
# Redis 缓存 key
|
|
CACHE_KEY = "wecom:access_token"
|
|
# access_token 有效期(秒)
|
|
TOKEN_EXPIRES = 7200
|
|
# 提前刷新时间(秒)
|
|
BUFFER_SECONDS = 300
|
|
|
|
def __init__(self, redis_client: aioredis.Redis):
|
|
"""初始化 token 管理器。
|
|
|
|
Args:
|
|
redis_client: Redis 异步客户端实例
|
|
"""
|
|
self.redis = redis_client
|
|
self.corp_id = settings.wecom_corp_id
|
|
self.corp_secret = settings.wecom_secret
|
|
self.client = httpx.AsyncClient(timeout=httpx.Timeout(connect=5.0, read=10.0))
|
|
|
|
async def get_token(self) -> str:
|
|
"""获取 access_token。
|
|
|
|
优先从 Redis 缓存获取,缓存未命中则调用企微 API 获取。
|
|
|
|
Returns:
|
|
str: access_token 字符串
|
|
|
|
Raises:
|
|
Exception: 获取失败
|
|
"""
|
|
# 1. 尝试从缓存获取
|
|
cached = await self.redis.get(self.CACHE_KEY)
|
|
if cached:
|
|
logger.debug("从缓存获取 access_token")
|
|
return cached.decode("utf-8")
|
|
|
|
# 2. 缓存未命中,刷新 token
|
|
return await self._refresh_token()
|
|
|
|
async def _refresh_token(self) -> str:
|
|
"""调用企微 API 刷新 access_token。
|
|
|
|
对应企微API:
|
|
GET https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=ID&corpsecret=SECRET
|
|
|
|
Returns:
|
|
str: 新获取的 access_token
|
|
|
|
Raises:
|
|
Exception: 获取失败
|
|
"""
|
|
logger.info("刷新 access_token")
|
|
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
|
|
params = {
|
|
"corpid": self.corp_id,
|
|
"corpsecret": self.corp_secret,
|
|
}
|
|
|
|
try:
|
|
response = await self.client.get(url, params=params)
|
|
result = response.json()
|
|
|
|
if result.get("errcode") != 0:
|
|
error_msg = result.get("errmsg", "未知错误")
|
|
logger.error(f"获取 access_token 失败: {error_msg}")
|
|
raise Exception(f"企微API错误: {error_msg}")
|
|
|
|
access_token = result["access_token"]
|
|
expires_in = result.get("expires_in", self.TOKEN_EXPIRES)
|
|
|
|
# 缓存到 Redis,TTL = 有效期 - 提前刷新时间
|
|
cache_ttl = max(expires_in - self.BUFFER_SECONDS, 60)
|
|
await self.redis.setex(self.CACHE_KEY, cache_ttl, access_token)
|
|
|
|
logger.info(f"access_token 刷新成功,TTL={cache_ttl}秒")
|
|
return access_token
|
|
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"获取 access_token 网络错误: {e}")
|
|
raise Exception(f"网络错误: {e}") from e
|
|
|
|
async def invalidate(self) -> None:
|
|
"""手动使缓存失效。
|
|
|
|
当检测到 token 过期或无效时调用,强制下次请求刷新。
|
|
"""
|
|
await self.redis.delete(self.CACHE_KEY)
|
|
logger.info("access_token 缓存已手动清除")
|
|
|
|
async def close(self) -> None:
|
|
"""关闭 HTTP 客户端。"""
|
|
await self.client.aclose()
|