Files

272 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — AI 服务(Dify 接入)
# =============================================================================
# 做什么:封装 Dify API 调用,实现 AI 自动回复
# 为什么:
# - ARCHITECTURE.md 设计了 ai_handling 状态,但当前未实现
# - 现有系统交接文档提供了 Dify API 地址和 Key
# - 这是实现「AI 自助解决」的核心模块
# 依赖:需要 Dify API 可达(生产环境 http://yw-dify.dc.servyou-it.com
# =============================================================================
import json
import logging
import asyncio
from typing import Any, Dict, List, Optional, AsyncGenerator
import httpx
from app.config import settings
logger = logging.getLogger(__name__)
class AIService:
"""AI 服务:封装 Dify API,提供 AI 回复能力。
支持两种调用模式:
1. 非流式(简单场景):一次性获取完整回复
2. 流式(推荐):SSE 流式返回,前端可逐字显示
参考:现有系统交接文档
- API URL: http://yw-dify.dc.servyou-it.com/dify2openai/v1/chat/completions
- Key: http://yw-dify.dc.servyou-it.com/v1|app-UaTWYdBSwN6VktKQlbh5YN5H|Chat
"""
def __init__(self):
"""初始化 AI 服务。
做什么:从配置读取 Dify API 地址和认证信息
为什么:集中管理 API 配置,便于切换测试/生产环境
"""
# Dify 兼容 OpenAI 格式的 API 端点
self.api_url = settings.dify_api_url
# Dify API Key(格式:base_url|app_id|app_name
self.api_key = settings.dify_api_key
# 请求超时(秒)
self.timeout = settings.dify_timeout
# httpx 异步客户端(复用连接池)
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""获取或创建 httpx 异步客户端。
做什么:懒加载 httpx.AsyncClient,复用连接池
为什么:避免每次请求都创建新连接,提升性能
"""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(self.timeout),
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
)
return self._client
async def close(self):
"""关闭 httpx 客户端。
做什么:释放连接池资源
为什么:避免连接泄漏,尤其在长期运行的 FastAPI 应用中
"""
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
logger.debug("AIService httpx client closed")
# --------------------------------------------------------------------------
# 非流式调用:一次性获取 AI 完整回复
# --------------------------------------------------------------------------
async def get_reply(
self,
message: str,
conversation_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Dict[str, Any]:
"""调用 Dify API 获取 AI 回复(非流式)。
Args:
message: 员工发送的消息内容
conversation_id: 会话ID(用于 Dify 多轮对话上下文)
user_id: 员工企微 UserID(用于 Dify 用户标识)
Returns:
Dict: {
"content": str, # AI 回复内容
"hit": bool, # 是否命中知识库(可回复)
"conversation_id": str, # Dify 会话ID(用于后续多轮对话)
"usage": dict, # Token 用量(可选)
}
做什么:发送消息到 Dify,解析返回内容,判断是否能回复
为什么:
- 非流式适合简单场景,代码简单
- 返回结构兼容 OpenAI Chat Completions 格式
- 通过回复内容判断是否命中知识库(有实质内容 = 命中)
"""
payload = {
"model": "Chat", # Dify 应用名称(来自 API Key 格式)
"messages": [
{"role": "user", "content": message}
],
"stream": False, # 非流式
"temperature": 0.1, # 低温度,保证回答稳定性
}
# 传入 Dify 会话ID,保持多轮对话上下文
if conversation_id:
payload["conversation_id"] = conversation_id
# 传入用户标识(Dify 侧用于日志和追溯)
if user_id:
payload["user"] = user_id
try:
client = await self._get_client()
logger.info(f"调用 Dify API: message={message[:50]}...")
response = await client.post(self.api_url, json=payload)
response.raise_for_status()
data = response.json()
# 解析 OpenAI 兼容格式的返回
# 格式:{"choices": [{"message": {"content": "..."}}]}
choices = data.get("choices", [])
if not choices:
logger.warning("Dify API 返回空 choices")
return {
"content": "",
"hit": False,
"conversation_id": conversation_id or "",
"usage": {},
}
reply_content = choices[0]["message"]["content"]
# 判断是否命中知识库:
# 策略1:检查内容是否为空或过长(Dify 可能返回提示语)
# 策略2:检查是否包含「抱歉」「不知道」等无法回答的特征词
hit = self._check_knowledge_hit(reply_content)
# 提取 Dify 返回的 conversation_id(用于多轮对话)
dify_conv_id = data.get("conversation_id", conversation_id or "")
logger.info(
f"Dify API 返回: hit={hit}, "
f"content_length={len(reply_content)}, "
f"conv_id={dify_conv_id[:20] if dify_conv_id else '(new)'}"
)
return {
"content": reply_content,
"hit": hit,
"conversation_id": dify_conv_id,
"usage": data.get("usage", {}),
}
except httpx.TimeoutException:
logger.error("Dify API 超时")
return {
"content": "⏰ AI 服务响应超时,请稍后再试或输入「IT」转人工。",
"hit": False,
"conversation_id": conversation_id or "",
"usage": {},
}
except httpx.HTTPStatusError as e:
logger.error(f"Dify API HTTP 错误: status={e.response.status_code}")
return {
"content": "⚠️ AI 服务暂时不可用,请输入「IT」转人工。",
"hit": False,
"conversation_id": conversation_id or "",
"usage": {},
}
except Exception as e:
logger.error(f"Dify API 调用失败: {e}")
return {
"content": "⚠️ AI 服务异常,请输入「IT」转人工。",
"hit": False,
"conversation_id": conversation_id or "",
"usage": {},
}
# --------------------------------------------------------------------------
# 流式调用:SSE 流式返回(供 WebSocket 推送给前端)
# --------------------------------------------------------------------------
async def get_reply_stream(
self,
message: str,
conversation_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
"""调用 Dify API 获取流式 AI 回复(SSE)。
Args:
message: 员工发送的消息内容
conversation_id: Dify 会话ID
user_id: 员工企微 UserID
Yields:
Dict: {
"delta": str, # 增量内容
"finished": bool, # 是否结束
"conversation_id": str,
"hit": bool, # 最终判断是否命中
}
做什么:SSE 流式读取 Dify 返回,逐块 yield 给调用方
为什么:
- 流式返回能提升用户体验(不用等 AI 全部生成完才显示)
- 通过 WebSocket 推送增量内容到 H5 前端
- 目前第一步先实现非流式,流式作为后续优化
"""
# TODO: 第一步简化,先 yield 完整内容(非真正流式)
# 后续优化:解析 SSE 事件流,逐块 yield
result = await self.get_reply(message, conversation_id, user_id)
yield {
"delta": result["content"],
"finished": True,
"conversation_id": result["conversation_id"],
"hit": result["hit"],
}
# --------------------------------------------------------------------------
# 判断是否命中知识库
# --------------------------------------------------------------------------
def _check_knowledge_hit(self, content: str) -> bool:
"""判断 AI 回复是否命中知识库(可以回答用户问题)。
Args:
content: AI 回复内容
Returns:
bool: True=命中(可以回复),False=未命中(需转人工)
做什么:分析 AI 回复内容,判断是否能有效回答问题
为什么:
- Dify 在无法回答时通常会返回固定提示语
- 参考现有系统:「抱歉,您的问题可能不在服务业务范围内」
- 命中 = 有实质内容且不像是「无法回答」的提示
"""
if not content or len(content.strip()) < 5:
return False
# 未命中特征词(Dify 无法回答时的典型回复)
miss_keywords = [
"抱歉", "对不起", "不知道", "无法回答",
"不在服务范围内", "超出我的能力", "暂不支持",
"请转人工", "联系管理员",
]
content_lower = content.lower()
# 如果回复中包含多个未命中特征词 → 判断为未命中
miss_count = sum(1 for kw in miss_keywords if kw in content_lower)
if miss_count >= 2:
return False
# 如果回复长度过短(< 10 字符)且包含特征词 → 未命中
if len(content) < 10 and any(kw in content_lower for kw in miss_keywords):
return False
return True