"""健康检查 — 僵尸检测 + 告警 连续 N tick 无变更(events 表无新事件)→ 写 observation 告警。 项目恢复活动后 → 自动解除告警。 """ from __future__ import annotations import json import logging from pathlib import Path from typing import Any, Dict from src.blackboard.db import get_connection from src.blackboard.queries import Queries logger = logging.getLogger("moziplus-v2.health") class HealthChecker: """per-project 僵尸检测""" def __init__(self, zombie_threshold: int = 20): """ Args: zombie_threshold: 连续多少 tick 无变更视为僵尸 """ self.zombie_threshold = zombie_threshold # project_id → 连续无变更 tick 数 self._stale_ticks: Dict[str, int] = {} # project_id → 是否已告警 self._alerted: Dict[str, bool] = {} # project_id → 上次检查时的 event count self._last_event_count: Dict[str, int] = {} def check(self, project_id: str, db_path: Path, tick_num: int) -> Dict[str, Any]: """检查单个项目的健康状态 Returns: {"healthy": bool, "zombie": bool, "stale_ticks": int, "alert_written": bool, "resolved": bool} """ str(db_path) result: Dict[str, Any] = { "healthy": True, "zombie": False, "stale_ticks": self._stale_ticks.get(project_id, 0), "alert_written": False, "resolved": False, } if not db_path.exists(): return result queries = Queries(db_path) # 用 event count 变化判断是否有真实变更 conn = queries._conn() try: conn.execute( "SELECT COUNT(*) FROM events").fetchone()[0] non_tick_events = conn.execute( "SELECT COUNT(*) FROM events WHERE event_type != 'daemon_tick' " "AND event_type != 'agent_zombie_detected'" ).fetchone()[0] finally: conn.close() last_count = self._last_event_count.get(project_id, 0) self._last_event_count[project_id] = non_tick_events has_real_change = non_tick_events > last_count if has_real_change: # 有真实变更 → 重置计数 old_stale = self._stale_ticks.pop(project_id, 0) was_alerted = self._alerted.pop(project_id, False) if was_alerted and old_stale >= self.zombie_threshold: # 解除告警 self._write_resolution(db_path, project_id, tick_num) result["resolved"] = True result["stale_ticks"] = 0 else: # 无真实变更 → 递增 stale = self._stale_ticks.get(project_id, 0) + 1 self._stale_ticks[project_id] = stale result["stale_ticks"] = stale if stale >= self.zombie_threshold and not self._alerted.get( project_id): # 写告警 self._write_alert(db_path, project_id, tick_num, stale) self._alerted[project_id] = True result["zombie"] = True result["healthy"] = False result["alert_written"] = True elif self._alerted.get(project_id): # 已告警但不再重复 result["zombie"] = True result["healthy"] = False return result def _write_alert(self, db_path: Path, project_id: str, tick_num: int, stale_ticks: int) -> None: """写入僵尸告警 observation""" conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "INSERT INTO observations (task_id, observer, severity, body) " "VALUES (?,?,?,?)", (None, "daemon", "warning", json.dumps({ "type": "zombie_detected", "project_id": project_id, "stale_ticks": stale_ticks, "threshold": self.zombie_threshold, "tick": tick_num, "message": f"项目 {project_id} 连续 {stale_ticks} tick 无真实变更", })), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (None, "daemon", "agent_zombie_detected", json.dumps({"project_id": project_id, "stale_ticks": stale_ticks})), ) conn.commit() finally: conn.close() logger.warning( "Zombie detected: %s (stale=%d)", project_id, stale_ticks) def _write_resolution(self, db_path: Path, project_id: str, tick_num: int) -> None: """解除僵尸告警""" conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "INSERT INTO observations (task_id, observer, severity, body) " "VALUES (?,?,?,?)", (None, "daemon", "info", json.dumps({ "type": "zombie_resolved", "project_id": project_id, "tick": tick_num, "message": f"项目 {project_id} 恢复活动", })), ) conn.commit() finally: conn.close() logger.info("Zombie resolved: %s", project_id) def get_status(self, project_id: str) -> Dict[str, Any]: """获取某个项目的当前健康状态""" return { "stale_ticks": self._stale_ticks.get(project_id, 0), "is_zombie": self._alerted.get(project_id, False), }