#!/usr/bin/env python3 """Gateway 存活监控脚本 每 10 秒探测一次 Gateway WebSocket 端口,记录状态到日志文件。 用于长期验证 Gateway 稳定性,确认 Phase -1 方案可行。 用法: python3 scripts/gateway_monitor.py # 默认 localhost:18789 python3 scripts/gateway_monitor.py --interval 5 # 5 秒间隔 python3 scripts/gateway_monitor.py --host 192.168.2.153 --port 18789 输出: logs/gateway_monitor.log (每行: timestamp | status | latency_ms | detail) 停止: Ctrl+C """ import asyncio import argparse import time import signal import sys from pathlib import Path from datetime import datetime # 日志目录 LOG_DIR = Path(__file__).parent.parent / "logs" LOG_FILE = LOG_DIR / "gateway_monitor.log" # 统计 stats = { "total": 0, "ok": 0, "fail": 0, "consecutive_fail": 0, "max_consecutive_fail": 0, "min_ms": float("inf"), "max_ms": 0, "total_ms": 0, } async def probe_gateway(host: str, port: int, timeout: float = 3.0) -> dict: """探测 Gateway WebSocket 端口""" start = time.monotonic() try: reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=timeout ) # 发 WebSocket Upgrade 请求 upgrade_req = ( "GET /ws HTTP/1.1\r\n" f"Host: {host}:{port}\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" "Sec-WebSocket-Version: 13\r\n" "\r\n" ) writer.write(upgrade_req.encode()) await asyncio.wait_for(writer.drain(), timeout=timeout) resp = await asyncio.wait_for(reader.read(256), timeout=timeout) elapsed_ms = (time.monotonic() - start) * 1000 writer.close() try: await asyncio.wait_for(writer.wait_closed(), timeout=1) except Exception: pass resp_text = resp.decode(errors="replace") is_ws = "101" in resp_text and "websocket" in resp_text.lower() if is_ws: return {"status": "ok", "latency_ms": round(elapsed_ms, 1)} else: return { "status": "unexpected", "latency_ms": round(elapsed_ms, 1), "detail": resp_text.split("\r\n")[0][:80], } except asyncio.TimeoutError: elapsed_ms = (time.monotonic() - start) * 1000 return {"status": "timeout", "latency_ms": round(elapsed_ms, 1)} except ConnectionRefusedError: elapsed_ms = (time.monotonic() - start) * 1000 return {"status": "refused", "latency_ms": round(elapsed_ms, 1)} except OSError as e: elapsed_ms = (time.monotonic() - start) * 1000 return {"status": "error", "latency_ms": round(elapsed_ms, 1), "detail": str(e)[:80]} def write_log(result: dict): """写一行日志""" LOG_DIR.mkdir(parents=True, exist_ok=True) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") detail = result.get("detail", "") line = f"{now} | {result['status']:12s} | {result['latency_ms']:8.1f}ms | {detail}\n" with open(LOG_FILE, "a") as f: f.write(line) # 也打到 stdout print(line.rstrip()) def print_summary(): """打印统计摘要""" if stats["total"] == 0: return avg_ms = stats["total_ms"] / stats["total"] ok_rate = stats["ok"] / stats["total"] * 100 print(f"\n{'='*60}") print(f"Gateway Monitor Summary") print(f"{'='*60}") print(f"Total probes: {stats['total']}") print(f"OK: {stats['ok']} ({ok_rate:.1f}%)") print(f"Fail: {stats['fail']}") print(f"Max consecutive: {stats['max_consecutive_fail']}") print(f"Latency: min={stats['min_ms']:.1f}ms avg={avg_ms:.1f}ms max={stats['max_ms']:.1f}ms") print(f"Log: {LOG_FILE}") async def main(host: str, port: int, interval: int): print(f"Gateway Monitor started") print(f" Target: {host}:{port}") print(f" Interval: {interval}s") print(f" Log: {LOG_FILE}") print(f" Ctrl+C to stop\n") running = True def handle_signal(sig, frame): nonlocal running running = False print("\nStopping...") signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) while running: result = await probe_gateway(host, port) stats["total"] += 1 stats["total_ms"] += result["latency_ms"] if result["status"] == "ok": stats["ok"] += 1 stats["consecutive_fail"] = 0 else: stats["fail"] += 1 stats["consecutive_fail"] += 1 if stats["consecutive_fail"] > stats["max_consecutive_fail"]: stats["max_consecutive_fail"] = stats["consecutive_fail"] if result["latency_ms"] < stats["min_ms"]: stats["min_ms"] = result["latency_ms"] if result["latency_ms"] > stats["max_ms"]: stats["max_ms"] = result["latency_ms"] write_log(result) # 等 interval 秒,但提前退出 try: await asyncio.sleep(interval) except asyncio.CancelledError: break print_summary() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Gateway 存活监控") parser.add_argument("--host", default="127.0.0.1", help="Gateway host") parser.add_argument("--port", type=int, default=18789, help="Gateway port") parser.add_argument("--interval", type=int, default=10, help="探测间隔(秒)") args = parser.parse_args() try: asyncio.run(main(args.host, args.port, args.interval)) except KeyboardInterrupt: print_summary()