Files
sanguo_moziplus_v2/scripts/gateway_monitor.py
T
2026-06-02 14:43:43 +08:00

184 lines
5.7 KiB
Python

#!/usr/bin/env python3
"""Gateway 存活监控脚本
每 10 秒探测一次 Gateway WebSocket 端口,记录状态到日志文件。
用于长期验证 Gateway 稳定性,确认 Phase -1 方案可行。
用法:
python3 scripts/gateway_monitor.py # 默认 localhost:18789
python3 scripts/gateway_monitor.py --interval 5 # 5 秒间隔
python3 scripts/gateway_monitor.py --host 192.168.2.153 --port 18789
输出:
logs/gateway_monitor.log (每行: timestamp | status | latency_ms | detail)
停止:
Ctrl+C
"""
import asyncio
import argparse
import time
import signal
import sys
from pathlib import Path
from datetime import datetime
# 日志目录
LOG_DIR = Path(__file__).parent.parent / "logs"
LOG_FILE = LOG_DIR / "gateway_monitor.log"
# 统计
stats = {
"total": 0,
"ok": 0,
"fail": 0,
"consecutive_fail": 0,
"max_consecutive_fail": 0,
"min_ms": float("inf"),
"max_ms": 0,
"total_ms": 0,
}
async def probe_gateway(host: str, port: int, timeout: float = 3.0) -> dict:
"""探测 Gateway WebSocket 端口"""
start = time.monotonic()
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port), timeout=timeout
)
# 发 WebSocket Upgrade 请求
upgrade_req = (
"GET /ws HTTP/1.1\r\n"
f"Host: {host}:{port}\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n"
)
writer.write(upgrade_req.encode())
await asyncio.wait_for(writer.drain(), timeout=timeout)
resp = await asyncio.wait_for(reader.read(256), timeout=timeout)
elapsed_ms = (time.monotonic() - start) * 1000
writer.close()
try:
await asyncio.wait_for(writer.wait_closed(), timeout=1)
except Exception:
pass
resp_text = resp.decode(errors="replace")
is_ws = "101" in resp_text and "websocket" in resp_text.lower()
if is_ws:
return {"status": "ok", "latency_ms": round(elapsed_ms, 1)}
else:
return {
"status": "unexpected",
"latency_ms": round(elapsed_ms, 1),
"detail": resp_text.split("\r\n")[0][:80],
}
except asyncio.TimeoutError:
elapsed_ms = (time.monotonic() - start) * 1000
return {"status": "timeout", "latency_ms": round(elapsed_ms, 1)}
except ConnectionRefusedError:
elapsed_ms = (time.monotonic() - start) * 1000
return {"status": "refused", "latency_ms": round(elapsed_ms, 1)}
except OSError as e:
elapsed_ms = (time.monotonic() - start) * 1000
return {"status": "error", "latency_ms": round(elapsed_ms, 1), "detail": str(e)[:80]}
def write_log(result: dict):
"""写一行日志"""
LOG_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
detail = result.get("detail", "")
line = f"{now} | {result['status']:12s} | {result['latency_ms']:8.1f}ms | {detail}\n"
with open(LOG_FILE, "a") as f:
f.write(line)
# 也打到 stdout
print(line.rstrip())
def print_summary():
"""打印统计摘要"""
if stats["total"] == 0:
return
avg_ms = stats["total_ms"] / stats["total"]
ok_rate = stats["ok"] / stats["total"] * 100
print(f"\n{'='*60}")
print(f"Gateway Monitor Summary")
print(f"{'='*60}")
print(f"Total probes: {stats['total']}")
print(f"OK: {stats['ok']} ({ok_rate:.1f}%)")
print(f"Fail: {stats['fail']}")
print(f"Max consecutive: {stats['max_consecutive_fail']}")
print(f"Latency: min={stats['min_ms']:.1f}ms avg={avg_ms:.1f}ms max={stats['max_ms']:.1f}ms")
print(f"Log: {LOG_FILE}")
async def main(host: str, port: int, interval: int):
print(f"Gateway Monitor started")
print(f" Target: {host}:{port}")
print(f" Interval: {interval}s")
print(f" Log: {LOG_FILE}")
print(f" Ctrl+C to stop\n")
running = True
def handle_signal(sig, frame):
nonlocal running
running = False
print("\nStopping...")
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
while running:
result = await probe_gateway(host, port)
stats["total"] += 1
stats["total_ms"] += result["latency_ms"]
if result["status"] == "ok":
stats["ok"] += 1
stats["consecutive_fail"] = 0
else:
stats["fail"] += 1
stats["consecutive_fail"] += 1
if stats["consecutive_fail"] > stats["max_consecutive_fail"]:
stats["max_consecutive_fail"] = stats["consecutive_fail"]
if result["latency_ms"] < stats["min_ms"]:
stats["min_ms"] = result["latency_ms"]
if result["latency_ms"] > stats["max_ms"]:
stats["max_ms"] = result["latency_ms"]
write_log(result)
# 等 interval 秒,但提前退出
try:
await asyncio.sleep(interval)
except asyncio.CancelledError:
break
print_summary()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Gateway 存活监控")
parser.add_argument("--host", default="127.0.0.1", help="Gateway host")
parser.add_argument("--port", type=int, default=18789, help="Gateway port")
parser.add_argument("--interval", type=int, default=10, help="探测间隔(秒)")
args = parser.parse_args()
try:
asyncio.run(main(args.host, args.port, args.interval))
except KeyboardInterrupt:
print_summary()