diff --git a/src/daemon/health.py b/src/daemon/health.py new file mode 100644 index 0000000..14a63dc --- /dev/null +++ b/src/daemon/health.py @@ -0,0 +1,145 @@ +"""健康检查 — 僵尸检测 + 告警 + +连续 N tick 无变更(events 表无新事件)→ 写 observation 告警。 +项目恢复活动后 → 自动解除告警。 +""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any, Dict, Optional + +from src.blackboard.db import get_connection, init_db +from src.blackboard.queries import Queries + +logger = logging.getLogger("moziplus-v2.health") + + +class HealthChecker: + """per-project 僵尸检测""" + + def __init__(self, zombie_threshold: int = 20): + """ + Args: + zombie_threshold: 连续多少 tick 无变更视为僵尸 + """ + self.zombie_threshold = zombie_threshold + # project_id → 连续无变更 tick 数 + self._stale_ticks: Dict[str, int] = {} + # project_id → 是否已告警 + self._alerted: Dict[str, bool] = {} + + def check(self, project_id: str, db_path: Path, + tick_num: int) -> Dict[str, Any]: + """检查单个项目的健康状态 + + Returns: + {"healthy": bool, "zombie": bool, "stale_ticks": int, + "alert_written": bool, "resolved": bool} + """ + db_key = str(db_path) + result: Dict[str, Any] = { + "healthy": True, + "zombie": False, + "stale_ticks": self._stale_ticks.get(project_id, 0), + "alert_written": False, + "resolved": False, + } + + if not db_path.exists(): + return result + + queries = Queries(db_path) + events = queries.recent_events(limit=1) + + # 判断是否有"真实"变更(排除 daemon_tick 自身) + has_real_change = False + for e in events: + if e.get("event_type") != "daemon_tick": + has_real_change = True + break + + if has_real_change: + # 有真实变更 → 重置计数 + old_stale = self._stale_ticks.pop(project_id, 0) + was_alerted = self._alerted.pop(project_id, False) + if was_alerted and old_stale >= self.zombie_threshold: + # 解除告警 + self._write_resolution(db_path, project_id, tick_num) + result["resolved"] = True + result["stale_ticks"] = 0 + else: + # 无真实变更 → 递增 + stale = self._stale_ticks.get(project_id, 0) + 1 + self._stale_ticks[project_id] = stale + result["stale_ticks"] = stale + + if stale >= self.zombie_threshold and not self._alerted.get(project_id): + # 写告警 + self._write_alert(db_path, project_id, tick_num, stale) + self._alerted[project_id] = True + result["zombie"] = True + result["healthy"] = False + result["alert_written"] = True + + return result + + def _write_alert(self, db_path: Path, project_id: str, + tick_num: int, stale_ticks: int) -> None: + """写入僵尸告警 observation""" + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + "INSERT INTO observations (task_id, observer, severity, body) " + "VALUES (?,?,?,?)", + (None, "daemon", "warning", + json.dumps({ + "type": "zombie_detected", + "project_id": project_id, + "stale_ticks": stale_ticks, + "threshold": self.zombie_threshold, + "tick": tick_num, + "message": f"项目 {project_id} 连续 {stale_ticks} tick 无真实变更", + })), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (None, "daemon", "agent_zombie_detected", + json.dumps({"project_id": project_id, "stale_ticks": stale_ticks})), + ) + conn.commit() + finally: + conn.close() + logger.warning("Zombie detected: %s (stale=%d)", project_id, stale_ticks) + + def _write_resolution(self, db_path: Path, project_id: str, + tick_num: int) -> None: + """解除僵尸告警""" + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + "INSERT INTO observations (task_id, observer, severity, body) " + "VALUES (?,?,?,?)", + (None, "daemon", "info", + json.dumps({ + "type": "zombie_resolved", + "project_id": project_id, + "tick": tick_num, + "message": f"项目 {project_id} 恢复活动", + })), + ) + conn.commit() + finally: + conn.close() + logger.info("Zombie resolved: %s", project_id) + + def get_status(self, project_id: str) -> Dict[str, Any]: + """获取某个项目的当前健康状态""" + return { + "stale_ticks": self._stale_ticks.get(project_id, 0), + "is_zombie": self._alerted.get(project_id, False), + }