Files

86 lines
3.6 KiB
Python

#!/usr/bin/env python3
"""测试 Dify AI 连通性 + 同步管理后台配置"""
import urllib.request, json, ssl
ctx = ssl.create_default_context()
BASE = "https://itsupport.servyou.com.cn"
def api(method, path, data=None, token=None):
url = f"{BASE}{path}"
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, method=method)
req.add_header("Content-Type", "application/json")
if token:
req.add_header("Authorization", f"Bearer {token}")
try:
resp = urllib.request.urlopen(req, context=ctx, timeout=60)
raw = resp.read().decode()
try:
return resp.status, json.loads(raw)
except json.JSONDecodeError:
return resp.status, {"raw": raw}
except urllib.request.HTTPError as e:
raw = e.read().decode()
try:
return e.code, json.loads(raw)
except json.JSONDecodeError:
return e.code, {"raw": raw}
# Step 1: Admin login
print("=== Step 1: Admin login ===")
code, body = api("POST", "/api/agents/login", {"user_id": "admin001", "name": "\u5b8b\u732e"})
admin_token = body.get("data", {}).get("token", "")
print(f" Code: {body.get('code')}, Token: {'OK' if admin_token else 'NONE'}")
# Step 2: Update Dify integration config in DB
print("\n=== Step 2: Update Dify integration config ===")
code, body = api("PUT", "/api/admin/integrations/dify", {
"api_url": "http://yw-dify.dc.servyou-it.com/dify2openai/v1/chat/completions",
"api_key": "http://yw-dify.dc.servyou-it.com/v1|app-UaTWYdBSwN6VktKQlbh5YN5H|Chat",
}, admin_token)
print(f" Code: {body.get('code')}")
data = body.get("data", {})
if data:
print(f" ID: {data.get('id')}")
print(f" Name: {data.get('name')}")
print(f" Status: {data.get('status')}")
config = data.get("config", {})
print(f" Config: {json.dumps(config, ensure_ascii=False)}")
else:
print(f" Data: {json.dumps(body, ensure_ascii=False)[:300]}")
# Step 3: Verify integration status
print("\n=== Step 3: Verify integration status ===")
code, body = api("GET", "/api/admin/integrations", token=admin_token)
items = body.get("data", {}).get("items", []) if isinstance(body.get("data"), dict) else []
for i in items:
name = i.get("name", "?")
status = i.get("status", "?")
print(f" {name}: {status}")
# Step 4: Test Dify API directly from server (via H5 mock-login + send message)
print("\n=== Step 4: Test Dify AI via H5 message ===")
code, body = api("POST", "/api/h5/mock-login", {"employee_id": "emp001", "employee_name": "\u6d4b\u8bd5\u5458\u5de5"})
h5_token = body.get("data", {}).get("token", "")
print(f" H5 Login: code={body.get('code')}")
if h5_token:
# Get or create conversation
code, body = api("GET", "/api/h5/conversations/current", token=h5_token)
conv_id = body.get("data", {}).get("id") if isinstance(body.get("data"), dict) else None
print(f" Conversation: code={body.get('code')}, id={conv_id}")
if conv_id:
# Send a test message to trigger AI
print(f" Sending test message to trigger Dify AI...")
code, body = api("POST", f"/api/h5/conversations/{conv_id}/messages", {
"content": "hello, this is a test"
}, h5_token)
print(f" Send message: code={body.get('code')}")
msg_data = body.get("data", {})
if msg_data:
print(f" Message ID: {msg_data.get('message_id', msg_data.get('id', 'N/A'))}")
print(f" AI Reply: {json.dumps(msg_data.get('ai_reply', msg_data.get('content', 'N/A')), ensure_ascii=False)[:200]}")
else:
print(f" Response: {json.dumps(body, ensure_ascii=False)[:300]}")