"""Daemon Ticker — 30s 状态扫描 + 依赖推进 + 事件驱动 Tick 循环是整个 Daemon 的核心驱动: 1. 遍历所有 active 项目 2. 每个 tick 扫描黑板状态 3. 推进依赖链(blocked → pending) 4. 写入 daemon_tick 事件 5. 调度 pending 任务(F9 实现) """ from __future__ import annotations import asyncio import json import logging from datetime import datetime from pathlib import Path from typing import Any, Callable, Coroutine, Dict, List, Optional from dataclasses import dataclass, field as dc_field from src.blackboard.operations import Blackboard from src.blackboard.db import get_connection from src.daemon.spawner import AgentBusyError from src.blackboard.queries import Queries from src.blackboard.registry import ProjectRegistry @dataclass class BroadcastRound: """追踪单个任务的广播状态""" task_id: str notified_agents: set = dc_field(default_factory=set) # 已 spawn 过的 Agent responded_agents: set = dc_field( default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY) round_number: int = 0 # 当前第几轮(0=未开始,1=第1轮) logger = logging.getLogger("moziplus-v2.ticker") class Ticker: """Daemon ticker 主循环""" def __init__( self, registry: ProjectRegistry, tick_interval: float = 30.0, max_ticks: Optional[int] = None, on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None, dispatcher: Optional[Any] = None, spawner: Optional[Any] = None, max_dispatch_per_tick: int = 3, claim_timeout_minutes: float = 5.0, default_task_timeout_minutes: float = 30.0, health_checker: Optional[Any] = None, experience_distiller: Optional[Any] = None, inbox_watcher: Optional[Any] = None, ): """ Args: registry: 项目注册表 tick_interval: tick 间隔秒数 max_ticks: 测试用,限制最大 tick 次数 on_tick_complete: 每次 tick 完成后的回调(用于通知等) dispatcher: Dispatcher 实例(Agent 调度) spawner: AgentSpawner 实例(Agent spawn) max_dispatch_per_tick: 每个 tick 最多调度多少个 pending 任务 claim_timeout_minutes: claimed 超时重置为 pending 的分钟数 default_task_timeout_minutes: working 超时标记 failed 的分钟数 """ self.registry = registry self.tick_interval = tick_interval self.max_ticks = max_ticks self.on_tick_complete = on_tick_complete self.dispatcher = dispatcher self.spawner = spawner self.max_dispatch_per_tick = max_dispatch_per_tick self.claim_timeout_minutes = claim_timeout_minutes self.default_task_timeout_minutes = default_task_timeout_minutes self.health_checker = health_checker self.experience_distiller = experience_distiller self.inbox_watcher = inbox_watcher self._tick_count: int = 0 self._running: bool = False self._task: Optional[asyncio.Task] = None # 已初始化的 db_path 集合,避免每次 tick 重复 init_db self._initialized_dbs: set = set() # 每个项目上次 tick 的 event count(用于僵尸检测 F8) self._last_event_counts: Dict[str, int] = {} # 广播轮次追踪(Phase 1 bug fix) self._broadcast_tracker: Dict[str, BroadcastRound] = {} # 当前项目 ID(_dispatch_pending 需要) self._current_project_id: Optional[str] = None # ------------------------------------------------------------------ # 生命周期 # ------------------------------------------------------------------ async def start(self) -> None: """启动 tick 循环""" if self._running: return self._running = True # 启动恢复(PM2 crash 后重建一致状态) self._startup_recover() self._task = asyncio.create_task(self._loop()) # 启动 InboxWatcher(即时事件监听) if self.inbox_watcher: try: await self.inbox_watcher.start() logger.info("InboxWatcher started") except Exception as e: logger.warning("InboxWatcher start failed: %s", e) logger.info("Ticker started (interval=%.1fs)", self.tick_interval) async def stop(self) -> None: """停止 tick 循环""" self._running = False # 停止 InboxWatcher if self.inbox_watcher: try: await self.inbox_watcher.stop() logger.info("InboxWatcher stopped") except Exception as e: logger.warning("InboxWatcher stop failed: %s", e) if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass logger.info("Ticker stopped (ticks=%d)", self._tick_count) @property def tick_count(self) -> int: return self._tick_count @property def is_running(self) -> bool: return self._running # ------------------------------------------------------------------ # 主循环 # ------------------------------------------------------------------ async def _loop(self) -> None: """Tick 主循环""" while self._running: try: await self.tick() except asyncio.CancelledError: raise except Exception: logger.exception("Tick %d error", self._tick_count + 1) if self.max_ticks and self._tick_count >= self.max_ticks: logger.info("Max ticks reached (%d), stopping", self.max_ticks) self._running = False break try: await asyncio.sleep(self.tick_interval) except asyncio.CancelledError: return # ------------------------------------------------------------------ # 单次 tick # ------------------------------------------------------------------ async def tick(self) -> Dict[str, Any]: """执行一次 tick,遍历所有 active 项目""" self._tick_count += 1 tick_num = self._tick_count results: Dict[str, Any] = { "tick": tick_num, "projects": {}, } projects = self.registry.list_projects() active_projects = { pid: info for pid, info in projects.items() if info.get("status") == "active" } for project_id, project_info in active_projects.items(): try: pr = await self._tick_project(project_id, project_info) results["projects"][project_id] = pr except Exception as e: logger.exception( "Tick %d project %s error", tick_num, project_id) results["projects"][project_id] = {"error": str(e)} # 虚拟项目 _general:不在 registry 但需要调度 general_db = Path(self.registry.root) / "_general" / "blackboard.db" if general_db.exists() and "_general" not in active_projects: try: pr = await self._tick_project("_general", { "id": "_general", "name": "一般任务", "status": "active", "source": "virtual", }) results["projects"]["_general"] = pr except Exception as e: logger.exception("Tick %d _general error", tick_num) results["projects"]["_general"] = {"error": str(e)} # 虚拟项目 _mail:飞鸽传书 mail_db = Path(self.registry.root) / "_mail" / "blackboard.db" if mail_db.exists() and "_mail" not in active_projects: try: pr = await self._tick_project("_mail", { "id": "_mail", "name": "飞鸽传书", "status": "active", "source": "virtual", }) results["projects"]["_mail"] = pr except Exception as e: logger.exception("Tick %d _mail error", tick_num) results["projects"]["_mail"] = {"error": str(e)} logger.debug( "Tick %d complete: %d projects", tick_num, len(active_projects)) if self.on_tick_complete: try: await self.on_tick_complete() except Exception: logger.exception("on_tick_complete callback error") return results async def _tick_project(self, project_id: str, project_info: Dict[str, Any]) -> Dict[str, Any]: """单项目的 tick 处理""" project_dir = self.registry.root / project_id db_path = project_dir / "blackboard.db" if not db_path.exists(): return {"status": "no_db"} # 只在首次遇到该 db_path 时执行 init_db db_key = str(db_path) if db_key not in self._initialized_dbs: from src.blackboard.db import init_db init_db(db_path) self._initialized_dbs.add(db_key) result: Dict[str, Any] = { "status": "ok", "summary_before": {}, "summary_after": {}, "advanced": [], "dispatched": [], "review_dispatched": [], "zombie_reclaimed": [], "parents_refreshed": [], } # 保存当前 project_id(供 _dispatch_pending 使用) self._current_project_id = project_id # 1. 扫描当前状态 queries = Queries(db_path) result["summary_before"] = queries.task_summary() # 2. 依赖推进 advanced = self._advance_dependencies(db_path) result["advanced"] = advanced # 3. 僵尸/超时处理(在依赖推进后、调度前) zombie_reclaimed = self._check_timeouts(db_path) result["zombie_reclaimed"] = zombie_reclaimed # 4. 调度 pending 任务 if self.dispatcher and self.spawner: dispatched = await self._dispatch_pending(db_path, project_id) result["dispatched"] = dispatched # 5. 调度审查任务 if self.dispatcher and self.spawner: review_dispatched = await self._dispatch_reviews(db_path, project_id) result["review_dispatched"] = review_dispatched # 6. 聚合父 Task 状态(v2.7) parents_refreshed = self._refresh_parent_statuses(db_path) result["parents_refreshed"] = parents_refreshed # 7. 一轮结束检测 + 庞统 review spawn(v2.9 #01) round_reviewed = await self._check_round_complete(db_path, project_id) result["round_reviewed"] = round_reviewed # 8. @mention 通知(v2.9 #01) mentions_processed = await self._process_mentions(db_path, project_id) result["mentions_processed"] = mentions_processed # 9. 写 daemon_tick 事件 conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (None, "daemon", "daemon_tick", json.dumps({"tick": self._tick_count, "advanced_count": len(advanced), "parents_refreshed": len(parents_refreshed)})), ) conn.commit() finally: conn.close() # 8. 健康检查(僵尸检测) if self.health_checker: try: self.health_checker.check( project_id, db_path, self._tick_count) except Exception as e: logger.warning("HealthChecker error for %s: %s", project_id, e) # 9. 经验蒸馏(完成的 task 自动触发) if self.experience_distiller: try: conn2 = get_connection(db_path) try: done_tasks = conn2.execute( "SELECT id FROM tasks WHERE status='done' AND updated_at > datetime('now', '-60 seconds')" ).fetchall() finally: conn2.close() for row in done_tasks: t = Blackboard(db_path).get_task(row[0]) if t: self.experience_distiller.distill_from_task( task_id=t.id, task_title=t.title, task_type=t.task_type ) except Exception as e: logger.warning( "ExperienceDistiller error for %s: %s", project_id, e) # 10. 扫描后状态 result["summary_after"] = queries.task_summary() return result # ------------------------------------------------------------------ # 父 Task 状态聚合 # ------------------------------------------------------------------ def _refresh_parent_statuses(self, db_path: Path) -> List[str]: """全量扫描有子 Task 的父 Task,刷新聚合状态 跳过手动状态(cancelled, paused)的父 Task。使用单连接批量处理。 """ queries = Queries(db_path) refreshed: List[str] = [] conn = get_connection(db_path) try: # 找出所有有子 Task 的父 Task ID parent_rows = conn.execute( "SELECT DISTINCT parent_task FROM tasks WHERE parent_task IS NOT NULL" ).fetchall() parent_ids = [r["parent_task"] for r in parent_rows] for pid in parent_ids: computed = queries.compute_parent_status(pid) if computed is None: continue parent = conn.execute( "SELECT status FROM tasks WHERE id=?", (pid,) ).fetchone() if parent and parent["status"] != computed: conn.execute( "UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?", (computed, pid), ) refreshed.append(pid) logger.info( "Parent %s status aggregated: → %s", pid, computed) if refreshed: conn.commit() finally: conn.close() return refreshed # ------------------------------------------------------------------ # 一轮结束检测 + 庞统 review (v2.9 #01) # ------------------------------------------------------------------ MAX_ROUNDS = 5 # §4.5 防无限循环 async def _check_round_complete(self, db_path: Path, project_id: str) -> List[str]: """检测 parent task 下所有 sub task 终态 → spawn 庞统 review 流程(§4.4): 1. 扫描所有 parent task 2. 对每个 parent 检查 sub task 是否全部终态 3. 检查 round_count 上限 4. increment round_count 5. spawn 庞统 review """ if not self.dispatcher or not self.spawner: return [] bb = Blackboard(db_path) reviewed: List[str] = [] # 找所有 parent task(有子 task 的) conn = get_connection(db_path) try: parent_rows = conn.execute( "SELECT DISTINCT parent_task FROM tasks WHERE parent_task IS NOT NULL" ).fetchall() finally: conn.close() for row in parent_rows: parent_id = row["parent_task"] try: summary = bb.get_subtasks_summary(parent_id) if not summary or not summary["all_terminal"]: continue # 检查 parent 自身状态:只有 done 状态的 parent 才触发 # (聚合后 parent 状态为 done 说明所有 sub 都完了) # BUG-2 fix: done 和 failed 都触发 review(failed 时优先判断重试/换人) if summary["parent_status"] not in ("done", "failed"): continue # 检查 round_count 上限 if summary["round_count"] >= self.MAX_ROUNDS: logger.warning( "Parent %s reached max rounds (%d), skipping review", parent_id, self.MAX_ROUNDS) continue # 构建 prompt(用即将成为的 round_num,不提前 increment) next_round = summary["round_count"] + 1 outputs = bb.get_aggregate_outputs(parent_id) comments = bb.get_round_comments(parent_id) parent_task = bb.get_task(parent_id) if not parent_task: continue # spawn 庞统 review(先 spawn,成功后再 increment round_count) review_prompt = self._build_review_prompt( parent_task, summary, outputs, comments, next_round, project_id=project_id ) spawned = await self._spawn_pangtong_review( parent_task, review_prompt, project_id, new_round=next_round ) if spawned: # spawn 成功 → increment round_count + 标记 reviewing new_round = bb.increment_round_count(parent_id) reviewed.append(parent_id) logger.info( "Round %d review spawned for parent %s (subs: %s)", new_round, parent_id, summary ) except Exception: logger.exception("Round check error for parent %s", parent_id) return reviewed def _build_review_prompt(self, parent_task, summary: dict, outputs: list, comments: list, round_num: int, project_id: str = "") -> str: """构建庞统 review prompt(§4.2 三问框架)""" goal = parent_task.description or parent_task.title must_haves = parent_task.must_haves or "{}" # 成果物摘要(限制 token) output_lines = [] for o in outputs[:20]: # 最多 20 个 output_lines.append( f"- [{o.get('task_title', '?')}] {o.get('title', '?')} " f"({o.get('output_type', '?')}) by {o.get('agent', '?')}" ) # 讨论摘要(限制 50 条) comment_lines = [] for c in comments[:50]: comment_lines.append( f"[{c.get('created_at', '?')[:16]}] {c.get('author', '?')}: {c.get('body', '?')[:200]}" ) return f"""## 庞统 Review(第 {round_num} 轮) ### Goal {goal} ### 验收标准 {must_haves} ### 本轮 Sub Task 状态 - 完成: {summary['done']} - 失败: {summary['failed']} - 取消: {summary['cancelled']} - 总计: {summary['total']} ### 成果物 {chr(10).join(output_lines) if output_lines else '无'} ### 黑板讨论 {chr(10).join(comment_lines) if comment_lines else '无'} ### 三问 1. Goal 还清晰吗?(是否有 goal drift) 2. 成果物覆盖 goal 了吗?(逐条检查验收标准) 3. 下一轮需要做什么?(创建新 sub tasks / 标记完成 / 调整方向) ### 失败处理 {f'有 {summary["failed"]} 个 sub task failed,优先判断是应该重试、换人、还是调整方案。' if summary['failed'] > 0 else '无失败'} ### 你可以 - 创建新一轮 sub tasks: POST http://{self.spawner.api_host}:{self.spawner.api_port}/api/projects/{project_id}/tasks body: {"title": "...", "description": "...", "parent_task": "{parent_task.id}", "assignee": "zhangfei-dev"} - 调整 goal(更新 parent task description/must_haves) - 标记完成(如果 goal 已达成,回复 GOAL_ACHIEVED) Round 上限: {self.MAX_ROUNDS}(当前第 {round_num} 轮) Project ID: {project_id} Parent Task ID: {parent_task.id} """ async def _spawn_pangtong_review(self, parent_task, review_prompt: str, project_id: str, new_round: int = 0) -> bool: """Spawn 庞统进行 review 流程: 1. spawn 庞统 2. 成功后把 parent 设为 reviewing(防重复触发) 3. 通过 on_complete 回调消费庞统结论 """ try: agent_id = "pangtong-fujunshi" f"review-{parent_task.id}-r{new_round}" # 构造 on_complete 回调:解析庞统结论,更新 parent 状态 async def _on_review_complete(aid: str, outcome: str): try: # 从 spawner session meta 读取庞统的回复文本 review_text = "" if self.spawner: # 找庞统最新完成的 session latest_meta = None latest_time = "" for sid, sess in self.spawner._sessions.items(): if sess.get( "agent_id") == agent_id and sess.get("meta"): t = sess.get("completed_at", "") if t > latest_time: latest_time = t latest_meta = sess["meta"] if latest_meta and isinstance(latest_meta, dict): payloads = latest_meta.get("payloads", []) review_text = " ".join( p.get("text", "") for p in payloads if isinstance(p, dict) ) self._handle_review_conclusion( parent_task.id, project_id, review_text, new_round) except Exception: logger.exception("Review conclusion handler failed for %s", parent_task.id) # 用 spawner.spawn_full_agent 直接 spawn result = await self.spawner.spawn_full_agent( agent_id=agent_id, message=review_prompt, task_id=parent_task.id, use_main_session=True, # #02: 投递到 main session task_db_path=None, on_complete=_on_review_complete, ) if result is not None: # spawn 成功 → parent 进入 reviewing(防下个 tick 重复触发) self._set_parent_reviewing(parent_task.id, project_id) return True return False except Exception: logger.exception( "Failed to spawn pangtong review for %s", parent_task.id) return False def _set_parent_reviewing(self, parent_id: str, project_id: str): """将 parent task 状态设为 reviewing(防重复触发)""" try: db_path = self._resolve_db_path(project_id) conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET status='reviewing', updated_at=datetime('now') " "WHERE id=? AND status IN ('done', 'failed')", (parent_id,)) conn.commit() logger.info("Parent %s → reviewing (round review in progress)", parent_id) finally: conn.close() except Exception: logger.exception("Failed to set parent %s to reviewing", parent_id) def _handle_review_conclusion(self, parent_id: str, project_id: str, review_text: str, round_num: int): """解析庞统 review 结论,更新 parent 状态 review_text 是庞统回复的文本(从 spawner session meta payloads 拼接)。 """ db_path = self._resolve_db_path(project_id) conn = get_connection(db_path) try: # 解析 GOAL_ACHIEVED is_achieved = bool( review_text and "GOAL_ACHIEVED" in review_text.upper()) if is_achieved: # Goal 达成 → parent 最终完成 conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET status='done', updated_at=datetime('now') " "WHERE id=? AND status='reviewing'", (parent_id,)) conn.commit() logger.info( "Parent %s review conclusion: GOAL_ACHIEVED → done", parent_id) else: # 庞统可能创建了新 sub task 或需要继续 → 恢复 working conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET status='working', updated_at=datetime('now') " "WHERE id=? AND status='reviewing'", (parent_id,)) conn.commit() sub_count = conn.execute( "SELECT COUNT(*) FROM tasks WHERE parent_task=?", (parent_id,) ).fetchone()[0] logger.info( "Parent %s review conclusion: continue → working " "(round %d, subs=%d)", parent_id, round_num, sub_count) except Exception: logger.exception( "Failed to handle review conclusion for %s", parent_id) # 安全恢复:reviewing → working try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET status='working', updated_at=datetime('now') " "WHERE id=? AND status='reviewing'", (parent_id,)) conn.commit() except Exception: pass finally: conn.close() def _resolve_db_path(self, project_id: str) -> Path: """解析项目 DB 路径""" from src.utils import get_data_root return get_data_root() / project_id / "blackboard.db" # ------------------------------------------------------------------ # @mention 通知处理 (v2.9 #01) # ------------------------------------------------------------------ MENTION_MAX_RETRIES = 5 async def _process_mentions(self, db_path: Path, project_id: str) -> List[str]: """扫描 pending mentions → spawn 被 @ 的 Agent 流程(§3.4): 1. 扫描 mention_queue 中 pending 且 retry_count < 5 的记录 2. 按 mentioned_agent 分组,同一 agent 多条 mention 合并为一次 spawn 3. 尝试 spawn,成功 → notified,失败 → retry_count++ """ if not self.spawner: return [] bb = Blackboard(db_path) mentions = bb.get_pending_mentions( max_retries=self.MENTION_MAX_RETRIES) if not mentions: return [] # 按 mentioned_agent 分组 agent_mentions: Dict[str, List[Dict]] = {} for m in mentions: aid = m["mentioned_agent"] agent_mentions.setdefault(aid, []).append(m) processed: List[str] = [] for agent_id, items in agent_mentions.items(): try: # 构建 mention 摘要 mention_lines = [] task_ids = set() for item in items: mention_lines.append( f"- [{item.get('comment_author', '?')}] {item.get('comment_body', '')[:200]}" ) task_ids.add(item["task_id"]) # 取第一个 task 作为 spawn 上下文 tid = items[0]["task_id"] task = bb.get_task(tid) if not task: continue # 构建 mention prompt prompt = self._build_mention_prompt( agent_id, task, mention_lines, project_id) # #09: 检测 rebuttal 场景 — review 状态的任务 + rebuttal comment is_rebuttal_review = False if task and task.status == "review": for item in items: ct = item.get("comment_type", "") if ct == "rebuttal": is_rebuttal_review = True break if is_rebuttal_review: # rebuttal 重审:带 on_complete 回调处理新 verdict _ticker = self _pid = project_id _t_id = tid async def _rebuttal_on_complete(aid: str, outcome: str): try: if outcome in ("completed", "session_revived"): # 读新 verdict rdb_path = _ticker._resolve_db_path(_pid) rconn = get_connection(rdb_path) try: new_review = rconn.execute( "SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1", (_t_id,) ).fetchone() finally: rconn.close() if new_review and new_review["verdict"] == "approved": _ticker._transition_status( get_connection( rdb_path), _t_id, "done", agent="daemon", detail={"reason": "rebuttal_approved"}) logger.info( "Rebuttal: task %s approved after rebuttal", _t_id) else: # 仍非 approved → @mention assignee verdict_str = new_review["verdict"] if new_review else "未知" rconn2 = get_connection(rdb_path) try: t_row = rconn2.execute( "SELECT assignee FROM tasks WHERE id=?", (_t_id,)).fetchone() finally: rconn2.close() if t_row and t_row["assignee"]: from src.blackboard.blackboard import Blackboard bb2 = Blackboard(rdb_path) bb2.add_comment(_t_id, "daemon", f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳", comment_type="review") logger.info( "Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str) except Exception: logger.exception( "Rebuttal on_complete failed for task %s", _t_id) result = await self.spawner.spawn_full_agent( agent_id=agent_id, message=prompt, task_id=tid, use_main_session=True, on_complete=_rebuttal_on_complete, ) else: # 普通 mention:不带回调 result = await self.spawner.spawn_full_agent( agent_id=agent_id, message=prompt, task_id=tid, use_main_session=True, # #02: 投递到 main session ) if result is not None: # 成功 → 标记所有该 agent 的 mentions 为 notified for item in items: bb.mark_mention_notified(item["id"]) processed.append(agent_id) logger.info( "Mention spawn success: %s (%d mentions)", agent_id, len(items)) else: # spawn 返回 None(其他原因)→ 递增 retry_count for item in items: bb.mark_mention_retry(item["id"]) logger.warning( "Mention spawn failed: %s, retrying next tick", agent_id) except AgentBusyError: # Agent 忙,不递增 retry_count,等下次 tick 自然重试 logger.info( "Mention spawn skipped: %s busy, will retry next tick", agent_id) except Exception: logger.exception( "Mention processing error for agent %s", agent_id) for item in items: try: if item.get("retry_count", 0) >= self.MENTION_MAX_RETRIES - 1: bb.mark_mention_failed(item["id"]) else: bb.mark_mention_retry(item["id"]) except Exception: pass return processed def _build_mention_prompt(self, agent_id: str, task: Any, mention_lines: List[str], project_id: str) -> str: """#03: @mention prompt(身份注入)""" api_host = getattr( self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1' api_port = getattr( self.spawner, 'api_port', 8083) if self.spawner else 8083 api_base = f"http://{api_host}:{api_port}/api" # 获取 Agent 专长 caps = "通用" try: if self.dispatcher and self.dispatcher.router: profile = self.dispatcher.router.agent_profiles.get(agent_id) if profile and profile.capabilities_zh: caps = ", ".join(profile.capabilities_zh) except Exception: pass mentions_text = "\n".join(mention_lines[:10]) return f"""你在黑板上被 @ 了。 ## 你的身份 你是 {agent_id},专长: {caps}。 ## 相关讨论 {mentions_text} ## 任务上下文 - 项目: {project_id} - 任务: {task.title} - 描述: {task.description or '无'} ## 你能做什么 - 读完整上下文: GET {api_base}/projects/{project_id}/tasks/{task.id}?expand=all - 回应: POST {api_base}/projects/{project_id}/tasks/{task.id}/comments - 如果讨论收敛到可执行任务,可以创建 sub task ## 约束 - 只回应与你专长相关的内容 - 禁止使用 sessions_send 直接发消息 - 委托他人做事用黑板 comment @agent-id,系统自动路由(无需手动传 mentions 数组) """ def _advance_dependencies(self, db_path: Path) -> List[str]: """检查 blocked 任务,若所有依赖已完成则推进为 pending Returns: 被推进的 task_id 列表 """ queries = Queries(db_path) blocked = queries.blocked_tasks_with_deps() advanced: List[str] = [] if not blocked: return advanced conn = get_connection(db_path) try: for item in blocked: if item["all_deps_done"]: task_id = item["task_id"] ok = self._transition_status(conn, task_id, "pending", agent="daemon", detail={"reason": "all_dependencies_done"}) if ok: advanced.append(task_id) logger.info("Advanced %s: blocked → pending", task_id) finally: conn.close() return advanced def _transition_status(self, conn, task_id: str, new_status: str, agent: str = "daemon", detail: Optional[Dict] = None) -> bool: """轻量状态转换(不走 Blackboard 类,避免 init_db)""" from src.blackboard.db import VALID_TRANSITIONS, TERMINAL_STATUSES, EVENT_TYPES from datetime import datetime conn.execute("BEGIN IMMEDIATE") row = conn.execute( "SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() if not row: return False old_status = row["status"] if old_status in TERMINAL_STATUSES or old_status == new_status: return old_status == new_status if new_status not in VALID_TRANSITIONS.get(old_status, set()): return False now = datetime.utcnow().isoformat() # 重置到 pending 时清空 assignee(避免残留导致重复路由到同一 Agent) # 但 Mail 的 assignee 是收件人,永不清空 if new_status == "pending": if self._current_project_id == "_mail": conn.execute( "UPDATE tasks SET status=?, updated_at=? WHERE id=?", (new_status, now, task_id), ) else: conn.execute( "UPDATE tasks SET status=?, assignee=NULL, resumed_from=NULL, updated_at=? WHERE id=?", (new_status, now, task_id), ) elif new_status == "paused": # 记录暂停前状态,恢复时回到原状态 conn.execute( "UPDATE tasks SET status=?, resumed_from=?, updated_at=? WHERE id=?", (new_status, old_status, now, task_id), ) else: conn.execute( "UPDATE tasks SET status=?, updated_at=? WHERE id=?", (new_status, now, task_id), ) event_type = f"task_{new_status}" if event_type not in EVENT_TYPES: event_type = "daemon_tick" conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent, event_type, json.dumps( {"from": old_status, "to": new_status, **(detail or {})})), ) conn.commit() return True # ------------------------------------------------------------------ # Agent 调度 # ------------------------------------------------------------------ async def _dispatch_pending(self, db_path: Path, project_id: str) -> List[str]: """扫描 pending 任务并调度 v3.0: 两条路径 - 确定性路径:retry/handoff/assignee/生命周期 → 直接 spawn - 广播认领:无确定性路径 → spawn 所有空闲 Agent,自主 claim """ queries = Queries(db_path) pending = queries.pending_dispatchable() dispatched: List[str] = [] if not pending: return dispatched # 分两批:确定性 vs 广播 deterministic_tasks = [] broadcast_tasks = [] for task in pending[:self.max_dispatch_per_tick]: decision = self.dispatcher.decide(task) if decision.get("mode") in ("deterministic", "agent_handoff"): deterministic_tasks.append((task, decision)) else: broadcast_tasks.append(task) # 路径1:确定性路由 → 直接 spawn for task, decision in deterministic_tasks: try: result = await self.dispatcher.dispatch( task, project_config={ "project_id": project_id, "db_path": db_path}, ) if result["status"] == "dispatched" and result["level"] in ( "full", "escalate"): conn = get_connection(db_path) try: # [v2.7.1] Mail 已在 dispatcher 中标 working,跳过 claimed if project_id == "_mail": conn.execute( "UPDATE tasks SET current_agent=? WHERE id=?", (result["agent_id"], task.id), ) conn.commit() dispatched.append(task.id) logger.info("Dispatched %s to %s (session=%s, mail auto-working)", task.id, result["agent_id"], result.get("session_id")) else: ok = self._transition_status( conn, task.id, "claimed", agent="daemon", detail={"dispatched_to": result["agent_id"], "session_id": result.get("session_id")}, ) if ok: conn.execute( "UPDATE tasks SET current_agent=? WHERE id=?", (result["agent_id"], task.id), ) conn.commit() dispatched.append(task.id) logger.info("Dispatched %s to %s (session=%s)", task.id, result["agent_id"], result.get("session_id")) finally: conn.close() elif result["status"] == "blocked": # Guardrail 拦截:任务标记为 blocked conn = get_connection(db_path) try: self._transition_status( conn, task.id, "blocked", agent="daemon", detail={"reason": result.get("reason", "guardrail"), "violations": result.get("violations", [])}, ) logger.info("Task %s blocked by guardrail: %s", task.id, result.get("reason", "unknown")) finally: conn.close() except Exception: logger.exception("Dispatch failed for %s", task.id) # 路径2:广播认领 if broadcast_tasks: broadcast_ids = await self._broadcast_claim(broadcast_tasks, db_path, project_id) dispatched.extend(broadcast_ids) return dispatched async def _broadcast_claim(self, tasks: list, db_path: Path, project_id: str) -> List[str]: """广播一批待认领任务给所有空闲 Agent(每轮最多广播一次) 司马懿建议:攒一批任务,一次广播,而非每个任务触发一次广播。 广播前检查全局并发,接近上限时跳过。 """ if not self.spawner: return [] # 全局并发检查(司马懿建议 1) if self.counter.is_near_limit(): logger.info("Skipping broadcast: global concurrent near limit (%d/%d)", self.counter.global_active, self.counter.max_global) return [] # 分离:需要升级的 vs 可广播的 broadcastable = [] escalated = [] for t in tasks: tracker = self._broadcast_tracker.get(t.id) if tracker and tracker.round_number >= 3: escalated.append(t) else: broadcastable.append(t) # 升级庞统 for t in escalated: conn = get_connection(db_path) try: self._transition_status( conn, t.id, "escalated", agent="daemon", detail={"reason": "no_taker_after_3_broadcasts", "round_number": self._broadcast_tracker.get(t.id).round_number if self._broadcast_tracker.get(t.id) else 0}, ) logger.warning( "Escalated %s: no taker after 3 broadcast rounds", t.id) self._broadcast_tracker.pop(t.id, None) finally: conn.close() if not broadcastable: return [] idle_agents = self._get_idle_agents() if not idle_agents: logger.warning( "No idle agents for broadcast, skipping (capacity issue)") return [] task_ids = [t.id for t in broadcastable] logger.info("Broadcasting %d tasks to %d idle agents: %s", len(broadcastable), len(idle_agents), task_ids) # 审计事件 try: conn = get_connection(db_path) try: for task in broadcastable: conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) " "VALUES (?,?,?,?)", (task.id, "daemon", "broadcast_claim", json.dumps({"agents": idle_agents, "task_title": task.title})), ) conn.commit() finally: conn.close() except Exception: pass # 为每个任务获取/创建 tracker for t in broadcastable: if t.id not in self._broadcast_tracker: self._broadcast_tracker[t.id] = BroadcastRound(task_id=t.id) spawned = [] for agent_id in idle_agents: prompt = self._build_claim_prompt( agent_id, broadcastable, project_id) try: session_id = await self.spawner.spawn_full_agent( agent_id=agent_id, message=prompt, task_id="broadcast", task_db_path=db_path, use_main_session=True, broadcast_task_ids=[t.id for t in broadcastable], ) if session_id is not None: spawned.append(session_id) # 记录已通知的 Agent for t in broadcastable: self._broadcast_tracker[t.id].notified_agents.add( agent_id) except AgentBusyError: logger.debug("Broadcast skip %s: busy", agent_id) except Exception: logger.exception("Broadcast spawn failed for %s", agent_id) return spawned def _build_claim_prompt(self, agent_id: str, tasks: list, project_id: str) -> str: """#03: 广播认领 prompt(身份+专长注入)""" api_host = getattr( self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1' api_port = getattr( self.spawner, 'api_port', 8083) if self.spawner else 8083 api_base = f"http://{api_host}:{api_port}/api" # 获取 Agent 专长 caps = "通用" try: if self.dispatcher and self.dispatcher.router: profile = self.dispatcher.router.agent_profiles.get(agent_id) if profile and profile.capabilities_zh: caps = ", ".join(profile.capabilities_zh) except Exception: pass task_list = "\n".join([ f"- ID: {t.id}, 标题: {t.title}, 类型: {getattr(t, 'task_type', '') or 'general'}, " f"优先级: {t.priority}" for t in tasks ]) return f"""你是 {agent_id}({caps})。你们是一个协作团队,共享一块黑板。 ## 待处理任务 {task_list} ## 你的角色 你收到团队广播。按你的专业判断回应: 1. 有属于你专业的任务 → 认领并执行 2. 不是你的活,但你的专业领域和这个任务有实际交叉 → 写 observation comment - 从你的专业视角:你看到什么风险?什么约束?什么跨领域建议? - 例:关羽看到编码任务 → "这个函数涉及风控计算,建议用 decimal 而非 float" - 例:赵云看到编码任务 → "这个策略需要分钟线数据,NAS路径 /Volumes/stock/min" - observation 限 200 字 3. 你的领域和任务无交叉 → NO_REPLY ## API - 读任务详情: GET {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}?expand=all - 认领: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/claim body: {{"agent": "{agent_id}"}} - 认领后标 working: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/status body: {{"status": "working", "agent": "{agent_id}"}} - 写评论: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/comments body: {{"author": "{agent_id}", "comment_type": "observation", "body": "专业判断(≤200字)"}} ## 约束 - 一个 tick 只认领一个任务 - observation 只在领域有实际交叉时写,纯粹不匹配则 NO_REPLY - 认领后必须写产出物再转 review - claim 失败说明已被别人认领,NO_REPLY 退出 - 禁止使用 sessions_send 直接发消息 """ @property def counter(self): """从 Dispatcher 获取 counter""" return getattr(self.dispatcher, 'counter', None) if self.dispatcher else None @staticmethod def _is_pid_alive(pid: int) -> bool: """检查进程是否存活""" import os try: os.kill(pid, 0) return True except (ProcessLookupError, PermissionError): return False def record_broadcast_response( self, task_id: str, agent_id: str, outcome: str): """记录 Agent 对广播任务的反馈(Spawner 调用的公共 API)""" tracker = self._broadcast_tracker.get(task_id) if not tracker: return if outcome == "claimed": # Agent 认领了,清理 tracker self._broadcast_tracker.pop(task_id, None) else: tracker.responded_agents.add(agent_id) # 异步轮次结束检测:所有已通知的 Agent 都已反馈 if tracker.notified_agents and tracker.responded_agents >= tracker.notified_agents: # 一轮结束 tracker.round_number += 1 tracker.notified_agents.clear() tracker.responded_agents.clear() logger.info("Broadcast round %d completed for task %s (async callback, no taker)", tracker.round_number, task_id) def _get_all_agent_ids(self) -> List[str]: """获取所有配置的 Agent ID""" if self.dispatcher and hasattr( self.dispatcher, 'router') and self.dispatcher.router: return list(self.dispatcher.router.agent_profiles.keys()) return [] def _get_idle_agents(self) -> List[str]: """获取当前空闲的 Agent 列表(从 config agents 取,而非 counter._per_agent)""" if not self.counter: return [] # agent_profiles 在 Router 初始化时从 config 填充,是完整 Agent 列表 all_agents = list( self.dispatcher.router.agent_profiles.keys()) if self.dispatcher else [] active = self.counter.active_agents return [aid for aid in all_agents if active.get(aid, 0) == 0] async def _dispatch_reviews(self, db_path: Path, project_id: str) -> List[str]: """扫描 review 状态任务,检查是否有产出,调度审查 Agent""" # mail 任务不走 review 流程,直接跳过 if project_id == "_mail": return [] queries = Queries(db_path) bb = Blackboard(db_path) review_tasks = queries.tasks_by_status("review") dispatched: List[str] = [] for task in review_tasks: # 检查是否已有 review 记录 reviews = bb.get_reviews(task.id) if reviews: continue # 已有审查,跳过 # 检查是否最近已 dispatch 过 review(防重复 dispatch) existing = self._check_recent_routing(db_path, task.id, "review") if existing: continue # 已有活跃 review dispatch # #07.2: crash_limit 已移到 _check_timeouts 统一检查 # 检查是否有产出(司马懿建议:无产出直接标 failed) outputs = bb.get_outputs(task.id) if not outputs: conn = get_connection(db_path) try: self._transition_status( conn, task.id, "failed", agent="daemon", detail={"reason": "no_output_for_review"}, ) bb.add_observation( task.id, "daemon", "任务进入 review 状态但没有产出物,自动标记为 failed", ) logger.warning("Task %s in review but no output, marking failed", task.id) finally: conn.close() continue # 调度审查 Agent(司马懿) try: result = await self.dispatcher.dispatch( task, action_type="review", project_config={ "project_id": project_id, "db_path": db_path}, ) if result["status"] == "dispatched": dispatched.append(task.id) # 更新 current_agent 为审查者 conn = get_connection(db_path) try: conn.execute( "UPDATE tasks SET current_agent=? WHERE id=?", (result["agent_id"], task.id), ) conn.commit() finally: conn.close() logger.info("Dispatched review for %s to %s", task.id, result["agent_id"]) except Exception: logger.exception("Review dispatch failed for %s", task.id) return dispatched # ------------------------------------------------------------------ # 僵尸/超时处理 # ------------------------------------------------------------------ def _check_timeouts(self, db_path: Path) -> List[str]: """检查 claimed/working 超时的任务""" queries = Queries(db_path) reclaimed: List[str] = [] now = datetime.utcnow() # UTC,与 SQLite datetime('now') 一致 # claimed 超时 → 重置为 pending(如果 retry_count >= 3 则升级庞统) claimed = queries.tasks_by_status("claimed") for task in claimed: if not task.claimed_at: continue try: claimed_time = datetime.fromisoformat(task.claimed_at) elapsed = (now - claimed_time).total_seconds() / 60.0 if elapsed > self.claim_timeout_minutes: retry_count = getattr(task, 'retry_count', 0) or 0 # 司马懿建议:复用 retry_count,>=3 则升级庞统 if retry_count >= 3: conn = get_connection(db_path) try: self._transition_status( conn, task.id, "escalated", agent="daemon", detail={"reason": "claim_timeout_no_taker", "retry_count": retry_count}, ) reclaimed.append(task.id) logger.warning("Escalated %s: no taker after %d broadcasts", task.id, retry_count) finally: conn.close() else: conn = get_connection(db_path) try: ok = self._transition_status( conn, task.id, "pending", agent="daemon", detail={"reason": "claim_timeout", "elapsed_minutes": round(elapsed, 1)}, ) if ok: # 递增 retry_count(复用为广播轮次计数) conn.execute( "UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?", (task.id,), ) conn.commit() reclaimed.append(task.id) logger.info("Reclaimed %s: claimed → pending (timeout %.1fm, retry=%d)", task.id, elapsed, retry_count + 1) finally: conn.close() except (ValueError, TypeError): pass # working 超时 → 标记为 failed working = queries.tasks_by_status("working") for task in working: # #07.2: crash_limit 统一检查(比超时更严重的信号) if self.dispatcher and hasattr( self.dispatcher, '_check_crash_limit'): if self.dispatcher._check_crash_limit( task.id, db_path, limit=3, window_minutes=30): conn = get_connection(db_path) try: self._transition_status( conn, task.id, "failed", agent="daemon", detail={"reason": "crash_limit", "message": "30 分钟内 crash 3 次,自动标 failed"}, ) finally: conn.close() reclaimed.append(task.id) logger.error( "Task %s: executor crash limit (3/30m), marking failed", task.id) continue # #07.3 ACT-1: updated_at fallback 覆盖 mail auto-working(无 started_at/claimed_at) start_time_str = task.started_at or task.claimed_at or task.updated_at if not start_time_str: continue try: start_time = datetime.fromisoformat(start_time_str) # per-task timeout: deadline 优先,否则用默认值 if task.deadline: deadline_time = datetime.fromisoformat(task.deadline) timeout_minutes = ( deadline_time - start_time).total_seconds() / 60.0 if timeout_minutes < 1: timeout_minutes = self.default_task_timeout_minutes else: timeout_minutes = self.default_task_timeout_minutes elapsed = (now - start_time).total_seconds() / 60.0 if elapsed > timeout_minutes: # [v2.7.1] Mail 幻觉门控兜底:有回复 + working → done if self._current_project_id == "_mail": has_reply = self._mail_check_reply(task.id, db_path) if has_reply: conn = get_connection(db_path) try: ok = self._transition_status( conn, task.id, "done", agent="daemon", detail={"reason": "mail_auto_done_recheck", "elapsed_minutes": round(elapsed, 1)}, ) if ok: reclaimed.append(task.id) logger.info("Mail %s: ticker recheck found reply, marked done (%.1fm)", task.id, elapsed) finally: conn.close() continue conn = get_connection(db_path) try: ok = self._transition_status( conn, task.id, "failed", agent="daemon", detail={"reason": "task_timeout", "elapsed_minutes": round(elapsed, 1), "timeout_minutes": round(timeout_minutes, 1)}, ) if ok: reclaimed.append(task.id) logger.warning("Task %s timed out (working %.1fm > %.1fm)", task.id, elapsed, timeout_minutes) finally: conn.close() except (ValueError, TypeError): pass # v2.7.2: 进程存活性检查 — counter 占用但进程已死的兜底 if self.spawner and self.counter and hasattr( self.counter, "active_agents"): for agent_id in list(self.counter.active_agents.keys()) if hasattr( self.counter, "active_agents") else []: session_info = self.spawner.get_session_by_agent(agent_id) if not session_info: continue pid = session_info.get("pid") task_id_check = session_info.get("task_id") if pid and not self._is_pid_alive(pid): logger.warning("Agent %s process dead (pid=%d), releasing counter", agent_id, pid) self.counter.release(agent_id) # #07.2: review 状态不推 pending,保持 review 等 _dispatch_reviews 处理 # working 状态推回 pending 让 ticker 重新 dispatch if task_id_check and db_path: try: conn = get_connection(db_path) try: current_row = conn.execute( "SELECT status FROM tasks WHERE id=?", ( task_id_check,) ).fetchone() if current_row and current_row["status"] == "review": logger.info( "Task %s in review, keeping status (process dead)", task_id_check) else: self._transition_status( conn, task_id_check, "pending", agent="daemon", detail={ "reason": "process_dead", "pid": pid}, ) finally: conn.close() except Exception: logger.exception( "Failed to handle process dead for task %s", task_id_check) # #07.2: Fix-3b 已删除。review 超时/crash 统一由 process_dead + _check_timeouts 处理 return reclaimed def _mail_check_reply(self, original_task_id: str, db_path: Path) -> bool: """Mail 幻觉门控:检查是否有回复邮件""" try: conn = get_connection(db_path) try: row = conn.execute( "SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1", (original_task_id, f'%{original_task_id}%'), ).fetchone() return row is not None finally: conn.close() except Exception as e: logger.error( "Mail %s: ticker reply check error: %s", original_task_id, e) return True # 保守:查询失败假设有回复 def _check_recent_routing(self, db_path: Path, task_id: str, action_type: str) -> bool: """检查最近 5 分钟内是否已 dispatch 过指定类型的路由(防重复)""" try: conn = get_connection(db_path) try: # 检查是否有 from_status=review 的 dispatched 记录(防止重复 review # dispatch) if action_type == "review": row = conn.execute( "SELECT COUNT(*) as cnt FROM routing_decisions " "WHERE task_id=? AND outcome='dispatched' " "AND from_status='review' " "AND created_at > datetime('now', '-5 minutes')", (task_id,), ).fetchone() else: row = conn.execute( "SELECT COUNT(*) as cnt FROM routing_decisions " "WHERE task_id=? AND outcome='dispatched' " "AND created_at > datetime('now', '-5 minutes')", (task_id,), ).fetchone() return row["cnt"] > 0 if row else False finally: conn.close() except Exception: return False # ------------------------------------------------------------------ # PM2 Crash 启动恢复 (#06) # ------------------------------------------------------------------ def _startup_recover(self) -> Dict[str, Any]: """PM2 crash 后启动恢复:扫描所有非终态任务,从黑板线索重建一致状态""" NON_TERMINAL = {"claimed", "working", "review", "reviewing"} projects = self.registry.list_projects() recovery_report = { "projects": {}, "total_recovered": 0, "total_noop": 0} # 收集所有需要扫描的项目(registry + 虚拟项目) project_dirs = {} for project_id, project_info in projects.items(): if project_info.get("status") == "active": project_dirs[project_id] = self.registry.root / \ project_id / "blackboard.db" # 虚拟项目 for virtual_id in ("_general", "_mail"): virtual_db = Path(self.registry.root) / \ virtual_id / "blackboard.db" if virtual_db.exists() and virtual_id not in project_dirs: project_dirs[virtual_id] = virtual_db for project_id, db_path in project_dirs.items(): if not db_path.exists(): continue # init_db 如果还没初始化 db_key = str(db_path) if db_key not in self._initialized_dbs: from src.blackboard.db import init_db init_db(db_path) self._initialized_dbs.add(db_key) # #06: 临时设置 _current_project_id,确保 _transition_status # 对 _mail 项目不清空 assignee(L839 特殊分支依赖此字段) old_pid = self._current_project_id self._current_project_id = project_id try: recovered, noop_count = self._recover_project( db_path, NON_TERMINAL) if recovered: recovery_report["projects"][project_id] = recovered recovery_report["total_recovered"] += len(recovered) recovery_report["total_noop"] += noop_count except Exception: logger.exception( "Startup recovery failed for project %s", project_id) finally: self._current_project_id = old_pid if recovery_report["total_recovered"] > 0: logger.info("Startup recovery: %d tasks recovered across %d projects", recovery_report["total_recovered"], len(recovery_report["projects"])) elif recovery_report["total_noop"] > 0: logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)", recovery_report["total_noop"]) else: logger.info( "Startup recovery: no non-terminal tasks found, clean start") return recovery_report def _recover_project(self, db_path: Path, non_terminal: set) -> tuple: """恢复单个项目中的非终态任务 Returns: (recovered_list, noop_count) """ conn = get_connection(db_path) recovered = [] noop_count = 0 try: for status in non_terminal: rows = conn.execute( "SELECT id, assignee, current_agent, updated_at FROM tasks WHERE status=?", (status,) ).fetchall() for task in rows: try: action = self._determine_recovery_action( conn, task, status, db_path) if action: self._execute_recovery( conn, task["id"], action, db_path) recovered.append( {"task_id": task["id"], "from": status, "action": action}) else: # 审计:保持原状的任务也记录事件 noop_count += 1 conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)", (task["id"], "daemon", "startup_recovery_noop", json.dumps({"status": status, "reason": "no_action_needed"})) ) conn.commit() except Exception: logger.exception( "Startup recovery failed for task %s", task["id"]) finally: conn.close() return recovered, noop_count def _determine_recovery_action(self, conn, task, status: str, db_path: Path) -> Optional[str]: """根据黑板线索决定恢复动作,返回 None 表示不需要干预""" task_id = task["id"] if status == "claimed": # claimed 统一推 pending return "push_to_pending" if status == "working": return self._recover_working_task(conn, task_id) if status == "review": return self._recover_review_task(conn, task_id) if status == "reviewing": # reviewing → 查 events 找前置状态(done 或 failed),精确恢复 prev_status = self._find_pre_reviewing_status(conn, task_id) if prev_status == "failed": return "push_to_failed" return "push_to_done" # 兜底 done return None def _recover_working_task(self, conn, task_id: str) -> Optional[str]: """working 状态恢复:看黑板线索判断 agent 是否实际完成了工作""" # 最后一次 attempt last_attempt = conn.execute( "SELECT outcome, agent FROM task_attempts WHERE task_id=? ORDER BY id DESC LIMIT 1", (task_id,) ).fetchone() if not last_attempt or last_attempt["outcome"] != "completed": return "push_to_pending_keep_agent" # 保留 current_agent # agent 正常退出,看是否有产出 has_output = conn.execute( "SELECT 1 FROM outputs WHERE task_id=? LIMIT 1", (task_id,) ).fetchone() is not None if not has_output: return "push_to_pending_keep_agent" # 看是否有 handoff has_handoff = conn.execute( "SELECT 1 FROM comments WHERE task_id=? AND comment_type='handoff' LIMIT 1", (task_id,) ).fetchone() is not None if has_handoff: return "push_to_review" # agent 已完成工作并交接 return "push_to_pending_keep_agent" def _recover_review_task(self, conn, task_id: str) -> Optional[str]: """review 状态恢复:看是否有审查结论""" # 检查 reviews 表 review = conn.execute( "SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1", (task_id,) ).fetchone() if review: if review["verdict"] == "approved": return "push_to_done" else: # needs_revision / rejected → 推回 pending 重新执行 return "push_to_pending" # 无审查结论 → 保持 review,ticker 自然会 dispatch reviewer return None def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path): """执行恢复动作""" # 获取原始状态(用于审计) orig_row = conn.execute( "SELECT status FROM tasks WHERE id=?", (task_id,) ).fetchone() orig_status = orig_row["status"] if orig_row else "unknown" if action == "push_to_pending": self._transition_status( conn, task_id, "pending", agent="daemon", detail={ "reason": "startup_recovery", "original_status": orig_status}, ) # 清空 current_agent(常规推 pending,无特定 agent 接手) conn.execute( "UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,)) conn.commit() elif action == "push_to_pending_keep_agent": self._transition_status( conn, task_id, "pending", agent="daemon", detail={ "reason": "startup_recovery", "original_status": orig_status}, ) # 保留 current_agent,让同一 agent 重新接手 conn.commit() elif action == "push_to_review": self._transition_status( conn, task_id, "review", agent="daemon", detail={ "reason": "startup_recovery", "original_status": "working"}, ) conn.commit() elif action == "push_to_done": self._transition_status( conn, task_id, "done", agent="daemon", detail={ "reason": "startup_recovery", "original_status": orig_status}, ) conn.commit() elif action == "push_to_failed": self._transition_status( conn, task_id, "failed", agent="daemon", detail={ "reason": "startup_recovery", "original_status": orig_status}, ) conn.commit() # 记录恢复审计事件 conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)", (task_id, "daemon", "startup_recovery", json.dumps({"action": action})) ) conn.commit() logger.info( "Recovery: task %s → %s (action=%s)", task_id, action, action) def _find_pre_reviewing_status(self, conn, task_id: str) -> str: """查 events 表找到 reviewing 之前的状态(done 或 failed)""" # _transition_status 写入 event_type=f"task_{new_status}",detail 用 # from/to rows = conn.execute( """SELECT detail FROM events WHERE task_id=? AND event_type='task_reviewing' ORDER BY id DESC LIMIT 1""", (task_id,) ).fetchall() for event in rows: try: detail = json.loads(event["detail"]) # _transition_status detail 格式: {"from": old_status, "to": # new_status, ...} prev = detail.get("from") or detail.get("old_status") if prev in ("done", "failed"): return prev except (json.JSONDecodeError, KeyError): continue return "done" # 找不到则兜底 done(reviewing 只从 done/failed 转入) # ------------------------------------------------------------------ # 手动 tick(API 端点触发) # ------------------------------------------------------------------ async def manual_tick(self) -> Dict[str, Any]: """手动触发一次 tick""" logger.info("Manual tick triggered") result = await self.tick() result["manual"] = True return result