Files

446 lines
16 KiB
Python
Raw Permalink Normal View History

# =============================================================================
# 企微IT智能服务台 — AI Wingman 服务(坐席智能副驾驶)
# =============================================================================
# 说明:复用 Dify 基础设施,使用独立的 Wingman AgentAgent 2),
# 与员工端 AIAgent 1)共用知识库但 system prompt 不同。
#
# 核心能力:
# 1. 生成 AI 草稿回复 — 基于对话上下文为坐席生成专业回复
# 2. 生成会话自动摘要 — 结单时自动提取问题/原因/解决方案
# 3. 生成自动标签建议 — 基于对话内容建议标签分类
#
# 降级策略:Wingman Agent 不可用时返回友好错误信息,不抛异常
# =============================================================================
import json
import logging
from typing import Any, Dict, List, Optional
import httpx
from app.config import settings
logger = logging.getLogger(__name__)
class WingmanService:
"""AI Wingman 服务 — 坐席智能副驾驶。
复用 Dify 基础设施,使用独立的 Wingman AgentAgent 2),
与员工端 AI(Agent 1)共用知识库但 system prompt 不同。
三个核心方法使用不同的 system prompt
- 草稿生成:生成坐席可采纳的专业回复
- 摘要生成:提取结构化的会话摘要
- 标签建议:建议标签分类和优先级
"""
# --------------------------------------------------------------------------
# System Prompt 定义
# --------------------------------------------------------------------------
_DRAFT_SYSTEM_PROMPT: str = (
"你是一个IT服务坐席助手,基于对话上下文为坐席生成专业、准确的回复草稿。"
"直接输出回复内容,不要解释。"
)
_SUMMARY_SYSTEM_PROMPT: str = (
"你是一个IT服务分析助手,基于完整对话生成结构化摘要,"
"包含:问题、原因、解决方案。以JSON格式输出。"
"输出格式:{\"problem\": \"问题描述\", \"cause\": \"原因\", \"solution\": \"解决方案\"}"
)
_TAGS_SYSTEM_PROMPT: str = (
"你是一个IT服务分类助手,基于对话内容建议标签分类。以JSON格式输出。"
"输出格式:{\"suggested_tags\": [\"标签1\", \"标签2\"], \"category\": \"分类\", \"priority\": \"low/medium/high\"}"
)
def __init__(self):
"""初始化 Wingman 服务。
从配置读取 Wingman Agent 的 API 地址和认证信息。
独立于 AIService,使用自己的 httpx 客户端。
"""
# Wingman Agent 专用 API 端点
self.api_url = settings.dify_wingman_api_url
# Wingman Agent API Key
self.api_key = settings.dify_wingman_api_key
# 请求超时(秒)
self.timeout = settings.dify_wingman_timeout
# httpx 异步客户端(复用连接池)
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""获取或创建 httpx 异步客户端(懒加载)。
复用连接池,避免每次请求都创建新连接。
Returns:
httpx.AsyncClient: 异步 HTTP 客户端实例
"""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(self.timeout),
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
)
return self._client
async def close(self):
"""关闭 httpx 客户端,释放连接池资源。"""
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
logger.debug("WingmanService httpx client closed")
# --------------------------------------------------------------------------
# 核心方法 1:生成 AI 草稿回复
# --------------------------------------------------------------------------
async def generate_draft(
self,
conversation_id: str,
messages: List[Dict[str, Any]],
db: Any = None,
) -> Dict[str, Any]:
"""生成 AI 草稿回复。
传入当前会话的完整消息历史,让 Wingman Agent 基于上下文
生成坐席可以采纳的草稿回复。
Args:
conversation_id: 会话ID
messages: 会话消息历史列表,每条消息包含 sender_type/content 等
db: 数据库会话(可选,当前未使用)
Returns:
Dict: {
"content": str, # 草稿内容
"confidence": float, # 置信度(0-1
"reasoning": str, # 生成推理说明
}
"""
# 构建对话上下文消息列表
context_messages = self._build_context_messages(
messages, self._DRAFT_SYSTEM_PROMPT
)
try:
result = await self._call_wingman_api(context_messages)
if result is None:
return {
"content": "",
"confidence": 0.0,
"reasoning": "Wingman 服务暂不可用",
}
reply_content = result
# 基于回复长度和内容质量估算置信度
confidence = self._estimate_confidence(reply_content)
return {
"content": reply_content,
"confidence": confidence,
"reasoning": f"基于最近 {len(messages)} 条对话上下文生成",
}
except Exception as e:
logger.error(f"Wingman 草稿生成失败: {e}")
return {
"content": "",
"confidence": 0.0,
"reasoning": f"AI 服务异常: {str(e)}",
}
# --------------------------------------------------------------------------
# 核心方法 2:生成会话自动摘要
# --------------------------------------------------------------------------
async def generate_summary(
self,
conversation_id: str,
messages: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""生成会话自动摘要。
基于完整对话生成结构化摘要,包含问题、原因、解决方案。
结单时自动调用。
Args:
conversation_id: 会话ID
messages: 会话消息历史列表
Returns:
Dict: {
"problem": str, # 问题描述
"cause": str, # 原因分析
"solution": str, # 解决方案
}
"""
context_messages = self._build_context_messages(
messages, self._SUMMARY_SYSTEM_PROMPT
)
# 默认摘要(降级时使用)
default_summary = {
"problem": "无法自动生成摘要",
"cause": "",
"solution": "",
}
try:
result = await self._call_wingman_api(context_messages)
if result is None:
return default_summary
# 尝试解析 JSON 格式的摘要
parsed = self._parse_json_response(result, default_summary)
return {
"problem": parsed.get("problem", default_summary["problem"]),
"cause": parsed.get("cause", default_summary["cause"]),
"solution": parsed.get("solution", default_summary["solution"]),
}
except Exception as e:
logger.error(f"Wingman 摘要生成失败: {e}")
return default_summary
# --------------------------------------------------------------------------
# 核心方法 3:生成自动标签建议
# --------------------------------------------------------------------------
async def suggest_tags(
self,
conversation_id: str,
messages: List[Dict[str, Any]],
existing_tags: Dict[str, Any] = None,
) -> Dict[str, Any]:
"""生成自动标签建议。
基于对话内容建议标签分类,包含标签列表、分类和优先级。
Args:
conversation_id: 会话ID
messages: 会话消息历史列表
existing_tags: 已有标签(可选,用于避免重复建议)
Returns:
Dict: {
"suggested_tags": list[str], # 建议标签列表
"category": str, # 分类
"priority": str, # 优先级: low/medium/high
}
"""
context_messages = self._build_context_messages(
messages, self._TAGS_SYSTEM_PROMPT
)
# 默认标签建议(降级时使用)
default_tags = {
"suggested_tags": [],
"category": "",
"priority": "medium",
}
try:
result = await self._call_wingman_api(context_messages)
if result is None:
return default_tags
# 尝试解析 JSON 格式的标签建议
parsed = self._parse_json_response(result, default_tags)
return {
"suggested_tags": parsed.get("suggested_tags", []),
"category": parsed.get("category", ""),
"priority": parsed.get("priority", "medium"),
}
except Exception as e:
logger.error(f"Wingman 标签建议失败: {e}")
return default_tags
# --------------------------------------------------------------------------
# 内部方法
# --------------------------------------------------------------------------
def _build_context_messages(
self,
messages: List[Dict[str, Any]],
system_prompt: str,
) -> List[Dict[str, str]]:
"""构建发送给 Wingman Agent 的消息列表。
将数据库中的消息历史转换为 OpenAI Chat Completions 格式的
messages 列表,包含 system prompt 和对话上下文。
Args:
messages: 数据库消息列表
system_prompt: 当前场景的 system prompt
Returns:
List[Dict]: OpenAI 格式的消息列表
"""
# 构建上下文消息列表
context: List[Dict[str, str]] = [
{"role": "system", "content": system_prompt}
]
# 角色映射:数据库 sender_type → OpenAI role
role_map = {
"employee": "user", # 员工消息 → user
"agent": "assistant", # 坐席消息 → assistant
"ai": "assistant", # AI消息 → assistant
"system": "system", # 系统消息 → system
}
for msg in messages:
role = role_map.get(msg.get("sender_type", ""), "user")
content = msg.get("content", "")
if content:
# 跳过系统消息(已有 system prompt
if msg.get("sender_type") == "system":
continue
context.append({"role": role, "content": content})
return context
async def _call_wingman_api(
self,
context_messages: List[Dict[str, str]],
) -> Optional[str]:
"""调用 Wingman Agent API(非流式)。
Args:
context_messages: OpenAI 格式的消息列表
Returns:
Optional[str]: AI 回复内容,失败时返回 None
"""
payload = {
"model": "Chat",
"messages": context_messages,
"stream": False,
"temperature": 0.3, # 适中的温度,保证准确性同时有一定灵活性
}
try:
client = await self._get_client()
logger.info(f"调用 Wingman API: messages_count={len(context_messages)}")
response = await client.post(self.api_url, json=payload)
response.raise_for_status()
data = response.json()
# 解析 OpenAI 兼容格式的返回
choices = data.get("choices", [])
if not choices:
logger.warning("Wingman API 返回空 choices")
return None
reply_content = choices[0]["message"]["content"]
logger.info(f"Wingman API 返回: content_length={len(reply_content)}")
return reply_content
except httpx.TimeoutException:
logger.error("Wingman API 超时")
return None
except httpx.HTTPStatusError as e:
logger.error(f"Wingman API HTTP 错误: status={e.response.status_code}")
return None
except Exception as e:
logger.error(f"Wingman API 调用失败: {e}")
return None
def _parse_json_response(
self,
content: str,
default: Dict[str, Any],
) -> Dict[str, Any]:
"""解析 AI 返回的 JSON 内容。
Wingman Agent 可能返回带 markdown 代码块的 JSON
也可能返回纯 JSON。此方法尝试多种解析方式。
Args:
content: AI 返回的原始文本
default: 解析失败时的默认值
Returns:
Dict: 解析后的字典,失败时返回默认值
"""
if not content:
return default
# 尝试 1:直接解析
try:
return json.loads(content)
except json.JSONDecodeError:
pass
# 尝试 2:提取 markdown 代码块中的 JSON
# AI 可能返回 ```json ... ``` 格式
import re
json_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1).strip())
except json.JSONDecodeError:
pass
# 尝试 3:查找第一个 { 到最后一个 } 之间的内容
start = content.find('{')
end = content.rfind('}')
if start != -1 and end != -1 and end > start:
try:
return json.loads(content[start:end + 1])
except json.JSONDecodeError:
pass
logger.warning(f"Wingman JSON 解析失败,使用默认值: {content[:200]}")
return default
def _estimate_confidence(self, content: str) -> float:
"""估算 AI 草稿回复的置信度。
基于回复长度和内容特征估算一个粗略的置信度值。
- 回复过短(< 10 字符):低置信度
- 回复包含不确定措辞:降低置信度
- 回复长度适中、内容具体:高置信度
Args:
content: AI 回复内容
Returns:
float: 置信度(0.0 - 1.0
"""
if not content or len(content.strip()) < 5:
return 0.2
confidence = 0.8 # 基础置信度
# 回复过短降低置信度
if len(content) < 10:
confidence -= 0.3
elif len(content) < 30:
confidence -= 0.1
# 包含不确定措辞降低置信度
uncertain_phrases = ["可能", "大概", "也许", "不确定", "建议您"]
for phrase in uncertain_phrases:
if phrase in content:
confidence -= 0.05
# 包含具体步骤或链接提高置信度
confident_phrases = ["步骤", "请按以下", "点击", "打开", "http"]
for phrase in confident_phrases:
if phrase in content:
confidence += 0.05
# 限制在 0.0 - 1.0 范围内
return max(0.0, min(1.0, confidence))