Files

1015 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# 企微IT智能服务台 — 管理后台 API 路由
# =============================================================================
# 说明:管理后台的全部路由端点,统一 /admin/ 前缀
# 包含 7 组 API
# 1. 运营总览仪表盘
# 2. 功能开关/参数管理
# 3. 坐席人员管理
# 4. 外部系统集成配置
# 5. 快速回复管理(审核)
# 6. 消息分配模式
# 7. 会话监控
# 8. 全局搜索
# 所有接口需要 admin 角色权限
# =============================================================================
import logging
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.agents import get_current_agent
from app.database import get_db
from app.models.agent import Agent
from app.schemas.admin import (
AgentCreateRequest,
AgentUpdateRequest,
AssignmentModeUpdateRequest,
ConfigUpdateRequest,
IntegrationUpdateRequest,
QuickReplyReviewRequest,
)
from app.services import admin_service
from app.utils.response import AppException, success_response
logger = logging.getLogger(__name__)
# 创建路由器
router = APIRouter(prefix="/admin")
# --------------------------------------------------------------------------
# 管理后台权限校验依赖
# --------------------------------------------------------------------------
async def require_admin(
agent: Agent = Depends(get_current_agent),
) -> Agent:
"""管理后台权限校验:仅 role='admin' 可访问。
Args:
agent: 当前坐席(通过认证依赖注入)
Returns:
Agent: 具有管理权限的坐席对象
Raises:
AppException: 非管理员角色(错误码 1004)
"""
if agent.role != "admin":
raise AppException(1004, "无管理权限")
return agent
# ==========================================================================
# 1. 运营总览仪表盘
# ==========================================================================
# ---------- GET /api/admin/dashboard/overview ----------
@router.get("/dashboard/overview")
async def get_dashboard_overview(
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取仪表盘统计数据。
返回在线坐席数、今日会话数、待审核数、集成健康状态等。
Args:
admin: 管理员(权限校验)
db: 数据库会话
Returns:
Dict: 统一响应格式,包含仪表盘数据
"""
overview = await admin_service.get_dashboard_overview(db)
return success_response(data=overview.model_dump())
# ==========================================================================
# 2. 功能开关/参数管理
# ==========================================================================
# ---------- GET /api/admin/configs ----------
@router.get("/configs")
async def get_configs(
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取全部配置项(按功能分组)。
Args:
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含配置分组列表
"""
groups = await admin_service.get_config_groups(db)
return success_response(data={
"groups": [g.model_dump() for g in groups]
})
# ---------- PUT /api/admin/configs/{key} ----------
@router.put("/configs/{key}")
async def update_config(
key: str,
body: ConfigUpdateRequest,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""更新单个配置项(同时记录变更日志)。
Args:
key: 配置键
body: 更新请求体
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含变更信息
"""
result = await admin_service.update_config(db, key, body.value, admin.id)
return success_response(data=result)
# ---------- GET /api/admin/configs/{key}/history ----------
@router.get("/configs/{key}/history")
async def get_config_history(
key: str,
limit: int = Query(20, ge=1, le=100, description="返回条数上限"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取指定配置项的变更历史。
Args:
key: 配置键
limit: 返回条数上限
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含变更历史列表
"""
items = await admin_service.get_config_history(db, key, limit)
return success_response(data={
"items": [item.model_dump() for item in items]
})
# ==========================================================================
# 3. 坐席人员管理
# ==========================================================================
# ---------- GET /api/admin/agents ----------
@router.get("/agents")
async def list_admin_agents(
status: Optional[str] = Query(None, description="按状态筛选: online/offline/busy"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取坐席列表(管理视图,含角色/技能标签)。
Args:
status: 按状态筛选(可选)
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含坐席列表
"""
items = await admin_service.list_admin_agents(db, status)
return success_response(data={
"items": [item.model_dump() for item in items]
})
# ---------- POST /api/admin/agents ----------
@router.post("/agents")
async def create_agent(
body: AgentCreateRequest,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""添加坐席。
Args:
body: 创建请求体
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含创建的坐席信息
"""
agent = await admin_service.create_agent(
db,
user_id=body.user_id,
name=body.name,
role=body.role,
skill_tags=body.skill_tags,
max_load=body.max_load,
)
return success_response(data=agent.model_dump())
# ---------- PUT /api/admin/agents/{id} ----------
@router.put("/agents/{agent_id}")
async def update_agent(
agent_id: str,
body: AgentUpdateRequest,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""编辑坐席(角色/技能标签/负载上限)。
Args:
agent_id: 坐席ID
body: 更新请求体
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含更新后的坐席信息
"""
result = await admin_service.update_agent(
db,
agent_id=agent_id,
role=body.role,
skill_tags=body.skill_tags,
max_load=body.max_load,
)
return success_response(data=result)
# ---------- DELETE /api/admin/agents/{id} ----------
@router.delete("/agents/{agent_id}")
async def delete_agent(
agent_id: str,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""移除坐席。
Args:
agent_id: 坐席ID
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式
"""
await admin_service.delete_agent(db, agent_id)
return success_response(data=None, message="坐席已移除")
# ---------- POST /api/admin/agents/{id}/otp-unbind ----------
@router.post("/agents/{agent_id}/otp-unbind")
async def admin_unbind_agent_otp(
agent_id: str,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""强制解绑坐席的OTP。
Args:
agent_id: 坐席ID
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式
"""
from app.models.agent import Agent as AgentModel
from sqlalchemy import select
from datetime import datetime
stmt = select(AgentModel).where(AgentModel.id == agent_id)
result = await db.execute(stmt)
agent = result.scalars().first()
if not agent:
raise AppException(1001, "坐席不存在")
agent.otp_secret = None
agent.otp_enabled = 0
agent.updated_at = datetime.now()
db.add(agent)
await db.flush()
logger.info(f"管理员强制解绑OTP: agent={agent.user_id}")
return success_response(data={"message": "OTP已解绑"})
# ==========================================================================
# 4. 外部系统集成配置
# ==========================================================================
# ---------- GET /api/admin/integrations ----------
@router.get("/integrations")
async def get_integrations(
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取集成系统列表及配置状态。
Args:
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含集成系统列表
"""
items = await admin_service.get_integrations(db)
return success_response(data={
"items": [item.model_dump() for item in items]
})
# ---------- PUT /api/admin/integrations/{id} ----------
@router.put("/integrations/{integration_id}")
async def update_integration(
integration_id: str,
body: IntegrationUpdateRequest,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""更新集成配置(支持 url_key 和 access_key 两种模式)。
- url_key 模式(Dify / RAGFlow):传入 api_url + api_key
- access_key 模式(火绒安全):传入 access_key_id + access_key_secret + base_url
Args:
integration_id: 集成系统ID(如 dify/ragflow/huorong
body: 更新请求体
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含更新后的集成信息
"""
result = await admin_service.update_integration(
db,
integration_id=integration_id,
# url_key 模式字段
api_url=body.api_url or "",
api_key=body.api_key or "",
# access_key 模式字段(火绒)
access_key_id=body.access_key_id or "",
access_key_secret=body.access_key_secret or "",
# account_password 模式字段(联软)
api_account=body.api_account or "",
api_password=body.api_password or "",
validate_key=body.validate_key or "",
base_url=body.base_url or "",
agent_id=admin.id,
)
return success_response(data=result.model_dump())
# ==========================================================================
# 5. 快速回复管理
# ==========================================================================
# ---------- GET /api/admin/quick-replies/pending ----------
@router.get("/quick-replies/pending")
async def list_pending_quick_replies(
category: Optional[str] = Query(None, description="按分类筛选"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取待审核快速回复模板列表。
Args:
category: 按分类筛选(可选)
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含待审核模板列表
"""
items = await admin_service.list_pending_quick_replies(db, category)
return success_response(data={
"items": [item.model_dump() for item in items]
})
# ---------- PUT /api/admin/quick-replies/{id}/review ----------
@router.put("/quick-replies/{template_id}/review")
async def review_quick_reply(
template_id: str,
body: QuickReplyReviewRequest,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""审核快速回复模板(通过/驳回)。
Args:
template_id: 模板ID
body: 审核请求体
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含审核结果
"""
result = await admin_service.review_quick_reply(
db,
template_id=template_id,
action=body.action,
reason=body.reason,
agent_id=admin.id,
)
return success_response(data=result)
# ==========================================================================
# 6. 消息分配模式
# ==========================================================================
# ---------- GET /api/admin/assignment-mode ----------
@router.get("/assignment-mode")
async def get_assignment_mode(
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取当前分配模式。
Args:
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含分配模式信息
"""
result = await admin_service.get_assignment_mode(db)
return success_response(data=result.model_dump())
# ---------- PUT /api/admin/assignment-mode ----------
@router.put("/assignment-mode")
async def update_assignment_mode(
body: AssignmentModeUpdateRequest,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""切换分配模式(阶段一仅允许手动接单)。
Args:
body: 更新请求体
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含更新后的分配模式
"""
result = await admin_service.update_assignment_mode(db, body.mode, admin.id)
return success_response(data=result.model_dump())
# ==========================================================================
# 7. 会话监控
# ==========================================================================
# ---------- GET /api/admin/monitor/sessions ----------
@router.get("/monitor/sessions")
async def get_monitor_sessions(
status: Optional[str] = Query(None, description="按会话状态筛选"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取实时会话列表(Demo预览)。
Args:
status: 按会话状态筛选(可选)
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含会话监控数据
"""
result = await admin_service.get_monitor_sessions(db, status)
return success_response(data=result.model_dump())
# ==========================================================================
# 8. 全局搜索
# ==========================================================================
# ---------- GET /api/admin/search ----------
@router.get("/search")
async def global_search(
q: str = Query(..., min_length=1, description="搜索关键词"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""搜索配置项、坐席、快速回复。
Args:
q: 搜索关键词
admin: 管理员
db: 数据库会话
Returns:
Dict: 统一响应格式,包含搜索结果
"""
items = await admin_service.global_search(db, q)
return success_response(data={
"items": [item.model_dump() for item in items]
})
# ==========================================================================
# 9. 火绒安全集成 API
# ==========================================================================
# 说明:火绒终端安全系统的数据代理端点
# - 测试连接:验证 AccessKey 签名是否正确
# - 终端列表:代理查询火绒终端
# - 终端详情:代理查询终端详细信息
# - 漏洞信息:代理查询高危漏洞
# - 病毒事件:代理查询病毒事件统计
# ==========================================================================
# ---------- POST /api/admin/integrations/huorong/test ----------
@router.post("/integrations/huorong/test")
async def test_huorong_connection(
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""测试火绒API连接。
使用当前保存的 AccessKey 配置调用火绒 _list 接口验证连接。
仅请求1条数据,最小化对火绒系统的影响。
失败时返回调试信息帮助排查凭据问题。
Args:
admin: 管理员
db: 数据库会话
Returns:
Dict: 包含 success(bool) 和 message(str),失败时含 debug 信息
"""
from app.integrations.huorong.config import get_huorong_client
from app.integrations.huorong.exceptions import HuorongConfigError, HuorongError
try:
client = await get_huorong_client(db)
result = await client.test_connection()
# 附带凭据摘要(脱敏),方便排查
result["debug"] = {
"base_url": client.base_url,
"access_key_id": client.access_key_id,
"key_length": len(client.access_key_secret),
}
# 失败时附加排查提示
if not result.get("success"):
result["debug_hint"] = (
"请检查: 1) 登录火绒控制中心 → 中心设置 → 通用设置 → API接口 → 管理密钥,"
"确认 AccessKey ID 和 Secret 正确; 2) 确认 AccessKey 未过期或被撤销; "
"3) 确认当前网络可访问火绒控制中心内网地址"
)
return success_response(data=result)
except HuorongConfigError as e:
return success_response(data={
"success": False,
"message": e.message,
})
except HuorongError as e:
# 认证失败时返回调试信息帮助用户排查
return success_response(data={
"success": False,
"message": f"测试失败: {e.message}",
"debug_hint": "请检查: 1) 火绒控制中心 > 中心设置 > API接口 > 管理密钥 确认凭据正确; 2) AccessKey 是否已过期或被撤销",
})
# ---------- GET /api/admin/integrations/huorong/terminals ----------
@router.get("/integrations/huorong/terminals")
async def list_huorong_terminals(
group_id: Optional[str] = Query(None, description="分组ID"),
page: int = Query(1, ge=1, description="页码"),
per_page: int = Query(20, ge=1, le=100, description="每页条数"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""查询火绒终端列表。
代理火绒 /api/clnts/_list 接口,返回终端基本信息。
Args:
group_id: 分组ID(可选)
page: 页码
per_page: 每页条数
admin: 管理员
db: 数据库会话
Returns:
Dict: 终端列表数据
"""
from app.integrations.huorong.config import get_huorong_client
from app.integrations.huorong.exceptions import HuorongConfigError, HuorongError
try:
client = await get_huorong_client(db)
result = await client.list_terminals(group_id=group_id, page=page, per_page=per_page)
# 序列化 Pydantic 模型
result["items"] = [item.model_dump() for item in result["items"]]
return success_response(data=result)
except HuorongConfigError as e:
return success_response(data={"error": e.message, "error_code": "config_missing"})
except HuorongError as e:
return success_response(data={"error": e.message, "error_code": "api_error"})
# ---------- GET /api/admin/integrations/huorong/terminals/{client_id} ----------
@router.get("/integrations/huorong/terminals/{client_id}")
async def get_huorong_terminal_detail(
client_id: str,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""查询火绒终端详细信息。
代理火绒 /api/clnts/_info2 接口,返回终端硬件/软件/资产/网络配置。
Args:
client_id: 终端唯一ID
admin: 管理员
db: 数据库会话
Returns:
Dict: 终端详细信息
"""
from app.integrations.huorong.config import get_huorong_client
from app.integrations.huorong.exceptions import HuorongConfigError, HuorongError
try:
client = await get_huorong_client(db)
detail = await client.get_terminal_detail(client_id)
return success_response(data=detail.model_dump())
except HuorongConfigError as e:
return success_response(data={"error": e.message, "error_code": "config_missing"})
except HuorongError as e:
return success_response(data={"error": e.message, "error_code": "api_error"})
# ---------- GET /api/admin/integrations/huorong/leaks ----------
@router.get("/integrations/huorong/leaks")
async def list_huorong_leaks(
group_id: Optional[str] = Query(None, description="分组ID"),
page: int = Query(1, ge=1, description="页码"),
per_page: int = Query(20, ge=1, le=100, description="每页条数"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""查询火绒高危漏洞终端。
代理火绒 /api/clnts/_leak 接口。
Args:
group_id: 分组ID(可选)
page: 页码
per_page: 每页条数
admin: 管理员
db: 数据库会话
Returns:
Dict: 漏洞终端列表
"""
from app.integrations.huorong.config import get_huorong_client
from app.integrations.huorong.exceptions import HuorongConfigError, HuorongError
try:
client = await get_huorong_client(db)
result = await client.list_terminal_leaks(group_id=group_id, page=page, per_page=per_page)
result["items"] = [item.model_dump() for item in result["items"]]
return success_response(data=result)
except HuorongConfigError as e:
return success_response(data={"error": e.message, "error_code": "config_missing"})
except HuorongError as e:
return success_response(data={"error": e.message, "error_code": "api_error"})
# ---------- GET /api/admin/integrations/huorong/virus-events ----------
@router.get("/integrations/huorong/virus-events")
async def list_huorong_virus_events(
client_id: Optional[str] = Query(None, description="终端IDtype=0时需提供)"),
group_id: Optional[str] = Query(None, description="分组IDtype=1时需提供)"),
query_type: int = Query(2, ge=0, le=2, description="查询类型: 0=按终端ID, 1=按分组ID, 2=全部"),
page: int = Query(1, ge=1, description="页码"),
per_page: int = Query(20, ge=1, le=100, description="每页条数"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""查询火绒病毒事件统计。
代理火绒 /api/clnts/_virus_events 接口。
火绒API要求 type 参数: 0=按client_id查, 1=按group_id查, 2=查全部。
Args:
client_id: 终端IDtype=0时需提供)
group_id: 分组IDtype=1时需提供)
query_type: 查询类型,默认2(全部)
page: 页码
per_page: 每页条数
admin: 管理员
db: 数据库会话
Returns:
Dict: 病毒事件统计数据
"""
from app.integrations.huorong.config import get_huorong_client
from app.integrations.huorong.exceptions import HuorongConfigError, HuorongError
try:
client = await get_huorong_client(db)
result = await client.get_virus_events(
client_id=client_id,
group_id=group_id,
query_type=query_type,
page=page,
per_page=per_page,
)
result["items"] = [item.model_dump() for item in result["items"]]
return success_response(data=result)
except HuorongConfigError as e:
return success_response(data={"error": e.message, "error_code": "config_missing"})
except HuorongError as e:
return success_response(data={"error": e.message, "error_code": "api_error"})
# ==========================================================================
# 10. 联软LV7000 安全集成 API
# ==========================================================================
# 说明:联软终端管理系统的数据代理端点
# - 测试连接:验证账号密码+Token获取
# - 终端查询:按员工账号/IP/MAC查终端(核心映射接口)
# - 终端详情:极详细硬件+软件+资产+网络信息
# ==========================================================================
# ---------- POST /api/admin/integrations/lianruan/test ----------
@router.post("/integrations/lianruan/test")
async def test_lianruan_connection(
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""测试联软API连接。
通过获取Token验证:IP白名单 + 账号密码 + Token。
Args:
admin: 管理员
db: 数据库会话
Returns:
Dict: 包含 success(bool) 和 message(str)
"""
from app.integrations.lianruan.config import get_lianruan_client
from app.integrations.lianruan.exceptions import LianruanConfigError
try:
client = await get_lianruan_client(db)
result = await client.test_connection()
return success_response(data=result)
except LianruanConfigError as e:
return success_response(data={
"success": False,
"message": e.message,
})
# ---------- GET /api/admin/integrations/lianruan/terminals ----------
@router.get("/integrations/lianruan/terminals")
async def query_lianruan_terminals(
strusername: Optional[str] = Query(None, description="员工账号(核心映射字段)"),
strdevname: Optional[str] = Query(None, description="计算机名"),
strdevip: Optional[str] = Query(None, description="IP地址"),
page: int = Query(1, ge=1, description="页码"),
per_page: int = Query(20, ge=1, le=100, description="每页条数"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""查询联软终端设备(核心映射接口)。
⭐ strusername 参数可直接按员工账号查终端!这是联软最大的优势。
Args:
strusername: 员工账号(映射金钥匙)
strdevname: 计算机名
strdevip: IP地址
page: 页码
per_page: 每页条数
admin: 管理员
db: 数据库会话
Returns:
Dict: 终端列表数据
"""
from app.integrations.lianruan.config import get_lianruan_client
from app.integrations.lianruan.exceptions import LianruanConfigError, LianruanError
try:
client = await get_lianruan_client(db)
result = await client.query_dev_by_params(
strusername=strusername or "",
strdevname=strdevname or "",
strdevip=strdevip or "",
page=page,
per_page=per_page,
)
result["items"] = [item.model_dump() for item in result["items"]]
return success_response(data=result)
except LianruanConfigError as e:
return success_response(data={"error": e.message, "error_code": "config_missing"})
except LianruanError as e:
return success_response(data={"error": e.message, "error_code": "api_error"})
# ==========================================================================
# 11. P2: 会话审计
# ==========================================================================
# ---------- GET /api/admin/audit/conversations ----------
@router.get("/audit/conversations")
async def list_audit_conversations(
status: Optional[str] = Query(None, description="按状态筛选"),
agent_id: Optional[str] = Query(None, description="按坐席筛选"),
keyword: Optional[str] = Query(None, description="按员工姓名/消息摘要搜索"),
date_from: Optional[str] = Query(None, description="开始日期 YYYY-MM-DD"),
date_to: Optional[str] = Query(None, description="结束日期 YYYY-MM-DD"),
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页条数"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取会话审计列表(支持分页+多条件筛选)。"""
result = await admin_service.list_audit_conversations(
db, status=status, agent_id=agent_id, keyword=keyword,
date_from=date_from, date_to=date_to, page=page, page_size=page_size,
)
return success_response(data=result)
# ---------- GET /api/admin/audit/conversations/{id} ----------
@router.get("/audit/conversations/{conversation_id}")
async def get_audit_conversation_detail(
conversation_id: str,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取会话审计详情(含消息列表)。"""
result = await admin_service.get_audit_conversation_detail(db, conversation_id)
if not result:
raise AppException(3004, "会话不存在")
return success_response(data=result)
# ==========================================================================
# 12. P2: 坐席绩效统计
# ==========================================================================
# ---------- GET /api/admin/agent-performance ----------
@router.get("/agent-performance")
async def get_agent_performance(
date_from: Optional[str] = Query(None, description="开始日期 YYYY-MM-DD"),
date_to: Optional[str] = Query(None, description="结束日期 YYYY-MM-DD"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取坐席绩效统计。"""
items = await admin_service.get_agent_performance(db, date_from=date_from, date_to=date_to)
return success_response(data={"items": items})
# ==========================================================================
# 13. P2: 系统日志
# ==========================================================================
# ---------- GET /api/admin/system-logs ----------
@router.get("/system-logs")
async def get_system_logs(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(50, ge=1, le=200, description="每页条数"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""获取系统日志(配置变更日志)。"""
result = await admin_service.get_system_logs(db, page=page, page_size=page_size)
return success_response(data=result)# ---------- GET /api/admin/integrations/lianruan/terminals/{devname}/detail ----------
@router.get("/integrations/lianruan/terminals/{devname}/detail")
async def get_lianruan_terminal_detail(
devname: str,
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""查询联软终端详细信息(极详细)。
比火绒_info2更丰富,含逻辑磁盘使用率、显示器、内存条详情。
Args:
devname: 计算机名
admin: 管理员
db: 数据库会话
Returns:
Dict: 终端详细信息
"""
from app.integrations.lianruan.config import get_lianruan_client
from app.integrations.lianruan.exceptions import LianruanConfigError, LianruanError
try:
client = await get_lianruan_client(db)
detail = await client.get_dev_all_info(strdevname=devname)
return success_response(data=detail.model_dump())
except LianruanConfigError as e:
return success_response(data={"error": e.message, "error_code": "config_missing"})
except LianruanError as e:
return success_response(data={"error": e.message, "error_code": "api_error"})
# ==========================================================================
# 14. RAGFlow 知识检索集成 API
# ==========================================================================
# ---------- POST /api/admin/integrations/ragflow/test ----------
@router.post("/integrations/ragflow/test")
async def test_ragflow_connection(
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""测试 RAGFlow API 连接。"""
from app.integrations.ragflow.config import get_ragflow_client
from app.integrations.ragflow.exceptions import RagflowConfigError
try:
client = await get_ragflow_client(db)
result = await client.test_connection()
return success_response(data=result)
except RagflowConfigError as e:
return success_response(data={
"success": False,
"message": e.message,
})
# ---------- GET /api/admin/integrations/ragflow/datasets ----------
@router.get("/integrations/ragflow/datasets")
async def list_ragflow_datasets(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页条数"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""列出 RAGFlow 知识库(数据集)。"""
from app.integrations.ragflow.config import get_ragflow_client
from app.integrations.ragflow.exceptions import RagflowConfigError, RagflowError
try:
client = await get_ragflow_client(db)
result = await client.list_datasets(page=page, page_size=page_size)
result["items"] = [item.model_dump() for item in result["items"]]
return success_response(data=result)
except RagflowConfigError as e:
return success_response(data={"error": e.message, "error_code": "config_missing"})
except RagflowError as e:
return success_response(data={"error": e.message, "error_code": "api_error"})
# ---------- POST /api/admin/integrations/ragflow/retrieval ----------
@router.post("/integrations/ragflow/retrieval")
async def ragflow_retrieval(
question: str = Query(..., min_length=1, description="查询问题"),
dataset_ids: Optional[str] = Query(None, description="数据集ID列表(逗号分隔)"),
top_k: int = Query(5, ge=1, le=20, description="返回结果数量"),
admin: Agent = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
"""RAGFlow 知识检索测试。"""
from app.integrations.ragflow.config import get_ragflow_client
from app.integrations.ragflow.exceptions import RagflowConfigError, RagflowError
try:
client = await get_ragflow_client(db)
ds_ids = None
if dataset_ids:
ds_ids = [id.strip() for id in dataset_ids.split(",") if id.strip()]
result = await client.retrieval(
question=question,
dataset_ids=ds_ids,
top_k=top_k,
)
return success_response(data={
"chunks": [chunk.model_dump() for chunk in result.chunks],
"doc_aggs": [agg.model_dump() for agg in result.doc_aggs],
"total": result.total,
})
except RagflowConfigError as e:
return success_response(data={"error": e.message, "error_code": "config_missing"})
except RagflowError as e:
return success_response(data={"error": e.message, "error_code": "api_error"})