#!/usr/bin/env python3 """测试 Dify AI 连通性 + 同步管理后台配置""" import urllib.request, json, ssl ctx = ssl.create_default_context() BASE = "https://itsupport.servyou.com.cn" def api(method, path, data=None, token=None): url = f"{BASE}{path}" body = json.dumps(data).encode() if data else None req = urllib.request.Request(url, data=body, method=method) req.add_header("Content-Type", "application/json") if token: req.add_header("Authorization", f"Bearer {token}") try: resp = urllib.request.urlopen(req, context=ctx, timeout=60) raw = resp.read().decode() try: return resp.status, json.loads(raw) except json.JSONDecodeError: return resp.status, {"raw": raw} except urllib.request.HTTPError as e: raw = e.read().decode() try: return e.code, json.loads(raw) except json.JSONDecodeError: return e.code, {"raw": raw} # Step 1: Admin login print("=== Step 1: Admin login ===") code, body = api("POST", "/api/agents/login", {"user_id": "admin001", "name": "\u5b8b\u732e"}) admin_token = body.get("data", {}).get("token", "") print(f" Code: {body.get('code')}, Token: {'OK' if admin_token else 'NONE'}") # Step 2: Update Dify integration config in DB print("\n=== Step 2: Update Dify integration config ===") code, body = api("PUT", "/api/admin/integrations/dify", { "api_url": "http://yw-dify.dc.servyou-it.com/dify2openai/v1/chat/completions", "api_key": "http://yw-dify.dc.servyou-it.com/v1|app-UaTWYdBSwN6VktKQlbh5YN5H|Chat", }, admin_token) print(f" Code: {body.get('code')}") data = body.get("data", {}) if data: print(f" ID: {data.get('id')}") print(f" Name: {data.get('name')}") print(f" Status: {data.get('status')}") config = data.get("config", {}) print(f" Config: {json.dumps(config, ensure_ascii=False)}") else: print(f" Data: {json.dumps(body, ensure_ascii=False)[:300]}") # Step 3: Verify integration status print("\n=== Step 3: Verify integration status ===") code, body = api("GET", "/api/admin/integrations", token=admin_token) items = body.get("data", {}).get("items", []) if isinstance(body.get("data"), dict) else [] for i in items: name = i.get("name", "?") status = i.get("status", "?") print(f" {name}: {status}") # Step 4: Test Dify API directly from server (via H5 mock-login + send message) print("\n=== Step 4: Test Dify AI via H5 message ===") code, body = api("POST", "/api/h5/mock-login", {"employee_id": "emp001", "employee_name": "\u6d4b\u8bd5\u5458\u5de5"}) h5_token = body.get("data", {}).get("token", "") print(f" H5 Login: code={body.get('code')}") if h5_token: # Get or create conversation code, body = api("GET", "/api/h5/conversations/current", token=h5_token) conv_id = body.get("data", {}).get("id") if isinstance(body.get("data"), dict) else None print(f" Conversation: code={body.get('code')}, id={conv_id}") if conv_id: # Send a test message to trigger AI print(f" Sending test message to trigger Dify AI...") code, body = api("POST", f"/api/h5/conversations/{conv_id}/messages", { "content": "hello, this is a test" }, h5_token) print(f" Send message: code={body.get('code')}") msg_data = body.get("data", {}) if msg_data: print(f" Message ID: {msg_data.get('message_id', msg_data.get('id', 'N/A'))}") print(f" AI Reply: {json.dumps(msg_data.get('ai_reply', msg_data.get('content', 'N/A')), ensure_ascii=False)[:200]}") else: print(f" Response: {json.dumps(body, ensure_ascii=False)[:300]}")