1729 lines
55 KiB
Python
1729 lines
55 KiB
Python
# =============================================================================
|
||
# 企微IT智能服务台 — 管理后台业务逻辑层
|
||
# =============================================================================
|
||
# 说明:管理后台的核心业务逻辑,包括:
|
||
# 1. 仪表盘数据聚合
|
||
# 2. 配置项分组读取与更新(含变更日志)
|
||
# 3. 坐席 CRUD 管理
|
||
# 4. 外部系统集成配置
|
||
# 5. 快速回复审核
|
||
# 6. 分配模式管理
|
||
# 7. 会话监控
|
||
# 8. 全局搜索
|
||
# =============================================================================
|
||
|
||
import json
|
||
import logging
|
||
from datetime import datetime, date, timedelta
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from sqlalchemy import func, select, or_, and_, case, literal_column
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.models.agent import Agent
|
||
from app.models.config_change_log import ConfigChangeLog
|
||
from app.models.conversation import Conversation
|
||
from app.models.message import Message
|
||
from app.models.quick_reply_template import QuickReplyTemplate
|
||
from app.models.system_config import SystemConfig
|
||
from app.schemas.admin import (
|
||
AdminAgentResponse,
|
||
AdminQuickReplyResponse,
|
||
AssignmentModeItem,
|
||
AssignmentModeResponse,
|
||
ConfigGroupResponse,
|
||
ConfigHistoryItem,
|
||
ConfigItemResponse,
|
||
DashboardOverviewResponse,
|
||
IntegrationConfig,
|
||
IntegrationHealthItem,
|
||
IntegrationResponse,
|
||
MonitorSessionsResponse,
|
||
SearchItem,
|
||
SessionItem,
|
||
SessionStats,
|
||
SystemAlertItem,
|
||
)
|
||
from app.utils.response import AppException, ERR_NOT_FOUND, ERR_PARAMS
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 配置分组映射(前缀 → 分组名称)
|
||
# --------------------------------------------------------------------------
|
||
CONFIG_GROUP_MAP: Dict[str, str] = {
|
||
"ai_": "AI 对话引擎",
|
||
"emergency_": "应急模式",
|
||
"assign_": "消息分配",
|
||
"polling_": "轮询配置",
|
||
"emotion_": "情绪检测",
|
||
"integration_": "外部集成",
|
||
"queue_": "排队策略",
|
||
"satisfaction_": "满意度评价",
|
||
"invite_": "邀请功能",
|
||
"notification_": "通知推送",
|
||
"security_": "安全策略",
|
||
}
|
||
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 集成系统定义(硬编码,阶段一不增加表)
|
||
# --------------------------------------------------------------------------
|
||
INTEGRATION_DEFINITIONS = [
|
||
{
|
||
"id": "dify",
|
||
"name": "Dify AI",
|
||
"key_prefix": "integration_dify_",
|
||
"configurable": True,
|
||
"config_type": "url_key", # api_url + api_key
|
||
},
|
||
{
|
||
"id": "ragflow",
|
||
"name": "RAGFlow",
|
||
"key_prefix": "integration_ragflow_",
|
||
"configurable": True,
|
||
"config_type": "url_key", # api_url + api_key
|
||
},
|
||
{
|
||
"id": "huorong",
|
||
"name": "火绒安全",
|
||
"key_prefix": "integration_huorong_",
|
||
"configurable": True,
|
||
"config_type": "access_key", # access_key_id + access_key_secret + base_url
|
||
},
|
||
{
|
||
"id": "lianruan",
|
||
"name": "联软LV7000",
|
||
"key_prefix": "integration_lianruan_",
|
||
"configurable": True,
|
||
"config_type": "account_password", # api_account + api_password + base_url + validate_key
|
||
},
|
||
{
|
||
"id": "data_platform",
|
||
"name": "数据平台",
|
||
"key_prefix": None,
|
||
"configurable": False,
|
||
"config_type": None,
|
||
},
|
||
{
|
||
"id": "beisen",
|
||
"name": "北森 eHR",
|
||
"key_prefix": None,
|
||
"configurable": False,
|
||
"config_type": None,
|
||
},
|
||
]
|
||
|
||
|
||
# --------------------------------------------------------------------------
|
||
# 分配模式定义(硬编码,阶段一仅手动接单可用)
|
||
# --------------------------------------------------------------------------
|
||
ASSIGNMENT_MODES = [
|
||
{"id": "manual", "name": "手动接单", "locked": False, "unlock_at": ""},
|
||
{"id": "round_robin", "name": "轮询分配", "locked": True, "unlock_at": "阶段二"},
|
||
{"id": "least_active", "name": "最少活跃优先", "locked": True, "unlock_at": "阶段二"},
|
||
{"id": "weighted", "name": "加权比例分配", "locked": True, "unlock_at": "阶段三"},
|
||
{"id": "skill_match", "name": "技能匹配分配", "locked": True, "unlock_at": "阶段三"},
|
||
{"id": "priority_queue", "name": "优先队列", "locked": True, "unlock_at": "阶段三"},
|
||
]
|
||
|
||
|
||
# ==========================================================================
|
||
# 仪表盘
|
||
# ==========================================================================
|
||
|
||
async def get_dashboard_overview(db: AsyncSession) -> DashboardOverviewResponse:
|
||
"""获取仪表盘统计数据。
|
||
|
||
聚合查询在线坐席数、今日会话数、待审核数、集成健康状态等。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
|
||
Returns:
|
||
DashboardOverviewResponse: 仪表盘统计数据
|
||
"""
|
||
# 在线坐席数
|
||
online_count_result = await db.execute(
|
||
select(func.count(Agent.id)).where(Agent.status == "online")
|
||
)
|
||
online_agents = online_count_result.scalar() or 0
|
||
|
||
# 今日会话数(今天创建的所有会话)
|
||
today_start = datetime.combine(date.today(), datetime.min.time())
|
||
today_conv_result = await db.execute(
|
||
select(func.count(Conversation.id)).where(
|
||
Conversation.created_at >= today_start
|
||
)
|
||
)
|
||
today_conversations = today_conv_result.scalar() or 0
|
||
|
||
# 待审核快速回复数
|
||
pending_result = await db.execute(
|
||
select(func.count(QuickReplyTemplate.id)).where(
|
||
QuickReplyTemplate.status == "pending_review"
|
||
)
|
||
)
|
||
pending_reviews = pending_result.scalar() or 0
|
||
|
||
# 系统告警 — 阶段一仅基于待审核快速回复生成告警,后续阶段接入更多告警源
|
||
system_alerts: List[SystemAlertItem] = []
|
||
if pending_reviews > 0:
|
||
# 查询待审核模板,用于告警详情
|
||
pending_templates_result = await db.execute(
|
||
select(QuickReplyTemplate)
|
||
.where(QuickReplyTemplate.status == "pending_review")
|
||
.order_by(QuickReplyTemplate.updated_at.desc())
|
||
.limit(5) # 最多展示5条告警
|
||
)
|
||
pending_templates = list(pending_templates_result.scalars().all())
|
||
|
||
for t in pending_templates:
|
||
system_alerts.append(
|
||
SystemAlertItem(
|
||
type="quick_reply_pending",
|
||
content=f"快速回复待审核:{t.content[:50]}{'...' if len(t.content) > 50 else ''}",
|
||
submitter=t.submitted_by or None,
|
||
time=t.updated_at.isoformat() if t.updated_at else "",
|
||
severity="warning",
|
||
)
|
||
)
|
||
|
||
# 集成系统健康状态(通用检查:按config_type判断连接状态)
|
||
integrations_health: List[IntegrationHealthItem] = []
|
||
for integ_def in INTEGRATION_DEFINITIONS:
|
||
if integ_def["configurable"] and integ_def["key_prefix"]:
|
||
prefix = integ_def["key_prefix"]
|
||
config_type = integ_def.get("config_type", "url_key")
|
||
status = "disconnected"
|
||
|
||
if config_type == "url_key":
|
||
# Dify/RAGFlow: 检查 api_url + api_key
|
||
au = await _get_config_value(db, f"{prefix}api_url")
|
||
ak = await _get_config_value(db, f"{prefix}api_key")
|
||
if au and ak:
|
||
status = "connected"
|
||
elif au:
|
||
status = "partial"
|
||
|
||
elif config_type == "access_key":
|
||
# 火绒: 检查 access_key_id + access_key_secret + base_url
|
||
aki = await _get_config_value(db, f"{prefix}access_key_id")
|
||
aks = await _get_config_value(db, f"{prefix}access_key_secret")
|
||
bu = await _get_config_value(db, f"{prefix}base_url")
|
||
if aki and aks and bu:
|
||
status = "connected"
|
||
elif bu:
|
||
status = "partial"
|
||
|
||
elif config_type == "account_password":
|
||
# 联软: 检查 api_account + api_password + base_url
|
||
aa = await _get_config_value(db, f"{prefix}api_account")
|
||
ap = await _get_config_value(db, f"{prefix}api_password")
|
||
bu = await _get_config_value(db, f"{prefix}base_url")
|
||
if aa and ap and bu:
|
||
status = "connected"
|
||
elif bu:
|
||
status = "partial"
|
||
|
||
integrations_health.append(
|
||
IntegrationHealthItem(system=integ_def["name"], status=status)
|
||
)
|
||
else:
|
||
integrations_health.append(
|
||
IntegrationHealthItem(system=integ_def["name"], status="disconnected")
|
||
)
|
||
|
||
# 平均响应时间(首次人工回复距首条员工消息的时间差)
|
||
avg_response_time_str = "—"
|
||
try:
|
||
# 取今日已结单或服务中的会话,计算平均首次响应时间
|
||
conv_ids_result = await db.execute(
|
||
select(Conversation.id).where(
|
||
Conversation.created_at >= today_start,
|
||
Conversation.assigned_agent_id.isnot(None),
|
||
)
|
||
)
|
||
conv_ids = [row[0] for row in conv_ids_result.all()]
|
||
|
||
if conv_ids:
|
||
response_times = []
|
||
for cid in conv_ids[:50]: # 最多统计50个会话,避免性能问题
|
||
# 找该会话首条员工消息
|
||
first_emp_msg = await db.execute(
|
||
select(Message.created_at).where(
|
||
Message.conversation_id == cid,
|
||
Message.sender_type == "employee",
|
||
).order_by(Message.created_at.asc()).limit(1)
|
||
)
|
||
first_emp_time = first_emp_msg.scalar()
|
||
|
||
# 找该会话首条坐席/AI回复
|
||
first_reply = await db.execute(
|
||
select(Message.created_at).where(
|
||
Message.conversation_id == cid,
|
||
Message.sender_type.in_(["agent", "ai"]),
|
||
).order_by(Message.created_at.asc()).limit(1)
|
||
)
|
||
first_reply_time = first_reply.scalar()
|
||
|
||
if first_emp_time and first_reply_time:
|
||
delta = (first_reply_time - first_emp_time).total_seconds()
|
||
if 0 < delta < 3600: # 合理范围内(1小时内)
|
||
response_times.append(delta)
|
||
|
||
if response_times:
|
||
avg_seconds = sum(response_times) / len(response_times)
|
||
if avg_seconds < 60:
|
||
avg_response_time_str = f"{avg_seconds:.0f}秒"
|
||
else:
|
||
avg_response_time_str = f"{avg_seconds / 60:.1f}分钟"
|
||
except Exception as e:
|
||
logger.warning(f"计算平均响应时间失败: {e}")
|
||
|
||
# AI 命中率(有AI实质性回复的会话占比)
|
||
ai_hit_rate_str = "—"
|
||
try:
|
||
total_conv_result = await db.execute(
|
||
select(func.count(Conversation.id)).where(
|
||
Conversation.created_at >= today_start
|
||
)
|
||
)
|
||
total_conv = total_conv_result.scalar() or 0
|
||
|
||
if total_conv > 0:
|
||
ai_conv_result = await db.execute(
|
||
select(func.count(Conversation.id)).where(
|
||
Conversation.created_at >= today_start,
|
||
Conversation.ai_substantive_reply_count > 0,
|
||
)
|
||
)
|
||
ai_conv = ai_conv_result.scalar() or 0
|
||
ai_hit_rate_str = f"{(ai_conv / total_conv) * 100:.0f}%"
|
||
except Exception as e:
|
||
logger.warning(f"计算AI命中率失败: {e}")
|
||
|
||
return DashboardOverviewResponse(
|
||
online_agents=online_agents,
|
||
today_conversations=today_conversations,
|
||
avg_response_time=avg_response_time_str,
|
||
ai_hit_rate=ai_hit_rate_str,
|
||
pending_reviews=pending_reviews,
|
||
system_alerts=system_alerts,
|
||
integrations_health=integrations_health,
|
||
)
|
||
|
||
|
||
# ==========================================================================
|
||
# 配置管理
|
||
# ==========================================================================
|
||
|
||
async def get_config_groups(db: AsyncSession) -> List[ConfigGroupResponse]:
|
||
"""获取全部配置项(按功能分组)。
|
||
|
||
从 system_configs 表读取所有配置,按前缀分组返回。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
|
||
Returns:
|
||
List[ConfigGroupResponse]: 配置分组列表
|
||
"""
|
||
# 查询所有非 integration_ 前缀的配置项
|
||
result = await db.execute(
|
||
select(SystemConfig).order_by(SystemConfig.config_key)
|
||
)
|
||
all_configs = list(result.scalars().all())
|
||
|
||
# 按 key 前缀分组(排除 integration_ 前缀)
|
||
groups_dict: Dict[str, List[ConfigItemResponse]] = {}
|
||
other_items: List[ConfigItemResponse] = []
|
||
|
||
for cfg in all_configs:
|
||
# 跳过 integration_ 前缀的配置(在集成管理中单独展示)
|
||
if cfg.config_key.startswith("integration_"):
|
||
continue
|
||
|
||
# 推断值类型
|
||
value_type = _infer_value_type(cfg.config_value)
|
||
|
||
item = ConfigItemResponse(
|
||
key=cfg.config_key,
|
||
value=cfg.config_value,
|
||
description=cfg.description or "",
|
||
value_type=value_type,
|
||
)
|
||
|
||
# 查找匹配的前缀分组
|
||
matched = False
|
||
for prefix, group_name in CONFIG_GROUP_MAP.items():
|
||
if cfg.config_key.startswith(prefix):
|
||
if group_name not in groups_dict:
|
||
groups_dict[group_name] = []
|
||
groups_dict[group_name].append(item)
|
||
matched = True
|
||
break
|
||
|
||
if not matched:
|
||
other_items.append(item)
|
||
|
||
# 构建分组响应
|
||
groups: List[ConfigGroupResponse] = []
|
||
for prefix, group_name in CONFIG_GROUP_MAP.items():
|
||
if group_name in groups_dict:
|
||
groups.append(
|
||
ConfigGroupResponse(
|
||
name=group_name,
|
||
key_prefix=prefix,
|
||
items=groups_dict[group_name],
|
||
)
|
||
)
|
||
|
||
# 未匹配前缀的配置项放入"其他"分组
|
||
if other_items:
|
||
groups.append(
|
||
ConfigGroupResponse(
|
||
name="其他配置",
|
||
key_prefix="",
|
||
items=other_items,
|
||
)
|
||
)
|
||
|
||
return groups
|
||
|
||
|
||
def _infer_value_type(value: str) -> str:
|
||
"""推断配置值的类型。
|
||
|
||
Args:
|
||
value: 配置值字符串
|
||
|
||
Returns:
|
||
str: 值类型标识
|
||
"""
|
||
if value.lower() in ("true", "false"):
|
||
return "boolean"
|
||
try:
|
||
float(value)
|
||
return "number"
|
||
except (ValueError, TypeError):
|
||
pass
|
||
try:
|
||
parsed = json.loads(value)
|
||
if isinstance(parsed, list):
|
||
return "json_array"
|
||
if isinstance(parsed, dict):
|
||
return "json_object"
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
return "string"
|
||
|
||
|
||
async def update_config(
|
||
db: AsyncSession,
|
||
key: str,
|
||
value: str,
|
||
agent_id: str,
|
||
) -> Dict[str, Any]:
|
||
"""更新单个配置项,并记录变更日志。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
key: 配置键
|
||
value: 新的配置值
|
||
agent_id: 操作人 agent_id
|
||
|
||
Returns:
|
||
Dict: 包含 key, old_value, new_value, changed_at
|
||
|
||
Raises:
|
||
AppException: 配置项不存在
|
||
"""
|
||
# 查找配置项
|
||
result = await db.execute(
|
||
select(SystemConfig).where(SystemConfig.config_key == key)
|
||
)
|
||
config = result.scalars().first()
|
||
|
||
if not config:
|
||
raise AppException(1003, f"配置项不存在: {key}")
|
||
|
||
old_value = config.config_value
|
||
|
||
# 写入变更日志
|
||
change_log = ConfigChangeLog(
|
||
config_key=key,
|
||
old_value=old_value,
|
||
new_value=value,
|
||
changed_by=agent_id,
|
||
)
|
||
db.add(change_log)
|
||
|
||
# 更新配置值
|
||
config.config_value = value
|
||
config.updated_at = datetime.now()
|
||
db.add(config)
|
||
|
||
logger.info(f"配置更新: key={key}, old={old_value}, new={value}, by={agent_id}")
|
||
|
||
return {
|
||
"key": key,
|
||
"old_value": old_value,
|
||
"new_value": value,
|
||
"changed_at": datetime.now().isoformat(),
|
||
}
|
||
|
||
|
||
async def get_config_history(
|
||
db: AsyncSession,
|
||
key: str,
|
||
limit: int = 20,
|
||
) -> List[ConfigHistoryItem]:
|
||
"""获取指定配置项的变更历史。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
key: 配置键
|
||
limit: 返回条数上限
|
||
|
||
Returns:
|
||
List[ConfigHistoryItem]: 变更历史列表
|
||
"""
|
||
result = await db.execute(
|
||
select(ConfigChangeLog)
|
||
.where(ConfigChangeLog.config_key == key)
|
||
.order_by(ConfigChangeLog.changed_at.desc())
|
||
.limit(limit)
|
||
)
|
||
logs = list(result.scalars().all())
|
||
|
||
# 批量查询操作人姓名
|
||
agent_ids = list({log.changed_by for log in logs})
|
||
agent_names = {}
|
||
if agent_ids:
|
||
agents_result = await db.execute(
|
||
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
|
||
)
|
||
agent_names = {row[0]: row[1] for row in agents_result.all()}
|
||
|
||
items = []
|
||
for log in logs:
|
||
items.append(
|
||
ConfigHistoryItem(
|
||
id=log.id,
|
||
config_key=log.config_key,
|
||
old_value=log.old_value,
|
||
new_value=log.new_value,
|
||
changed_by=log.changed_by,
|
||
changed_by_name=agent_names.get(log.changed_by, ""),
|
||
changed_at=log.changed_at,
|
||
)
|
||
)
|
||
|
||
return items
|
||
|
||
|
||
# ==========================================================================
|
||
# 坐席管理
|
||
# ==========================================================================
|
||
|
||
async def list_admin_agents(
|
||
db: AsyncSession,
|
||
status: Optional[str] = None,
|
||
) -> List[AdminAgentResponse]:
|
||
"""获取坐席列表(管理视图,含角色/技能标签/今日结单数)。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
status: 按状态筛选(可选)
|
||
|
||
Returns:
|
||
List[AdminAgentResponse]: 坐席列表
|
||
"""
|
||
stmt = select(Agent).order_by(Agent.name)
|
||
if status:
|
||
stmt = stmt.where(Agent.status == status)
|
||
|
||
result = await db.execute(stmt)
|
||
agents = list(result.scalars().all())
|
||
|
||
# 批量查询今日结单数
|
||
today_start = datetime.combine(date.today(), datetime.min.time())
|
||
agent_ids = [a.id for a in agents]
|
||
today_resolved_map: Dict[str, int] = {}
|
||
|
||
if agent_ids:
|
||
resolved_result = await db.execute(
|
||
select(
|
||
Conversation.assigned_agent_id,
|
||
func.count(Conversation.id),
|
||
)
|
||
.where(
|
||
Conversation.assigned_agent_id.in_(agent_ids),
|
||
Conversation.status == "resolved",
|
||
Conversation.updated_at >= today_start,
|
||
)
|
||
.group_by(Conversation.assigned_agent_id)
|
||
)
|
||
today_resolved_map = dict(resolved_result.all())
|
||
|
||
items = []
|
||
for a in agents:
|
||
resp = AdminAgentResponse(
|
||
id=a.id,
|
||
user_id=a.user_id,
|
||
name=a.name,
|
||
status=a.status,
|
||
role=a.role,
|
||
skill_tags=a.skill_tags or [],
|
||
current_load=a.current_load,
|
||
max_load=a.max_load,
|
||
today_resolved=today_resolved_map.get(a.id, 0),
|
||
created_at=a.created_at,
|
||
updated_at=a.updated_at,
|
||
)
|
||
items.append(resp)
|
||
|
||
return items
|
||
|
||
|
||
async def create_agent(
|
||
db: AsyncSession,
|
||
user_id: str,
|
||
name: str,
|
||
role: str = "agent",
|
||
skill_tags: Optional[List[str]] = None,
|
||
max_load: int = 5,
|
||
) -> AdminAgentResponse:
|
||
"""创建坐席。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
user_id: 企微用户ID
|
||
name: 坐席姓名
|
||
role: 角色(仅允许 admin / agent)
|
||
skill_tags: 技能标签列表
|
||
max_load: 最大同时服务数
|
||
|
||
Returns:
|
||
AdminAgentResponse: 创建的坐席信息
|
||
|
||
Raises:
|
||
AppException: user_id 已存在 或 role 值非法
|
||
"""
|
||
# 校验 role 白名单,防止非法角色值入库
|
||
if role not in ("admin", "agent"):
|
||
raise AppException(1001, f"角色值非法: {role},仅允许 admin 或 agent")
|
||
|
||
# 检查 user_id 是否已存在
|
||
existing = await db.execute(
|
||
select(Agent).where(Agent.user_id == user_id)
|
||
)
|
||
if existing.scalars().first():
|
||
raise AppException(1001, f"坐席 user_id 已存在: {user_id}")
|
||
|
||
agent = Agent(
|
||
user_id=user_id,
|
||
name=name,
|
||
role=role,
|
||
skill_tags=skill_tags or [],
|
||
max_load=max_load,
|
||
status="offline",
|
||
current_load=0,
|
||
)
|
||
db.add(agent)
|
||
await db.flush()
|
||
|
||
logger.info(f"创建坐席: user_id={user_id}, name={name}, role={role}")
|
||
|
||
return AdminAgentResponse(
|
||
id=agent.id,
|
||
user_id=agent.user_id,
|
||
name=agent.name,
|
||
status=agent.status,
|
||
role=agent.role,
|
||
skill_tags=agent.skill_tags or [],
|
||
current_load=agent.current_load,
|
||
max_load=agent.max_load,
|
||
today_resolved=0,
|
||
created_at=agent.created_at,
|
||
updated_at=agent.updated_at,
|
||
)
|
||
|
||
|
||
async def update_agent(
|
||
db: AsyncSession,
|
||
agent_id: str,
|
||
role: Optional[str] = None,
|
||
skill_tags: Optional[List[str]] = None,
|
||
max_load: Optional[int] = None,
|
||
) -> Dict[str, Any]:
|
||
"""更新坐席信息(角色/技能标签/负载上限)。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
agent_id: 坐席ID
|
||
role: 角色(可选)
|
||
skill_tags: 技能标签列表(可选)
|
||
max_load: 最大同时服务数(可选)
|
||
|
||
Returns:
|
||
Dict: 更新后的坐席关键字段
|
||
|
||
Raises:
|
||
AppException: 坐席不存在
|
||
"""
|
||
result = await db.execute(
|
||
select(Agent).where(Agent.id == agent_id)
|
||
)
|
||
agent = result.scalars().first()
|
||
|
||
if not agent:
|
||
raise AppException(3004, "坐席不存在")
|
||
|
||
# 校验 role 白名单,防止非法角色值入库
|
||
if role is not None and role not in ("admin", "agent"):
|
||
raise AppException(1001, f"角色值非法: {role},仅允许 admin 或 agent")
|
||
|
||
if role is not None:
|
||
agent.role = role
|
||
if skill_tags is not None:
|
||
agent.skill_tags = skill_tags
|
||
if max_load is not None:
|
||
agent.max_load = max_load
|
||
|
||
agent.updated_at = datetime.now()
|
||
db.add(agent)
|
||
|
||
logger.info(f"更新坐席: id={agent_id}, role={role}, skill_tags={skill_tags}, max_load={max_load}")
|
||
|
||
return {
|
||
"id": agent.id,
|
||
"role": agent.role,
|
||
"skill_tags": agent.skill_tags or [],
|
||
"max_load": agent.max_load,
|
||
}
|
||
|
||
|
||
async def delete_agent(db: AsyncSession, agent_id: str) -> None:
|
||
"""移除坐席。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
agent_id: 坐席ID
|
||
|
||
Raises:
|
||
AppException: 坐席不存在
|
||
"""
|
||
result = await db.execute(
|
||
select(Agent).where(Agent.id == agent_id)
|
||
)
|
||
agent = result.scalars().first()
|
||
|
||
if not agent:
|
||
raise AppException(3004, "坐席不存在")
|
||
|
||
await db.delete(agent)
|
||
logger.info(f"移除坐席: id={agent_id}, user_id={agent.user_id}")
|
||
|
||
|
||
# ==========================================================================
|
||
# 集成配置管理
|
||
# ==========================================================================
|
||
|
||
async def get_integrations(db: AsyncSession) -> List[IntegrationResponse]:
|
||
"""获取集成系统列表及配置状态。
|
||
|
||
从 system_configs 表读取 integration_ 前缀的配置。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
|
||
Returns:
|
||
List[IntegrationResponse]: 集成系统列表
|
||
"""
|
||
# 查询所有 integration_ 前缀的配置
|
||
result = await db.execute(
|
||
select(SystemConfig).where(
|
||
SystemConfig.config_key.startswith("integration_")
|
||
)
|
||
)
|
||
integ_configs = list(result.scalars().all())
|
||
|
||
# 构建 {prefix: {api_url: ..., api_key: ...}} 映射
|
||
config_map: Dict[str, Dict[str, str]] = {}
|
||
for cfg in integ_configs:
|
||
# integration_dify_api_url → 前缀 integration_dify_
|
||
# 找到对应的 key_prefix
|
||
for integ_def in INTEGRATION_DEFINITIONS:
|
||
prefix = integ_def.get("key_prefix")
|
||
if prefix and cfg.config_key.startswith(prefix):
|
||
if prefix not in config_map:
|
||
config_map[prefix] = {}
|
||
# 去掉前缀得到子键名(如 api_url, api_key)
|
||
sub_key = cfg.config_key[len(prefix):]
|
||
config_map[prefix][sub_key] = cfg.config_value
|
||
break
|
||
|
||
items: List[IntegrationResponse] = []
|
||
for integ_def in INTEGRATION_DEFINITIONS:
|
||
if integ_def["configurable"]:
|
||
prefix = integ_def["key_prefix"]
|
||
config_type = integ_def.get("config_type", "url_key")
|
||
cfg_data = config_map.get(prefix, {})
|
||
|
||
if config_type == "url_key":
|
||
# Dify / RAGFlow 模式:api_url + api_key
|
||
api_url = cfg_data.get("api_url", "")
|
||
api_key = cfg_data.get("api_key", "")
|
||
if api_url and api_key:
|
||
status = "connected"
|
||
elif api_url:
|
||
status = "partial"
|
||
else:
|
||
status = "disconnected"
|
||
items.append(
|
||
IntegrationResponse(
|
||
id=integ_def["id"],
|
||
name=integ_def["name"],
|
||
status=status,
|
||
configurable=True,
|
||
config_type="url_key",
|
||
config=IntegrationConfig(
|
||
api_url=api_url,
|
||
api_key_set=bool(api_key),
|
||
),
|
||
)
|
||
)
|
||
|
||
elif config_type == "access_key":
|
||
# 火绒模式:access_key_id + access_key_secret + base_url
|
||
access_key_id = cfg_data.get("access_key_id", "")
|
||
access_key_secret = cfg_data.get("access_key_secret", "")
|
||
base_url = cfg_data.get("base_url", "")
|
||
if access_key_id and access_key_secret and base_url:
|
||
status = "connected"
|
||
elif base_url:
|
||
status = "partial"
|
||
else:
|
||
status = "disconnected"
|
||
items.append(
|
||
IntegrationResponse(
|
||
id=integ_def["id"],
|
||
name=integ_def["name"],
|
||
status=status,
|
||
configurable=True,
|
||
config_type="access_key",
|
||
config=IntegrationConfig(
|
||
# url_key 模式字段(火绒不需要,但前端卡片复用展示)
|
||
api_url=base_url,
|
||
api_key_set=bool(access_key_id),
|
||
# access_key 模式专属字段
|
||
access_key_id_set=bool(access_key_id),
|
||
access_key_secret_set=bool(access_key_secret),
|
||
base_url=base_url or None,
|
||
),
|
||
)
|
||
)
|
||
|
||
elif config_type == "account_password":
|
||
# 联软模式:api_account + api_password + base_url + validate_key
|
||
api_account = cfg_data.get("api_account", "")
|
||
api_password = cfg_data.get("api_password", "")
|
||
base_url = cfg_data.get("base_url", "")
|
||
if api_account and api_password and base_url:
|
||
status = "connected"
|
||
elif base_url:
|
||
status = "partial"
|
||
else:
|
||
status = "disconnected"
|
||
items.append(
|
||
IntegrationResponse(
|
||
id=integ_def["id"],
|
||
name=integ_def["name"],
|
||
status=status,
|
||
configurable=True,
|
||
config_type="account_password",
|
||
config=IntegrationConfig(
|
||
# 复用字段(前端展示用)
|
||
api_url=base_url,
|
||
api_key_set=bool(api_account),
|
||
# account_password 模式专属字段
|
||
base_url=base_url or None,
|
||
api_account_set=bool(api_account),
|
||
api_password_set=bool(api_password),
|
||
),
|
||
)
|
||
)
|
||
|
||
else:
|
||
items.append(
|
||
IntegrationResponse(
|
||
id=integ_def["id"],
|
||
name=integ_def["name"],
|
||
status="disconnected",
|
||
configurable=False,
|
||
config=None,
|
||
)
|
||
)
|
||
|
||
return items
|
||
|
||
|
||
async def update_integration(
|
||
db: AsyncSession,
|
||
integration_id: str,
|
||
# url_key 模式(Dify / RAGFlow)
|
||
api_url: str = "",
|
||
api_key: str = "",
|
||
# access_key 模式(火绒安全)
|
||
access_key_id: str = "",
|
||
access_key_secret: str = "",
|
||
base_url: str = "",
|
||
# account_password 模式(联软LV7000)
|
||
api_account: str = "",
|
||
api_password: str = "",
|
||
validate_key: str = "",
|
||
agent_id: str = "",
|
||
) -> IntegrationResponse:
|
||
"""更新集成系统配置。
|
||
|
||
支持三种模式:
|
||
- url_key 模式(Dify / RAGFlow):传入 api_url + api_key
|
||
- access_key 模式(火绒安全):传入 access_key_id + access_key_secret + base_url
|
||
- account_password 模式(联软LV7000):传入 api_account + api_password + base_url + validate_key
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
integration_id: 集成系统ID(如 dify/ragflow/huorong/lianruan)
|
||
api_url: API 地址(url_key 模式)
|
||
api_key: API Key(url_key 模式)
|
||
access_key_id: AccessKey ID(access_key 模式)
|
||
access_key_secret: AccessKey Secret(access_key 模式)
|
||
base_url: 内网 Base URL(access_key/account_password 模式)
|
||
api_account: API账号(account_password 模式)
|
||
api_password: API密码(account_password 模式)
|
||
validate_key: 验证密钥(account_password 模式,可选)
|
||
agent_id: 操作人 agent_id
|
||
|
||
Returns:
|
||
IntegrationResponse: 更新后的集成系统信息
|
||
|
||
Raises:
|
||
AppException: 集成系统不存在或不可配置
|
||
"""
|
||
# 查找集成定义
|
||
integ_def = None
|
||
for d in INTEGRATION_DEFINITIONS:
|
||
if d["id"] == integration_id:
|
||
integ_def = d
|
||
break
|
||
|
||
if not integ_def:
|
||
raise AppException(1003, f"集成系统不存在: {integration_id}")
|
||
|
||
if not integ_def["configurable"]:
|
||
raise AppException(1001, f"集成系统不可配置: {integ_def['name']}")
|
||
|
||
config_type = integ_def.get("config_type", "url_key")
|
||
prefix = integ_def["key_prefix"]
|
||
|
||
# ---------- url_key 模式(Dify / RAGFlow)----------
|
||
if config_type == "url_key":
|
||
await _upsert_system_config(db, f"{prefix}api_url", api_url, f"{integ_def['name']} API 地址", agent_id)
|
||
await _upsert_system_config(db, f"{prefix}api_key", api_key, f"{integ_def['name']} API Key", agent_id)
|
||
|
||
if api_url and api_key:
|
||
status = "connected"
|
||
elif api_url:
|
||
status = "partial"
|
||
else:
|
||
status = "disconnected"
|
||
|
||
return IntegrationResponse(
|
||
id=integration_id,
|
||
name=integ_def["name"],
|
||
status=status,
|
||
configurable=True,
|
||
config_type="url_key",
|
||
config=IntegrationConfig(
|
||
api_url=api_url,
|
||
api_key_set=bool(api_key),
|
||
),
|
||
)
|
||
|
||
# ---------- access_key 模式(火绒安全)----------
|
||
elif config_type == "access_key":
|
||
await _upsert_system_config(db, f"{prefix}access_key_id", access_key_id, f"{integ_def['name']} AccessKey ID", agent_id)
|
||
await _upsert_system_config(db, f"{prefix}access_key_secret", access_key_secret, f"{integ_def['name']} AccessKey Secret", agent_id)
|
||
await _upsert_system_config(db, f"{prefix}base_url", base_url, f"{integ_def['name']} Base URL", agent_id)
|
||
|
||
if access_key_id and access_key_secret and base_url:
|
||
status = "connected"
|
||
elif base_url:
|
||
status = "partial"
|
||
else:
|
||
status = "disconnected"
|
||
|
||
return IntegrationResponse(
|
||
id=integration_id,
|
||
name=integ_def["name"],
|
||
status=status,
|
||
configurable=True,
|
||
config_type="access_key",
|
||
config=IntegrationConfig(
|
||
# 前端复用字段(url_key 模式展示用)
|
||
api_url=base_url,
|
||
api_key_set=bool(access_key_id),
|
||
# access_key 模式专属字段
|
||
access_key_id_set=bool(access_key_id),
|
||
access_key_secret_set=bool(access_key_secret),
|
||
base_url=base_url or None,
|
||
),
|
||
)
|
||
|
||
# ---------- account_password 模式(联软LV7000)----------
|
||
elif config_type == "account_password":
|
||
await _upsert_system_config(db, f"{prefix}api_account", api_account, f"{integ_def['name']} API账号", agent_id)
|
||
await _upsert_system_config(db, f"{prefix}api_password", api_password, f"{integ_def['name']} API密码", agent_id)
|
||
await _upsert_system_config(db, f"{prefix}base_url", base_url, f"{integ_def['name']} Base URL", agent_id)
|
||
await _upsert_system_config(db, f"{prefix}validate_key", validate_key, f"{integ_def['name']} 验证密钥", agent_id)
|
||
|
||
if api_account and api_password and base_url:
|
||
status = "connected"
|
||
elif base_url:
|
||
status = "partial"
|
||
else:
|
||
status = "disconnected"
|
||
|
||
return IntegrationResponse(
|
||
id=integration_id,
|
||
name=integ_def["name"],
|
||
status=status,
|
||
configurable=True,
|
||
config_type="account_password",
|
||
config=IntegrationConfig(
|
||
api_url=base_url,
|
||
api_key_set=bool(api_account),
|
||
base_url=base_url or None,
|
||
api_account_set=bool(api_account),
|
||
api_password_set=bool(api_password),
|
||
),
|
||
)
|
||
|
||
raise AppException(1001, f"未知配置类型: {config_type}")
|
||
|
||
|
||
async def _get_config_value(db: AsyncSession, key: str) -> str:
|
||
"""快速读取单个配置值。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
key: 配置键
|
||
|
||
Returns:
|
||
str: 配置值,不存在返回空字符串
|
||
"""
|
||
result = await db.execute(
|
||
select(SystemConfig.config_value).where(
|
||
SystemConfig.config_key == key
|
||
)
|
||
)
|
||
row = result.scalar()
|
||
return row if row else ""
|
||
|
||
|
||
async def _upsert_system_config(
|
||
db: AsyncSession,
|
||
key: str,
|
||
value: str,
|
||
description: str,
|
||
agent_id: str,
|
||
) -> None:
|
||
"""插入或更新 system_configs 记录,并记录变更日志。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
key: 配置键
|
||
value: 配置值
|
||
description: 配置说明
|
||
agent_id: 操作人
|
||
"""
|
||
result = await db.execute(
|
||
select(SystemConfig).where(SystemConfig.config_key == key)
|
||
)
|
||
config = result.scalars().first()
|
||
|
||
if config:
|
||
old_value = config.config_value
|
||
config.config_value = value
|
||
config.updated_at = datetime.now()
|
||
db.add(config)
|
||
|
||
# 记录变更日志
|
||
change_log = ConfigChangeLog(
|
||
config_key=key,
|
||
old_value=old_value,
|
||
new_value=value,
|
||
changed_by=agent_id,
|
||
)
|
||
db.add(change_log)
|
||
else:
|
||
new_config = SystemConfig(
|
||
config_key=key,
|
||
config_value=value,
|
||
description=description,
|
||
)
|
||
db.add(new_config)
|
||
|
||
# 记录新增日志
|
||
change_log = ConfigChangeLog(
|
||
config_key=key,
|
||
old_value="",
|
||
new_value=value,
|
||
changed_by=agent_id,
|
||
)
|
||
db.add(change_log)
|
||
|
||
|
||
# ==========================================================================
|
||
# 快速回复审核
|
||
# ==========================================================================
|
||
|
||
async def list_pending_quick_replies(
|
||
db: AsyncSession,
|
||
category: Optional[str] = None,
|
||
) -> List[AdminQuickReplyResponse]:
|
||
"""获取待审核快速回复模板列表。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
category: 按分类筛选(可选)
|
||
|
||
Returns:
|
||
List[AdminQuickReplyResponse]: 待审核模板列表
|
||
"""
|
||
stmt = (
|
||
select(QuickReplyTemplate)
|
||
.where(QuickReplyTemplate.status == "pending_review")
|
||
.order_by(QuickReplyTemplate.updated_at.desc())
|
||
)
|
||
if category:
|
||
stmt = stmt.where(QuickReplyTemplate.category == category)
|
||
|
||
result = await db.execute(stmt)
|
||
templates = list(result.scalars().all())
|
||
|
||
# 批量查询提交人姓名
|
||
submitted_by_ids = list({t.submitted_by for t in templates if t.submitted_by})
|
||
agent_names: Dict[str, str] = {}
|
||
if submitted_by_ids:
|
||
agents_result = await db.execute(
|
||
select(Agent.id, Agent.name).where(Agent.id.in_(submitted_by_ids))
|
||
)
|
||
agent_names = dict(agents_result.all())
|
||
|
||
items = []
|
||
for t in templates:
|
||
items.append(
|
||
AdminQuickReplyResponse(
|
||
id=t.id,
|
||
category=t.category,
|
||
title=t.title,
|
||
content=t.content,
|
||
variables=t.variables or [],
|
||
status=t.status,
|
||
version=t.version,
|
||
submitted_by=t.submitted_by,
|
||
submitted_by_name=agent_names.get(t.submitted_by or "", ""),
|
||
sort_order=t.sort_order,
|
||
created_at=t.created_at,
|
||
updated_at=t.updated_at,
|
||
)
|
||
)
|
||
|
||
return items
|
||
|
||
|
||
async def review_quick_reply(
|
||
db: AsyncSession,
|
||
template_id: str,
|
||
action: str,
|
||
reason: str,
|
||
agent_id: str,
|
||
) -> Dict[str, Any]:
|
||
"""审核快速回复模板(通过/驳回)。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
template_id: 模板ID
|
||
action: 审核动作(approve/reject)
|
||
reason: 审核原因
|
||
agent_id: 审核人 agent_id
|
||
|
||
Returns:
|
||
Dict: 包含 id, status, version
|
||
|
||
Raises:
|
||
AppException: 模板不存在或动作非法
|
||
"""
|
||
if action not in ("approve", "reject"):
|
||
raise AppException(1001, "审核动作只能是 approve 或 reject")
|
||
|
||
result = await db.execute(
|
||
select(QuickReplyTemplate).where(QuickReplyTemplate.id == template_id)
|
||
)
|
||
template = result.scalars().first()
|
||
|
||
if not template:
|
||
raise ERR_NOT_FOUND
|
||
|
||
# 校验模板状态:仅允许审核 pending_review 状态的模板
|
||
if template.status != "pending_review":
|
||
raise AppException(
|
||
1001,
|
||
f"当前模板状态为 {template.status},仅 pending_review 状态可审核"
|
||
)
|
||
|
||
if action == "approve":
|
||
template.status = "approved"
|
||
template.version = (template.version or 1) + 1
|
||
else:
|
||
template.status = "rejected"
|
||
|
||
template.updated_at = datetime.now()
|
||
db.add(template)
|
||
|
||
logger.info(
|
||
f"快速回复审核: id={template_id}, action={action}, "
|
||
f"by={agent_id}, reason={reason}"
|
||
)
|
||
|
||
return {
|
||
"id": template.id,
|
||
"status": template.status,
|
||
"version": template.version,
|
||
}
|
||
|
||
|
||
# ==========================================================================
|
||
# 分配模式
|
||
# ==========================================================================
|
||
|
||
async def get_assignment_mode(db: AsyncSession) -> AssignmentModeResponse:
|
||
"""获取当前分配模式。
|
||
|
||
从 system_configs 表读取 assign_mode 配置。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
|
||
Returns:
|
||
AssignmentModeResponse: 分配模式信息
|
||
"""
|
||
# 读取当前分配模式
|
||
result = await db.execute(
|
||
select(SystemConfig).where(SystemConfig.config_key == "assign_mode")
|
||
)
|
||
config = result.scalars().first()
|
||
current_mode = config.config_value if config else "manual"
|
||
|
||
# 构建模式列表
|
||
modes = [
|
||
AssignmentModeItem(
|
||
id=m["id"],
|
||
name=m["name"],
|
||
enabled=(m["id"] == current_mode),
|
||
locked=m["locked"],
|
||
unlock_at=m["unlock_at"],
|
||
)
|
||
for m in ASSIGNMENT_MODES
|
||
]
|
||
|
||
return AssignmentModeResponse(
|
||
current_mode=current_mode,
|
||
modes=modes,
|
||
)
|
||
|
||
|
||
async def update_assignment_mode(
|
||
db: AsyncSession,
|
||
mode: str,
|
||
agent_id: str,
|
||
) -> AssignmentModeResponse:
|
||
"""切换分配模式(阶段一仅允许手动接单)。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
mode: 分配模式ID
|
||
agent_id: 操作人 agent_id
|
||
|
||
Returns:
|
||
AssignmentModeResponse: 更新后的分配模式信息
|
||
|
||
Raises:
|
||
AppException: 模式不存在或已锁定
|
||
"""
|
||
# 验证模式是否合法
|
||
mode_def = None
|
||
for m in ASSIGNMENT_MODES:
|
||
if m["id"] == mode:
|
||
mode_def = m
|
||
break
|
||
|
||
if not mode_def:
|
||
raise AppException(1001, f"无效的分配模式: {mode}")
|
||
|
||
if mode_def["locked"]:
|
||
raise AppException(1001, f"分配模式已锁定,将在{mode_def['unlock_at']}解锁")
|
||
|
||
# 更新或创建 assign_mode 配置
|
||
await _upsert_system_config(db, "assign_mode", mode, "消息分配模式", agent_id)
|
||
|
||
logger.info(f"切换分配模式: mode={mode}, by={agent_id}")
|
||
|
||
return await get_assignment_mode(db)
|
||
|
||
|
||
# ==========================================================================
|
||
# 会话监控
|
||
# ==========================================================================
|
||
|
||
async def get_monitor_sessions(
|
||
db: AsyncSession,
|
||
status: Optional[str] = None,
|
||
) -> MonitorSessionsResponse:
|
||
"""获取实时会话列表(Demo预览)。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
status: 按状态筛选(可选,默认非 resolved)
|
||
|
||
Returns:
|
||
MonitorSessionsResponse: 会话监控数据
|
||
"""
|
||
today_start = datetime.combine(date.today(), datetime.min.time())
|
||
|
||
# 统计数据
|
||
in_progress_result = await db.execute(
|
||
select(func.count(Conversation.id)).where(
|
||
Conversation.status == "serving"
|
||
)
|
||
)
|
||
in_progress = in_progress_result.scalar() or 0
|
||
|
||
queued_result = await db.execute(
|
||
select(func.count(Conversation.id)).where(
|
||
Conversation.status == "queued"
|
||
)
|
||
)
|
||
queued = queued_result.scalar() or 0
|
||
|
||
resolved_today_result = await db.execute(
|
||
select(func.count(Conversation.id)).where(
|
||
Conversation.status == "resolved",
|
||
Conversation.updated_at >= today_start,
|
||
)
|
||
)
|
||
resolved_today = resolved_today_result.scalar() or 0
|
||
|
||
stats = SessionStats(
|
||
in_progress=in_progress,
|
||
queued=queued,
|
||
resolved_today=resolved_today,
|
||
alerts=0,
|
||
)
|
||
|
||
# 会话列表(默认查询非 resolved 的会话)
|
||
stmt = select(Conversation).order_by(Conversation.created_at.desc())
|
||
if status:
|
||
stmt = stmt.where(Conversation.status == status)
|
||
else:
|
||
stmt = stmt.where(Conversation.status != "resolved")
|
||
|
||
result = await db.execute(stmt.limit(50))
|
||
conversations = list(result.scalars().all())
|
||
|
||
# 批量查询坐席姓名
|
||
agent_ids = list({c.assigned_agent_id for c in conversations if c.assigned_agent_id})
|
||
agent_names: Dict[str, str] = {}
|
||
if agent_ids:
|
||
agents_result = await db.execute(
|
||
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
|
||
)
|
||
agent_names = dict(agents_result.all())
|
||
|
||
items = []
|
||
for c in conversations:
|
||
items.append(
|
||
SessionItem(
|
||
id=c.id,
|
||
employee_name=c.employee_name,
|
||
status=c.status,
|
||
assigned_agent_name=agent_names.get(c.assigned_agent_id or "", ""),
|
||
urgency_score=c.urgency_score,
|
||
created_at=c.created_at,
|
||
last_message_summary=c.last_message_summary,
|
||
)
|
||
)
|
||
|
||
return MonitorSessionsResponse(stats=stats, items=items)
|
||
|
||
|
||
# ==========================================================================
|
||
# 全局搜索
|
||
# ==========================================================================
|
||
|
||
async def global_search(
|
||
db: AsyncSession,
|
||
query: str,
|
||
) -> List[SearchItem]:
|
||
"""全局搜索配置项、坐席、快速回复。
|
||
|
||
按类型优先级排序:配置项 > 坐席 > 快速回复,同类型按名称排序。
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
query: 搜索关键词
|
||
|
||
Returns:
|
||
List[SearchItem]: 搜索结果列表
|
||
"""
|
||
items: List[SearchItem] = []
|
||
keyword = f"%{query}%"
|
||
|
||
# 搜索配置项
|
||
config_result = await db.execute(
|
||
select(SystemConfig).where(
|
||
or_(
|
||
SystemConfig.config_key.ilike(keyword),
|
||
SystemConfig.description.ilike(keyword),
|
||
)
|
||
).limit(10)
|
||
)
|
||
for cfg in config_result.scalars().all():
|
||
items.append(
|
||
SearchItem(
|
||
type="config",
|
||
id=cfg.config_key,
|
||
name=cfg.description or cfg.config_key,
|
||
route="/admin/configs",
|
||
)
|
||
)
|
||
|
||
# 搜索坐席
|
||
agent_result = await db.execute(
|
||
select(Agent).where(
|
||
or_(
|
||
Agent.name.ilike(keyword),
|
||
Agent.user_id.ilike(keyword),
|
||
)
|
||
).limit(10)
|
||
)
|
||
for a in agent_result.scalars().all():
|
||
items.append(
|
||
SearchItem(
|
||
type="agent",
|
||
id=a.id,
|
||
name=a.name,
|
||
route="/admin/agents",
|
||
)
|
||
)
|
||
|
||
# 搜索快速回复
|
||
qr_result = await db.execute(
|
||
select(QuickReplyTemplate).where(
|
||
or_(
|
||
QuickReplyTemplate.title.ilike(keyword),
|
||
QuickReplyTemplate.content.ilike(keyword),
|
||
QuickReplyTemplate.category.ilike(keyword),
|
||
)
|
||
).limit(10)
|
||
)
|
||
for qr in qr_result.scalars().all():
|
||
items.append(
|
||
SearchItem(
|
||
type="quick_reply",
|
||
id=qr.id,
|
||
name=qr.title,
|
||
route="/admin/quick-replies",
|
||
)
|
||
)
|
||
|
||
return items
|
||
|
||
|
||
# ==========================================================================
|
||
# P2: 会话审计
|
||
# ==========================================================================
|
||
|
||
async def list_audit_conversations(
|
||
db: AsyncSession,
|
||
status: Optional[str] = None,
|
||
agent_id: Optional[str] = None,
|
||
keyword: Optional[str] = None,
|
||
date_from: Optional[str] = None,
|
||
date_to: Optional[str] = None,
|
||
page: int = 1,
|
||
page_size: int = 20,
|
||
) -> Dict[str, Any]:
|
||
"""获取会话审计列表(支持分页+多条件筛选)。"""
|
||
stmt = select(Conversation)
|
||
count_stmt = select(func.count(Conversation.id))
|
||
|
||
filters = []
|
||
if status:
|
||
filters.append(Conversation.status == status)
|
||
if agent_id:
|
||
filters.append(Conversation.assigned_agent_id == agent_id)
|
||
if keyword:
|
||
like_pattern = f"%{keyword}%"
|
||
filters.append(
|
||
or_(
|
||
Conversation.employee_name.ilike(like_pattern),
|
||
Conversation.last_message_summary.ilike(like_pattern),
|
||
)
|
||
)
|
||
if date_from:
|
||
try:
|
||
dt_from = datetime.strptime(date_from, "%Y-%m-%d")
|
||
filters.append(Conversation.created_at >= dt_from)
|
||
except ValueError:
|
||
pass
|
||
if date_to:
|
||
try:
|
||
dt_to = datetime.strptime(date_to, "%Y-%m-%d") + timedelta(days=1)
|
||
filters.append(Conversation.created_at < dt_to)
|
||
except ValueError:
|
||
pass
|
||
|
||
if filters:
|
||
stmt = stmt.where(and_(*filters))
|
||
count_stmt = count_stmt.where(and_(*filters))
|
||
|
||
total_result = await db.execute(count_stmt)
|
||
total = total_result.scalar() or 0
|
||
|
||
offset = (page - 1) * page_size
|
||
stmt = stmt.order_by(Conversation.created_at.desc()).offset(offset).limit(page_size)
|
||
result = await db.execute(stmt)
|
||
conversations = list(result.scalars().all())
|
||
|
||
agent_ids = list({c.assigned_agent_id for c in conversations if c.assigned_agent_id})
|
||
agent_names: Dict[str, str] = {}
|
||
if agent_ids:
|
||
agents_result = await db.execute(
|
||
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
|
||
)
|
||
agent_names = dict(agents_result.all())
|
||
|
||
items = []
|
||
for c in conversations:
|
||
items.append({
|
||
"id": c.id,
|
||
"employee_name": c.employee_name or "",
|
||
"department": c.department or "",
|
||
"status": c.status,
|
||
"assigned_agent_name": agent_names.get(c.assigned_agent_id or "", ""),
|
||
"urgency_score": c.urgency_score or 1,
|
||
"created_at": c.created_at.isoformat() if c.created_at else "",
|
||
"updated_at": c.updated_at.isoformat() if c.updated_at else "",
|
||
"last_message_summary": c.last_message_summary or "",
|
||
})
|
||
|
||
return {"items": items, "total": total, "page": page, "page_size": page_size}
|
||
|
||
|
||
async def get_audit_conversation_detail(
|
||
db: AsyncSession,
|
||
conversation_id: str,
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""获取会话审计详情(含消息列表)。"""
|
||
result = await db.execute(
|
||
select(Conversation).where(Conversation.id == conversation_id)
|
||
)
|
||
conv = result.scalars().first()
|
||
if not conv:
|
||
return None
|
||
|
||
msgs_result = await db.execute(
|
||
select(Message)
|
||
.where(Message.conversation_id == conversation_id)
|
||
.order_by(Message.created_at.asc())
|
||
.limit(200)
|
||
)
|
||
messages = list(msgs_result.scalars().all())
|
||
|
||
agent_name = ""
|
||
if conv.assigned_agent_id:
|
||
agent_result = await db.execute(
|
||
select(Agent.name).where(Agent.id == conv.assigned_agent_id)
|
||
)
|
||
agent_name = agent_result.scalar() or ""
|
||
|
||
return {
|
||
"id": conv.id,
|
||
"employee_name": conv.employee_name or "",
|
||
"employee_id": conv.employee_id or "",
|
||
"department": conv.department or "",
|
||
"position": conv.position or "",
|
||
"status": conv.status,
|
||
"assigned_agent_name": agent_name,
|
||
"urgency_score": conv.urgency_score or 1,
|
||
"tags": conv.tags or [],
|
||
"created_at": conv.created_at.isoformat() if conv.created_at else "",
|
||
"updated_at": conv.updated_at.isoformat() if conv.updated_at else "",
|
||
"last_message_summary": conv.last_message_summary or "",
|
||
"messages": [
|
||
{
|
||
"id": m.id,
|
||
"sender_type": m.sender_type,
|
||
"sender_name": m.sender_name or "",
|
||
"content": m.content or "",
|
||
"msg_type": m.msg_type or "text",
|
||
"created_at": m.created_at.isoformat() if m.created_at else "",
|
||
}
|
||
for m in messages
|
||
],
|
||
}
|
||
|
||
|
||
# ==========================================================================
|
||
# P2: 坐席绩效统计
|
||
# ==========================================================================
|
||
|
||
async def get_agent_performance(
|
||
db: AsyncSession,
|
||
date_from: Optional[str] = None,
|
||
date_to: Optional[str] = None,
|
||
) -> List[Dict[str, Any]]:
|
||
"""获取坐席绩效统计。"""
|
||
today = date.today()
|
||
if date_from:
|
||
try:
|
||
dt_from = datetime.strptime(date_from, "%Y-%m-%d")
|
||
except ValueError:
|
||
dt_from = datetime(today.year, today.month, 1)
|
||
else:
|
||
dt_from = datetime(today.year, today.month, 1)
|
||
|
||
if date_to:
|
||
try:
|
||
dt_to = datetime.strptime(date_to, "%Y-%m-%d") + timedelta(days=1)
|
||
except ValueError:
|
||
dt_to = datetime.now()
|
||
else:
|
||
dt_to = datetime.now()
|
||
|
||
agents_result = await db.execute(select(Agent).order_by(Agent.name))
|
||
agents = list(agents_result.scalars().all())
|
||
if not agents:
|
||
return []
|
||
|
||
agent_ids = [a.id for a in agents]
|
||
|
||
total_conv_result = await db.execute(
|
||
select(Conversation.assigned_agent_id, func.count(Conversation.id))
|
||
.where(
|
||
Conversation.assigned_agent_id.in_(agent_ids),
|
||
Conversation.created_at >= dt_from,
|
||
Conversation.created_at < dt_to,
|
||
)
|
||
.group_by(Conversation.assigned_agent_id)
|
||
)
|
||
total_conv_map = dict(total_conv_result.all())
|
||
|
||
resolved_result = await db.execute(
|
||
select(Conversation.assigned_agent_id, func.count(Conversation.id))
|
||
.where(
|
||
Conversation.assigned_agent_id.in_(agent_ids),
|
||
Conversation.status == "resolved",
|
||
Conversation.created_at >= dt_from,
|
||
Conversation.created_at < dt_to,
|
||
)
|
||
.group_by(Conversation.assigned_agent_id)
|
||
)
|
||
resolved_map = dict(resolved_result.all())
|
||
|
||
today_start = datetime.combine(today, datetime.min.time())
|
||
today_conv_result = await db.execute(
|
||
select(Conversation.assigned_agent_id, func.count(Conversation.id))
|
||
.where(
|
||
Conversation.assigned_agent_id.in_(agent_ids),
|
||
Conversation.created_at >= today_start,
|
||
)
|
||
.group_by(Conversation.assigned_agent_id)
|
||
)
|
||
today_conv_map = dict(today_conv_result.all())
|
||
|
||
items = []
|
||
for a in agents:
|
||
total = total_conv_map.get(a.id, 0)
|
||
resolved = resolved_map.get(a.id, 0)
|
||
today_count = today_conv_map.get(a.id, 0)
|
||
resolution_rate = f"{(resolved / total * 100):.0f}%" if total > 0 else "—"
|
||
|
||
items.append({
|
||
"id": a.id,
|
||
"user_id": a.user_id,
|
||
"name": a.name,
|
||
"status": a.status,
|
||
"role": a.role,
|
||
"skill_tags": a.skill_tags or [],
|
||
"current_load": a.current_load or 0,
|
||
"max_load": a.max_load or 5,
|
||
"total_conversations": total,
|
||
"resolved_conversations": resolved,
|
||
"today_conversations": today_count,
|
||
"resolution_rate": resolution_rate,
|
||
})
|
||
|
||
return items
|
||
|
||
|
||
# ==========================================================================
|
||
# P2: 系统日志
|
||
# ==========================================================================
|
||
|
||
async def get_system_logs(
|
||
db: AsyncSession,
|
||
page: int = 1,
|
||
page_size: int = 50,
|
||
) -> Dict[str, Any]:
|
||
"""获取系统日志(配置变更日志)。"""
|
||
count_result = await db.execute(select(func.count(ConfigChangeLog.id)))
|
||
total = count_result.scalar() or 0
|
||
|
||
offset = (page - 1) * page_size
|
||
result = await db.execute(
|
||
select(ConfigChangeLog)
|
||
.order_by(ConfigChangeLog.changed_at.desc())
|
||
.offset(offset)
|
||
.limit(page_size)
|
||
)
|
||
logs = list(result.scalars().all())
|
||
|
||
agent_ids = list({log.changed_by for log in logs if log.changed_by})
|
||
agent_names: Dict[str, str] = {}
|
||
if agent_ids:
|
||
agents_result = await db.execute(
|
||
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
|
||
)
|
||
agent_names = dict(agents_result.all())
|
||
|
||
items = []
|
||
for log in logs:
|
||
items.append({
|
||
"id": log.id,
|
||
"log_type": "config_change",
|
||
"config_key": log.config_key,
|
||
"old_value": log.old_value or "",
|
||
"new_value": log.new_value or "",
|
||
"changed_by": log.changed_by or "",
|
||
"changed_by_name": agent_names.get(log.changed_by or "", ""),
|
||
"changed_at": log.changed_at.isoformat() if log.changed_at else "",
|
||
})
|
||
|
||
return {"items": items, "total": total, "page": page, "page_size": page_size}
|