diff --git a/scripts/gateway_monitor.py b/scripts/gateway_monitor.py new file mode 100644 index 0000000..ba35e85 --- /dev/null +++ b/scripts/gateway_monitor.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +"""Gateway 存活监控脚本 + +每 10 秒探测一次 Gateway WebSocket 端口,记录状态到日志文件。 +用于长期验证 Gateway 稳定性,确认 Phase -1 方案可行。 + +用法: + python3 scripts/gateway_monitor.py # 默认 localhost:18789 + python3 scripts/gateway_monitor.py --interval 5 # 5 秒间隔 + python3 scripts/gateway_monitor.py --host 192.168.2.153 --port 18789 + +输出: + logs/gateway_monitor.log (每行: timestamp | status | latency_ms | detail) + +停止: + Ctrl+C +""" + +import asyncio +import argparse +import time +import signal +import sys +from pathlib import Path +from datetime import datetime + +# 日志目录 +LOG_DIR = Path(__file__).parent.parent / "logs" +LOG_FILE = LOG_DIR / "gateway_monitor.log" + +# 统计 +stats = { + "total": 0, + "ok": 0, + "fail": 0, + "consecutive_fail": 0, + "max_consecutive_fail": 0, + "min_ms": float("inf"), + "max_ms": 0, + "total_ms": 0, +} + + +async def probe_gateway(host: str, port: int, timeout: float = 3.0) -> dict: + """探测 Gateway WebSocket 端口""" + start = time.monotonic() + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port), timeout=timeout + ) + # 发 WebSocket Upgrade 请求 + upgrade_req = ( + "GET /ws HTTP/1.1\r\n" + f"Host: {host}:{port}\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + "Sec-WebSocket-Version: 13\r\n" + "\r\n" + ) + writer.write(upgrade_req.encode()) + await asyncio.wait_for(writer.drain(), timeout=timeout) + + resp = await asyncio.wait_for(reader.read(256), timeout=timeout) + elapsed_ms = (time.monotonic() - start) * 1000 + + writer.close() + try: + await asyncio.wait_for(writer.wait_closed(), timeout=1) + except Exception: + pass + + resp_text = resp.decode(errors="replace") + is_ws = "101" in resp_text and "websocket" in resp_text.lower() + + if is_ws: + return {"status": "ok", "latency_ms": round(elapsed_ms, 1)} + else: + return { + "status": "unexpected", + "latency_ms": round(elapsed_ms, 1), + "detail": resp_text.split("\r\n")[0][:80], + } + + except asyncio.TimeoutError: + elapsed_ms = (time.monotonic() - start) * 1000 + return {"status": "timeout", "latency_ms": round(elapsed_ms, 1)} + except ConnectionRefusedError: + elapsed_ms = (time.monotonic() - start) * 1000 + return {"status": "refused", "latency_ms": round(elapsed_ms, 1)} + except OSError as e: + elapsed_ms = (time.monotonic() - start) * 1000 + return {"status": "error", "latency_ms": round(elapsed_ms, 1), "detail": str(e)[:80]} + + +def write_log(result: dict): + """写一行日志""" + LOG_DIR.mkdir(parents=True, exist_ok=True) + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + detail = result.get("detail", "") + line = f"{now} | {result['status']:12s} | {result['latency_ms']:8.1f}ms | {detail}\n" + with open(LOG_FILE, "a") as f: + f.write(line) + # 也打到 stdout + print(line.rstrip()) + + +def print_summary(): + """打印统计摘要""" + if stats["total"] == 0: + return + avg_ms = stats["total_ms"] / stats["total"] + ok_rate = stats["ok"] / stats["total"] * 100 + print(f"\n{'='*60}") + print(f"Gateway Monitor Summary") + print(f"{'='*60}") + print(f"Total probes: {stats['total']}") + print(f"OK: {stats['ok']} ({ok_rate:.1f}%)") + print(f"Fail: {stats['fail']}") + print(f"Max consecutive: {stats['max_consecutive_fail']}") + print(f"Latency: min={stats['min_ms']:.1f}ms avg={avg_ms:.1f}ms max={stats['max_ms']:.1f}ms") + print(f"Log: {LOG_FILE}") + + +async def main(host: str, port: int, interval: int): + print(f"Gateway Monitor started") + print(f" Target: {host}:{port}") + print(f" Interval: {interval}s") + print(f" Log: {LOG_FILE}") + print(f" Ctrl+C to stop\n") + + running = True + + def handle_signal(sig, frame): + nonlocal running + running = False + print("\nStopping...") + + signal.signal(signal.SIGINT, handle_signal) + signal.signal(signal.SIGTERM, handle_signal) + + while running: + result = await probe_gateway(host, port) + + stats["total"] += 1 + stats["total_ms"] += result["latency_ms"] + + if result["status"] == "ok": + stats["ok"] += 1 + stats["consecutive_fail"] = 0 + else: + stats["fail"] += 1 + stats["consecutive_fail"] += 1 + if stats["consecutive_fail"] > stats["max_consecutive_fail"]: + stats["max_consecutive_fail"] = stats["consecutive_fail"] + + if result["latency_ms"] < stats["min_ms"]: + stats["min_ms"] = result["latency_ms"] + if result["latency_ms"] > stats["max_ms"]: + stats["max_ms"] = result["latency_ms"] + + write_log(result) + + # 等 interval 秒,但提前退出 + try: + await asyncio.sleep(interval) + except asyncio.CancelledError: + break + + print_summary() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Gateway 存活监控") + parser.add_argument("--host", default="127.0.0.1", help="Gateway host") + parser.add_argument("--port", type=int, default=18789, help="Gateway port") + parser.add_argument("--interval", type=int, default=10, help="探测间隔(秒)") + args = parser.parse_args() + + try: + asyncio.run(main(args.host, args.port, args.interval)) + except KeyboardInterrupt: + print_summary()