Files

1729 lines
55 KiB
Python
Raw Permalink Normal View History

# =============================================================================
# 企微IT智能服务台 — 管理后台业务逻辑层
# =============================================================================
# 说明:管理后台的核心业务逻辑,包括:
# 1. 仪表盘数据聚合
# 2. 配置项分组读取与更新(含变更日志)
# 3. 坐席 CRUD 管理
# 4. 外部系统集成配置
# 5. 快速回复审核
# 6. 分配模式管理
# 7. 会话监控
# 8. 全局搜索
# =============================================================================
import json
import logging
from datetime import datetime, date, timedelta
from typing import Any, Dict, List, Optional
from sqlalchemy import func, select, or_, and_, case, literal_column
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.agent import Agent
from app.models.config_change_log import ConfigChangeLog
from app.models.conversation import Conversation
from app.models.message import Message
from app.models.quick_reply_template import QuickReplyTemplate
from app.models.system_config import SystemConfig
from app.schemas.admin import (
AdminAgentResponse,
AdminQuickReplyResponse,
AssignmentModeItem,
AssignmentModeResponse,
ConfigGroupResponse,
ConfigHistoryItem,
ConfigItemResponse,
DashboardOverviewResponse,
IntegrationConfig,
IntegrationHealthItem,
IntegrationResponse,
MonitorSessionsResponse,
SearchItem,
SessionItem,
SessionStats,
SystemAlertItem,
)
from app.utils.response import AppException, ERR_NOT_FOUND, ERR_PARAMS
logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------
# 配置分组映射(前缀 → 分组名称)
# --------------------------------------------------------------------------
CONFIG_GROUP_MAP: Dict[str, str] = {
"ai_": "AI 对话引擎",
"emergency_": "应急模式",
"assign_": "消息分配",
"polling_": "轮询配置",
"emotion_": "情绪检测",
"integration_": "外部集成",
"queue_": "排队策略",
"satisfaction_": "满意度评价",
"invite_": "邀请功能",
"notification_": "通知推送",
"security_": "安全策略",
}
# --------------------------------------------------------------------------
# 集成系统定义(硬编码,阶段一不增加表)
# --------------------------------------------------------------------------
INTEGRATION_DEFINITIONS = [
{
"id": "dify",
"name": "Dify AI",
"key_prefix": "integration_dify_",
"configurable": True,
"config_type": "url_key", # api_url + api_key
},
{
"id": "ragflow",
"name": "RAGFlow",
"key_prefix": "integration_ragflow_",
"configurable": True,
"config_type": "url_key", # api_url + api_key
},
{
"id": "huorong",
"name": "火绒安全",
"key_prefix": "integration_huorong_",
"configurable": True,
"config_type": "access_key", # access_key_id + access_key_secret + base_url
},
{
"id": "lianruan",
"name": "联软LV7000",
"key_prefix": "integration_lianruan_",
"configurable": True,
"config_type": "account_password", # api_account + api_password + base_url + validate_key
},
{
"id": "data_platform",
"name": "数据平台",
"key_prefix": None,
"configurable": False,
"config_type": None,
},
{
"id": "beisen",
"name": "北森 eHR",
"key_prefix": None,
"configurable": False,
"config_type": None,
},
]
# --------------------------------------------------------------------------
# 分配模式定义(硬编码,阶段一仅手动接单可用)
# --------------------------------------------------------------------------
ASSIGNMENT_MODES = [
{"id": "manual", "name": "手动接单", "locked": False, "unlock_at": ""},
{"id": "round_robin", "name": "轮询分配", "locked": True, "unlock_at": "阶段二"},
{"id": "least_active", "name": "最少活跃优先", "locked": True, "unlock_at": "阶段二"},
{"id": "weighted", "name": "加权比例分配", "locked": True, "unlock_at": "阶段三"},
{"id": "skill_match", "name": "技能匹配分配", "locked": True, "unlock_at": "阶段三"},
{"id": "priority_queue", "name": "优先队列", "locked": True, "unlock_at": "阶段三"},
]
# ==========================================================================
# 仪表盘
# ==========================================================================
async def get_dashboard_overview(db: AsyncSession) -> DashboardOverviewResponse:
"""获取仪表盘统计数据。
聚合查询在线坐席数、今日会话数、待审核数、集成健康状态等。
Args:
db: 数据库会话
Returns:
DashboardOverviewResponse: 仪表盘统计数据
"""
# 在线坐席数
online_count_result = await db.execute(
select(func.count(Agent.id)).where(Agent.status == "online")
)
online_agents = online_count_result.scalar() or 0
# 今日会话数(今天创建的所有会话)
today_start = datetime.combine(date.today(), datetime.min.time())
today_conv_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.created_at >= today_start
)
)
today_conversations = today_conv_result.scalar() or 0
# 待审核快速回复数
pending_result = await db.execute(
select(func.count(QuickReplyTemplate.id)).where(
QuickReplyTemplate.status == "pending_review"
)
)
pending_reviews = pending_result.scalar() or 0
# 系统告警 — 阶段一仅基于待审核快速回复生成告警,后续阶段接入更多告警源
system_alerts: List[SystemAlertItem] = []
if pending_reviews > 0:
# 查询待审核模板,用于告警详情
pending_templates_result = await db.execute(
select(QuickReplyTemplate)
.where(QuickReplyTemplate.status == "pending_review")
.order_by(QuickReplyTemplate.updated_at.desc())
.limit(5) # 最多展示5条告警
)
pending_templates = list(pending_templates_result.scalars().all())
for t in pending_templates:
system_alerts.append(
SystemAlertItem(
type="quick_reply_pending",
content=f"快速回复待审核:{t.content[:50]}{'...' if len(t.content) > 50 else ''}",
submitter=t.submitted_by or None,
time=t.updated_at.isoformat() if t.updated_at else "",
severity="warning",
)
)
# 集成系统健康状态(通用检查:按config_type判断连接状态)
integrations_health: List[IntegrationHealthItem] = []
for integ_def in INTEGRATION_DEFINITIONS:
if integ_def["configurable"] and integ_def["key_prefix"]:
prefix = integ_def["key_prefix"]
config_type = integ_def.get("config_type", "url_key")
status = "disconnected"
if config_type == "url_key":
# Dify/RAGFlow: 检查 api_url + api_key
au = await _get_config_value(db, f"{prefix}api_url")
ak = await _get_config_value(db, f"{prefix}api_key")
if au and ak:
status = "connected"
elif au:
status = "partial"
elif config_type == "access_key":
# 火绒: 检查 access_key_id + access_key_secret + base_url
aki = await _get_config_value(db, f"{prefix}access_key_id")
aks = await _get_config_value(db, f"{prefix}access_key_secret")
bu = await _get_config_value(db, f"{prefix}base_url")
if aki and aks and bu:
status = "connected"
elif bu:
status = "partial"
elif config_type == "account_password":
# 联软: 检查 api_account + api_password + base_url
aa = await _get_config_value(db, f"{prefix}api_account")
ap = await _get_config_value(db, f"{prefix}api_password")
bu = await _get_config_value(db, f"{prefix}base_url")
if aa and ap and bu:
status = "connected"
elif bu:
status = "partial"
integrations_health.append(
IntegrationHealthItem(system=integ_def["name"], status=status)
)
else:
integrations_health.append(
IntegrationHealthItem(system=integ_def["name"], status="disconnected")
)
# 平均响应时间(首次人工回复距首条员工消息的时间差)
avg_response_time_str = ""
try:
# 取今日已结单或服务中的会话,计算平均首次响应时间
conv_ids_result = await db.execute(
select(Conversation.id).where(
Conversation.created_at >= today_start,
Conversation.assigned_agent_id.isnot(None),
)
)
conv_ids = [row[0] for row in conv_ids_result.all()]
if conv_ids:
response_times = []
for cid in conv_ids[:50]: # 最多统计50个会话,避免性能问题
# 找该会话首条员工消息
first_emp_msg = await db.execute(
select(Message.created_at).where(
Message.conversation_id == cid,
Message.sender_type == "employee",
).order_by(Message.created_at.asc()).limit(1)
)
first_emp_time = first_emp_msg.scalar()
# 找该会话首条坐席/AI回复
first_reply = await db.execute(
select(Message.created_at).where(
Message.conversation_id == cid,
Message.sender_type.in_(["agent", "ai"]),
).order_by(Message.created_at.asc()).limit(1)
)
first_reply_time = first_reply.scalar()
if first_emp_time and first_reply_time:
delta = (first_reply_time - first_emp_time).total_seconds()
if 0 < delta < 3600: # 合理范围内(1小时内)
response_times.append(delta)
if response_times:
avg_seconds = sum(response_times) / len(response_times)
if avg_seconds < 60:
avg_response_time_str = f"{avg_seconds:.0f}"
else:
avg_response_time_str = f"{avg_seconds / 60:.1f}分钟"
except Exception as e:
logger.warning(f"计算平均响应时间失败: {e}")
# AI 命中率(有AI实质性回复的会话占比)
ai_hit_rate_str = ""
try:
total_conv_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.created_at >= today_start
)
)
total_conv = total_conv_result.scalar() or 0
if total_conv > 0:
ai_conv_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.created_at >= today_start,
Conversation.ai_substantive_reply_count > 0,
)
)
ai_conv = ai_conv_result.scalar() or 0
ai_hit_rate_str = f"{(ai_conv / total_conv) * 100:.0f}%"
except Exception as e:
logger.warning(f"计算AI命中率失败: {e}")
return DashboardOverviewResponse(
online_agents=online_agents,
today_conversations=today_conversations,
avg_response_time=avg_response_time_str,
ai_hit_rate=ai_hit_rate_str,
pending_reviews=pending_reviews,
system_alerts=system_alerts,
integrations_health=integrations_health,
)
# ==========================================================================
# 配置管理
# ==========================================================================
async def get_config_groups(db: AsyncSession) -> List[ConfigGroupResponse]:
"""获取全部配置项(按功能分组)。
从 system_configs 表读取所有配置,按前缀分组返回。
Args:
db: 数据库会话
Returns:
List[ConfigGroupResponse]: 配置分组列表
"""
# 查询所有非 integration_ 前缀的配置项
result = await db.execute(
select(SystemConfig).order_by(SystemConfig.config_key)
)
all_configs = list(result.scalars().all())
# 按 key 前缀分组(排除 integration_ 前缀)
groups_dict: Dict[str, List[ConfigItemResponse]] = {}
other_items: List[ConfigItemResponse] = []
for cfg in all_configs:
# 跳过 integration_ 前缀的配置(在集成管理中单独展示)
if cfg.config_key.startswith("integration_"):
continue
# 推断值类型
value_type = _infer_value_type(cfg.config_value)
item = ConfigItemResponse(
key=cfg.config_key,
value=cfg.config_value,
description=cfg.description or "",
value_type=value_type,
)
# 查找匹配的前缀分组
matched = False
for prefix, group_name in CONFIG_GROUP_MAP.items():
if cfg.config_key.startswith(prefix):
if group_name not in groups_dict:
groups_dict[group_name] = []
groups_dict[group_name].append(item)
matched = True
break
if not matched:
other_items.append(item)
# 构建分组响应
groups: List[ConfigGroupResponse] = []
for prefix, group_name in CONFIG_GROUP_MAP.items():
if group_name in groups_dict:
groups.append(
ConfigGroupResponse(
name=group_name,
key_prefix=prefix,
items=groups_dict[group_name],
)
)
# 未匹配前缀的配置项放入"其他"分组
if other_items:
groups.append(
ConfigGroupResponse(
name="其他配置",
key_prefix="",
items=other_items,
)
)
return groups
def _infer_value_type(value: str) -> str:
"""推断配置值的类型。
Args:
value: 配置值字符串
Returns:
str: 值类型标识
"""
if value.lower() in ("true", "false"):
return "boolean"
try:
float(value)
return "number"
except (ValueError, TypeError):
pass
try:
parsed = json.loads(value)
if isinstance(parsed, list):
return "json_array"
if isinstance(parsed, dict):
return "json_object"
except (json.JSONDecodeError, TypeError):
pass
return "string"
async def update_config(
db: AsyncSession,
key: str,
value: str,
agent_id: str,
) -> Dict[str, Any]:
"""更新单个配置项,并记录变更日志。
Args:
db: 数据库会话
key: 配置键
value: 新的配置值
agent_id: 操作人 agent_id
Returns:
Dict: 包含 key, old_value, new_value, changed_at
Raises:
AppException: 配置项不存在
"""
# 查找配置项
result = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == key)
)
config = result.scalars().first()
if not config:
raise AppException(1003, f"配置项不存在: {key}")
old_value = config.config_value
# 写入变更日志
change_log = ConfigChangeLog(
config_key=key,
old_value=old_value,
new_value=value,
changed_by=agent_id,
)
db.add(change_log)
# 更新配置值
config.config_value = value
config.updated_at = datetime.now()
db.add(config)
logger.info(f"配置更新: key={key}, old={old_value}, new={value}, by={agent_id}")
return {
"key": key,
"old_value": old_value,
"new_value": value,
"changed_at": datetime.now().isoformat(),
}
async def get_config_history(
db: AsyncSession,
key: str,
limit: int = 20,
) -> List[ConfigHistoryItem]:
"""获取指定配置项的变更历史。
Args:
db: 数据库会话
key: 配置键
limit: 返回条数上限
Returns:
List[ConfigHistoryItem]: 变更历史列表
"""
result = await db.execute(
select(ConfigChangeLog)
.where(ConfigChangeLog.config_key == key)
.order_by(ConfigChangeLog.changed_at.desc())
.limit(limit)
)
logs = list(result.scalars().all())
# 批量查询操作人姓名
agent_ids = list({log.changed_by for log in logs})
agent_names = {}
if agent_ids:
agents_result = await db.execute(
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
)
agent_names = {row[0]: row[1] for row in agents_result.all()}
items = []
for log in logs:
items.append(
ConfigHistoryItem(
id=log.id,
config_key=log.config_key,
old_value=log.old_value,
new_value=log.new_value,
changed_by=log.changed_by,
changed_by_name=agent_names.get(log.changed_by, ""),
changed_at=log.changed_at,
)
)
return items
# ==========================================================================
# 坐席管理
# ==========================================================================
async def list_admin_agents(
db: AsyncSession,
status: Optional[str] = None,
) -> List[AdminAgentResponse]:
"""获取坐席列表(管理视图,含角色/技能标签/今日结单数)。
Args:
db: 数据库会话
status: 按状态筛选(可选)
Returns:
List[AdminAgentResponse]: 坐席列表
"""
stmt = select(Agent).order_by(Agent.name)
if status:
stmt = stmt.where(Agent.status == status)
result = await db.execute(stmt)
agents = list(result.scalars().all())
# 批量查询今日结单数
today_start = datetime.combine(date.today(), datetime.min.time())
agent_ids = [a.id for a in agents]
today_resolved_map: Dict[str, int] = {}
if agent_ids:
resolved_result = await db.execute(
select(
Conversation.assigned_agent_id,
func.count(Conversation.id),
)
.where(
Conversation.assigned_agent_id.in_(agent_ids),
Conversation.status == "resolved",
Conversation.updated_at >= today_start,
)
.group_by(Conversation.assigned_agent_id)
)
today_resolved_map = dict(resolved_result.all())
items = []
for a in agents:
resp = AdminAgentResponse(
id=a.id,
user_id=a.user_id,
name=a.name,
status=a.status,
role=a.role,
skill_tags=a.skill_tags or [],
current_load=a.current_load,
max_load=a.max_load,
today_resolved=today_resolved_map.get(a.id, 0),
created_at=a.created_at,
updated_at=a.updated_at,
)
items.append(resp)
return items
async def create_agent(
db: AsyncSession,
user_id: str,
name: str,
role: str = "agent",
skill_tags: Optional[List[str]] = None,
max_load: int = 5,
) -> AdminAgentResponse:
"""创建坐席。
Args:
db: 数据库会话
user_id: 企微用户ID
name: 坐席姓名
role: 角色(仅允许 admin / agent
skill_tags: 技能标签列表
max_load: 最大同时服务数
Returns:
AdminAgentResponse: 创建的坐席信息
Raises:
AppException: user_id 已存在 或 role 值非法
"""
# 校验 role 白名单,防止非法角色值入库
if role not in ("admin", "agent"):
raise AppException(1001, f"角色值非法: {role},仅允许 admin 或 agent")
# 检查 user_id 是否已存在
existing = await db.execute(
select(Agent).where(Agent.user_id == user_id)
)
if existing.scalars().first():
raise AppException(1001, f"坐席 user_id 已存在: {user_id}")
agent = Agent(
user_id=user_id,
name=name,
role=role,
skill_tags=skill_tags or [],
max_load=max_load,
status="offline",
current_load=0,
)
db.add(agent)
await db.flush()
logger.info(f"创建坐席: user_id={user_id}, name={name}, role={role}")
return AdminAgentResponse(
id=agent.id,
user_id=agent.user_id,
name=agent.name,
status=agent.status,
role=agent.role,
skill_tags=agent.skill_tags or [],
current_load=agent.current_load,
max_load=agent.max_load,
today_resolved=0,
created_at=agent.created_at,
updated_at=agent.updated_at,
)
async def update_agent(
db: AsyncSession,
agent_id: str,
role: Optional[str] = None,
skill_tags: Optional[List[str]] = None,
max_load: Optional[int] = None,
) -> Dict[str, Any]:
"""更新坐席信息(角色/技能标签/负载上限)。
Args:
db: 数据库会话
agent_id: 坐席ID
role: 角色(可选)
skill_tags: 技能标签列表(可选)
max_load: 最大同时服务数(可选)
Returns:
Dict: 更新后的坐席关键字段
Raises:
AppException: 坐席不存在
"""
result = await db.execute(
select(Agent).where(Agent.id == agent_id)
)
agent = result.scalars().first()
if not agent:
raise AppException(3004, "坐席不存在")
# 校验 role 白名单,防止非法角色值入库
if role is not None and role not in ("admin", "agent"):
raise AppException(1001, f"角色值非法: {role},仅允许 admin 或 agent")
if role is not None:
agent.role = role
if skill_tags is not None:
agent.skill_tags = skill_tags
if max_load is not None:
agent.max_load = max_load
agent.updated_at = datetime.now()
db.add(agent)
logger.info(f"更新坐席: id={agent_id}, role={role}, skill_tags={skill_tags}, max_load={max_load}")
return {
"id": agent.id,
"role": agent.role,
"skill_tags": agent.skill_tags or [],
"max_load": agent.max_load,
}
async def delete_agent(db: AsyncSession, agent_id: str) -> None:
"""移除坐席。
Args:
db: 数据库会话
agent_id: 坐席ID
Raises:
AppException: 坐席不存在
"""
result = await db.execute(
select(Agent).where(Agent.id == agent_id)
)
agent = result.scalars().first()
if not agent:
raise AppException(3004, "坐席不存在")
await db.delete(agent)
logger.info(f"移除坐席: id={agent_id}, user_id={agent.user_id}")
# ==========================================================================
# 集成配置管理
# ==========================================================================
async def get_integrations(db: AsyncSession) -> List[IntegrationResponse]:
"""获取集成系统列表及配置状态。
从 system_configs 表读取 integration_ 前缀的配置。
Args:
db: 数据库会话
Returns:
List[IntegrationResponse]: 集成系统列表
"""
# 查询所有 integration_ 前缀的配置
result = await db.execute(
select(SystemConfig).where(
SystemConfig.config_key.startswith("integration_")
)
)
integ_configs = list(result.scalars().all())
# 构建 {prefix: {api_url: ..., api_key: ...}} 映射
config_map: Dict[str, Dict[str, str]] = {}
for cfg in integ_configs:
# integration_dify_api_url → 前缀 integration_dify_
# 找到对应的 key_prefix
for integ_def in INTEGRATION_DEFINITIONS:
prefix = integ_def.get("key_prefix")
if prefix and cfg.config_key.startswith(prefix):
if prefix not in config_map:
config_map[prefix] = {}
# 去掉前缀得到子键名(如 api_url, api_key
sub_key = cfg.config_key[len(prefix):]
config_map[prefix][sub_key] = cfg.config_value
break
items: List[IntegrationResponse] = []
for integ_def in INTEGRATION_DEFINITIONS:
if integ_def["configurable"]:
prefix = integ_def["key_prefix"]
config_type = integ_def.get("config_type", "url_key")
cfg_data = config_map.get(prefix, {})
if config_type == "url_key":
# Dify / RAGFlow 模式:api_url + api_key
api_url = cfg_data.get("api_url", "")
api_key = cfg_data.get("api_key", "")
if api_url and api_key:
status = "connected"
elif api_url:
status = "partial"
else:
status = "disconnected"
items.append(
IntegrationResponse(
id=integ_def["id"],
name=integ_def["name"],
status=status,
configurable=True,
config_type="url_key",
config=IntegrationConfig(
api_url=api_url,
api_key_set=bool(api_key),
),
)
)
elif config_type == "access_key":
# 火绒模式:access_key_id + access_key_secret + base_url
access_key_id = cfg_data.get("access_key_id", "")
access_key_secret = cfg_data.get("access_key_secret", "")
base_url = cfg_data.get("base_url", "")
if access_key_id and access_key_secret and base_url:
status = "connected"
elif base_url:
status = "partial"
else:
status = "disconnected"
items.append(
IntegrationResponse(
id=integ_def["id"],
name=integ_def["name"],
status=status,
configurable=True,
config_type="access_key",
config=IntegrationConfig(
# url_key 模式字段(火绒不需要,但前端卡片复用展示)
api_url=base_url,
api_key_set=bool(access_key_id),
# access_key 模式专属字段
access_key_id_set=bool(access_key_id),
access_key_secret_set=bool(access_key_secret),
base_url=base_url or None,
),
)
)
elif config_type == "account_password":
# 联软模式:api_account + api_password + base_url + validate_key
api_account = cfg_data.get("api_account", "")
api_password = cfg_data.get("api_password", "")
base_url = cfg_data.get("base_url", "")
if api_account and api_password and base_url:
status = "connected"
elif base_url:
status = "partial"
else:
status = "disconnected"
items.append(
IntegrationResponse(
id=integ_def["id"],
name=integ_def["name"],
status=status,
configurable=True,
config_type="account_password",
config=IntegrationConfig(
# 复用字段(前端展示用)
api_url=base_url,
api_key_set=bool(api_account),
# account_password 模式专属字段
base_url=base_url or None,
api_account_set=bool(api_account),
api_password_set=bool(api_password),
),
)
)
else:
items.append(
IntegrationResponse(
id=integ_def["id"],
name=integ_def["name"],
status="disconnected",
configurable=False,
config=None,
)
)
return items
async def update_integration(
db: AsyncSession,
integration_id: str,
# url_key 模式(Dify / RAGFlow
api_url: str = "",
api_key: str = "",
# access_key 模式(火绒安全)
access_key_id: str = "",
access_key_secret: str = "",
base_url: str = "",
# account_password 模式(联软LV7000
api_account: str = "",
api_password: str = "",
validate_key: str = "",
agent_id: str = "",
) -> IntegrationResponse:
"""更新集成系统配置。
支持三种模式:
- url_key 模式(Dify / RAGFlow):传入 api_url + api_key
- access_key 模式(火绒安全):传入 access_key_id + access_key_secret + base_url
- account_password 模式(联软LV7000):传入 api_account + api_password + base_url + validate_key
Args:
db: 数据库会话
integration_id: 集成系统ID(如 dify/ragflow/huorong/lianruan
api_url: API 地址(url_key 模式)
api_key: API Keyurl_key 模式)
access_key_id: AccessKey IDaccess_key 模式)
access_key_secret: AccessKey Secretaccess_key 模式)
base_url: 内网 Base URLaccess_key/account_password 模式)
api_account: API账号(account_password 模式)
api_password: API密码(account_password 模式)
validate_key: 验证密钥(account_password 模式,可选)
agent_id: 操作人 agent_id
Returns:
IntegrationResponse: 更新后的集成系统信息
Raises:
AppException: 集成系统不存在或不可配置
"""
# 查找集成定义
integ_def = None
for d in INTEGRATION_DEFINITIONS:
if d["id"] == integration_id:
integ_def = d
break
if not integ_def:
raise AppException(1003, f"集成系统不存在: {integration_id}")
if not integ_def["configurable"]:
raise AppException(1001, f"集成系统不可配置: {integ_def['name']}")
config_type = integ_def.get("config_type", "url_key")
prefix = integ_def["key_prefix"]
# ---------- url_key 模式(Dify / RAGFlow----------
if config_type == "url_key":
await _upsert_system_config(db, f"{prefix}api_url", api_url, f"{integ_def['name']} API 地址", agent_id)
await _upsert_system_config(db, f"{prefix}api_key", api_key, f"{integ_def['name']} API Key", agent_id)
if api_url and api_key:
status = "connected"
elif api_url:
status = "partial"
else:
status = "disconnected"
return IntegrationResponse(
id=integration_id,
name=integ_def["name"],
status=status,
configurable=True,
config_type="url_key",
config=IntegrationConfig(
api_url=api_url,
api_key_set=bool(api_key),
),
)
# ---------- access_key 模式(火绒安全)----------
elif config_type == "access_key":
await _upsert_system_config(db, f"{prefix}access_key_id", access_key_id, f"{integ_def['name']} AccessKey ID", agent_id)
await _upsert_system_config(db, f"{prefix}access_key_secret", access_key_secret, f"{integ_def['name']} AccessKey Secret", agent_id)
await _upsert_system_config(db, f"{prefix}base_url", base_url, f"{integ_def['name']} Base URL", agent_id)
if access_key_id and access_key_secret and base_url:
status = "connected"
elif base_url:
status = "partial"
else:
status = "disconnected"
return IntegrationResponse(
id=integration_id,
name=integ_def["name"],
status=status,
configurable=True,
config_type="access_key",
config=IntegrationConfig(
# 前端复用字段(url_key 模式展示用)
api_url=base_url,
api_key_set=bool(access_key_id),
# access_key 模式专属字段
access_key_id_set=bool(access_key_id),
access_key_secret_set=bool(access_key_secret),
base_url=base_url or None,
),
)
# ---------- account_password 模式(联软LV7000----------
elif config_type == "account_password":
await _upsert_system_config(db, f"{prefix}api_account", api_account, f"{integ_def['name']} API账号", agent_id)
await _upsert_system_config(db, f"{prefix}api_password", api_password, f"{integ_def['name']} API密码", agent_id)
await _upsert_system_config(db, f"{prefix}base_url", base_url, f"{integ_def['name']} Base URL", agent_id)
await _upsert_system_config(db, f"{prefix}validate_key", validate_key, f"{integ_def['name']} 验证密钥", agent_id)
if api_account and api_password and base_url:
status = "connected"
elif base_url:
status = "partial"
else:
status = "disconnected"
return IntegrationResponse(
id=integration_id,
name=integ_def["name"],
status=status,
configurable=True,
config_type="account_password",
config=IntegrationConfig(
api_url=base_url,
api_key_set=bool(api_account),
base_url=base_url or None,
api_account_set=bool(api_account),
api_password_set=bool(api_password),
),
)
raise AppException(1001, f"未知配置类型: {config_type}")
async def _get_config_value(db: AsyncSession, key: str) -> str:
"""快速读取单个配置值。
Args:
db: 数据库会话
key: 配置键
Returns:
str: 配置值,不存在返回空字符串
"""
result = await db.execute(
select(SystemConfig.config_value).where(
SystemConfig.config_key == key
)
)
row = result.scalar()
return row if row else ""
async def _upsert_system_config(
db: AsyncSession,
key: str,
value: str,
description: str,
agent_id: str,
) -> None:
"""插入或更新 system_configs 记录,并记录变更日志。
Args:
db: 数据库会话
key: 配置键
value: 配置值
description: 配置说明
agent_id: 操作人
"""
result = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == key)
)
config = result.scalars().first()
if config:
old_value = config.config_value
config.config_value = value
config.updated_at = datetime.now()
db.add(config)
# 记录变更日志
change_log = ConfigChangeLog(
config_key=key,
old_value=old_value,
new_value=value,
changed_by=agent_id,
)
db.add(change_log)
else:
new_config = SystemConfig(
config_key=key,
config_value=value,
description=description,
)
db.add(new_config)
# 记录新增日志
change_log = ConfigChangeLog(
config_key=key,
old_value="",
new_value=value,
changed_by=agent_id,
)
db.add(change_log)
# ==========================================================================
# 快速回复审核
# ==========================================================================
async def list_pending_quick_replies(
db: AsyncSession,
category: Optional[str] = None,
) -> List[AdminQuickReplyResponse]:
"""获取待审核快速回复模板列表。
Args:
db: 数据库会话
category: 按分类筛选(可选)
Returns:
List[AdminQuickReplyResponse]: 待审核模板列表
"""
stmt = (
select(QuickReplyTemplate)
.where(QuickReplyTemplate.status == "pending_review")
.order_by(QuickReplyTemplate.updated_at.desc())
)
if category:
stmt = stmt.where(QuickReplyTemplate.category == category)
result = await db.execute(stmt)
templates = list(result.scalars().all())
# 批量查询提交人姓名
submitted_by_ids = list({t.submitted_by for t in templates if t.submitted_by})
agent_names: Dict[str, str] = {}
if submitted_by_ids:
agents_result = await db.execute(
select(Agent.id, Agent.name).where(Agent.id.in_(submitted_by_ids))
)
agent_names = dict(agents_result.all())
items = []
for t in templates:
items.append(
AdminQuickReplyResponse(
id=t.id,
category=t.category,
title=t.title,
content=t.content,
variables=t.variables or [],
status=t.status,
version=t.version,
submitted_by=t.submitted_by,
submitted_by_name=agent_names.get(t.submitted_by or "", ""),
sort_order=t.sort_order,
created_at=t.created_at,
updated_at=t.updated_at,
)
)
return items
async def review_quick_reply(
db: AsyncSession,
template_id: str,
action: str,
reason: str,
agent_id: str,
) -> Dict[str, Any]:
"""审核快速回复模板(通过/驳回)。
Args:
db: 数据库会话
template_id: 模板ID
action: 审核动作(approve/reject
reason: 审核原因
agent_id: 审核人 agent_id
Returns:
Dict: 包含 id, status, version
Raises:
AppException: 模板不存在或动作非法
"""
if action not in ("approve", "reject"):
raise AppException(1001, "审核动作只能是 approve 或 reject")
result = await db.execute(
select(QuickReplyTemplate).where(QuickReplyTemplate.id == template_id)
)
template = result.scalars().first()
if not template:
raise ERR_NOT_FOUND
# 校验模板状态:仅允许审核 pending_review 状态的模板
if template.status != "pending_review":
raise AppException(
1001,
f"当前模板状态为 {template.status},仅 pending_review 状态可审核"
)
if action == "approve":
template.status = "approved"
template.version = (template.version or 1) + 1
else:
template.status = "rejected"
template.updated_at = datetime.now()
db.add(template)
logger.info(
f"快速回复审核: id={template_id}, action={action}, "
f"by={agent_id}, reason={reason}"
)
return {
"id": template.id,
"status": template.status,
"version": template.version,
}
# ==========================================================================
# 分配模式
# ==========================================================================
async def get_assignment_mode(db: AsyncSession) -> AssignmentModeResponse:
"""获取当前分配模式。
从 system_configs 表读取 assign_mode 配置。
Args:
db: 数据库会话
Returns:
AssignmentModeResponse: 分配模式信息
"""
# 读取当前分配模式
result = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == "assign_mode")
)
config = result.scalars().first()
current_mode = config.config_value if config else "manual"
# 构建模式列表
modes = [
AssignmentModeItem(
id=m["id"],
name=m["name"],
enabled=(m["id"] == current_mode),
locked=m["locked"],
unlock_at=m["unlock_at"],
)
for m in ASSIGNMENT_MODES
]
return AssignmentModeResponse(
current_mode=current_mode,
modes=modes,
)
async def update_assignment_mode(
db: AsyncSession,
mode: str,
agent_id: str,
) -> AssignmentModeResponse:
"""切换分配模式(阶段一仅允许手动接单)。
Args:
db: 数据库会话
mode: 分配模式ID
agent_id: 操作人 agent_id
Returns:
AssignmentModeResponse: 更新后的分配模式信息
Raises:
AppException: 模式不存在或已锁定
"""
# 验证模式是否合法
mode_def = None
for m in ASSIGNMENT_MODES:
if m["id"] == mode:
mode_def = m
break
if not mode_def:
raise AppException(1001, f"无效的分配模式: {mode}")
if mode_def["locked"]:
raise AppException(1001, f"分配模式已锁定,将在{mode_def['unlock_at']}解锁")
# 更新或创建 assign_mode 配置
await _upsert_system_config(db, "assign_mode", mode, "消息分配模式", agent_id)
logger.info(f"切换分配模式: mode={mode}, by={agent_id}")
return await get_assignment_mode(db)
# ==========================================================================
# 会话监控
# ==========================================================================
async def get_monitor_sessions(
db: AsyncSession,
status: Optional[str] = None,
) -> MonitorSessionsResponse:
"""获取实时会话列表(Demo预览)。
Args:
db: 数据库会话
status: 按状态筛选(可选,默认非 resolved)
Returns:
MonitorSessionsResponse: 会话监控数据
"""
today_start = datetime.combine(date.today(), datetime.min.time())
# 统计数据
in_progress_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.status == "serving"
)
)
in_progress = in_progress_result.scalar() or 0
queued_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.status == "queued"
)
)
queued = queued_result.scalar() or 0
resolved_today_result = await db.execute(
select(func.count(Conversation.id)).where(
Conversation.status == "resolved",
Conversation.updated_at >= today_start,
)
)
resolved_today = resolved_today_result.scalar() or 0
stats = SessionStats(
in_progress=in_progress,
queued=queued,
resolved_today=resolved_today,
alerts=0,
)
# 会话列表(默认查询非 resolved 的会话)
stmt = select(Conversation).order_by(Conversation.created_at.desc())
if status:
stmt = stmt.where(Conversation.status == status)
else:
stmt = stmt.where(Conversation.status != "resolved")
result = await db.execute(stmt.limit(50))
conversations = list(result.scalars().all())
# 批量查询坐席姓名
agent_ids = list({c.assigned_agent_id for c in conversations if c.assigned_agent_id})
agent_names: Dict[str, str] = {}
if agent_ids:
agents_result = await db.execute(
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
)
agent_names = dict(agents_result.all())
items = []
for c in conversations:
items.append(
SessionItem(
id=c.id,
employee_name=c.employee_name,
status=c.status,
assigned_agent_name=agent_names.get(c.assigned_agent_id or "", ""),
urgency_score=c.urgency_score,
created_at=c.created_at,
last_message_summary=c.last_message_summary,
)
)
return MonitorSessionsResponse(stats=stats, items=items)
# ==========================================================================
# 全局搜索
# ==========================================================================
async def global_search(
db: AsyncSession,
query: str,
) -> List[SearchItem]:
"""全局搜索配置项、坐席、快速回复。
按类型优先级排序:配置项 > 坐席 > 快速回复,同类型按名称排序。
Args:
db: 数据库会话
query: 搜索关键词
Returns:
List[SearchItem]: 搜索结果列表
"""
items: List[SearchItem] = []
keyword = f"%{query}%"
# 搜索配置项
config_result = await db.execute(
select(SystemConfig).where(
or_(
SystemConfig.config_key.ilike(keyword),
SystemConfig.description.ilike(keyword),
)
).limit(10)
)
for cfg in config_result.scalars().all():
items.append(
SearchItem(
type="config",
id=cfg.config_key,
name=cfg.description or cfg.config_key,
route="/admin/configs",
)
)
# 搜索坐席
agent_result = await db.execute(
select(Agent).where(
or_(
Agent.name.ilike(keyword),
Agent.user_id.ilike(keyword),
)
).limit(10)
)
for a in agent_result.scalars().all():
items.append(
SearchItem(
type="agent",
id=a.id,
name=a.name,
route="/admin/agents",
)
)
# 搜索快速回复
qr_result = await db.execute(
select(QuickReplyTemplate).where(
or_(
QuickReplyTemplate.title.ilike(keyword),
QuickReplyTemplate.content.ilike(keyword),
QuickReplyTemplate.category.ilike(keyword),
)
).limit(10)
)
for qr in qr_result.scalars().all():
items.append(
SearchItem(
type="quick_reply",
id=qr.id,
name=qr.title,
route="/admin/quick-replies",
)
)
return items
# ==========================================================================
# P2: 会话审计
# ==========================================================================
async def list_audit_conversations(
db: AsyncSession,
status: Optional[str] = None,
agent_id: Optional[str] = None,
keyword: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
page: int = 1,
page_size: int = 20,
) -> Dict[str, Any]:
"""获取会话审计列表(支持分页+多条件筛选)。"""
stmt = select(Conversation)
count_stmt = select(func.count(Conversation.id))
filters = []
if status:
filters.append(Conversation.status == status)
if agent_id:
filters.append(Conversation.assigned_agent_id == agent_id)
if keyword:
like_pattern = f"%{keyword}%"
filters.append(
or_(
Conversation.employee_name.ilike(like_pattern),
Conversation.last_message_summary.ilike(like_pattern),
)
)
if date_from:
try:
dt_from = datetime.strptime(date_from, "%Y-%m-%d")
filters.append(Conversation.created_at >= dt_from)
except ValueError:
pass
if date_to:
try:
dt_to = datetime.strptime(date_to, "%Y-%m-%d") + timedelta(days=1)
filters.append(Conversation.created_at < dt_to)
except ValueError:
pass
if filters:
stmt = stmt.where(and_(*filters))
count_stmt = count_stmt.where(and_(*filters))
total_result = await db.execute(count_stmt)
total = total_result.scalar() or 0
offset = (page - 1) * page_size
stmt = stmt.order_by(Conversation.created_at.desc()).offset(offset).limit(page_size)
result = await db.execute(stmt)
conversations = list(result.scalars().all())
agent_ids = list({c.assigned_agent_id for c in conversations if c.assigned_agent_id})
agent_names: Dict[str, str] = {}
if agent_ids:
agents_result = await db.execute(
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
)
agent_names = dict(agents_result.all())
items = []
for c in conversations:
items.append({
"id": c.id,
"employee_name": c.employee_name or "",
"department": c.department or "",
"status": c.status,
"assigned_agent_name": agent_names.get(c.assigned_agent_id or "", ""),
"urgency_score": c.urgency_score or 1,
"created_at": c.created_at.isoformat() if c.created_at else "",
"updated_at": c.updated_at.isoformat() if c.updated_at else "",
"last_message_summary": c.last_message_summary or "",
})
return {"items": items, "total": total, "page": page, "page_size": page_size}
async def get_audit_conversation_detail(
db: AsyncSession,
conversation_id: str,
) -> Optional[Dict[str, Any]]:
"""获取会话审计详情(含消息列表)。"""
result = await db.execute(
select(Conversation).where(Conversation.id == conversation_id)
)
conv = result.scalars().first()
if not conv:
return None
msgs_result = await db.execute(
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at.asc())
.limit(200)
)
messages = list(msgs_result.scalars().all())
agent_name = ""
if conv.assigned_agent_id:
agent_result = await db.execute(
select(Agent.name).where(Agent.id == conv.assigned_agent_id)
)
agent_name = agent_result.scalar() or ""
return {
"id": conv.id,
"employee_name": conv.employee_name or "",
"employee_id": conv.employee_id or "",
"department": conv.department or "",
"position": conv.position or "",
"status": conv.status,
"assigned_agent_name": agent_name,
"urgency_score": conv.urgency_score or 1,
"tags": conv.tags or [],
"created_at": conv.created_at.isoformat() if conv.created_at else "",
"updated_at": conv.updated_at.isoformat() if conv.updated_at else "",
"last_message_summary": conv.last_message_summary or "",
"messages": [
{
"id": m.id,
"sender_type": m.sender_type,
"sender_name": m.sender_name or "",
"content": m.content or "",
"msg_type": m.msg_type or "text",
"created_at": m.created_at.isoformat() if m.created_at else "",
}
for m in messages
],
}
# ==========================================================================
# P2: 坐席绩效统计
# ==========================================================================
async def get_agent_performance(
db: AsyncSession,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""获取坐席绩效统计。"""
today = date.today()
if date_from:
try:
dt_from = datetime.strptime(date_from, "%Y-%m-%d")
except ValueError:
dt_from = datetime(today.year, today.month, 1)
else:
dt_from = datetime(today.year, today.month, 1)
if date_to:
try:
dt_to = datetime.strptime(date_to, "%Y-%m-%d") + timedelta(days=1)
except ValueError:
dt_to = datetime.now()
else:
dt_to = datetime.now()
agents_result = await db.execute(select(Agent).order_by(Agent.name))
agents = list(agents_result.scalars().all())
if not agents:
return []
agent_ids = [a.id for a in agents]
total_conv_result = await db.execute(
select(Conversation.assigned_agent_id, func.count(Conversation.id))
.where(
Conversation.assigned_agent_id.in_(agent_ids),
Conversation.created_at >= dt_from,
Conversation.created_at < dt_to,
)
.group_by(Conversation.assigned_agent_id)
)
total_conv_map = dict(total_conv_result.all())
resolved_result = await db.execute(
select(Conversation.assigned_agent_id, func.count(Conversation.id))
.where(
Conversation.assigned_agent_id.in_(agent_ids),
Conversation.status == "resolved",
Conversation.created_at >= dt_from,
Conversation.created_at < dt_to,
)
.group_by(Conversation.assigned_agent_id)
)
resolved_map = dict(resolved_result.all())
today_start = datetime.combine(today, datetime.min.time())
today_conv_result = await db.execute(
select(Conversation.assigned_agent_id, func.count(Conversation.id))
.where(
Conversation.assigned_agent_id.in_(agent_ids),
Conversation.created_at >= today_start,
)
.group_by(Conversation.assigned_agent_id)
)
today_conv_map = dict(today_conv_result.all())
items = []
for a in agents:
total = total_conv_map.get(a.id, 0)
resolved = resolved_map.get(a.id, 0)
today_count = today_conv_map.get(a.id, 0)
resolution_rate = f"{(resolved / total * 100):.0f}%" if total > 0 else ""
items.append({
"id": a.id,
"user_id": a.user_id,
"name": a.name,
"status": a.status,
"role": a.role,
"skill_tags": a.skill_tags or [],
"current_load": a.current_load or 0,
"max_load": a.max_load or 5,
"total_conversations": total,
"resolved_conversations": resolved,
"today_conversations": today_count,
"resolution_rate": resolution_rate,
})
return items
# ==========================================================================
# P2: 系统日志
# ==========================================================================
async def get_system_logs(
db: AsyncSession,
page: int = 1,
page_size: int = 50,
) -> Dict[str, Any]:
"""获取系统日志(配置变更日志)。"""
count_result = await db.execute(select(func.count(ConfigChangeLog.id)))
total = count_result.scalar() or 0
offset = (page - 1) * page_size
result = await db.execute(
select(ConfigChangeLog)
.order_by(ConfigChangeLog.changed_at.desc())
.offset(offset)
.limit(page_size)
)
logs = list(result.scalars().all())
agent_ids = list({log.changed_by for log in logs if log.changed_by})
agent_names: Dict[str, str] = {}
if agent_ids:
agents_result = await db.execute(
select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))
)
agent_names = dict(agents_result.all())
items = []
for log in logs:
items.append({
"id": log.id,
"log_type": "config_change",
"config_key": log.config_key,
"old_value": log.old_value or "",
"new_value": log.new_value or "",
"changed_by": log.changed_by or "",
"changed_by_name": agent_names.get(log.changed_by or "", ""),
"changed_at": log.changed_at.isoformat() if log.changed_at else "",
})
return {"items": items, "total": total, "page": page, "page_size": page_size}