Files

446 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — AI Wingman 服务(坐席智能副驾驶)
# =============================================================================
# 说明:复用 Dify 基础设施,使用独立的 Wingman AgentAgent 2),
# 与员工端 AIAgent 1)共用知识库但 system prompt 不同。
#
# 核心能力:
# 1. 生成 AI 草稿回复 — 基于对话上下文为坐席生成专业回复
# 2. 生成会话自动摘要 — 结单时自动提取问题/原因/解决方案
# 3. 生成自动标签建议 — 基于对话内容建议标签分类
#
# 降级策略:Wingman Agent 不可用时返回友好错误信息,不抛异常
# =============================================================================
import json
import logging
from typing import Any, Dict, List, Optional
import httpx
from app.config import settings
logger = logging.getLogger(__name__)
class WingmanService:
"""AI Wingman 服务 — 坐席智能副驾驶。
复用 Dify 基础设施,使用独立的 Wingman AgentAgent 2),
与员工端 AI(Agent 1)共用知识库但 system prompt 不同。
三个核心方法使用不同的 system prompt
- 草稿生成:生成坐席可采纳的专业回复
- 摘要生成:提取结构化的会话摘要
- 标签建议:建议标签分类和优先级
"""
# --------------------------------------------------------------------------
# System Prompt 定义
# --------------------------------------------------------------------------
_DRAFT_SYSTEM_PROMPT: str = (
"你是一个IT服务坐席助手,基于对话上下文为坐席生成专业、准确的回复草稿。"
"直接输出回复内容,不要解释。"
)
_SUMMARY_SYSTEM_PROMPT: str = (
"你是一个IT服务分析助手,基于完整对话生成结构化摘要,"
"包含:问题、原因、解决方案。以JSON格式输出。"
"输出格式:{\"problem\": \"问题描述\", \"cause\": \"原因\", \"solution\": \"解决方案\"}"
)
_TAGS_SYSTEM_PROMPT: str = (
"你是一个IT服务分类助手,基于对话内容建议标签分类。以JSON格式输出。"
"输出格式:{\"suggested_tags\": [\"标签1\", \"标签2\"], \"category\": \"分类\", \"priority\": \"low/medium/high\"}"
)
def __init__(self):
"""初始化 Wingman 服务。
从配置读取 Wingman Agent 的 API 地址和认证信息。
独立于 AIService,使用自己的 httpx 客户端。
"""
# Wingman Agent 专用 API 端点
self.api_url = settings.dify_wingman_api_url
# Wingman Agent API Key
self.api_key = settings.dify_wingman_api_key
# 请求超时(秒)
self.timeout = settings.dify_wingman_timeout
# httpx 异步客户端(复用连接池)
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""获取或创建 httpx 异步客户端(懒加载)。
复用连接池,避免每次请求都创建新连接。
Returns:
httpx.AsyncClient: 异步 HTTP 客户端实例
"""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(self.timeout),
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
)
return self._client
async def close(self):
"""关闭 httpx 客户端,释放连接池资源。"""
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
logger.debug("WingmanService httpx client closed")
# --------------------------------------------------------------------------
# 核心方法 1:生成 AI 草稿回复
# --------------------------------------------------------------------------
async def generate_draft(
self,
conversation_id: str,
messages: List[Dict[str, Any]],
db: Any = None,
) -> Dict[str, Any]:
"""生成 AI 草稿回复。
传入当前会话的完整消息历史,让 Wingman Agent 基于上下文
生成坐席可以采纳的草稿回复。
Args:
conversation_id: 会话ID
messages: 会话消息历史列表,每条消息包含 sender_type/content 等
db: 数据库会话(可选,当前未使用)
Returns:
Dict: {
"content": str, # 草稿内容
"confidence": float, # 置信度(0-1
"reasoning": str, # 生成推理说明
}
"""
# 构建对话上下文消息列表
context_messages = self._build_context_messages(
messages, self._DRAFT_SYSTEM_PROMPT
)
try:
result = await self._call_wingman_api(context_messages)
if result is None:
return {
"content": "",
"confidence": 0.0,
"reasoning": "Wingman 服务暂不可用",
}
reply_content = result
# 基于回复长度和内容质量估算置信度
confidence = self._estimate_confidence(reply_content)
return {
"content": reply_content,
"confidence": confidence,
"reasoning": f"基于最近 {len(messages)} 条对话上下文生成",
}
except Exception as e:
logger.error(f"Wingman 草稿生成失败: {e}")
return {
"content": "",
"confidence": 0.0,
"reasoning": f"AI 服务异常: {str(e)}",
}
# --------------------------------------------------------------------------
# 核心方法 2:生成会话自动摘要
# --------------------------------------------------------------------------
async def generate_summary(
self,
conversation_id: str,
messages: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""生成会话自动摘要。
基于完整对话生成结构化摘要,包含问题、原因、解决方案。
结单时自动调用。
Args:
conversation_id: 会话ID
messages: 会话消息历史列表
Returns:
Dict: {
"problem": str, # 问题描述
"cause": str, # 原因分析
"solution": str, # 解决方案
}
"""
context_messages = self._build_context_messages(
messages, self._SUMMARY_SYSTEM_PROMPT
)
# 默认摘要(降级时使用)
default_summary = {
"problem": "无法自动生成摘要",
"cause": "",
"solution": "",
}
try:
result = await self._call_wingman_api(context_messages)
if result is None:
return default_summary
# 尝试解析 JSON 格式的摘要
parsed = self._parse_json_response(result, default_summary)
return {
"problem": parsed.get("problem", default_summary["problem"]),
"cause": parsed.get("cause", default_summary["cause"]),
"solution": parsed.get("solution", default_summary["solution"]),
}
except Exception as e:
logger.error(f"Wingman 摘要生成失败: {e}")
return default_summary
# --------------------------------------------------------------------------
# 核心方法 3:生成自动标签建议
# --------------------------------------------------------------------------
async def suggest_tags(
self,
conversation_id: str,
messages: List[Dict[str, Any]],
existing_tags: Dict[str, Any] = None,
) -> Dict[str, Any]:
"""生成自动标签建议。
基于对话内容建议标签分类,包含标签列表、分类和优先级。
Args:
conversation_id: 会话ID
messages: 会话消息历史列表
existing_tags: 已有标签(可选,用于避免重复建议)
Returns:
Dict: {
"suggested_tags": list[str], # 建议标签列表
"category": str, # 分类
"priority": str, # 优先级: low/medium/high
}
"""
context_messages = self._build_context_messages(
messages, self._TAGS_SYSTEM_PROMPT
)
# 默认标签建议(降级时使用)
default_tags = {
"suggested_tags": [],
"category": "",
"priority": "medium",
}
try:
result = await self._call_wingman_api(context_messages)
if result is None:
return default_tags
# 尝试解析 JSON 格式的标签建议
parsed = self._parse_json_response(result, default_tags)
return {
"suggested_tags": parsed.get("suggested_tags", []),
"category": parsed.get("category", ""),
"priority": parsed.get("priority", "medium"),
}
except Exception as e:
logger.error(f"Wingman 标签建议失败: {e}")
return default_tags
# --------------------------------------------------------------------------
# 内部方法
# --------------------------------------------------------------------------
def _build_context_messages(
self,
messages: List[Dict[str, Any]],
system_prompt: str,
) -> List[Dict[str, str]]:
"""构建发送给 Wingman Agent 的消息列表。
将数据库中的消息历史转换为 OpenAI Chat Completions 格式的
messages 列表,包含 system prompt 和对话上下文。
Args:
messages: 数据库消息列表
system_prompt: 当前场景的 system prompt
Returns:
List[Dict]: OpenAI 格式的消息列表
"""
# 构建上下文消息列表
context: List[Dict[str, str]] = [
{"role": "system", "content": system_prompt}
]
# 角色映射:数据库 sender_type → OpenAI role
role_map = {
"employee": "user", # 员工消息 → user
"agent": "assistant", # 坐席消息 → assistant
"ai": "assistant", # AI消息 → assistant
"system": "system", # 系统消息 → system
}
for msg in messages:
role = role_map.get(msg.get("sender_type", ""), "user")
content = msg.get("content", "")
if content:
# 跳过系统消息(已有 system prompt
if msg.get("sender_type") == "system":
continue
context.append({"role": role, "content": content})
return context
async def _call_wingman_api(
self,
context_messages: List[Dict[str, str]],
) -> Optional[str]:
"""调用 Wingman Agent API(非流式)。
Args:
context_messages: OpenAI 格式的消息列表
Returns:
Optional[str]: AI 回复内容,失败时返回 None
"""
payload = {
"model": "Chat",
"messages": context_messages,
"stream": False,
"temperature": 0.3, # 适中的温度,保证准确性同时有一定灵活性
}
try:
client = await self._get_client()
logger.info(f"调用 Wingman API: messages_count={len(context_messages)}")
response = await client.post(self.api_url, json=payload)
response.raise_for_status()
data = response.json()
# 解析 OpenAI 兼容格式的返回
choices = data.get("choices", [])
if not choices:
logger.warning("Wingman API 返回空 choices")
return None
reply_content = choices[0]["message"]["content"]
logger.info(f"Wingman API 返回: content_length={len(reply_content)}")
return reply_content
except httpx.TimeoutException:
logger.error("Wingman API 超时")
return None
except httpx.HTTPStatusError as e:
logger.error(f"Wingman API HTTP 错误: status={e.response.status_code}")
return None
except Exception as e:
logger.error(f"Wingman API 调用失败: {e}")
return None
def _parse_json_response(
self,
content: str,
default: Dict[str, Any],
) -> Dict[str, Any]:
"""解析 AI 返回的 JSON 内容。
Wingman Agent 可能返回带 markdown 代码块的 JSON
也可能返回纯 JSON。此方法尝试多种解析方式。
Args:
content: AI 返回的原始文本
default: 解析失败时的默认值
Returns:
Dict: 解析后的字典,失败时返回默认值
"""
if not content:
return default
# 尝试 1:直接解析
try:
return json.loads(content)
except json.JSONDecodeError:
pass
# 尝试 2:提取 markdown 代码块中的 JSON
# AI 可能返回 ```json ... ``` 格式
import re
json_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1).strip())
except json.JSONDecodeError:
pass
# 尝试 3:查找第一个 { 到最后一个 } 之间的内容
start = content.find('{')
end = content.rfind('}')
if start != -1 and end != -1 and end > start:
try:
return json.loads(content[start:end + 1])
except json.JSONDecodeError:
pass
logger.warning(f"Wingman JSON 解析失败,使用默认值: {content[:200]}")
return default
def _estimate_confidence(self, content: str) -> float:
"""估算 AI 草稿回复的置信度。
基于回复长度和内容特征估算一个粗略的置信度值。
- 回复过短(< 10 字符):低置信度
- 回复包含不确定措辞:降低置信度
- 回复长度适中、内容具体:高置信度
Args:
content: AI 回复内容
Returns:
float: 置信度(0.0 - 1.0
"""
if not content or len(content.strip()) < 5:
return 0.2
confidence = 0.8 # 基础置信度
# 回复过短降低置信度
if len(content) < 10:
confidence -= 0.3
elif len(content) < 30:
confidence -= 0.1
# 包含不确定措辞降低置信度
uncertain_phrases = ["可能", "大概", "也许", "不确定", "建议您"]
for phrase in uncertain_phrases:
if phrase in content:
confidence -= 0.05
# 包含具体步骤或链接提高置信度
confident_phrases = ["步骤", "请按以下", "点击", "打开", "http"]
for phrase in confident_phrases:
if phrase in content:
confidence += 0.05
# 限制在 0.0 - 1.0 范围内
return max(0.0, min(1.0, confidence))