Files

1729 lines
55 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 管理后台业务逻辑层
# =============================================================================
# 说明:管理后台的核心业务逻辑,包括:
# 1. 仪表盘数据聚合
# 2. 配置项分组读取与更新(含变更日志)
# 3. 坐席 CRUD 管理
# 4. 外部系统集成配置
# 5. 快速回复审核
# 6. 分配模式管理
# 7. 会话监控
# 8. 全局搜索
# =============================================================================
import json
import logging
from datetime import datetime, date, timedelta
from typing import Any, Dict, List, Optional
from sqlalchemy import func, select, or_, and_, case, literal_column
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.agent import Agent
from app.models.config_change_log import ConfigChangeLog
from app.models.conversation import Conversation
from app.models.message import Message
from app.models.quick_reply_template import QuickReplyTemplate
from app.models.system_config import SystemConfig
from app.schemas.admin import (
AdminAgentResponse,
AdminQuickReplyResponse,
AssignmentModeItem,
AssignmentModeResponse,
ConfigGroupResponse,
ConfigHistoryItem,
ConfigItemResponse,
DashboardOverviewResponse,
IntegrationConfig,
IntegrationHealthItem,
IntegrationResponse,
MonitorSessionsResponse,
SearchItem,
SessionItem,
SessionStats,
SystemAlertItem,
)
from app.utils.response import AppException, ERR_NOT_FOUND, ERR_PARAMS
logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------
# 配置分组映射(前缀 → 分组名称)
# --------------------------------------------------------------------------
CONFIG_GROUP_MAP: Dict[str, str] = {
"ai_": "AI 对话引擎",
"emergency_": "应急模式",
"assign_": "消息分配",
"polling_": "轮询配置",
"emotion_": "情绪检测",
"integration_": "外部集成",
"queue_": "排队策略",
"satisfaction_": "满意度评价",
"invite_": "邀请功能",
"notification_": "通知推送",
"security_": "安全策略",
}
# --------------------------------------------------------------------------
# 集成系统定义(硬编码,阶段一不增加表)
# --------------------------------------------------------------------------
INTEGRATION_DEFINITIONS = [
{
"id": "dify",
"name": "Dify AI",
"key_prefix": "integration_dify_",
"configurable": True,
"config_type": "url_key", # api_url + api_key
},
{
"id": "ragflow",
"name": "RAGFlow",
"key_prefix": "integration_ragflow_",
"configurable": True,
"config_type": "url_key", # api_url + api_key
},
{
"id": "huorong",
"name": "火绒安全",
"key_prefix": "integration_huorong_",
"configurable": True,
"config_type": "access_key", # access_key_id + access_key_secret + base_url
},
{
"id": "lianruan",
"name": "联软LV7000",
"key_prefix": "integration_lianruan_",
"configurable": True,
"config_type": "account_password", # api_account + api_password + base_url + validate_key
},
{
"id": "data_platform",
"name": "数据平台",
"key_prefix": None,
"configurable": False,
"config_type": None,
},
{
"id": "beisen",
"name": "北森 eHR",
"key_prefix": None,
"configurable": False,
"config_type": None,
},
]
# --------------------------------------------------------------------------
# 分配模式定义(硬编码,阶段一仅手动接单可用)
# --------------------------------------------------------------------------
ASSIGNMENT_MODES = [
{"id": "manual", "name": "手动接单", "locked": False, "unlock_at": ""},
{"id": "round_robin", "name": "轮询分配", "locked": True, "unlock_at": "阶段二"},
{"id": "least_active", "name": "最少活跃优先", "locked": True, "unlock_at": "阶段二"},
{"id": "weighted", "name": "加权比例分配", "locked": True, "unlock_at": "阶段三"},
{"id": "skill_match", "name": "技能匹配分配", "locked": True, "unlock_at": "阶段三"},
{"id": "priority_queue", "name": "优先队列", "locked": True, "unlock_at": "阶段三"},
]
# ==========================================================================
# 仪表盘
# ==========================================================================
async def get_dashboard_overview(db: AsyncSession) -> DashboardOverviewResponse:
"""获取仪表盘统计数据。
聚合查询在线坐席数、今日会话数、待审核数、集成健康状态等。
Args:
db: 数据库会话
Returns:
DashboardOverviewResponse: 仪表盘统计数据
"""
# 在线坐席数
online_count_result = await db.execute(
select(func.count(Agent.id)).where(Agent.status == "online")
)
online_agents = online_count_result.scalar() or 0
# 今日会话数(今天创建的所有会话)
today_start = datetime.combine(date.today(), datetime.min.time())
today_conv_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.created_at >= today_start
)
)
today_conversations = today_conv_result.scalar() or 0
# 待审核快速回复数
pending_result = await db.execute(
select(func.count(QuickReplyTemplate.id)).where(
QuickReplyTemplate.status == "pending_review"
)
)
pending_reviews = pending_result.scalar() or 0
# 系统告警 — 阶段一仅基于待审核快速回复生成告警,后续阶段接入更多告警源
system_alerts: List[SystemAlertItem] = []
if pending_reviews > 0:
# 查询待审核模板,用于告警详情
pending_templates_result = await db.execute(
select(QuickReplyTemplate)
.where(QuickReplyTemplate.status == "pending_review")
.order_by(QuickReplyTemplate.updated_at.desc())
.limit(5) # 最多展示5条告警
)
pending_templates = list(pending_templates_result.scalars().all())
for t in pending_templates:
system_alerts.append(
SystemAlertItem(
type="quick_reply_pending",
content=f"快速回复待审核:{t.content[:50]}{'...' if len(t.content) > 50 else ''}",
submitter=t.submitted_by or None,
time=t.updated_at.isoformat() if t.updated_at else "",
severity="warning",
)
)
# 集成系统健康状态(通用检查:按config_type判断连接状态)
integrations_health: List[IntegrationHealthItem] = []
for integ_def in INTEGRATION_DEFINITIONS:
if integ_def["configurable"] and integ_def["key_prefix"]:
prefix = integ_def["key_prefix"]
config_type = integ_def.get("config_type", "url_key")
status = "disconnected"
if config_type == "url_key":
# Dify/RAGFlow: 检查 api_url + api_key
au = await _get_config_value(db, f"{prefix}api_url")
ak = await _get_config_value(db, f"{prefix}api_key")
if au and ak:
status = "connected"
elif au:
status = "partial"
elif config_type == "access_key":
# 火绒: 检查 access_key_id + access_key_secret + base_url
aki = await _get_config_value(db, f"{prefix}access_key_id")
aks = await _get_config_value(db, f"{prefix}access_key_secret")
bu = await _get_config_value(db, f"{prefix}base_url")
if aki and aks and bu:
status = "connected"
elif bu:
status = "partial"
elif config_type == "account_password":
# 联软: 检查 api_account + api_password + base_url
aa = await _get_config_value(db, f"{prefix}api_account")
ap = await _get_config_value(db, f"{prefix}api_password")
bu = await _get_config_value(db, f"{prefix}base_url")
if aa and ap and bu:
status = "connected"
elif bu:
status = "partial"
integrations_health.append(
IntegrationHealthItem(system=integ_def["name"], status=status)
)
else:
integrations_health.append(
IntegrationHealthItem(system=integ_def["name"], status="disconnected")
)
# 平均响应时间(首次人工回复距首条员工消息的时间差)
avg_response_time_str = ""
try:
# 取今日已结单或服务中的会话,计算平均首次响应时间
conv_ids_result = await db.execute(
select(Conversation.id).where(
Conversation.created_at >= today_start,
Conversation.assigned_agent_id.isnot(None),
)
)
conv_ids = [row[0] for row in conv_ids_result.all()]
if conv_ids:
response_times = []
for cid in conv_ids[:50]: # 最多统计50个会话,避免性能问题
# 找该会话首条员工消息
first_emp_msg = await db.execute(
select(Message.created_at).where(
Message.conversation_id == cid,
Message.sender_type == "employee",
).order_by(Message.created_at.asc()).limit(1)
)
first_emp_time = first_emp_msg.scalar()
# 找该会话首条坐席/AI回复
first_reply = await db.execute(
select(Message.created_at).where(
Message.conversation_id == cid,
Message.sender_type.in_(["agent", "ai"]),
).order_by(Message.created_at.asc()).limit(1)
)
first_reply_time = first_reply.scalar()
if first_emp_time and first_reply_time:
delta = (first_reply_time - first_emp_time).total_seconds()
if 0 < delta < 3600: # 合理范围内(1小时内)
response_times.append(delta)
if response_times:
avg_seconds = sum(response_times) / len(response_times)
if avg_seconds < 60:
avg_response_time_str = f"{avg_seconds:.0f}"
else:
avg_response_time_str = f"{avg_seconds / 60:.1f}分钟"
except Exception as e:
logger.warning(f"计算平均响应时间失败: {e}")
# AI 命中率(有AI实质性回复的会话占比)
ai_hit_rate_str = ""
try:
total_conv_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.created_at >= today_start
)
)
total_conv = total_conv_result.scalar() or 0
if total_conv > 0:
ai_conv_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.created_at >= today_start,
Conversation.ai_substantive_reply_count > 0,
)
)
ai_conv = ai_conv_result.scalar() or 0
ai_hit_rate_str = f"{(ai_conv / total_conv) * 100:.0f}%"
except Exception as e:
logger.warning(f"计算AI命中率失败: {e}")
return DashboardOverviewResponse(
online_agents=online_agents,
today_conversations=today_conversations,
avg_response_time=avg_response_time_str,
ai_hit_rate=ai_hit_rate_str,
pending_reviews=pending_reviews,
system_alerts=system_alerts,
integrations_health=integrations_health,
)
# ==========================================================================
# 配置管理
# ==========================================================================
async def get_config_groups(db: AsyncSession) -> List[ConfigGroupResponse]:
"""获取全部配置项(按功能分组)。
从 system_configs 表读取所有配置,按前缀分组返回。
Args:
db: 数据库会话
Returns:
List[ConfigGroupResponse]: 配置分组列表
"""
# 查询所有非 integration_ 前缀的配置项
result = await db.execute(
select(SystemConfig).order_by(SystemConfig.config_key)
)
all_configs = list(result.scalars().all())
# 按 key 前缀分组(排除 integration_ 前缀)
groups_dict: Dict[str, List[ConfigItemResponse]] = {}
other_items: List[ConfigItemResponse] = []
for cfg in all_configs:
# 跳过 integration_ 前缀的配置(在集成管理中单独展示)
if cfg.config_key.startswith("integration_"):
continue
# 推断值类型
value_type = _infer_value_type(cfg.config_value)
item = ConfigItemResponse(
key=cfg.config_key,
value=cfg.config_value,
description=cfg.description or "",
value_type=value_type,
)
# 查找匹配的前缀分组
matched = False
for prefix, group_name in CONFIG_GROUP_MAP.items():
if cfg.config_key.startswith(prefix):
if group_name not in groups_dict:
groups_dict[group_name] = []
groups_dict[group_name].append(item)
matched = True
break
if not matched:
other_items.append(item)
# 构建分组响应
groups: List[ConfigGroupResponse] = []
for prefix, group_name in CONFIG_GROUP_MAP.items():
if group_name in groups_dict:
groups.append(
ConfigGroupResponse(
name=group_name,
key_prefix=prefix,
items=groups_dict[group_name],
)
)
# 未匹配前缀的配置项放入"其他"分组
if other_items:
groups.append(
ConfigGroupResponse(
name="其他配置",
key_prefix="",
items=other_items,
)
)
return groups
def _infer_value_type(value: str) -> str:
"""推断配置值的类型。
Args:
value: 配置值字符串
Returns:
str: 值类型标识
"""
if value.lower() in ("true", "false"):
return "boolean"
try:
float(value)
return "number"
except (ValueError, TypeError):
pass
try:
parsed = json.loads(value)
if isinstance(parsed, list):
return "json_array"
if isinstance(parsed, dict):
return "json_object"
except (json.JSONDecodeError, TypeError):
pass
return "string"
async def update_config(
db: AsyncSession,
key: str,
value: str,
agent_id: str,
) -> Dict[str, Any]:
"""更新单个配置项,并记录变更日志。
Args:
db: 数据库会话
key: 配置键
value: 新的配置值
agent_id: 操作人 agent_id
Returns:
Dict: 包含 key, old_value, new_value, changed_at
Raises:
AppException: 配置项不存在
"""
# 查找配置项
result = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == key)
)
config = result.scalars().first()
if not config:
raise AppException(1003, f"配置项不存在: {key}")
old_value = config.config_value
# 写入变更日志
change_log = ConfigChangeLog(
config_key=key,
old_value=old_value,
new_value=value,
changed_by=agent_id,
)
db.add(change_log)
# 更新配置值
config.config_value = value
config.updated_at = datetime.now()
db.add(config)
logger.info(f"配置更新: key={key}, old={old_value}, new={value}, by={agent_id}")
return {
"key": key,
"old_value": old_value,
"new_value": value,
"changed_at": datetime.now().isoformat(),
}
async def get_config_history(
db: AsyncSession,
key: str,
limit: int = 20,
) -> List[ConfigHistoryItem]:
"""获取指定配置项的变更历史。
Args:
db: 数据库会话
key: 配置键
limit: 返回条数上限
Returns:
List[ConfigHistoryItem]: 变更历史列表
"""
result = await db.execute(
select(ConfigChangeLog)
.where(ConfigChangeLog.config_key == key)
.order_by(ConfigChangeLog.changed_at.desc())
.limit(limit)
)
logs = list(result.scalars().all())
# 批量查询操作人姓名
agent_ids = list({log.changed_by for log in logs})
agent_names = {}
if agent_ids:
agents_result = await db.execute(
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
)
agent_names = {row[0]: row[1] for row in agents_result.all()}
items = []
for log in logs:
items.append(
ConfigHistoryItem(
id=log.id,
config_key=log.config_key,
old_value=log.old_value,
new_value=log.new_value,
changed_by=log.changed_by,
changed_by_name=agent_names.get(log.changed_by, ""),
changed_at=log.changed_at,
)
)
return items
# ==========================================================================
# 坐席管理
# ==========================================================================
async def list_admin_agents(
db: AsyncSession,
status: Optional[str] = None,
) -> List[AdminAgentResponse]:
"""获取坐席列表(管理视图,含角色/技能标签/今日结单数)。
Args:
db: 数据库会话
status: 按状态筛选(可选)
Returns:
List[AdminAgentResponse]: 坐席列表
"""
stmt = select(Agent).order_by(Agent.name)
if status:
stmt = stmt.where(Agent.status == status)
result = await db.execute(stmt)
agents = list(result.scalars().all())
# 批量查询今日结单数
today_start = datetime.combine(date.today(), datetime.min.time())
agent_ids = [a.id for a in agents]
today_resolved_map: Dict[str, int] = {}
if agent_ids:
resolved_result = await db.execute(
select(
Conversation.assigned_agent_id,
func.count(Conversation.id),
)
.where(
Conversation.assigned_agent_id.in_(agent_ids),
Conversation.status == "resolved",
Conversation.updated_at >= today_start,
)
.group_by(Conversation.assigned_agent_id)
)
today_resolved_map = dict(resolved_result.all())
items = []
for a in agents:
resp = AdminAgentResponse(
id=a.id,
user_id=a.user_id,
name=a.name,
status=a.status,
role=a.role,
skill_tags=a.skill_tags or [],
current_load=a.current_load,
max_load=a.max_load,
today_resolved=today_resolved_map.get(a.id, 0),
created_at=a.created_at,
updated_at=a.updated_at,
)
items.append(resp)
return items
async def create_agent(
db: AsyncSession,
user_id: str,
name: str,
role: str = "agent",
skill_tags: Optional[List[str]] = None,
max_load: int = 5,
) -> AdminAgentResponse:
"""创建坐席。
Args:
db: 数据库会话
user_id: 企微用户ID
name: 坐席姓名
role: 角色(仅允许 admin / agent
skill_tags: 技能标签列表
max_load: 最大同时服务数
Returns:
AdminAgentResponse: 创建的坐席信息
Raises:
AppException: user_id 已存在 或 role 值非法
"""
# 校验 role 白名单,防止非法角色值入库
if role not in ("admin", "agent"):
raise AppException(1001, f"角色值非法: {role},仅允许 admin 或 agent")
# 检查 user_id 是否已存在
existing = await db.execute(
select(Agent).where(Agent.user_id == user_id)
)
if existing.scalars().first():
raise AppException(1001, f"坐席 user_id 已存在: {user_id}")
agent = Agent(
user_id=user_id,
name=name,
role=role,
skill_tags=skill_tags or [],
max_load=max_load,
status="offline",
current_load=0,
)
db.add(agent)
await db.flush()
logger.info(f"创建坐席: user_id={user_id}, name={name}, role={role}")
return AdminAgentResponse(
id=agent.id,
user_id=agent.user_id,
name=agent.name,
status=agent.status,
role=agent.role,
skill_tags=agent.skill_tags or [],
current_load=agent.current_load,
max_load=agent.max_load,
today_resolved=0,
created_at=agent.created_at,
updated_at=agent.updated_at,
)
async def update_agent(
db: AsyncSession,
agent_id: str,
role: Optional[str] = None,
skill_tags: Optional[List[str]] = None,
max_load: Optional[int] = None,
) -> Dict[str, Any]:
"""更新坐席信息(角色/技能标签/负载上限)。
Args:
db: 数据库会话
agent_id: 坐席ID
role: 角色(可选)
skill_tags: 技能标签列表(可选)
max_load: 最大同时服务数(可选)
Returns:
Dict: 更新后的坐席关键字段
Raises:
AppException: 坐席不存在
"""
result = await db.execute(
select(Agent).where(Agent.id == agent_id)
)
agent = result.scalars().first()
if not agent:
raise AppException(3004, "坐席不存在")
# 校验 role 白名单,防止非法角色值入库
if role is not None and role not in ("admin", "agent"):
raise AppException(1001, f"角色值非法: {role},仅允许 admin 或 agent")
if role is not None:
agent.role = role
if skill_tags is not None:
agent.skill_tags = skill_tags
if max_load is not None:
agent.max_load = max_load
agent.updated_at = datetime.now()
db.add(agent)
logger.info(f"更新坐席: id={agent_id}, role={role}, skill_tags={skill_tags}, max_load={max_load}")
return {
"id": agent.id,
"role": agent.role,
"skill_tags": agent.skill_tags or [],
"max_load": agent.max_load,
}
async def delete_agent(db: AsyncSession, agent_id: str) -> None:
"""移除坐席。
Args:
db: 数据库会话
agent_id: 坐席ID
Raises:
AppException: 坐席不存在
"""
result = await db.execute(
select(Agent).where(Agent.id == agent_id)
)
agent = result.scalars().first()
if not agent:
raise AppException(3004, "坐席不存在")
await db.delete(agent)
logger.info(f"移除坐席: id={agent_id}, user_id={agent.user_id}")
# ==========================================================================
# 集成配置管理
# ==========================================================================
async def get_integrations(db: AsyncSession) -> List[IntegrationResponse]:
"""获取集成系统列表及配置状态。
从 system_configs 表读取 integration_ 前缀的配置。
Args:
db: 数据库会话
Returns:
List[IntegrationResponse]: 集成系统列表
"""
# 查询所有 integration_ 前缀的配置
result = await db.execute(
select(SystemConfig).where(
SystemConfig.config_key.startswith("integration_")
)
)
integ_configs = list(result.scalars().all())
# 构建 {prefix: {api_url: ..., api_key: ...}} 映射
config_map: Dict[str, Dict[str, str]] = {}
for cfg in integ_configs:
# integration_dify_api_url → 前缀 integration_dify_
# 找到对应的 key_prefix
for integ_def in INTEGRATION_DEFINITIONS:
prefix = integ_def.get("key_prefix")
if prefix and cfg.config_key.startswith(prefix):
if prefix not in config_map:
config_map[prefix] = {}
# 去掉前缀得到子键名(如 api_url, api_key
sub_key = cfg.config_key[len(prefix):]
config_map[prefix][sub_key] = cfg.config_value
break
items: List[IntegrationResponse] = []
for integ_def in INTEGRATION_DEFINITIONS:
if integ_def["configurable"]:
prefix = integ_def["key_prefix"]
config_type = integ_def.get("config_type", "url_key")
cfg_data = config_map.get(prefix, {})
if config_type == "url_key":
# Dify / RAGFlow 模式:api_url + api_key
api_url = cfg_data.get("api_url", "")
api_key = cfg_data.get("api_key", "")
if api_url and api_key:
status = "connected"
elif api_url:
status = "partial"
else:
status = "disconnected"
items.append(
IntegrationResponse(
id=integ_def["id"],
name=integ_def["name"],
status=status,
configurable=True,
config_type="url_key",
config=IntegrationConfig(
api_url=api_url,
api_key_set=bool(api_key),
),
)
)
elif config_type == "access_key":
# 火绒模式:access_key_id + access_key_secret + base_url
access_key_id = cfg_data.get("access_key_id", "")
access_key_secret = cfg_data.get("access_key_secret", "")
base_url = cfg_data.get("base_url", "")
if access_key_id and access_key_secret and base_url:
status = "connected"
elif base_url:
status = "partial"
else:
status = "disconnected"
items.append(
IntegrationResponse(
id=integ_def["id"],
name=integ_def["name"],
status=status,
configurable=True,
config_type="access_key",
config=IntegrationConfig(
# url_key 模式字段(火绒不需要,但前端卡片复用展示)
api_url=base_url,
api_key_set=bool(access_key_id),
# access_key 模式专属字段
access_key_id_set=bool(access_key_id),
access_key_secret_set=bool(access_key_secret),
base_url=base_url or None,
),
)
)
elif config_type == "account_password":
# 联软模式:api_account + api_password + base_url + validate_key
api_account = cfg_data.get("api_account", "")
api_password = cfg_data.get("api_password", "")
base_url = cfg_data.get("base_url", "")
if api_account and api_password and base_url:
status = "connected"
elif base_url:
status = "partial"
else:
status = "disconnected"
items.append(
IntegrationResponse(
id=integ_def["id"],
name=integ_def["name"],
status=status,
configurable=True,
config_type="account_password",
config=IntegrationConfig(
# 复用字段(前端展示用)
api_url=base_url,
api_key_set=bool(api_account),
# account_password 模式专属字段
base_url=base_url or None,
api_account_set=bool(api_account),
api_password_set=bool(api_password),
),
)
)
else:
items.append(
IntegrationResponse(
id=integ_def["id"],
name=integ_def["name"],
status="disconnected",
configurable=False,
config=None,
)
)
return items
async def update_integration(
db: AsyncSession,
integration_id: str,
# url_key 模式(Dify / RAGFlow
api_url: str = "",
api_key: str = "",
# access_key 模式(火绒安全)
access_key_id: str = "",
access_key_secret: str = "",
base_url: str = "",
# account_password 模式(联软LV7000
api_account: str = "",
api_password: str = "",
validate_key: str = "",
agent_id: str = "",
) -> IntegrationResponse:
"""更新集成系统配置。
支持三种模式:
- url_key 模式(Dify / RAGFlow):传入 api_url + api_key
- access_key 模式(火绒安全):传入 access_key_id + access_key_secret + base_url
- account_password 模式(联软LV7000):传入 api_account + api_password + base_url + validate_key
Args:
db: 数据库会话
integration_id: 集成系统ID(如 dify/ragflow/huorong/lianruan
api_url: API 地址(url_key 模式)
api_key: API Keyurl_key 模式)
access_key_id: AccessKey IDaccess_key 模式)
access_key_secret: AccessKey Secretaccess_key 模式)
base_url: 内网 Base URLaccess_key/account_password 模式)
api_account: API账号(account_password 模式)
api_password: API密码(account_password 模式)
validate_key: 验证密钥(account_password 模式,可选)
agent_id: 操作人 agent_id
Returns:
IntegrationResponse: 更新后的集成系统信息
Raises:
AppException: 集成系统不存在或不可配置
"""
# 查找集成定义
integ_def = None
for d in INTEGRATION_DEFINITIONS:
if d["id"] == integration_id:
integ_def = d
break
if not integ_def:
raise AppException(1003, f"集成系统不存在: {integration_id}")
if not integ_def["configurable"]:
raise AppException(1001, f"集成系统不可配置: {integ_def['name']}")
config_type = integ_def.get("config_type", "url_key")
prefix = integ_def["key_prefix"]
# ---------- url_key 模式(Dify / RAGFlow----------
if config_type == "url_key":
await _upsert_system_config(db, f"{prefix}api_url", api_url, f"{integ_def['name']} API 地址", agent_id)
await _upsert_system_config(db, f"{prefix}api_key", api_key, f"{integ_def['name']} API Key", agent_id)
if api_url and api_key:
status = "connected"
elif api_url:
status = "partial"
else:
status = "disconnected"
return IntegrationResponse(
id=integration_id,
name=integ_def["name"],
status=status,
configurable=True,
config_type="url_key",
config=IntegrationConfig(
api_url=api_url,
api_key_set=bool(api_key),
),
)
# ---------- access_key 模式(火绒安全)----------
elif config_type == "access_key":
await _upsert_system_config(db, f"{prefix}access_key_id", access_key_id, f"{integ_def['name']} AccessKey ID", agent_id)
await _upsert_system_config(db, f"{prefix}access_key_secret", access_key_secret, f"{integ_def['name']} AccessKey Secret", agent_id)
await _upsert_system_config(db, f"{prefix}base_url", base_url, f"{integ_def['name']} Base URL", agent_id)
if access_key_id and access_key_secret and base_url:
status = "connected"
elif base_url:
status = "partial"
else:
status = "disconnected"
return IntegrationResponse(
id=integration_id,
name=integ_def["name"],
status=status,
configurable=True,
config_type="access_key",
config=IntegrationConfig(
# 前端复用字段(url_key 模式展示用)
api_url=base_url,
api_key_set=bool(access_key_id),
# access_key 模式专属字段
access_key_id_set=bool(access_key_id),
access_key_secret_set=bool(access_key_secret),
base_url=base_url or None,
),
)
# ---------- account_password 模式(联软LV7000----------
elif config_type == "account_password":
await _upsert_system_config(db, f"{prefix}api_account", api_account, f"{integ_def['name']} API账号", agent_id)
await _upsert_system_config(db, f"{prefix}api_password", api_password, f"{integ_def['name']} API密码", agent_id)
await _upsert_system_config(db, f"{prefix}base_url", base_url, f"{integ_def['name']} Base URL", agent_id)
await _upsert_system_config(db, f"{prefix}validate_key", validate_key, f"{integ_def['name']} 验证密钥", agent_id)
if api_account and api_password and base_url:
status = "connected"
elif base_url:
status = "partial"
else:
status = "disconnected"
return IntegrationResponse(
id=integration_id,
name=integ_def["name"],
status=status,
configurable=True,
config_type="account_password",
config=IntegrationConfig(
api_url=base_url,
api_key_set=bool(api_account),
base_url=base_url or None,
api_account_set=bool(api_account),
api_password_set=bool(api_password),
),
)
raise AppException(1001, f"未知配置类型: {config_type}")
async def _get_config_value(db: AsyncSession, key: str) -> str:
"""快速读取单个配置值。
Args:
db: 数据库会话
key: 配置键
Returns:
str: 配置值,不存在返回空字符串
"""
result = await db.execute(
select(SystemConfig.config_value).where(
SystemConfig.config_key == key
)
)
row = result.scalar()
return row if row else ""
async def _upsert_system_config(
db: AsyncSession,
key: str,
value: str,
description: str,
agent_id: str,
) -> None:
"""插入或更新 system_configs 记录,并记录变更日志。
Args:
db: 数据库会话
key: 配置键
value: 配置值
description: 配置说明
agent_id: 操作人
"""
result = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == key)
)
config = result.scalars().first()
if config:
old_value = config.config_value
config.config_value = value
config.updated_at = datetime.now()
db.add(config)
# 记录变更日志
change_log = ConfigChangeLog(
config_key=key,
old_value=old_value,
new_value=value,
changed_by=agent_id,
)
db.add(change_log)
else:
new_config = SystemConfig(
config_key=key,
config_value=value,
description=description,
)
db.add(new_config)
# 记录新增日志
change_log = ConfigChangeLog(
config_key=key,
old_value="",
new_value=value,
changed_by=agent_id,
)
db.add(change_log)
# ==========================================================================
# 快速回复审核
# ==========================================================================
async def list_pending_quick_replies(
db: AsyncSession,
category: Optional[str] = None,
) -> List[AdminQuickReplyResponse]:
"""获取待审核快速回复模板列表。
Args:
db: 数据库会话
category: 按分类筛选(可选)
Returns:
List[AdminQuickReplyResponse]: 待审核模板列表
"""
stmt = (
select(QuickReplyTemplate)
.where(QuickReplyTemplate.status == "pending_review")
.order_by(QuickReplyTemplate.updated_at.desc())
)
if category:
stmt = stmt.where(QuickReplyTemplate.category == category)
result = await db.execute(stmt)
templates = list(result.scalars().all())
# 批量查询提交人姓名
submitted_by_ids = list({t.submitted_by for t in templates if t.submitted_by})
agent_names: Dict[str, str] = {}
if submitted_by_ids:
agents_result = await db.execute(
select(Agent.id, Agent.name).where(Agent.id.in_(submitted_by_ids))
)
agent_names = dict(agents_result.all())
items = []
for t in templates:
items.append(
AdminQuickReplyResponse(
id=t.id,
category=t.category,
title=t.title,
content=t.content,
variables=t.variables or [],
status=t.status,
version=t.version,
submitted_by=t.submitted_by,
submitted_by_name=agent_names.get(t.submitted_by or "", ""),
sort_order=t.sort_order,
created_at=t.created_at,
updated_at=t.updated_at,
)
)
return items
async def review_quick_reply(
db: AsyncSession,
template_id: str,
action: str,
reason: str,
agent_id: str,
) -> Dict[str, Any]:
"""审核快速回复模板(通过/驳回)。
Args:
db: 数据库会话
template_id: 模板ID
action: 审核动作(approve/reject
reason: 审核原因
agent_id: 审核人 agent_id
Returns:
Dict: 包含 id, status, version
Raises:
AppException: 模板不存在或动作非法
"""
if action not in ("approve", "reject"):
raise AppException(1001, "审核动作只能是 approve 或 reject")
result = await db.execute(
select(QuickReplyTemplate).where(QuickReplyTemplate.id == template_id)
)
template = result.scalars().first()
if not template:
raise ERR_NOT_FOUND
# 校验模板状态:仅允许审核 pending_review 状态的模板
if template.status != "pending_review":
raise AppException(
1001,
f"当前模板状态为 {template.status},仅 pending_review 状态可审核"
)
if action == "approve":
template.status = "approved"
template.version = (template.version or 1) + 1
else:
template.status = "rejected"
template.updated_at = datetime.now()
db.add(template)
logger.info(
f"快速回复审核: id={template_id}, action={action}, "
f"by={agent_id}, reason={reason}"
)
return {
"id": template.id,
"status": template.status,
"version": template.version,
}
# ==========================================================================
# 分配模式
# ==========================================================================
async def get_assignment_mode(db: AsyncSession) -> AssignmentModeResponse:
"""获取当前分配模式。
从 system_configs 表读取 assign_mode 配置。
Args:
db: 数据库会话
Returns:
AssignmentModeResponse: 分配模式信息
"""
# 读取当前分配模式
result = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == "assign_mode")
)
config = result.scalars().first()
current_mode = config.config_value if config else "manual"
# 构建模式列表
modes = [
AssignmentModeItem(
id=m["id"],
name=m["name"],
enabled=(m["id"] == current_mode),
locked=m["locked"],
unlock_at=m["unlock_at"],
)
for m in ASSIGNMENT_MODES
]
return AssignmentModeResponse(
current_mode=current_mode,
modes=modes,
)
async def update_assignment_mode(
db: AsyncSession,
mode: str,
agent_id: str,
) -> AssignmentModeResponse:
"""切换分配模式(阶段一仅允许手动接单)。
Args:
db: 数据库会话
mode: 分配模式ID
agent_id: 操作人 agent_id
Returns:
AssignmentModeResponse: 更新后的分配模式信息
Raises:
AppException: 模式不存在或已锁定
"""
# 验证模式是否合法
mode_def = None
for m in ASSIGNMENT_MODES:
if m["id"] == mode:
mode_def = m
break
if not mode_def:
raise AppException(1001, f"无效的分配模式: {mode}")
if mode_def["locked"]:
raise AppException(1001, f"分配模式已锁定,将在{mode_def['unlock_at']}解锁")
# 更新或创建 assign_mode 配置
await _upsert_system_config(db, "assign_mode", mode, "消息分配模式", agent_id)
logger.info(f"切换分配模式: mode={mode}, by={agent_id}")
return await get_assignment_mode(db)
# ==========================================================================
# 会话监控
# ==========================================================================
async def get_monitor_sessions(
db: AsyncSession,
status: Optional[str] = None,
) -> MonitorSessionsResponse:
"""获取实时会话列表(Demo预览)。
Args:
db: 数据库会话
status: 按状态筛选(可选,默认非 resolved)
Returns:
MonitorSessionsResponse: 会话监控数据
"""
today_start = datetime.combine(date.today(), datetime.min.time())
# 统计数据
in_progress_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.status == "serving"
)
)
in_progress = in_progress_result.scalar() or 0
queued_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.status == "queued"
)
)
queued = queued_result.scalar() or 0
resolved_today_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.status == "resolved",
Conversation.updated_at >= today_start,
)
)
resolved_today = resolved_today_result.scalar() or 0
stats = SessionStats(
in_progress=in_progress,
queued=queued,
resolved_today=resolved_today,
alerts=0,
)
# 会话列表(默认查询非 resolved 的会话)
stmt = select(Conversation).order_by(Conversation.created_at.desc())
if status:
stmt = stmt.where(Conversation.status == status)
else:
stmt = stmt.where(Conversation.status != "resolved")
result = await db.execute(stmt.limit(50))
conversations = list(result.scalars().all())
# 批量查询坐席姓名
agent_ids = list({c.assigned_agent_id for c in conversations if c.assigned_agent_id})
agent_names: Dict[str, str] = {}
if agent_ids:
agents_result = await db.execute(
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
)
agent_names = dict(agents_result.all())
items = []
for c in conversations:
items.append(
SessionItem(
id=c.id,
employee_name=c.employee_name,
status=c.status,
assigned_agent_name=agent_names.get(c.assigned_agent_id or "", ""),
urgency_score=c.urgency_score,
created_at=c.created_at,
last_message_summary=c.last_message_summary,
)
)
return MonitorSessionsResponse(stats=stats, items=items)
# ==========================================================================
# 全局搜索
# ==========================================================================
async def global_search(
db: AsyncSession,
query: str,
) -> List[SearchItem]:
"""全局搜索配置项、坐席、快速回复。
按类型优先级排序:配置项 > 坐席 > 快速回复,同类型按名称排序。
Args:
db: 数据库会话
query: 搜索关键词
Returns:
List[SearchItem]: 搜索结果列表
"""
items: List[SearchItem] = []
keyword = f"%{query}%"
# 搜索配置项
config_result = await db.execute(
select(SystemConfig).where(
or_(
SystemConfig.config_key.ilike(keyword),
SystemConfig.description.ilike(keyword),
)
).limit(10)
)
for cfg in config_result.scalars().all():
items.append(
SearchItem(
type="config",
id=cfg.config_key,
name=cfg.description or cfg.config_key,
route="/admin/configs",
)
)
# 搜索坐席
agent_result = await db.execute(
select(Agent).where(
or_(
Agent.name.ilike(keyword),
Agent.user_id.ilike(keyword),
)
).limit(10)
)
for a in agent_result.scalars().all():
items.append(
SearchItem(
type="agent",
id=a.id,
name=a.name,
route="/admin/agents",
)
)
# 搜索快速回复
qr_result = await db.execute(
select(QuickReplyTemplate).where(
or_(
QuickReplyTemplate.title.ilike(keyword),
QuickReplyTemplate.content.ilike(keyword),
QuickReplyTemplate.category.ilike(keyword),
)
).limit(10)
)
for qr in qr_result.scalars().all():
items.append(
SearchItem(
type="quick_reply",
id=qr.id,
name=qr.title,
route="/admin/quick-replies",
)
)
return items
# ==========================================================================
# P2: 会话审计
# ==========================================================================
async def list_audit_conversations(
db: AsyncSession,
status: Optional[str] = None,
agent_id: Optional[str] = None,
keyword: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
page: int = 1,
page_size: int = 20,
) -> Dict[str, Any]:
"""获取会话审计列表(支持分页+多条件筛选)。"""
stmt = select(Conversation)
count_stmt = select(func.count(Conversation.id))
filters = []
if status:
filters.append(Conversation.status == status)
if agent_id:
filters.append(Conversation.assigned_agent_id == agent_id)
if keyword:
like_pattern = f"%{keyword}%"
filters.append(
or_(
Conversation.employee_name.ilike(like_pattern),
Conversation.last_message_summary.ilike(like_pattern),
)
)
if date_from:
try:
dt_from = datetime.strptime(date_from, "%Y-%m-%d")
filters.append(Conversation.created_at >= dt_from)
except ValueError:
pass
if date_to:
try:
dt_to = datetime.strptime(date_to, "%Y-%m-%d") + timedelta(days=1)
filters.append(Conversation.created_at < dt_to)
except ValueError:
pass
if filters:
stmt = stmt.where(and_(*filters))
count_stmt = count_stmt.where(and_(*filters))
total_result = await db.execute(count_stmt)
total = total_result.scalar() or 0
offset = (page - 1) * page_size
stmt = stmt.order_by(Conversation.created_at.desc()).offset(offset).limit(page_size)
result = await db.execute(stmt)
conversations = list(result.scalars().all())
agent_ids = list({c.assigned_agent_id for c in conversations if c.assigned_agent_id})
agent_names: Dict[str, str] = {}
if agent_ids:
agents_result = await db.execute(
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
)
agent_names = dict(agents_result.all())
items = []
for c in conversations:
items.append({
"id": c.id,
"employee_name": c.employee_name or "",
"department": c.department or "",
"status": c.status,
"assigned_agent_name": agent_names.get(c.assigned_agent_id or "", ""),
"urgency_score": c.urgency_score or 1,
"created_at": c.created_at.isoformat() if c.created_at else "",
"updated_at": c.updated_at.isoformat() if c.updated_at else "",
"last_message_summary": c.last_message_summary or "",
})
return {"items": items, "total": total, "page": page, "page_size": page_size}
async def get_audit_conversation_detail(
db: AsyncSession,
conversation_id: str,
) -> Optional[Dict[str, Any]]:
"""获取会话审计详情(含消息列表)。"""
result = await db.execute(
select(Conversation).where(Conversation.id == conversation_id)
)
conv = result.scalars().first()
if not conv:
return None
msgs_result = await db.execute(
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at.asc())
.limit(200)
)
messages = list(msgs_result.scalars().all())
agent_name = ""
if conv.assigned_agent_id:
agent_result = await db.execute(
select(Agent.name).where(Agent.id == conv.assigned_agent_id)
)
agent_name = agent_result.scalar() or ""
return {
"id": conv.id,
"employee_name": conv.employee_name or "",
"employee_id": conv.employee_id or "",
"department": conv.department or "",
"position": conv.position or "",
"status": conv.status,
"assigned_agent_name": agent_name,
"urgency_score": conv.urgency_score or 1,
"tags": conv.tags or [],
"created_at": conv.created_at.isoformat() if conv.created_at else "",
"updated_at": conv.updated_at.isoformat() if conv.updated_at else "",
"last_message_summary": conv.last_message_summary or "",
"messages": [
{
"id": m.id,
"sender_type": m.sender_type,
"sender_name": m.sender_name or "",
"content": m.content or "",
"msg_type": m.msg_type or "text",
"created_at": m.created_at.isoformat() if m.created_at else "",
}
for m in messages
],
}
# ==========================================================================
# P2: 坐席绩效统计
# ==========================================================================
async def get_agent_performance(
db: AsyncSession,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""获取坐席绩效统计。"""
today = date.today()
if date_from:
try:
dt_from = datetime.strptime(date_from, "%Y-%m-%d")
except ValueError:
dt_from = datetime(today.year, today.month, 1)
else:
dt_from = datetime(today.year, today.month, 1)
if date_to:
try:
dt_to = datetime.strptime(date_to, "%Y-%m-%d") + timedelta(days=1)
except ValueError:
dt_to = datetime.now()
else:
dt_to = datetime.now()
agents_result = await db.execute(select(Agent).order_by(Agent.name))
agents = list(agents_result.scalars().all())
if not agents:
return []
agent_ids = [a.id for a in agents]
total_conv_result = await db.execute(
select(Conversation.assigned_agent_id, func.count(Conversation.id))
.where(
Conversation.assigned_agent_id.in_(agent_ids),
Conversation.created_at >= dt_from,
Conversation.created_at < dt_to,
)
.group_by(Conversation.assigned_agent_id)
)
total_conv_map = dict(total_conv_result.all())
resolved_result = await db.execute(
select(Conversation.assigned_agent_id, func.count(Conversation.id))
.where(
Conversation.assigned_agent_id.in_(agent_ids),
Conversation.status == "resolved",
Conversation.created_at >= dt_from,
Conversation.created_at < dt_to,
)
.group_by(Conversation.assigned_agent_id)
)
resolved_map = dict(resolved_result.all())
today_start = datetime.combine(today, datetime.min.time())
today_conv_result = await db.execute(
select(Conversation.assigned_agent_id, func.count(Conversation.id))
.where(
Conversation.assigned_agent_id.in_(agent_ids),
Conversation.created_at >= today_start,
)
.group_by(Conversation.assigned_agent_id)
)
today_conv_map = dict(today_conv_result.all())
items = []
for a in agents:
total = total_conv_map.get(a.id, 0)
resolved = resolved_map.get(a.id, 0)
today_count = today_conv_map.get(a.id, 0)
resolution_rate = f"{(resolved / total * 100):.0f}%" if total > 0 else ""
items.append({
"id": a.id,
"user_id": a.user_id,
"name": a.name,
"status": a.status,
"role": a.role,
"skill_tags": a.skill_tags or [],
"current_load": a.current_load or 0,
"max_load": a.max_load or 5,
"total_conversations": total,
"resolved_conversations": resolved,
"today_conversations": today_count,
"resolution_rate": resolution_rate,
})
return items
# ==========================================================================
# P2: 系统日志
# ==========================================================================
async def get_system_logs(
db: AsyncSession,
page: int = 1,
page_size: int = 50,
) -> Dict[str, Any]:
"""获取系统日志(配置变更日志)。"""
count_result = await db.execute(select(func.count(ConfigChangeLog.id)))
total = count_result.scalar() or 0
offset = (page - 1) * page_size
result = await db.execute(
select(ConfigChangeLog)
.order_by(ConfigChangeLog.changed_at.desc())
.offset(offset)
.limit(page_size)
)
logs = list(result.scalars().all())
agent_ids = list({log.changed_by for log in logs if log.changed_by})
agent_names: Dict[str, str] = {}
if agent_ids:
agents_result = await db.execute(
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
)
agent_names = dict(agents_result.all())
items = []
for log in logs:
items.append({
"id": log.id,
"log_type": "config_change",
"config_key": log.config_key,
"old_value": log.old_value or "",
"new_value": log.new_value or "",
"changed_by": log.changed_by or "",
"changed_by_name": agent_names.get(log.changed_by or "", ""),
"changed_at": log.changed_at.isoformat() if log.changed_at else "",
})
return {"items": items, "total": total, "page": page, "page_size": page_size}