# 健康检查 + 错误码 + 日志结构化 审计与改进方案 **审计日期**: 2026-06-15 **审计人**: Claude(满载跑批) **关联**: [[风险跟踪表]] / [[后端架构]] / [[Dockerfile优化与镜像审计]] --- ## 📌 1. 健康检查现状 ### 1.1 当前实现 **端点**: `backend/app/main.py:506` ```python @app.get("/health", tags=["系统"]) async def health_check(): """健康检查端点。""" return {"status": "ok", "service": "wecom-it-smart-desk"} ``` **Docker compose healthcheck**: ```yaml healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()"] interval: 30s timeout: 10s retries: 3 start_period: 40s ``` ### 1.2 问题清单 | # | 问题 | 严重度 | 影响 | |---|---|---|---| | H-1 | `/health` 不验证 DB 连接 | 🟠 中 | DB 挂了,但 healthcheck 还显示 OK | | H-2 | `/health` 不验证 Redis 连接 | 🟠 中 | Redis 挂了,但 healthcheck 还显示 OK | | H-3 | `/health` 不报告版本/build | 🟡 | 排障不便 | | H-4 | `/health` 永远是 200,无 degraded 状态 | 🟡 | 难区分"在线但降级" | | H-5 | 无 `/ready` 和 `/live` 区分 | 🟡 | K8s 不友好 | | H-6 | Docker healthcheck 改用 urllib 已修 ✅(P1-3) | 🟢 | 已 done | ### 1.3 改进版(完整 healthcheck) ```python # backend/app/api/health.py(新建) import time import psutil from typing import Dict, Any from fastapi import APIRouter, HTTPException from sqlalchemy import text from app.database import async_session_maker from app.config import settings from app.utils.token_manager import get_token_manager router = APIRouter(tags=["系统"]) START_TIME = time.time() @router.get("/health") async def health_check(): """Liveness probe - 进程是否存活 适用: K8s livenessProbe / Docker healthcheck 返回: 总是 200,只要进程没崩 """ return { "status": "ok", "service": "wecom-it-smart-desk", "uptime_seconds": int(time.time() - START_TIME), } @router.get("/ready") async def readiness_check(): """Readiness probe - 进程是否准备好接流量 适用: K8s readinessProbe / 负载均衡 验证: DB + Redis 实际连通性 """ checks = { "database": False, "redis": False, "wecom_token": False, } # 1. DB 检查 try: async with async_session_maker() as session: result = await session.execute(text("SELECT 1")) result.scalar() checks["database"] = True except Exception as e: checks["database_error"] = str(e)[:200] # 2. Redis 检查 try: tm = get_token_manager() client = await tm.get_redis() await client.ping() checks["redis"] = True except Exception as e: checks["redis_error"] = str(e)[:200] # 3. 企微 token 检查(可选) try: tm = get_token_manager() token = await tm.get_access_token() checks["wecom_token"] = bool(token) except Exception as e: checks["wecom_error"] = str(e)[:200] all_ok = all(v for k, v in checks.items() if not k.endswith("_error")) status_code = 200 if all_ok else 503 return JSONResponse( status_code=status_code, content={ "status": "ready" if all_ok else "degraded", "service": "wecom-it-smart-desk", "uptime_seconds": int(time.time() - START_TIME), "checks": checks, "timestamp": datetime.now().isoformat(), } ) @router.get("/metrics") async def metrics(): """Prometheus metrics 端点(轻量版) 适用: Prometheus 抓取 输出: 关键业务/技术指标 """ process = psutil.Process() return { "process": { "cpu_percent": process.cpu_percent(), "memory_mb": process.memory_info().rss / 1024 / 1024, "threads": process.num_threads(), "uptime_seconds": int(time.time() - START_TIME), }, "system": { "cpu_percent": psutil.cpu_percent(), "memory_percent": psutil.virtual_memory().percent, "disk_percent": psutil.disk_usage('/').percent, }, } @router.get("/version") async def version(): """版本信息端点 用途: 排障 / 部署确认 """ import os return { "service": "wecom-it-smart-desk", "version": os.getenv("APP_VERSION", "dev"), "git_sha": os.getenv("GIT_SHA", "unknown")[:8], "build_time": os.getenv("BUILD_TIME", "unknown"), "python": "3.12", } ``` ### 1.4 Docker compose 更新 ```yaml healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()"] interval: 30s timeout: 10s retries: 3 start_period: 40s # 高级(可选,等 K8s 迁移时) readiness: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/ready').read()"] interval: 10s timeout: 5s retries: 3 ``` --- ## 📌 2. 错误码体系现状与改进 ### 2.1 现状(`backend/app/utils/response.py`) **已有错误码**(18 个): - **1000+ 通用** (5): ERR_PARAMS / UNAUTHORIZED / NOT_FOUND / FORBIDDEN / INTERNAL - **2000+ 企微** (6): WECOM_TOKEN / SEND / DECRYPT / ENCRYPT / VERIFY / USER_INFO - **3000+ 业务** (7): AGENT_OFFLINE / CONVERSATION_RESOLVED / CONVERSATION_NOT_FOUND / AGENT_NOT_FOUND / AGENT_BUSY / DUPLICATE_ASSIGN / GRAB_* **格式**: ```json {"code": 0, "data": {}, "message": "success"} ``` ### 2.2 问题清单 | # | 问题 | 严重度 | 解决 | |---|---|---|---| | E-1 | 错误码无标准枚举类(只常量) | 🟡 | 加 `ErrorCode` Enum | | E-2 | HTTP 200 + code 非 0(违反 REST 习惯) | 🟡 | 评估:4xx 5xx 也可,跟前端约定 | | E-3 | 没错误追踪 ID(correlation_id) | 🟠 | 加 `trace_id` 字段 | | E-4 | 错误响应没 `documentation_url` | 🟢 | 加上,链到文档 | | E-5 | i18n 缺失(中文硬编码) | 🟡 | 错误消息 i18n 化 | | E-6 | 前端错误处理分散(无统一拦截) | 🟠 | 加 axios 拦截器 + 错误码映射表 | ### 2.3 改进版错误码体系 **新建 `backend/app/utils/error_codes.py`**: ```python # ============================================================================= # 错误码体系 - 标准枚举 # ============================================================================= # 规范: # - 0 = 成功 # - 1xxx = 通用错误 # - 2xxx = 鉴权/会话 # - 3xxx = 企微 API # - 4xxx = 业务 - 会话 # - 5xxx = 业务 - 坐席 # - 6xxx = 业务 - 配置 # - 7xxx = 集成外部系统 # - 9xxx = 兜底 # ============================================================================= from enum import Enum class ErrorCode(int, Enum): """统一错误码枚举""" # 0: 成功 SUCCESS = 0 # 1xxx: 通用错误 PARAMS_INVALID = 1001 # 参数错误 UNAUTHORIZED = 1002 # 未授权 NOT_FOUND = 1003 # 资源不存在 FORBIDDEN = 1004 # 无权限 INTERNAL = 1005 # 服务器错误 RATE_LIMITED = 1006 # 限流 SERVICE_UNAVAILABLE = 1007 # 服务不可用 TIMEOUT = 1008 # 超时 # 2xxx: 鉴权 AUTH_TOKEN_MISSING = 2001 # token 缺失 AUTH_TOKEN_EXPIRED = 2002 # token 过期 AUTH_TOKEN_INVALID = 2003 # token 无效 AUTH_OTP_REQUIRED = 2004 # 需要 OTP AUTH_OTP_INVALID = 2005 # OTP 错误 AUTH_PASSWORD_WRONG = 2006 # 密码错误 AUTH_AGENT_DISABLED = 2007 # 坐席已禁用 # 3xxx: 企微 API WECOM_TOKEN_FAIL = 3001 # 企微 token 获取失败 WECOM_SEND_FAIL = 3002 # 企微消息发送失败 WECOM_DECRYPT_FAIL = 3003 # 企微消息解密失败 WECOM_ENCRYPT_FAIL = 3004 # 企微消息加密失败 WECOM_VERIFY_FAIL = 3005 # 企微回调签名验证失败 WECOM_USER_INFO_FAIL = 3006 # 企微用户信息获取失败 WECOM_API_ERROR = 3099 # 企微 API 通用错误 # 4xxx: 业务 - 会话 CONV_NOT_FOUND = 4001 # 会话不存在 CONV_RESOLVED = 4002 # 会话已结单 CONV_NO_AGENT = 4003 # 无可用坐席 CONV_DUPLICATE_ASSIGN = 4004 # 重复分配 CONV_GRAB_DENIED = 4005 # 抢单失败 # 5xxx: 业务 - 坐席 AGENT_NOT_FOUND = 5001 # 坐席不存在 AGENT_OFFLINE = 5002 # 坐席离线 AGENT_BUSY = 5003 # 坐席满载 AGENT_GRAB_SELF = 5004 # 不能接手自己的会话 AGENT_GRAB_NOT_SERVING = 5005 # 只能接手服务中的会话 # 6xxx: 业务 - 配置 CONFIG_NOT_FOUND = 6001 # 配置不存在 CONFIG_INVALID = 6002 # 配置值无效 # 7xxx: 集成外部 HUORONG_API_FAIL = 7001 # 火绒 API LIANRUAN_API_FAIL = 7002 # 联软 API ATRUST_API_FAIL = 7003 # aTrust API EHR_API_FAIL = 7004 # eHR API DIFY_API_FAIL = 7005 # Dify API # 9xxx: 兜底 UNKNOWN = 9999 # 错误码 → HTTP 状态码(可选,默认 200) HTTP_STATUS_MAP = { ErrorCode.SUCCESS: 200, ErrorCode.PARAMS_INVALID: 422, ErrorCode.UNAUTHORIZED: 401, ErrorCode.NOT_FOUND: 404, ErrorCode.FORBIDDEN: 403, ErrorCode.INTERNAL: 500, ErrorCode.RATE_LIMITED: 429, ErrorCode.SERVICE_UNAVAILABLE: 503, ErrorCode.TIMEOUT: 504, ErrorCode.AUTH_TOKEN_MISSING: 401, ErrorCode.AUTH_TOKEN_EXPIRED: 401, ErrorCode.AUTH_TOKEN_INVALID: 401, ErrorCode.AUTH_OTP_REQUIRED: 401, ErrorCode.AUTH_OTP_INVALID: 401, ErrorCode.AUTH_PASSWORD_WRONG: 401, ErrorCode.AUTH_AGENT_DISABLED: 403, # 业务错误默认 200,通过 code 区分 # 但具体可调,如 4xxx 资源类 404,5xxx 状态类 409 } ``` **更新 `response.py`**: ```python from app.utils.error_codes import ErrorCode, HTTP_STATUS_MAP def error_response( code: ErrorCode, message: str, data: Any = None, trace_id: str = None, ) -> Dict[str, Any]: """构建错误响应(增加 trace_id)""" return { "code": int(code), "message": message, "data": data or {}, "trace_id": trace_id, "timestamp": datetime.now().isoformat(), } class AppException(Exception): def __init__( self, code: ErrorCode, message: str, data: Any = None, http_status: int = None, ): self.code = code self.message = message self.data = data self.http_status = http_status or HTTP_STATUS_MAP.get(code, 200) super().__init__(message) async def app_exception_handler(request: Request, exc: AppException) -> JSONResponse: # 生成 trace_id import uuid trace_id = request.headers.get("X-Request-ID") or str(uuid.uuid4()) # 记录到日志 logger.warning( f"[{trace_id}] {request.method} {request.url.path} " f"-> {exc.code.value} {exc.message}" ) return JSONResponse( status_code=exc.http_status, content=error_response(exc.code, exc.message, exc.data, trace_id), headers={"X-Trace-ID": trace_id}, ) ``` ### 2.4 前端错误码映射(axios 拦截器) **新建 `frontend-admin/src/api/error-handler.ts`**(每个前端类似): ```typescript import { ElMessage } from 'element-plus' // 错误码 → 用户提示 const ERROR_MESSAGES: Record = { 1001: '参数错误,请检查输入', 1002: '登录已过期,请重新登录', 1003: '资源不存在', 1004: '无权限访问', 1005: '服务器错误,请稍后重试', 1006: '操作过快,请稍候再试', 2001: '请先登录', 2002: '登录已过期', 2003: '身份验证失败', 2004: '请输入动态码', 2005: '动态码错误', 2006: '密码错误', 4001: '会话不存在', 4002: '会话已结束', 5001: '坐席不存在', 5002: '坐席离线', 5003: '坐席已满载', 9999: '未知错误', } export function handleError(code: number, message: string, traceId?: string) { const userMsg = ERROR_MESSAGES[code] || message || '操作失败' // 特殊处理 if ([1002, 2001, 2002, 2003].includes(code)) { // 跳登录 localStorage.removeItem('token') window.location.href = '/login' } ElMessage.error(userMsg) // 开发环境显示 trace_id if (import.meta.env.DEV && traceId) { console.error(`[TraceID: ${traceId}] Code: ${code}, Message: ${message}`) } } ``` --- ## 📌 3. 日志结构化 ### 3.1 现状 ```python # backend/app/main.py logging.basicConfig( level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s", ) ``` **问题**: - 🟡 文本格式,不易查询/聚合 - 🟡 无 trace_id - 🟡 无 request/response 记录 - 🟡 无结构化字段(用户/会话/操作) ### 3.2 改进版 **新建 `backend/app/utils/logging_config.py`**: ```python import json import logging import sys import time from contextvars import ContextVar from typing import Any, Dict, Optional # 请求上下文 request_id_var: ContextVar[Optional[str]] = ContextVar('request_id', default=None) user_id_var: ContextVar[Optional[str]] = ContextVar('user_id', default=None) class JSONFormatter(logging.Formatter): """JSON 格式化器 - 适合 ELK / Loki / CloudWatch 解析""" def format(self, record: logging.LogRecord) -> str: log_data = { "timestamp": self.formatTime(record), "level": record.levelname, "logger": record.name, "message": record.getMessage(), } # 上下文 if request_id_var.get(): log_data["request_id"] = request_id_var.get() if user_id_var.get(): log_data["user_id"] = user_id_var.get() # 额外字段 if hasattr(record, "extra_data"): log_data.update(record.extra_data) # 异常 if record.exc_info: log_data["exception"] = self.formatException(record.exc_info) return json.dumps(log_data, ensure_ascii=False) def setup_logging(level: str = "INFO", json_format: bool = True): """配置日志""" root = logging.getLogger() root.setLevel(level) # 清除已有 handler for handler in root.handlers[:]: root.removeHandler(handler) handler = logging.StreamHandler(sys.stdout) if json_format: handler.setFormatter(JSONFormatter()) else: handler.setFormatter(logging.Formatter( "[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s" )) root.addHandler(handler) # 业务日志辅助函数 def log_business( event: str, *, user_id: str = None, conversation_id: str = None, agent_id: str = None, **kwargs ): """记录业务日志(结构化)""" extra_data = { "event": event, "user_id": user_id, "conversation_id": conversation_id, "agent_id": agent_id, **kwargs, } logger.info(f"business_event: {event}", extra={"extra_data": extra_data}) def log_security( event: str, *, user_id: str = None, ip: str = None, **kwargs ): """记录安全日志(单独级别,便于审计)""" extra_data = { "event": event, "category": "security", "user_id": user_id, "ip": ip, **kwargs, } logger.warning(f"security_event: {event}", extra={"extra_data": extra_data}) ``` **中间件 - 注入 request_id**: ```python # backend/app/main.py from app.utils.logging_config import setup_logging, request_id_var, user_id_var import uuid @app.middleware("http") async def request_id_middleware(request: Request, call_next): # 拿/创 trace_id trace_id = request.headers.get("X-Request-ID") or str(uuid.uuid4()) request_id_var.set(trace_id) # 记录请求开始 start = time.time() logger.info( f"request_start: {request.method} {request.url.path}", extra={"extra_data": { "method": request.method, "path": request.url.path, "client": request.client.host if request.client else "?", }} ) response = await call_next(request) # 记录请求结束 duration = time.time() - start logger.info( f"request_end: {response.status_code} in {duration:.3f}s", extra={"extra_data": { "method": request.method, "path": request.url.path, "status": response.status_code, "duration_ms": int(duration * 1000), }} ) response.headers["X-Request-ID"] = trace_id return response ``` ### 3.3 日志聚合方案 | 方案 | 适用 | 接入成本 | |---|---|---| | **stdout + Docker logs** | 小规模 / 排障 | 🟢 0 | | **Loki + Promtail** | 中规模 / 查日志 | 🟡 中 | | **ELK (Elasticsearch + Logstash + Kibana)** | 大规模 / 全文搜索 | 🟠 高 | | **CloudWatch / 阿里云 SLS** | 公有云 | 🟡 看云 | **短期**: 走 stdout,Docker 收集到 `/var/log/wecom-it-desk/*.log`,脚本 + grep 查 **中期**: Loki + Grafana(本地 NAS 部署) **长期**: ELK / 云原生日志 --- ## 📌 4. 实施路径 ### 4.1 立即(本次跑批) - [x] 审计报告写完(本文件) - [ ] 加 `backend/app/utils/error_codes.py` (Enum) - [ ] 加 `backend/app/utils/logging_config.py` (JSON formatter) - [ ] 更新 `main.py` 加 `/ready` `/metrics` `/version` 端点 - [ ] 加 request_id 中间件 ### 4.2 下周 - [ ] 4 前端加 `api/error-handler.ts` - [ ] 加 4 前端 axios 拦截器(捕获 trace_id) - [ ] 加 `.env` 配置 `LOG_LEVEL=INFO` + `LOG_FORMAT=json` ### 4.3 季度 - [ ] Loki + Promtail 部署 - [ ] Grafana 仪表盘(Loki 数据源) - [ ] 关键业务事件告警(登录失败/坐席离线) --- ## 📌 5. 关联文档 - [[风险跟踪表]] M-3(无统一错误码)/ M-5(无健康检查) - [[后端架构]] §5 错误处理 - [[Dockerfile优化与镜像审计]] - healthcheck - [[前端审计报告]] U-2(全局错误边界) --- *本审计是 2026-06-15 Claude 满载跑批产出,待评审*