177 lines
6.1 KiB
Python
177 lines
6.1 KiB
Python
|
|
# =============================================================================
|
|||
|
|
# 企微IT智能服务台 — 外部系统数据缓存层
|
|||
|
|
# =============================================================================
|
|||
|
|
# 说明:
|
|||
|
|
# 1. 封装外部系统数据的缓存读写逻辑
|
|||
|
|
# 2. 统一缓存key格式:ext:{system}:{method}:{param_hash}
|
|||
|
|
# 3. 不同数据类型使用不同TTL(终端映射30分钟、安全状态5分钟等)
|
|||
|
|
# 4. Redis不可用时自动降级(不缓存,直接透传)
|
|||
|
|
#
|
|||
|
|
# 与 CacheService 的关系:
|
|||
|
|
# CacheService 是全局Redis客户端封装,ExternalSystemCache 基于它
|
|||
|
|
# 添加外部系统专用的缓存策略(TTL、key格式、刷新机制)
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
import hashlib
|
|||
|
|
import json
|
|||
|
|
import logging
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import Any, Dict, Optional
|
|||
|
|
|
|||
|
|
from app.services.cache_service import CacheService
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# 缓存TTL配置(秒)
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
CACHE_TTL = {
|
|||
|
|
# 终端映射(员工→终端)— 映射关系不常变,缓存较长
|
|||
|
|
"terminal_mapping": 30 * 60, # 30分钟
|
|||
|
|
# 终端详情(硬件/软件)— 硬件配置极少变,缓存最长
|
|||
|
|
"terminal_detail": 60 * 60, # 60分钟
|
|||
|
|
# 安全状态(漏洞/病毒)— 安全状态需近实时,缓存短
|
|||
|
|
"security_status": 5 * 60, # 5分钟
|
|||
|
|
# VPN在线状态 — 在线状态变化快,缓存最短
|
|||
|
|
"vpn_status": 1 * 60, # 1分钟
|
|||
|
|
# eHR员工信息 — 静态数据,缓存最长
|
|||
|
|
"employee_info": 24 * 60 * 60, # 24小时
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ExternalSystemCache:
|
|||
|
|
"""外部系统数据缓存
|
|||
|
|
|
|||
|
|
做什么:为外部系统查询结果提供统一缓存读写
|
|||
|
|
为什么:减少外部API调用频率,降低延迟和出错率
|
|||
|
|
|
|||
|
|
降级策略:Redis不可用时,缓存读写均跳过,直接透传到外部系统
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, cache_service: Optional[CacheService] = None):
|
|||
|
|
"""初始化缓存层
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
cache_service: Redis缓存服务实例。None时降级为无缓存模式
|
|||
|
|
"""
|
|||
|
|
self._cache = cache_service
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def _make_key(system: str, method: str, param: str) -> str:
|
|||
|
|
"""生成缓存key
|
|||
|
|
|
|||
|
|
做什么:按统一格式生成缓存key
|
|||
|
|
为什么:避免不同系统/方法的key冲突
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
system: 系统标识(lianruan/huorong/atrust/ehr)
|
|||
|
|
method: 方法名(terminal_mapping/terminal_detail/...)
|
|||
|
|
param: 查询参数(用户名/计算机名等)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
缓存key,格式: ext:lianruan:terminal_mapping:abc123
|
|||
|
|
"""
|
|||
|
|
# 对参数做哈希,避免特殊字符问题
|
|||
|
|
param_hash = hashlib.md5(param.encode()).hexdigest()[:12]
|
|||
|
|
return f"ext:{system}:{method}:{param_hash}"
|
|||
|
|
|
|||
|
|
async def get(self, system: str, method: str, param: str) -> Optional[Dict]:
|
|||
|
|
"""从缓存读取数据
|
|||
|
|
|
|||
|
|
做什么:按系统+方法+参数查找缓存
|
|||
|
|
为什么:命中缓存可避免一次外部API调用
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
system: 系统标识
|
|||
|
|
method: 方法名
|
|||
|
|
param: 查询参数
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
缓存的字典数据,或 None(未命中/Redis不可用)
|
|||
|
|
"""
|
|||
|
|
if not self._cache or not self._cache.redis:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
key = self._make_key(system, method, param)
|
|||
|
|
try:
|
|||
|
|
data = await self._cache.get(key)
|
|||
|
|
if data:
|
|||
|
|
logger.debug(f"缓存命中: {key}")
|
|||
|
|
return json.loads(data) if isinstance(data, str) else data
|
|||
|
|
return None
|
|||
|
|
except Exception as e:
|
|||
|
|
# Redis错误不阻断业务,降级为无缓存
|
|||
|
|
logger.warning(f"缓存读取失败(降级为无缓存): {key}, error={e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
async def set(
|
|||
|
|
self,
|
|||
|
|
system: str,
|
|||
|
|
method: str,
|
|||
|
|
param: str,
|
|||
|
|
data: Dict,
|
|||
|
|
ttl_override: Optional[int] = None,
|
|||
|
|
) -> bool:
|
|||
|
|
"""写入缓存
|
|||
|
|
|
|||
|
|
做什么:将外部系统查询结果存入缓存
|
|||
|
|
为什么:后续相同查询可直接命中缓存
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
system: 系统标识
|
|||
|
|
method: 方法名
|
|||
|
|
param: 查询参数
|
|||
|
|
data: 要缓存的数据
|
|||
|
|
ttl_override: 自定义TTL(秒),None则使用默认TTL
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
True=成功, False=失败/Redis不可用
|
|||
|
|
"""
|
|||
|
|
if not self._cache or not self._cache.redis:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
key = self._make_key(system, method, param)
|
|||
|
|
ttl = ttl_override or CACHE_TTL.get(method, 5 * 60) # 默认5分钟
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 添加缓存时间戳,便于判断数据新鲜度
|
|||
|
|
data_with_meta = {
|
|||
|
|
**data,
|
|||
|
|
"_cached_at": datetime.now().isoformat(),
|
|||
|
|
"_source_system": system,
|
|||
|
|
}
|
|||
|
|
await self._cache.set(key, json.dumps(data_with_meta, default=str), ex=ttl)
|
|||
|
|
logger.debug(f"缓存写入: {key}, TTL={ttl}s")
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"缓存写入失败: {key}, error={e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
async def invalidate(self, system: str, method: str, param: str) -> bool:
|
|||
|
|
"""主动失效缓存
|
|||
|
|
|
|||
|
|
做什么:删除指定缓存条目
|
|||
|
|
为什么:外部数据变更时(如终端隔离后),需主动失效缓存
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
system: 系统标识
|
|||
|
|
method: 方法名
|
|||
|
|
param: 查询参数
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
True=成功, False=失败/Redis不可用
|
|||
|
|
"""
|
|||
|
|
if not self._cache or not self._cache.redis:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
key = self._make_key(system, method, param)
|
|||
|
|
try:
|
|||
|
|
await self._cache.delete(key)
|
|||
|
|
logger.debug(f"缓存失效: {key}")
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"缓存失效失败: {key}, error={e}")
|
|||
|
|
return False
|