diff --git a/docs/design/06-pm2-crash-recovery.md b/docs/design/06-pm2-crash-recovery.md index 16345ea..3f47787 100644 --- a/docs/design/06-pm2-crash-recovery.md +++ b/docs/design/06-pm2-crash-recovery.md @@ -299,8 +299,11 @@ def _determine_recovery_action(self, conn, task, status: str, db_path: Path) -> return self._recover_review_task(conn, task_id) if status == "reviewing": - # reviewing → done,让 _check_round_complete 重新触发庞统 review - return "push_to_done" + # reviewing → 查 events 找前置状态(done 或 failed),精确恢复 + prev_status = self._find_pre_reviewing_status(conn, task_id) + if prev_status == "failed": + return "push_to_failed" + return "push_to_done" # 兜底 done return None @@ -314,15 +317,15 @@ def _recover_working_task(self, conn, task_id: str) -> Optional[str]: ).fetchone() if not last_attempt or last_attempt["outcome"] != "completed": - return "push_to_pending" + return "push_to_pending_keep_agent" # 保留 current_agent,让同一 agent 接手 # agent 正常退出,看是否有产出 has_output = conn.execute( - "SELECT 1 FROM outputs WHERE task_id=? LIMIT 1", (task_id,) + "SELECT 1 FROM outputs WHERE task_id=? LIMIT 1" (task_id,) ).fetchone() is not None if not has_output: - return "push_to_pending" + return "push_to_pending_keep_agent" # 看是否有 handoff has_handoff = conn.execute( @@ -333,7 +336,7 @@ def _recover_working_task(self, conn, task_id: str) -> Optional[str]: if has_handoff: return "push_to_review" # agent 已完成工作并交接 - return "push_to_pending" + return "push_to_pending_keep_agent" def _recover_review_task(self, conn, task_id: str) -> Optional[str]: @@ -367,10 +370,20 @@ def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path): detail={"reason": "startup_recovery", "original_status": conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()["status"]}, ) - # 清空 current_agent + # 清空 current_agent(常规推 pending,无特定 agent 接手) conn.execute("UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,)) conn.commit() + elif action == "push_to_pending_keep_agent": + self._transition_status( + conn, task_id, "pending", + agent="daemon", + detail={"reason": "startup_recovery", "original_status": + conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()["status"]}, + ) + # 保留 current_agent,让同一 agent 重新接手 + conn.commit() + elif action == "push_to_review": # working → review,恢复 assignee 保持不变 self._transition_status( @@ -384,7 +397,17 @@ def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path): self._transition_status( conn, task_id, "done", agent="daemon", - detail={"reason": "startup_recovery", "original_status": "review"}, + detail={"reason": "startup_recovery", "original_status": + conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()["status"]}, + ) + conn.commit() + + elif action == "push_to_failed": + self._transition_status( + conn, task_id, "failed", + agent="daemon", + detail={"reason": "startup_recovery", "original_status": + conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()["status"]}, ) conn.commit() @@ -396,6 +419,27 @@ def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path): conn.commit() logger.info("Recovery: task %s → %s (action=%s)", task_id, action, action) + +# ── 辅助方法 ── + +def _find_pre_reviewing_status(self, conn, task_id: str) -> str: + """查 events 表找到 reviewing 之前的状态(done 或 failed)""" + row = conn.execute( + """SELECT detail FROM events + WHERE task_id=? AND event_type='task_status_changed' + ORDER BY id DESC LIMIT 10""", + (task_id,) + ).fetchall() + + for event in row: + try: + detail = json.loads(event["detail"]) + if detail.get("new_status") == "reviewing": + return detail.get("old_status", "done") # 兜底 done + except (json.JSONDecodeError, KeyError): + continue + + return "done" # 找不到则兜底 done(reviewing 只从 done/failed 转入) ``` ### 5.2 调用时机