diff --git a/src/daemon/dispatcher.py b/src/daemon/dispatcher.py index 4353ade..a296926 100644 --- a/src/daemon/dispatcher.py +++ b/src/daemon/dispatcher.py @@ -229,6 +229,12 @@ class Dispatcher: _dispatcher = self _is_review = action_type == "review" + # v2.8.1 Fix-2: 明确需要回退 current_agent 的 outcome + ROLLBACK_CURRENT_AGENT_OUTCOMES = frozenset({ + "crashed", "compact_failed", "process_crash", + "session_stuck", "compact_hanging", + }) + def _task_on_complete(aid, outcome): try: if _is_review: @@ -239,7 +245,26 @@ class Dispatcher: _dispatcher._mark_task_status(_task_db, _task_id, "done") logger.info("Task %s: review complete (%s), marking done", _task_id, outcome) else: - logger.warning("Task %s: review agent %s, NOT marking done", _task_id, outcome) + logger.warning("Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome) + # v2.8.1 Fix-2: crash 后回退 current_agent,避免 exclude_current 卡死 + if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES: + try: + conn = get_connection(_task_db) + try: + conn.execute( + "UPDATE tasks SET current_agent = " + "(SELECT assignee FROM tasks WHERE id=?) " + "WHERE id=? AND current_agent=?", + (_task_id, _task_id, aid) + ) + conn.commit() + finally: + conn.close() + logger.info("Task %s: rolled back current_agent from %s to assignee", + _task_id, aid) + except Exception as e: + logger.warning("Task %s: failed to rollback current_agent: %s", + _task_id, e) else: _dispatcher._task_auto_complete(_task_id, _task_db) except Exception as e: @@ -759,3 +784,31 @@ class Dispatcher: conn.close() except Exception as e: logger.error("Task %s: mark status error: %s", task_id, e) + + @staticmethod + def _check_crash_limit(task_id: str, db_path: pathlib.Path, limit: int = 3, + window_minutes: int = 30) -> bool: + """v2.8.1 Fix-3c: 检查 task 最近 window_minutes 内的 crash 次数是否超限。 + + 基于 task_attempts 表(持久化),PM2 重启不丢失。 + Returns: True = 已超限,应 escalate。 + """ + try: + conn = get_connection(db_path) + try: + row = conn.execute( + "SELECT COUNT(*) as cnt FROM task_attempts " + "WHERE task_id=? AND outcome='crashed' " + "AND started_at > datetime('now', ?)", + (task_id, f'-{window_minutes} minutes') + ).fetchone() + count = row["cnt"] if row else 0 + if count >= limit: + logger.warning("Task %s: crash limit reached (%d/%d in %dm)", + task_id, count, limit, window_minutes) + return True + return False + finally: + conn.close() + except Exception: + return False diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index 1b56f24..c1f43cf 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -757,6 +757,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ # 其他:A1(completed), A4(agent_failed), A7(auth_failed), # A8(gateway_unreachable), A11(lock_conflict), # A10(compact_failed), A12(agent_error) + # v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间 + if outcome in ("crashed", "compact_failed", "process_crash", "session_stuck", + "compact_hanging", "agent_error") and self.counter: + self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟 + logger.info("Crash/error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome) # 进程退出 → on_complete release counter # 任务状态由各 outcome 自行处理(或等 ticker) await self._do_on_complete_async(on_complete, agent_id, outcome) @@ -1088,9 +1093,59 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ logger.exception("Failed to revive %s", agent_id) return False + @staticmethod + def _check_recent_compaction_jsonl(session_file: str, window_seconds: int = 300) -> bool: + """v2.8.1 Fix-1: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。 + + 比 compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录, + 但不保证更新 compactionCheckpoints。 + """ + if not session_file or not pathlib.Path(session_file).exists(): + return False + try: + from datetime import datetime, timezone + now = datetime.now(timezone.utc) + with open(session_file, "rb") as sf: + sf.seek(0, 2) + size = sf.tell() + sf.seek(max(0, size - 51200)) + tail = sf.read().decode("utf-8", errors="replace") + for line in reversed(tail.splitlines()): + if not line.strip(): + continue + try: + import json as _json + obj = _json.loads(line) + except (_json.JSONDecodeError, ValueError): + continue + if obj.get("type") == "compaction": + ts = obj.get("timestamp", "") + if ts: + try: + ct = datetime.fromisoformat(ts.replace("Z", "+00:00")) + if (now - ct).total_seconds() < window_seconds: + return True + except (ValueError, TypeError): + pass + ts = obj.get("timestamp", "") + if ts: + try: + ct = datetime.fromisoformat(ts.replace("Z", "+00:00")) + if (now - ct).total_seconds() >= window_seconds: + break + except (ValueError, TypeError): + pass + return False + except Exception: + return False + @staticmethod def _check_session_state(agent_id: str) -> dict: - """检查 sessions.json 和 lock 状态""" + """检查 sessions.json 和 lock 状态 + + v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1), + 替代失效的 compactionCheckpoints 检测。 + """ result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False} sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json" if not sessions_path.exists(): @@ -1125,13 +1180,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ except Exception: pass - # 最近 5 分钟的 compact - import time - now_ms = time.time() * 1000 - for cp in main_session.get("compactionCheckpoints", []): - if (now_ms - cp.get("createdAt", 0)) < 300_000: - result["recent_compact"] = True - break + # v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描 + # 只在 agent 非空闲时才扫描(减少不必要 I/O) + if result["status"] not in ("done", "idle", "unknown", None) and sf: + result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf) except Exception: pass return result diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index f4703e4..4ec4bb7 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -1154,6 +1154,25 @@ Parent Task ID: {parent_task.id} if existing: continue # 已有活跃 review dispatch + # v2.8.1 Fix-3c: crash 计数上限检查 + if self.dispatcher._check_crash_limit(task.id, db_path, limit=3, window_minutes=30): + conn = get_connection(db_path) + try: + self._transition_status( + conn, task.id, "failed", + agent="daemon", + detail={"reason": "review_crash_limit", + "message": "30 分钟内 crash 3 次,自动标 failed"}, + ) + finally: + conn.close() + bb.add_observation( + task.id, "daemon", + "审查任务连续 crash 3 次(30 分钟内),自动标记为 failed,请人工介入", + ) + logger.error("Task %s: 3 crashes in 30min, marking failed", task.id) + continue + # 检查是否有产出(司马懿建议:无产出直接标 failed) outputs = bb.get_outputs(task.id) if not outputs: @@ -1343,6 +1362,54 @@ Parent Task ID: {parent_task.id} except Exception: logger.exception("Failed to push back task %s", task_id_check) + # v2.8.1 Fix-3b: review 超时回收 + # review 状态无超时回收是设计空白:review agent crash 后任务永远卡在 review + review_timeout_minutes = getattr(self, 'review_timeout_minutes', 15) + review_tasks = queries.tasks_by_status("review") + for task in review_tasks: + current = getattr(task, 'current_agent', None) + # 检查是否有活跃的 review agent 进程 + if current and self.spawner: + session_info = self.spawner.get_session_by_agent(current) + if session_info: + pid = session_info.get("pid") + if pid and self._is_pid_alive(pid): + continue # review agent 还在跑,不回收 + # 无活跃进程 → 检查超时 + updated = task.updated_at + if not updated: + continue + try: + elapsed = (now - datetime.fromisoformat(updated)).total_seconds() / 60.0 + if elapsed > review_timeout_minutes: + try: + conn = get_connection(db_path) + try: + current_row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task.id,) + ).fetchone() + if current_row and current_row["status"] != "review": + continue + self._transition_status( + conn, task.id, "pending", + agent="daemon", + detail={"reason": "review_timeout", + "elapsed_minutes": round(elapsed, 1)}, + ) + conn.execute( + "UPDATE tasks SET current_agent=NULL WHERE id=?", (task.id,) + ) + conn.commit() + finally: + conn.close() + reclaimed.append(task.id) + logger.warning("Review timeout: %s (%.1fm > %dm), pushed back to pending", + task.id, elapsed, review_timeout_minutes) + except Exception: + logger.exception("Failed to reclaim review task %s", task.id) + except (ValueError, TypeError): + pass + return reclaimed def _mail_check_reply(self, original_task_id: str, db_path: Path) -> bool: