Files
wecom_it_smart_desk/backend/app/services/external/cache.py
T

177 lines
6.1 KiB
Python
Raw Normal View History

# =============================================================================
# 企微IT智能服务台 — 外部系统数据缓存层
# =============================================================================
# 说明:
# 1. 封装外部系统数据的缓存读写逻辑
# 2. 统一缓存key格式:ext:{system}:{method}:{param_hash}
# 3. 不同数据类型使用不同TTL(终端映射30分钟、安全状态5分钟等)
# 4. Redis不可用时自动降级(不缓存,直接透传)
#
# 与 CacheService 的关系:
# CacheService 是全局Redis客户端封装,ExternalSystemCache 基于它
# 添加外部系统专用的缓存策略(TTL、key格式、刷新机制)
# =============================================================================
import hashlib
import json
import logging
from datetime import datetime
from typing import Any, Dict, Optional
from app.services.cache_service import CacheService
logger = logging.getLogger(__name__)
# =============================================================================
# 缓存TTL配置(秒)
# =============================================================================
CACHE_TTL = {
# 终端映射(员工→终端)— 映射关系不常变,缓存较长
"terminal_mapping": 30 * 60, # 30分钟
# 终端详情(硬件/软件)— 硬件配置极少变,缓存最长
"terminal_detail": 60 * 60, # 60分钟
# 安全状态(漏洞/病毒)— 安全状态需近实时,缓存短
"security_status": 5 * 60, # 5分钟
# VPN在线状态 — 在线状态变化快,缓存最短
"vpn_status": 1 * 60, # 1分钟
# eHR员工信息 — 静态数据,缓存最长
"employee_info": 24 * 60 * 60, # 24小时
}
class ExternalSystemCache:
"""外部系统数据缓存
做什么:为外部系统查询结果提供统一缓存读写
为什么:减少外部API调用频率,降低延迟和出错率
降级策略:Redis不可用时,缓存读写均跳过,直接透传到外部系统
"""
def __init__(self, cache_service: Optional[CacheService] = None):
"""初始化缓存层
Args:
cache_service: Redis缓存服务实例。None时降级为无缓存模式
"""
self._cache = cache_service
@staticmethod
def _make_key(system: str, method: str, param: str) -> str:
"""生成缓存key
做什么:按统一格式生成缓存key
为什么:避免不同系统/方法的key冲突
Args:
system: 系统标识(lianruan/huorong/atrust/ehr
method: 方法名(terminal_mapping/terminal_detail/...
param: 查询参数(用户名/计算机名等)
Returns:
缓存key,格式: ext:lianruan:terminal_mapping:abc123
"""
# 对参数做哈希,避免特殊字符问题
param_hash = hashlib.md5(param.encode()).hexdigest()[:12]
return f"ext:{system}:{method}:{param_hash}"
async def get(self, system: str, method: str, param: str) -> Optional[Dict]:
"""从缓存读取数据
做什么:按系统+方法+参数查找缓存
为什么:命中缓存可避免一次外部API调用
Args:
system: 系统标识
method: 方法名
param: 查询参数
Returns:
缓存的字典数据,或 None(未命中/Redis不可用)
"""
if not self._cache or not self._cache.redis:
return None
key = self._make_key(system, method, param)
try:
data = await self._cache.get(key)
if data:
logger.debug(f"缓存命中: {key}")
return json.loads(data) if isinstance(data, str) else data
return None
except Exception as e:
# Redis错误不阻断业务,降级为无缓存
logger.warning(f"缓存读取失败(降级为无缓存): {key}, error={e}")
return None
async def set(
self,
system: str,
method: str,
param: str,
data: Dict,
ttl_override: Optional[int] = None,
) -> bool:
"""写入缓存
做什么:将外部系统查询结果存入缓存
为什么:后续相同查询可直接命中缓存
Args:
system: 系统标识
method: 方法名
param: 查询参数
data: 要缓存的数据
ttl_override: 自定义TTL(秒),None则使用默认TTL
Returns:
True=成功, False=失败/Redis不可用
"""
if not self._cache or not self._cache.redis:
return False
key = self._make_key(system, method, param)
ttl = ttl_override or CACHE_TTL.get(method, 5 * 60) # 默认5分钟
try:
# 添加缓存时间戳,便于判断数据新鲜度
data_with_meta = {
**data,
"_cached_at": datetime.now().isoformat(),
"_source_system": system,
}
await self._cache.set(key, json.dumps(data_with_meta, default=str), ex=ttl)
logger.debug(f"缓存写入: {key}, TTL={ttl}s")
return True
except Exception as e:
logger.warning(f"缓存写入失败: {key}, error={e}")
return False
async def invalidate(self, system: str, method: str, param: str) -> bool:
"""主动失效缓存
做什么:删除指定缓存条目
为什么:外部数据变更时(如终端隔离后),需主动失效缓存
Args:
system: 系统标识
method: 方法名
param: 查询参数
Returns:
True=成功, False=失败/Redis不可用
"""
if not self._cache or not self._cache.redis:
return False
key = self._make_key(system, method, param)
try:
await self._cache.delete(key)
logger.debug(f"缓存失效: {key}")
return True
except Exception as e:
logger.warning(f"缓存失效失败: {key}, error={e}")
return False