diff --git a/docs/design/15-runaway-guard.md b/docs/design/15-runaway-guard.md new file mode 100644 index 0000000..421b00f --- /dev/null +++ b/docs/design/15-runaway-guard.md @@ -0,0 +1,61 @@ +# §15 Runaway Guard — Per-Task Dispatch 上限 + +> 设计文档 v1.0 | 2026-06-16 + +## 问题 + +mail/toolchain task 走 handler auto-working(跳过 claim 阶段),不受 claim_timeout 的 3 次重试兜底保护。如果一个 auto-working task 反复 spawn 但永远到不了 done/failed,会无限循环消耗资源。 + +### 实际案例 + +2026-06-15 mention 重复投递事件:`spawn_full_agent` 在 `use_main_session=True` 时返回 `None`,ticker `_process_mentions` 误判为失败,每次 tick(30s)都重试。同一 mention 投递了 4 次,直到 retry_count 达到 mention_queue 的 5 次上限才停止。 + +直接根因已由 PR #80 修复,但如果类似 bug 再次出现,当前没有任何机制阻止 task 层面的无限循环。 + +## 设计 + +### 机制 + +tasks 表新增 `dispatch_count` 字段,每次 ticker 成功 dispatch 一个 task 时递增。当 `dispatch_count >= 10`(全局默认)时,自动标 failed。 + +### 默认值选择 + +全局默认 10 次。参考 Hermes v0.13 Best Practices §3 "Per-Task 重试上限": + +- 简单任务重试 1 次 +- 复杂任务重试 3 次 +- crash recovery(3 次)+ api_retry(3 次)余量 = ~10 次 + +### 适用范围 + +所有 task 类型(task/mail/toolchain),所有非终态(pending/working/claimed)。 + +### 检查时机 + +在 `_check_timeouts` 方法开头,先于现有的 claimed/working 超时检查执行。 + +### 与现有机制的关系 + +| 机制 | 覆盖场景 | 触发动作 | +|------|---------|---------| +| claim_timeout retry_count >= 3 | 广播任务无人认领 | 升级庞统 | +| crash_limit 3/30min | working 状态 crash | 标 failed | +| api_retry_count | API 连续失败 | 标 failed | +| 续杯 max_retries 3 | 续杯耗尽 | 标 failed | +| working timeout | working 超时 | 标 failed 或 done | +| **runaway_guard 10 次** | **任何状态的无限循环** | **标 failed** | + +runaway_guard 是最后一道防线,覆盖所有其他机制遗漏的循环场景。 + +## 改动文件 + +| 文件 | 改动 | +|------|------| +| `src/blackboard/db.py` | `_safe_add_column(conn, "tasks", "dispatch_count", "INTEGER DEFAULT 0")` | +| `src/blackboard/models.py` | Task dataclass 加 `dispatch_count: int = 0` | +| `src/daemon/ticker.py` | `_dispatch_pending` / `_dispatch_reviews` 递增 dispatch_count;`_check_timeouts` 加 runaway guard 检查 | + +## 参考 + +- Hermes v0.13 Kanban Best Practices §3 "Per-Task 重试上限" +- 实际案例:2026-06-15 mention 重复投递事件(PR #80 修复了直接根因,runaway guard 作为兜底) diff --git a/src/blackboard/db.py b/src/blackboard/db.py index f2dc012..7563f4e 100644 --- a/src/blackboard/db.py +++ b/src/blackboard/db.py @@ -117,6 +117,7 @@ def _migrate_v28(conn: sqlite3.Connection) -> None: _safe_add_column(conn, "tasks", "round_count", "INTEGER DEFAULT 0") _safe_add_column(conn, "tasks", "resumed_from", "TEXT") + _safe_add_column(conn, "tasks", "dispatch_count", "INTEGER DEFAULT 0") # 3. checkpoints 表(M3) conn.execute("""CREATE TABLE IF NOT EXISTS checkpoints ( diff --git a/src/blackboard/models.py b/src/blackboard/models.py index b6a2dbc..23b8e9a 100644 --- a/src/blackboard/models.py +++ b/src/blackboard/models.py @@ -41,6 +41,8 @@ class Task: resumed_from: Optional[str] = None # 暂停前状态,恢复时回到原状态 # v2.9 四相循环 round_count: int = 0 # 庞统 review 轮次计数 + # §15 Runaway Guard + dispatch_count: int = 0 # 被 ticker dispatch 的总次数 # v2.8 归档 archived: bool = False archived_at: Optional[str] = None diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index 2bbda1a..dec6b06 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -1084,6 +1084,19 @@ Parent Task ID: {parent_task.id} broadcast_ids = await self._broadcast_claim(broadcast_tasks, db_path, project_id) dispatched.extend(broadcast_ids) + # §15 Runaway Guard: 统一递增 dispatch_count + if dispatched: + conn = get_connection(db_path) + try: + for tid in dispatched: + conn.execute( + "UPDATE tasks SET dispatch_count = COALESCE(dispatch_count, 0) + 1 WHERE id=?", + (tid,), + ) + conn.commit() + finally: + conn.close() + return dispatched async def _broadcast_claim(self, tasks: list, db_path: Path, @@ -1376,6 +1389,19 @@ Parent Task ID: {parent_task.id} except Exception: logger.exception("Review dispatch failed for %s", task.id) + # §15 Runaway Guard: 统一递增 dispatch_count (review) + if dispatched: + conn = get_connection(db_path) + try: + for tid in dispatched: + conn.execute( + "UPDATE tasks SET dispatch_count = COALESCE(dispatch_count, 0) + 1 WHERE id=?", + (tid,), + ) + conn.commit() + finally: + conn.close() + return dispatched # ------------------------------------------------------------------ @@ -1388,6 +1414,31 @@ Parent Task ID: {parent_task.id} reclaimed: List[str] = [] now = datetime.utcnow() # UTC,与 SQLite datetime('now') 一致 + # §15 Runaway Guard: per-task dispatch_count 上限检查 + # 覆盖所有状态,防止无限循环 dispatch + MAX_DISPATCH_COUNT = 10 + for status_to_check in ("pending", "working", "claimed"): + tasks_to_check = queries.tasks_by_status(status_to_check) + for task in tasks_to_check: + dispatch_count = getattr(task, 'dispatch_count', 0) or 0 + if dispatch_count >= MAX_DISPATCH_COUNT: + conn = get_connection(db_path) + try: + ok = self._transition_status( + conn, task.id, "failed", + agent="daemon", + detail={"reason": "runaway_guard", + "dispatch_count": dispatch_count, + "message": f"dispatch {dispatch_count} 次仍未完成,自动标 failed"}, + ) + if ok: + reclaimed.append(task.id) + logger.error( + "Task %s: runaway guard triggered (dispatch_count=%d, status=%s), marking failed", + task.id, dispatch_count, status_to_check) + finally: + conn.close() + # claimed 超时 → 重置为 pending(如果 retry_count >= 3 则升级庞统) claimed = queries.tasks_by_status("claimed") for task in claimed: diff --git a/tests/integration/test_ticker_integration.py b/tests/integration/test_ticker_integration.py index d4822a9..72ba960 100644 --- a/tests/integration/test_ticker_integration.py +++ b/tests/integration/test_ticker_integration.py @@ -543,3 +543,94 @@ class TestCheckTimeoutsUnified: reclaimed = ticker._check_timeouts(db_path) assert "t-review-dead" not in reclaimed + + +# --------------------------------------------------------------------------- +# E13: §15 Runaway Guard — per-task dispatch_count 上限 +# --------------------------------------------------------------------------- + +class TestRunawayGuard: + """E13: dispatch_count >= 10 → 自动标 failed(覆盖所有非终态)""" + + @pytest.fixture + def guard_project(self, tmp_path): + """创建项目 + 任务""" + data_root = tmp_path / "projects" + registry = ProjectRegistry(data_root) + registry.create_project("guard-proj", "Guard Test", agents=["agent-a"]) + db_path = data_root / "guard-proj" / "blackboard.db" + bb = Blackboard(db_path) + return registry, db_path, bb + + def test_runaway_guard_triggers_working(self, guard_project): + """E13.1: working 状态 dispatch_count >= 10 → 标 failed""" + registry, db_path, bb = guard_project + + bb.create_task(Task( + id="t-runaway", title="Runaway Task", status="working", + assigned_by="daemon", current_agent="agent-a", + )) + + conn = bb._conn() + try: + conn.execute( + "UPDATE tasks SET dispatch_count = 10 WHERE id = ?", ("t-runaway",)) + conn.commit() + finally: + conn.close() + + ticker = Ticker(registry, tick_interval=30) + reclaimed = ticker._check_timeouts(db_path) + + assert "t-runaway" in reclaimed + task = Queries(db_path).task_by_id("t-runaway") + assert task.status == "failed" + + def test_runaway_guard_triggers_pending(self, guard_project): + """E13.2: pending 状态 dispatch_count >= 10 → 标 failed""" + registry, db_path, bb = guard_project + + bb.create_task(Task( + id="t-pending-runaway", title="Pending Runaway", status="pending", + assigned_by="daemon", + )) + + conn = bb._conn() + try: + conn.execute( + "UPDATE tasks SET dispatch_count = 10 WHERE id = ?", + ("t-pending-runaway",)) + conn.commit() + finally: + conn.close() + + ticker = Ticker(registry, tick_interval=30) + reclaimed = ticker._check_timeouts(db_path) + + assert "t-pending-runaway" in reclaimed + task = Queries(db_path).task_by_id("t-pending-runaway") + assert task.status == "failed" + + def test_runaway_guard_not_triggered(self, guard_project): + """E13.3: dispatch_count < 10 → 正常流程不受影响""" + registry, db_path, bb = guard_project + + bb.create_task(Task( + id="t-normal", title="Normal Task", status="working", + assigned_by="daemon", current_agent="agent-a", + )) + + conn = bb._conn() + try: + conn.execute( + "UPDATE tasks SET dispatch_count = 5 WHERE id = ?", ("t-normal",)) + conn.commit() + finally: + conn.close() + + ticker = Ticker(registry, tick_interval=30) + reclaimed = ticker._check_timeouts(db_path) + + assert "t-normal" not in reclaimed + task = Queries(db_path).task_by_id("t-normal") + assert task.status == "working"