diff --git a/src/daemon/dispatcher.py b/src/daemon/dispatcher.py index d35f659..6b29b4f 100644 --- a/src/daemon/dispatcher.py +++ b/src/daemon/dispatcher.py @@ -229,7 +229,7 @@ class Dispatcher: _dispatcher = self _is_review = action_type == "review" - # v2.8.1 Fix-2: 明确需要回退 current_agent 的 outcome + # #07.2: executor/review 统一 crash 回退 ROLLBACK_CURRENT_AGENT_OUTCOMES = frozenset({ "crashed", "compact_failed", "process_crash", "session_stuck", "compact_hanging", @@ -237,35 +237,19 @@ class Dispatcher: def _task_on_complete(aid, outcome): try: + # #07.2: 统一 crash 回退——executor 和 review 都回退 current_agent + if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES and _task_db: + _dispatcher._rollback_current_agent(_task_db, _task_id, aid) + if _is_review: - # 审查 Agent 完成 > 检查 outcome - # 只有正常完成才标 done,crash/error 保持 review - if _task_db: - if outcome in ("completed", "session_revived"): - _dispatcher._mark_task_status(_task_db, _task_id, "done") - logger.info("Task %s: review complete (%s), marking done", _task_id, outcome) - else: - logger.warning("Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome) - # v2.8.1 Fix-2: crash 后回退 current_agent,避免 exclude_current 卡死 - if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES: - try: - conn = get_connection(_task_db) - try: - conn.execute( - "UPDATE tasks SET current_agent = " - "(SELECT assignee FROM tasks WHERE id=?) " - "WHERE id=? AND current_agent=?", - (_task_id, _task_id, aid) - ) - conn.commit() - finally: - conn.close() - logger.info("Task %s: rolled back current_agent from %s to assignee", - _task_id, aid) - except Exception as e: - logger.warning("Task %s: failed to rollback current_agent: %s", - _task_id, e) + # review: 正常完成标 done,crash/error 保持 review 等 ticker 处理 + if _task_db and outcome in ("completed", "session_revived"): + _dispatcher._mark_task_status(_task_db, _task_id, "done") + logger.info("Task %s: review complete (%s), marking done", _task_id, outcome) + else: + logger.warning("Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome) else: + # executor: 三信号验证 → 标 review _dispatcher._task_auto_complete(_task_id, _task_db) except Exception as e: logger.error("Task %s: on_complete error: %s", _task_id, e) @@ -772,6 +756,24 @@ class Dispatcher: logger.error("Task %s: verify error: %s", task_id, e) return True + def _rollback_current_agent(self, db_path: Path, task_id: str, agent_id: str) -> None: + """#07.2: crash 后回退 current_agent 到 assignee,避免 exclude_current 卡死""" + try: + conn = get_connection(db_path) + try: + conn.execute( + "UPDATE tasks SET current_agent = " + "(SELECT assignee FROM tasks WHERE id=?) " + "WHERE id=? AND current_agent=?", + (task_id, task_id, agent_id) + ) + conn.commit() + finally: + conn.close() + logger.info("Task %s: rolled back current_agent from %s to assignee", task_id, agent_id) + except Exception as e: + logger.warning("Task %s: failed to rollback current_agent: %s", task_id, e) + def _mark_task_status(self, db_path: Path, task_id: str, status: str) -> None: """更新任务状态 + 写审计事件""" try: diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index 4ec4bb7..ebc21e4 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -1154,24 +1154,7 @@ Parent Task ID: {parent_task.id} if existing: continue # 已有活跃 review dispatch - # v2.8.1 Fix-3c: crash 计数上限检查 - if self.dispatcher._check_crash_limit(task.id, db_path, limit=3, window_minutes=30): - conn = get_connection(db_path) - try: - self._transition_status( - conn, task.id, "failed", - agent="daemon", - detail={"reason": "review_crash_limit", - "message": "30 分钟内 crash 3 次,自动标 failed"}, - ) - finally: - conn.close() - bb.add_observation( - task.id, "daemon", - "审查任务连续 crash 3 次(30 分钟内),自动标记为 failed,请人工介入", - ) - logger.error("Task %s: 3 crashes in 30min, marking failed", task.id) - continue + # #07.2: crash_limit 已移到 _check_timeouts 统一检查 # 检查是否有产出(司马懿建议:无产出直接标 failed) outputs = bb.get_outputs(task.id) @@ -1281,6 +1264,23 @@ Parent Task ID: {parent_task.id} # working 超时 → 标记为 failed working = queries.tasks_by_status("working") for task in working: + # #07.2: crash_limit 统一检查(比超时更严重的信号) + if self.dispatcher and hasattr(self.dispatcher, '_check_crash_limit'): + if self.dispatcher._check_crash_limit(task.id, db_path, limit=3, window_minutes=30): + conn = get_connection(db_path) + try: + self._transition_status( + conn, task.id, "failed", + agent="daemon", + detail={"reason": "crash_limit", + "message": "30 分钟内 crash 3 次,自动标 failed"}, + ) + finally: + conn.close() + reclaimed.append(task.id) + logger.error("Task %s: executor crash limit (3/30m), marking failed", task.id) + continue + start_time_str = task.started_at or task.claimed_at if not start_time_str: continue @@ -1344,71 +1344,32 @@ Parent Task ID: {parent_task.id} pid = session_info.get("pid") task_id_check = session_info.get("task_id") if pid and not self._is_pid_alive(pid): - logger.warning("Agent %s process dead (pid=%d), releasing counter + push back pending", + logger.warning("Agent %s process dead (pid=%d), releasing counter", agent_id, pid) self.counter.release(agent_id) - # 推回 pending 让 ticker 重新 dispatch + # #07.2: review 状态不推 pending,保持 review 等 _dispatch_reviews 处理 + # working 状态推回 pending 让 ticker 重新 dispatch if task_id_check and db_path: try: conn = get_connection(db_path) try: - self._transition_status( - conn, task_id_check, "pending", - agent="daemon", - detail={"reason": "process_dead", "pid": pid}, - ) + current_row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id_check,) + ).fetchone() + if current_row and current_row["status"] == "review": + logger.info("Task %s in review, keeping status (process dead)", task_id_check) + else: + self._transition_status( + conn, task_id_check, "pending", + agent="daemon", + detail={"reason": "process_dead", "pid": pid}, + ) finally: conn.close() except Exception: - logger.exception("Failed to push back task %s", task_id_check) + logger.exception("Failed to handle process dead for task %s", task_id_check) - # v2.8.1 Fix-3b: review 超时回收 - # review 状态无超时回收是设计空白:review agent crash 后任务永远卡在 review - review_timeout_minutes = getattr(self, 'review_timeout_minutes', 15) - review_tasks = queries.tasks_by_status("review") - for task in review_tasks: - current = getattr(task, 'current_agent', None) - # 检查是否有活跃的 review agent 进程 - if current and self.spawner: - session_info = self.spawner.get_session_by_agent(current) - if session_info: - pid = session_info.get("pid") - if pid and self._is_pid_alive(pid): - continue # review agent 还在跑,不回收 - # 无活跃进程 → 检查超时 - updated = task.updated_at - if not updated: - continue - try: - elapsed = (now - datetime.fromisoformat(updated)).total_seconds() / 60.0 - if elapsed > review_timeout_minutes: - try: - conn = get_connection(db_path) - try: - current_row = conn.execute( - "SELECT status FROM tasks WHERE id=?", (task.id,) - ).fetchone() - if current_row and current_row["status"] != "review": - continue - self._transition_status( - conn, task.id, "pending", - agent="daemon", - detail={"reason": "review_timeout", - "elapsed_minutes": round(elapsed, 1)}, - ) - conn.execute( - "UPDATE tasks SET current_agent=NULL WHERE id=?", (task.id,) - ) - conn.commit() - finally: - conn.close() - reclaimed.append(task.id) - logger.warning("Review timeout: %s (%.1fm > %dm), pushed back to pending", - task.id, elapsed, review_timeout_minutes) - except Exception: - logger.exception("Failed to reclaim review task %s", task.id) - except (ValueError, TypeError): - pass + # #07.2: Fix-3b 已删除。review 超时/crash 统一由 process_dead + _check_timeouts 处理 return reclaimed