""" 企微IT智能服务台 — 在 8001 端口启动后端 + 测试 绕过 8000 端口的僵尸 socket 问题 """ import subprocess import sys import time import urllib.request import urllib.error import json PYTHON = r"C:\Users\simon\AppData\Local\Programs\Python\Python312\python.exe" BACKEND_DIR = r"C:\Users\simon\wecom_it_smart_desk\backend" PORT = 8001 # 换端口!8000 有僵尸 socket def wait_backend_ready(max_wait=20): """等待后端 /health 返回 200""" start = time.time() while time.time() - start < max_wait: try: req = urllib.request.Request(f"http://localhost:{PORT}/health") with urllib.request.urlopen(req, timeout=3) as resp: if resp.status == 200: print(f" 后端已就绪(耗时 {time.time() - start:.1f} 秒)") return True except Exception: pass time.sleep(1) print(f" ⚠️ 后端就绪等待超时({max_wait} 秒)") return False def test_endpoint(method, path, data=None): """测试单个 HTTP 端点""" url = f"http://localhost:{PORT}{path}" body = json.dumps(data).encode() if data else None headers = {"Content-Type": "application/json"} if data else {} req = urllib.request.Request(url, data=body, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=5) as resp: return resp.status, resp.read().decode("utf-8", errors="replace")[:500] except urllib.error.HTTPError as e: return e.code, e.read().decode("utf-8", errors="replace")[:500] except Exception as e: return 0, str(e) if __name__ == "__main__": # Step 1: 先试着杀掉 8001 端口的进程(以防之前用过) print("=" * 60) print("Step 1: 清理端口 8001") print("=" * 60) r = subprocess.run(["netstat", "-ano"], capture_output=True, text=True) for line in r.stdout.splitlines(): if ":8001" in line and "LISTENING" in line: pid = line.strip().split()[-1] if pid.isdigit(): print(f" 杀掉 PID={pid}") subprocess.run(["taskkill", "/F", "/PID", pid], capture_output=True) print(" OK") # Step 2: 启动后端 print() print("=" * 60) print("Step 2: 在端口 8001 启动后端") print("=" * 60) log_file = open( r"C:\Users\simon\wecom_it_smart_desk\backend_log_8001.txt", "w", encoding="utf-8" ) backend_proc = subprocess.Popen( [PYTHON, "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", str(PORT)], cwd=BACKEND_DIR, stdout=log_file, stderr=subprocess.STDOUT, ) print(f" 后端进程已启动 (PID={backend_proc.pid})") # Step 3: 等待就绪 print() print("=" * 60) print("Step 3: 等待后端就绪") print("=" * 60) if not wait_backend_ready(): # 读取日志看看什么情况 log_file.close() time.sleep(1) try: with open(r"C:\Users\simon\wecom_it_smart_desk\backend_log_8001.txt", "r", encoding="utf-8", errors="replace") as f: print(f.read()[-2000:]) except: pass sys.exit(1) # Step 4: 测试端点 print() print("=" * 60) print("Step 4: 测试所有端点") print("=" * 60) tests = [ ("GET", "/health", None, "健康检查"), ("GET", "/api/test-ping", None, "诊断 Ping"), ("GET", "/api/test-error", None, "诊断 Error(测试异常捕获)"), ("POST", "/api/agents/login", {"user_id": "test_diag", "name": "诊断用户"}, "坐席登录"), ("GET", "/api/agents", None, "坐席列表"), ] for method, path, data, desc in tests: status, body = test_endpoint(method, path, data) icon = "✅" if (200 <= status < 300) else "❌" print(f" {icon} {method} {path} => {status}") for line in body.strip().splitlines(): print(f" {line}") # Step 5: 读取后端日志 print() print("=" * 60) print("Step 5: 后端日志(含错误堆栈)") print("=" * 60) time.sleep(1) log_file.close() try: with open(r"C:\Users\simon\wecom_it_smart_desk\backend_log_8001.txt", "r", encoding="utf-8", errors="replace") as f: lines = f.read().strip().splitlines() if len(lines) > 60: print(f" (日志共 {len(lines)} 行,显示最后 60 行)") lines = lines[-60:] for line in lines: print(f" {line}") except Exception as e: print(f" 读取日志失败: {e}") print() print("=" * 60) print("🎉 测试完成!") print(f" 后端地址: http://localhost:{PORT}") print(f" 后端 PID: {backend_proc.pid}") print(f" 停止命令: taskkill /F /PID {backend_proc.pid}") print("") print(" ⚠️ 前端需要更新代理端口到 8001") print(" 请在 PowerShell 执行以下命令更新前端:") print(f" (见下方提示)") print("=" * 60)