auto-sync: 2026-06-03 00:50:12

This commit is contained in:
cfdaily
2026-06-03 00:50:12 +08:00
parent 26308c2341
commit e17c33ffc9
+239
View File
@@ -1422,6 +1422,245 @@ Parent Task ID: {parent_task.id}
except Exception:
return False
# ------------------------------------------------------------------
# PM2 Crash 启动恢复 (#06)
# ------------------------------------------------------------------
def _startup_recover(self) -> Dict[str, Any]:
"""PM2 crash 后启动恢复:扫描所有非终态任务,从黑板线索重建一致状态"""
NON_TERMINAL = {"claimed", "working", "review", "reviewing"}
projects = self.registry.list_projects()
recovery_report = {"projects": {}, "total_recovered": 0, "total_noop": 0}
# 收集所有需要扫描的项目(registry + 虚拟项目)
project_dirs = {}
for project_id, project_info in projects.items():
if project_info.get("status") == "active":
project_dirs[project_id] = self.registry.root / project_id / "blackboard.db"
# 虚拟项目
for virtual_id in ("_general", "_mail"):
virtual_db = Path(self.registry.root) / virtual_id / "blackboard.db"
if virtual_db.exists() and virtual_id not in project_dirs:
project_dirs[virtual_id] = virtual_db
for project_id, db_path in project_dirs.items():
if not db_path.exists():
continue
# init_db 如果还没初始化
db_key = str(db_path)
if db_key not in self._initialized_dbs:
from src.blackboard.db import init_db
init_db(db_path)
self._initialized_dbs.add(db_key)
recovered, noop_count = self._recover_project(db_path, NON_TERMINAL)
if recovered:
recovery_report["projects"][project_id] = recovered
recovery_report["total_recovered"] += len(recovered)
recovery_report["total_noop"] += noop_count
if recovery_report["total_recovered"] > 0:
logger.info("Startup recovery: %d tasks recovered across %d projects",
recovery_report["total_recovered"],
len(recovery_report["projects"]))
elif recovery_report["total_noop"] > 0:
logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)",
recovery_report["total_noop"])
else:
logger.info("Startup recovery: no non-terminal tasks found, clean start")
return recovery_report
def _recover_project(self, db_path: Path, non_terminal: set) -> tuple:
"""恢复单个项目中的非终态任务
Returns:
(recovered_list, noop_count)
"""
conn = get_connection(db_path)
recovered = []
noop_count = 0
try:
for status in non_terminal:
rows = conn.execute(
"SELECT id, assignee, current_agent, updated_at FROM tasks WHERE status=?",
(status,)
).fetchall()
for task in rows:
action = self._determine_recovery_action(conn, task, status, db_path)
if action:
self._execute_recovery(conn, task["id"], action, db_path)
recovered.append({"task_id": task["id"], "from": status, "action": action})
else:
# 审计:保持原状的任务也记录事件
noop_count += 1
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
(task["id"], "daemon", "startup_recovery_noop",
json.dumps({"status": status, "reason": "no_action_needed"}))
)
conn.commit()
finally:
conn.close()
return recovered, noop_count
def _determine_recovery_action(self, conn, task, status: str,
db_path: Path) -> Optional[str]:
"""根据黑板线索决定恢复动作,返回 None 表示不需要干预"""
task_id = task["id"]
if status == "claimed":
# claimed 统一推 pending
return "push_to_pending"
if status == "working":
return self._recover_working_task(conn, task_id)
if status == "review":
return self._recover_review_task(conn, task_id)
if status == "reviewing":
# reviewing → 查 events 找前置状态(done 或 failed),精确恢复
prev_status = self._find_pre_reviewing_status(conn, task_id)
if prev_status == "failed":
return "push_to_failed"
return "push_to_done" # 兜底 done
return None
def _recover_working_task(self, conn, task_id: str) -> Optional[str]:
"""working 状态恢复:看黑板线索判断 agent 是否实际完成了工作"""
# 最后一次 attempt
last_attempt = conn.execute(
"SELECT outcome, agent FROM task_attempts WHERE task_id=? ORDER BY id DESC LIMIT 1",
(task_id,)
).fetchone()
if not last_attempt or last_attempt["outcome"] != "completed":
return "push_to_pending_keep_agent" # 保留 current_agent
# agent 正常退出,看是否有产出
has_output = conn.execute(
"SELECT 1 FROM outputs WHERE task_id=? LIMIT 1", (task_id,)
).fetchone() is not None
if not has_output:
return "push_to_pending_keep_agent"
# 看是否有 handoff
has_handoff = conn.execute(
"SELECT 1 FROM comments WHERE task_id=? AND comment_type='handoff' LIMIT 1",
(task_id,)
).fetchone() is not None
if has_handoff:
return "push_to_review" # agent 已完成工作并交接
return "push_to_pending_keep_agent"
def _recover_review_task(self, conn, task_id: str) -> Optional[str]:
"""review 状态恢复:看是否有审查结论"""
# 检查 reviews 表
review = conn.execute(
"SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
(task_id,)
).fetchone()
if review:
if review["verdict"] == "approved":
return "push_to_done"
else:
# needs_revision / rejected → 推回 pending 重新执行
return "push_to_pending"
# 无审查结论 → 保持 reviewticker 自然会 dispatch reviewer
return None
def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path):
"""执行恢复动作"""
# 获取原始状态(用于审计)
orig_row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
orig_status = orig_row["status"] if orig_row else "unknown"
if action == "push_to_pending":
self._transition_status(
conn, task_id, "pending",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
)
# 清空 current_agent(常规推 pending,无特定 agent 接手)
conn.execute("UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,))
conn.commit()
elif action == "push_to_pending_keep_agent":
self._transition_status(
conn, task_id, "pending",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
)
# 保留 current_agent,让同一 agent 重新接手
conn.commit()
elif action == "push_to_review":
self._transition_status(
conn, task_id, "review",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": "working"},
)
conn.commit()
elif action == "push_to_done":
self._transition_status(
conn, task_id, "done",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
)
conn.commit()
elif action == "push_to_failed":
self._transition_status(
conn, task_id, "failed",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
)
conn.commit()
# 记录恢复审计事件
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
(task_id, "daemon", "startup_recovery", json.dumps({"action": action}))
)
conn.commit()
logger.info("Recovery: task %s%s (action=%s)", task_id, action, action)
def _find_pre_reviewing_status(self, conn, task_id: str) -> str:
"""查 events 表找到 reviewing 之前的状态(done 或 failed"""
rows = conn.execute(
"""SELECT detail FROM events
WHERE task_id=? AND event_type='task_status_changed'
ORDER BY id DESC LIMIT 10""",
(task_id,)
).fetchall()
for event in rows:
try:
detail = json.loads(event["detail"])
if detail.get("new_status") == "reviewing":
return detail.get("old_status", "done") # 兜底 done
except (json.JSONDecodeError, KeyError):
continue
return "done" # 找不到则兜底 donereviewing 只从 done/failed 转入)
# ------------------------------------------------------------------
# 手动 tickAPI 端点触发)
# ------------------------------------------------------------------