Files
sanguo_moziplus_v2/src/daemon/ticker.py
T
cfdaily 9ec601d747
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 29s
CI / frontend (pull_request) Successful in 12s
CI / notify-on-failure (pull_request) Successful in 1s
[moz] feat: Runaway Guard per-task dispatch 上限
§15 Runaway Guard — per-task dispatch_count 上限,防止无限循环 dispatch

问题:mail/toolchain task 走 handler auto-working(跳过 claim),不受
claim_timeout 3 次重试兜底保护。如果反复 spawn 但永远到不了 done/failed,
会无限循环消耗资源(实际案例:2026-06-15 mention 重复投递事件)。

设计:
- tasks 表新增 dispatch_count 字段
- 每次 ticker 成功 dispatch 时递增
- dispatch_count >= 10 时自动标 failed(reason=runaway_guard)
- 覆盖所有非终态(pending/working/claimed)
- 参考 Hermes v0.13 §3 Per-Task 重试上限

改动文件:
- src/blackboard/db.py: _safe_add_column dispatch_count
- src/blackboard/models.py: Task dataclass 加 dispatch_count
- src/daemon/ticker.py: dispatch 递增 + _check_timeouts runaway guard
- docs/design/15-runaway-guard.md: 设计文档
- tests/integration/test_ticker_integration.py: E13 测试 3 个

测试:456 passed, 3 skipped
2026-06-16 23:10:27 +00:00

1957 lines
80 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Daemon Ticker — 30s 状态扫描 + 依赖推进 + 事件驱动
Tick 循环是整个 Daemon 的核心驱动:
1. 遍历所有 active 项目
2. 每个 tick 扫描黑板状态
3. 推进依赖链(blocked → pending
4. 写入 daemon_tick 事件
5. 调度 pending 任务(F9 实现)
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, List, Optional
from dataclasses import dataclass, field as dc_field
from src.daemon.task_type_registry import TaskTypeRegistry
from src.blackboard.operations import Blackboard
from src.blackboard.db import get_connection
from src.daemon.spawner import AgentBusyError
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
@dataclass
class BroadcastRound:
"""追踪单个任务的广播状态"""
task_id: str
notified_agents: set = dc_field(default_factory=set) # 已 spawn 过的 Agent
responded_agents: set = dc_field(
default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY
round_number: int = 0 # 当前第几轮(0=未开始,1=第1轮)
logger = logging.getLogger("moziplus-v2.ticker")
class Ticker:
"""Daemon ticker 主循环"""
def __init__(
self,
registry: ProjectRegistry,
tick_interval: float = 30.0,
max_ticks: Optional[int] = None,
on_tick_complete: Optional[Callable[[],
Coroutine[Any, Any, None]]] = None,
dispatcher: Optional[Any] = None,
spawner: Optional[Any] = None,
max_dispatch_per_tick: int = 3,
claim_timeout_minutes: float = 5.0,
default_task_timeout_minutes: float = 30.0,
health_checker: Optional[Any] = None,
experience_distiller: Optional[Any] = None,
inbox_watcher: Optional[Any] = None,
):
"""
Args:
registry: 项目注册表
tick_interval: tick 间隔秒数
max_ticks: 测试用,限制最大 tick 次数
on_tick_complete: 每次 tick 完成后的回调(用于通知等)
dispatcher: Dispatcher 实例(Agent 调度)
spawner: AgentSpawner 实例(Agent spawn
max_dispatch_per_tick: 每个 tick 最多调度多少个 pending 任务
claim_timeout_minutes: claimed 超时重置为 pending 的分钟数
default_task_timeout_minutes: working 超时标记 failed 的分钟数
"""
self.registry = registry
self.tick_interval = tick_interval
self.max_ticks = max_ticks
self.on_tick_complete = on_tick_complete
self.dispatcher = dispatcher
self.spawner = spawner
self.max_dispatch_per_tick = max_dispatch_per_tick
self.claim_timeout_minutes = claim_timeout_minutes
self.default_task_timeout_minutes = default_task_timeout_minutes
self.health_checker = health_checker
self.experience_distiller = experience_distiller
self.inbox_watcher = inbox_watcher
self._tick_count: int = 0
self._running: bool = False
self._task: Optional[asyncio.Task] = None
# 已初始化的 db_path 集合,避免每次 tick 重复 init_db
self._initialized_dbs: set = set()
# 每个项目上次 tick 的 event count(用于僵尸检测 F8
self._last_event_counts: Dict[str, int] = {}
# 广播轮次追踪(Phase 1 bug fix
self._broadcast_tracker: Dict[str, BroadcastRound] = {}
# 当前项目 ID_dispatch_pending 需要)
self._current_project_id: Optional[str] = None
# ------------------------------------------------------------------
# 生命周期
# ------------------------------------------------------------------
async def start(self) -> None:
"""启动 tick 循环"""
if self._running:
return
self._running = True
# 启动恢复(PM2 crash 后重建一致状态)
self._startup_recover()
self._task = asyncio.create_task(self._loop())
# 启动 InboxWatcher(即时事件监听)
if self.inbox_watcher:
try:
await self.inbox_watcher.start()
logger.info("InboxWatcher started")
except Exception as e:
logger.warning("InboxWatcher start failed: %s", e)
logger.info("Ticker started (interval=%.1fs)", self.tick_interval)
async def stop(self) -> None:
"""停止 tick 循环"""
self._running = False
# 停止 InboxWatcher
if self.inbox_watcher:
try:
await self.inbox_watcher.stop()
logger.info("InboxWatcher stopped")
except Exception as e:
logger.warning("InboxWatcher stop failed: %s", e)
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Ticker stopped (ticks=%d)", self._tick_count)
@property
def tick_count(self) -> int:
return self._tick_count
@property
def is_running(self) -> bool:
return self._running
# ------------------------------------------------------------------
# 主循环
# ------------------------------------------------------------------
async def _loop(self) -> None:
"""Tick 主循环"""
while self._running:
try:
await self.tick()
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Tick %d error", self._tick_count + 1)
if self.max_ticks and self._tick_count >= self.max_ticks:
logger.info("Max ticks reached (%d), stopping", self.max_ticks)
self._running = False
break
try:
await asyncio.sleep(self.tick_interval)
except asyncio.CancelledError:
return
# ------------------------------------------------------------------
# 单次 tick
# ------------------------------------------------------------------
async def tick(self) -> Dict[str, Any]:
"""执行一次 tick,遍历所有 active 项目"""
self._tick_count += 1
tick_num = self._tick_count
results: Dict[str, Any] = {
"tick": tick_num,
"projects": {},
}
projects = self.registry.list_projects()
active_projects = {
pid: info for pid, info in projects.items()
if info.get("status") == "active"
}
for project_id, project_info in active_projects.items():
try:
pr = await self._tick_project(project_id, project_info)
results["projects"][project_id] = pr
except Exception as e:
logger.exception(
"Tick %d project %s error",
tick_num,
project_id)
results["projects"][project_id] = {"error": str(e)}
# 虚拟项目 _general:不在 registry 但需要调度
general_db = Path(self.registry.root) / "_general" / "blackboard.db"
if general_db.exists() and "_general" not in active_projects:
try:
pr = await self._tick_project("_general", {
"id": "_general", "name": "一般任务",
"status": "active", "source": "virtual",
})
results["projects"]["_general"] = pr
except Exception as e:
logger.exception("Tick %d _general error", tick_num)
results["projects"]["_general"] = {"error": str(e)}
# 虚拟项目:从注册表自动发现 + _general 硬编码
for vp in TaskTypeRegistry.virtual_projects():
vp_db = Path(self.registry.root) / vp / "blackboard.db"
if vp_db.exists() and vp not in active_projects:
try:
vp_handler = TaskTypeRegistry.get_by_project(vp)
vp_name = vp_handler.display_name if vp_handler and vp_handler.display_name else vp
pr = await self._tick_project(vp, {
"id": vp, "name": vp_name,
"status": "active", "source": "virtual",
})
results["projects"][vp] = pr
except Exception as e:
logger.exception("Tick %d %s error", tick_num, vp)
results["projects"][vp] = {"error": str(e)}
logger.debug(
"Tick %d complete: %d projects",
tick_num,
len(active_projects))
if self.on_tick_complete:
try:
await self.on_tick_complete()
except Exception:
logger.exception("on_tick_complete callback error")
return results
async def _tick_project(self, project_id: str,
project_info: Dict[str, Any]) -> Dict[str, Any]:
"""单项目的 tick 处理"""
project_dir = self.registry.root / project_id
db_path = project_dir / "blackboard.db"
if not db_path.exists():
return {"status": "no_db"}
# 只在首次遇到该 db_path 时执行 init_db
db_key = str(db_path)
if db_key not in self._initialized_dbs:
from src.blackboard.db import init_db
init_db(db_path)
self._initialized_dbs.add(db_key)
result: Dict[str, Any] = {
"status": "ok",
"summary_before": {},
"summary_after": {},
"advanced": [],
"dispatched": [],
"review_dispatched": [],
"zombie_reclaimed": [],
"parents_refreshed": [],
}
# 保存当前 project_id(供 _dispatch_pending 使用)
self._current_project_id = project_id
# 1. 扫描当前状态
queries = Queries(db_path)
result["summary_before"] = queries.task_summary()
# 2. 依赖推进
advanced = self._advance_dependencies(db_path)
result["advanced"] = advanced
# 3. 僵尸/超时处理(在依赖推进后、调度前)
zombie_reclaimed = self._check_timeouts(db_path)
result["zombie_reclaimed"] = zombie_reclaimed
# 4. 调度 pending 任务
if self.dispatcher and self.spawner:
dispatched = await self._dispatch_pending(db_path, project_id)
result["dispatched"] = dispatched
# 5. 调度审查任务
if self.dispatcher and self.spawner:
review_dispatched = await self._dispatch_reviews(db_path, project_id)
result["review_dispatched"] = review_dispatched
# 6. 聚合父 Task 状态(v2.7
parents_refreshed = self._refresh_parent_statuses(db_path)
result["parents_refreshed"] = parents_refreshed
# 7. 一轮结束检测 + 庞统 review spawnv2.9 #01
round_reviewed = await self._check_round_complete(db_path, project_id)
result["round_reviewed"] = round_reviewed
# 8. @mention 通知(v2.9 #01
mentions_processed = await self._process_mentions(db_path, project_id)
result["mentions_processed"] = mentions_processed
# 9. 写 daemon_tick 事件
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(None, "daemon", "daemon_tick",
json.dumps({"tick": self._tick_count, "advanced_count": len(advanced),
"parents_refreshed": len(parents_refreshed)})),
)
conn.commit()
finally:
conn.close()
# 8. 健康检查(僵尸检测)
if self.health_checker:
try:
self.health_checker.check(
project_id, db_path, self._tick_count)
except Exception as e:
logger.warning("HealthChecker error for %s: %s", project_id, e)
# 9. 经验蒸馏(完成的 task 自动触发)
if self.experience_distiller:
try:
conn2 = get_connection(db_path)
try:
done_tasks = conn2.execute(
"SELECT id FROM tasks WHERE status='done' AND updated_at > datetime('now', '-60 seconds')"
).fetchall()
finally:
conn2.close()
for row in done_tasks:
t = Blackboard(db_path).get_task(row[0])
if t:
self.experience_distiller.distill_from_task(
task_id=t.id, task_title=t.title, task_type=t.task_type
)
except Exception as e:
logger.warning(
"ExperienceDistiller error for %s: %s", project_id, e)
# 10. 扫描后状态
result["summary_after"] = queries.task_summary()
return result
# ------------------------------------------------------------------
# 父 Task 状态聚合
# ------------------------------------------------------------------
def _refresh_parent_statuses(self, db_path: Path) -> List[str]:
"""全量扫描有子 Task 的父 Task,刷新聚合状态
跳过手动状态(cancelled, paused)的父 Task。使用单连接批量处理。
"""
queries = Queries(db_path)
refreshed: List[str] = []
conn = get_connection(db_path)
try:
# 找出所有有子 Task 的父 Task ID
parent_rows = conn.execute(
"SELECT DISTINCT parent_task FROM tasks WHERE parent_task IS NOT NULL"
).fetchall()
parent_ids = [r["parent_task"] for r in parent_rows]
for pid in parent_ids:
computed = queries.compute_parent_status(pid)
if computed is None:
continue
parent = conn.execute(
"SELECT status FROM tasks WHERE id=?", (pid,)
).fetchone()
if parent and parent["status"] != computed:
conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?",
(computed, pid),
)
refreshed.append(pid)
logger.info(
"Parent %s status aggregated: → %s", pid, computed)
if refreshed:
conn.commit()
finally:
conn.close()
return refreshed
# ------------------------------------------------------------------
# 一轮结束检测 + 庞统 review (v2.9 #01)
# ------------------------------------------------------------------
MAX_ROUNDS = 5 # §4.5 防无限循环
async def _check_round_complete(self, db_path: Path,
project_id: str) -> List[str]:
"""检测 parent task 下所有 sub task 终态 → spawn 庞统 review
流程(§4.4):
1. 扫描所有 parent task
2. 对每个 parent 检查 sub task 是否全部终态
3. 检查 round_count 上限
4. increment round_count
5. spawn 庞统 review
"""
if not self.dispatcher or not self.spawner:
return []
bb = Blackboard(db_path)
reviewed: List[str] = []
# 找所有 parent task(有子 task 的)
conn = get_connection(db_path)
try:
parent_rows = conn.execute(
"SELECT DISTINCT parent_task FROM tasks WHERE parent_task IS NOT NULL"
).fetchall()
finally:
conn.close()
for row in parent_rows:
parent_id = row["parent_task"]
try:
summary = bb.get_subtasks_summary(parent_id)
if not summary or not summary["all_terminal"]:
continue
# 检查 parent 自身状态:只有 done 状态的 parent 才触发
# (聚合后 parent 状态为 done 说明所有 sub 都完了)
# BUG-2 fix: done 和 failed 都触发 reviewfailed 时优先判断重试/换人)
if summary["parent_status"] not in ("done", "failed"):
continue
# 检查 round_count 上限
if summary["round_count"] >= self.MAX_ROUNDS:
logger.warning(
"Parent %s reached max rounds (%d), skipping review",
parent_id, self.MAX_ROUNDS)
continue
# 构建 prompt(用即将成为的 round_num,不提前 increment
next_round = summary["round_count"] + 1
outputs = bb.get_aggregate_outputs(parent_id)
comments = bb.get_round_comments(parent_id)
parent_task = bb.get_task(parent_id)
if not parent_task:
continue
# spawn 庞统 review(先 spawn,成功后再 increment round_count
review_prompt = self._build_review_prompt(
parent_task, summary, outputs, comments, next_round,
project_id=project_id
)
spawned = await self._spawn_pangtong_review(
parent_task, review_prompt, project_id, new_round=next_round
)
if spawned:
# spawn 成功 → increment round_count + 标记 reviewing
new_round = bb.increment_round_count(parent_id)
reviewed.append(parent_id)
logger.info(
"Round %d review spawned for parent %s (subs: %s)",
new_round, parent_id, summary
)
except Exception:
logger.exception("Round check error for parent %s", parent_id)
return reviewed
def _build_review_prompt(self, parent_task, summary: dict,
outputs: list, comments: list,
round_num: int,
project_id: str = "") -> str:
"""构建庞统 review prompt(§4.2 三问框架)"""
goal = parent_task.description or parent_task.title
must_haves = parent_task.must_haves or "{}"
# 成果物摘要(限制 token
output_lines = []
for o in outputs[:20]: # 最多 20 个
output_lines.append(
f"- [{o.get('task_title', '?')}] {o.get('title', '?')} "
f"({o.get('output_type', '?')}) by {o.get('agent', '?')}"
)
# 讨论摘要(限制 50 条)
comment_lines = []
for c in comments[:50]:
comment_lines.append(
f"[{c.get('created_at', '?')[:16]}] {c.get('author', '?')}: {c.get('body', '?')[:200]}"
)
return f"""## 庞统 Review(第 {round_num} 轮)
### Goal
{goal}
### 验收标准
{must_haves}
### 本轮 Sub Task 状态
- 完成: {summary['done']}
- 失败: {summary['failed']}
- 取消: {summary['cancelled']}
- 总计: {summary['total']}
### 成果物
{chr(10).join(output_lines) if output_lines else ''}
### 黑板讨论
{chr(10).join(comment_lines) if comment_lines else ''}
### 三问
1. Goal 还清晰吗?(是否有 goal drift)
2. 成果物覆盖 goal 了吗?(逐条检查验收标准)
3. 下一轮需要做什么?(创建新 sub tasks / 标记完成 / 调整方向)
### 失败处理
{f'{summary["failed"]} 个 sub task failed,优先判断是应该重试、换人、还是调整方案。' if summary['failed'] > 0 else '无失败'}
### 你可以
- 创建新一轮 sub tasks: POST http://{self.spawner.api_host}:{self.spawner.api_port}/api/projects/{project_id}/tasks
body: {"title": "...", "description": "...", "parent_task": "{parent_task.id}", "assignee": "zhangfei-dev"}
- 调整 goal(更新 parent task description/must_haves
- 标记完成(如果 goal 已达成,回复 GOAL_ACHIEVED
Round 上限: {self.MAX_ROUNDS}(当前第 {round_num} 轮)
Project ID: {project_id}
Parent Task ID: {parent_task.id}
"""
async def _spawn_pangtong_review(self, parent_task,
review_prompt: str,
project_id: str,
new_round: int = 0) -> bool:
"""Spawn 庞统进行 review
流程:
1. spawn 庞统
2. 成功后把 parent 设为 reviewing(防重复触发)
3. 通过 on_complete 回调消费庞统结论
"""
try:
agent_id = "pangtong-fujunshi"
f"review-{parent_task.id}-r{new_round}"
# 构造 on_complete 回调:解析庞统结论,更新 parent 状态
async def _on_review_complete(aid: str, outcome: str):
try:
# 从 spawner session meta 读取庞统的回复文本
review_text = ""
if self.spawner:
# 找庞统最新完成的 session
latest_meta = None
latest_time = ""
for sid, sess in self.spawner._sessions.items():
if sess.get(
"agent_id") == agent_id and sess.get("meta"):
t = sess.get("completed_at", "")
if t > latest_time:
latest_time = t
latest_meta = sess["meta"]
if latest_meta and isinstance(latest_meta, dict):
payloads = latest_meta.get("payloads", [])
review_text = " ".join(
p.get("text", "") for p in payloads if isinstance(p, dict)
)
self._handle_review_conclusion(
parent_task.id, project_id, review_text, new_round)
except Exception:
logger.exception("Review conclusion handler failed for %s",
parent_task.id)
# 用 spawner.spawn_full_agent 直接 spawn
result = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=review_prompt,
task_id=parent_task.id,
use_main_session=True, # #02: 投递到 main session
task_db_path=None,
on_complete=_on_review_complete,
)
if result is not None:
# spawn 成功 → parent 进入 reviewing(防下个 tick 重复触发)
self._set_parent_reviewing(parent_task.id, project_id)
return True
return False
except Exception:
logger.exception(
"Failed to spawn pangtong review for %s",
parent_task.id)
return False
def _set_parent_reviewing(self, parent_id: str, project_id: str):
"""将 parent task 状态设为 reviewing(防重复触发)"""
try:
db_path = self._resolve_db_path(project_id)
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status='reviewing', updated_at=datetime('now') "
"WHERE id=? AND status IN ('done', 'failed')",
(parent_id,))
conn.commit()
logger.info("Parent %s → reviewing (round review in progress)",
parent_id)
finally:
conn.close()
except Exception:
logger.exception("Failed to set parent %s to reviewing", parent_id)
def _handle_review_conclusion(self, parent_id: str, project_id: str,
review_text: str, round_num: int):
"""解析庞统 review 结论,更新 parent 状态
review_text 是庞统回复的文本(从 spawner session meta payloads 拼接)。
"""
db_path = self._resolve_db_path(project_id)
conn = get_connection(db_path)
try:
# 解析 GOAL_ACHIEVED
is_achieved = bool(
review_text and "GOAL_ACHIEVED" in review_text.upper())
if is_achieved:
# Goal 达成 → parent 最终完成
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status='done', updated_at=datetime('now') "
"WHERE id=? AND status='reviewing'",
(parent_id,))
conn.commit()
logger.info(
"Parent %s review conclusion: GOAL_ACHIEVED → done",
parent_id)
else:
# 庞统可能创建了新 sub task 或需要继续 → 恢复 working
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status='working', updated_at=datetime('now') "
"WHERE id=? AND status='reviewing'",
(parent_id,))
conn.commit()
sub_count = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE parent_task=?",
(parent_id,)
).fetchone()[0]
logger.info(
"Parent %s review conclusion: continue → working "
"(round %d, subs=%d)",
parent_id, round_num, sub_count)
except Exception:
logger.exception(
"Failed to handle review conclusion for %s",
parent_id)
# 安全恢复:reviewing → working
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status='working', updated_at=datetime('now') "
"WHERE id=? AND status='reviewing'",
(parent_id,))
conn.commit()
except Exception:
pass
finally:
conn.close()
def _resolve_db_path(self, project_id: str) -> Path:
"""解析项目 DB 路径"""
from src.utils import get_data_root
return get_data_root() / project_id / "blackboard.db"
# ------------------------------------------------------------------
# @mention 通知处理 (v2.9 #01)
# ------------------------------------------------------------------
MENTION_MAX_RETRIES = 5
async def _process_mentions(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 pending mentions → spawn 被 @ 的 Agent
流程(§3.4):
1. 扫描 mention_queue 中 pending 且 retry_count < 5 的记录
2. 按 mentioned_agent 分组,同一 agent 多条 mention 合并为一次 spawn
3. 尝试 spawn,成功 → notified,失败 → retry_count++
"""
if not self.spawner:
return []
bb = Blackboard(db_path)
mentions = bb.get_pending_mentions(
max_retries=self.MENTION_MAX_RETRIES)
if not mentions:
return []
# 按 mentioned_agent 分组
agent_mentions: Dict[str, List[Dict]] = {}
for m in mentions:
aid = m["mentioned_agent"]
agent_mentions.setdefault(aid, []).append(m)
processed: List[str] = []
for agent_id, items in agent_mentions.items():
try:
# 构建 mention 摘要
mention_lines = []
task_ids = set()
for item in items:
mention_lines.append(
f"- [{item.get('comment_author', '?')}] {item.get('comment_body', '')[:200]}"
)
task_ids.add(item["task_id"])
# 取第一个 task 作为 spawn 上下文
tid = items[0]["task_id"]
task = bb.get_task(tid)
if not task:
continue
# 构建 mention prompt
prompt = self._build_mention_prompt(
agent_id, task, mention_lines, project_id)
# #09: 检测 rebuttal 场景 — review 状态的任务 + rebuttal comment
is_rebuttal_review = False
if task and task.status == "review":
for item in items:
ct = item.get("comment_type", "")
if ct == "rebuttal":
is_rebuttal_review = True
break
if is_rebuttal_review:
# rebuttal 重审:带 on_complete 回调处理新 verdict
_ticker = self
_pid = project_id
_t_id = tid
async def _rebuttal_on_complete(aid: str, outcome: str):
try:
if outcome in ("completed", "session_revived"):
# 读新 verdict
rdb_path = _ticker._resolve_db_path(_pid)
rconn = get_connection(rdb_path)
try:
new_review = rconn.execute(
"SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
(_t_id,)
).fetchone()
finally:
rconn.close()
if new_review and new_review["verdict"] == "approved":
_ticker._transition_status(
get_connection(
rdb_path), _t_id, "done",
agent="daemon",
detail={"reason": "rebuttal_approved"})
logger.info(
"Rebuttal: task %s approved after rebuttal", _t_id)
else:
# 仍非 approved → @mention assignee
verdict_str = new_review["verdict"] if new_review else "未知"
rconn2 = get_connection(rdb_path)
try:
t_row = rconn2.execute(
"SELECT assignee FROM tasks WHERE id=?", (_t_id,)).fetchone()
finally:
rconn2.close()
if t_row and t_row["assignee"]:
from src.blackboard.blackboard import Blackboard
bb2 = Blackboard(rdb_path)
bb2.add_comment(_t_id, "daemon",
f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
logger.info(
"Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str)
except Exception:
logger.exception(
"Rebuttal on_complete failed for task %s", _t_id)
result = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=prompt,
task_id=tid,
use_main_session=True,
on_complete=_rebuttal_on_complete,
)
else:
# 普通 mention:不带回调
result = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=prompt,
task_id=tid,
use_main_session=True, # #02: 投递到 main session
)
if result is not None:
# 成功 → 标记所有该 agent 的 mentions 为 notified
for item in items:
bb.mark_mention_notified(item["id"])
processed.append(agent_id)
logger.info(
"Mention spawn success: %s (%d mentions)",
agent_id,
len(items))
else:
# spawn 返回 None(其他原因)→ 递增 retry_count
for item in items:
bb.mark_mention_retry(item["id"])
logger.warning(
"Mention spawn failed: %s, retrying next tick", agent_id)
except AgentBusyError:
# Agent 忙,不递增 retry_count,等下次 tick 自然重试
logger.info(
"Mention spawn skipped: %s busy, will retry next tick",
agent_id)
except Exception:
logger.exception(
"Mention processing error for agent %s", agent_id)
for item in items:
try:
if item.get("retry_count",
0) >= self.MENTION_MAX_RETRIES - 1:
bb.mark_mention_failed(item["id"])
else:
bb.mark_mention_retry(item["id"])
except Exception:
pass
return processed
def _build_mention_prompt(self, agent_id: str, task: Any,
mention_lines: List[str],
project_id: str) -> str:
"""#03: @mention prompt(身份注入)"""
api_host = getattr(
self.spawner,
'api_host',
'127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(
self.spawner,
'api_port',
8083) if self.spawner else 8083
api_base = f"http://{api_host}:{api_port}/api"
# 获取 Agent 专长
caps = "通用"
try:
if self.dispatcher and self.dispatcher.router:
profile = self.dispatcher.router.agent_profiles.get(agent_id)
if profile and profile.capabilities_zh:
caps = ", ".join(profile.capabilities_zh)
except Exception:
pass
mentions_text = "\n".join(mention_lines[:10])
return f"""你在黑板上被 @ 了。
## 你的身份
你是 {agent_id},专长: {caps}
## 相关讨论
{mentions_text}
## 任务上下文
- 项目: {project_id}
- 任务: {task.title}
- 描述: {task.description or ''}
## 你能做什么
- 读完整上下文: GET {api_base}/projects/{project_id}/tasks/{task.id}?expand=all
- 回应: POST {api_base}/projects/{project_id}/tasks/{task.id}/comments
- 如果讨论收敛到可执行任务,可以创建 sub task
## 约束
- 只回应与你专长相关的内容
- 禁止使用 sessions_send 直接发消息
- 委托他人做事用黑板 comment @agent-id,系统自动路由(无需手动传 mentions 数组)
"""
def _advance_dependencies(self, db_path: Path) -> List[str]:
"""检查 blocked 任务,若所有依赖已完成则推进为 pending
Returns:
被推进的 task_id 列表
"""
queries = Queries(db_path)
blocked = queries.blocked_tasks_with_deps()
advanced: List[str] = []
if not blocked:
return advanced
conn = get_connection(db_path)
try:
for item in blocked:
if item["all_deps_done"]:
task_id = item["task_id"]
ok = self._transition_status(conn, task_id, "pending",
agent="daemon",
detail={"reason": "all_dependencies_done"})
if ok:
advanced.append(task_id)
logger.info("Advanced %s: blocked → pending", task_id)
finally:
conn.close()
return advanced
def _transition_status(self, conn, task_id: str, new_status: str,
agent: str = "daemon",
detail: Optional[Dict] = None) -> bool:
"""轻量状态转换(不走 Blackboard 类,避免 init_db"""
from src.blackboard.db import VALID_TRANSITIONS, TERMINAL_STATUSES, EVENT_TYPES
from datetime import datetime
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return False
old_status = row["status"]
if old_status in TERMINAL_STATUSES or old_status == new_status:
return old_status == new_status
if new_status not in VALID_TRANSITIONS.get(old_status, set()):
return False
now = datetime.utcnow().isoformat()
# 重置到 pending 时清空 assignee(避免残留导致重复路由到同一 Agent)
# handler 虚拟项目(_mail 等)的 assignee 是收件人,永不清空
if new_status == "pending":
handler = TaskTypeRegistry.get_by_project(self._current_project_id)
if handler:
conn.execute(
"UPDATE tasks SET status=?, updated_at=? WHERE id=?",
(new_status, now, task_id),
)
else:
conn.execute(
"UPDATE tasks SET status=?, assignee=NULL, resumed_from=NULL, updated_at=? WHERE id=?",
(new_status, now, task_id),
)
elif new_status == "paused":
# 记录暂停前状态,恢复时回到原状态
conn.execute(
"UPDATE tasks SET status=?, resumed_from=?, updated_at=? WHERE id=?",
(new_status, old_status, now, task_id),
)
else:
conn.execute(
"UPDATE tasks SET status=?, updated_at=? WHERE id=?",
(new_status, now, task_id),
)
event_type = f"task_{new_status}"
if event_type not in EVENT_TYPES:
event_type = "daemon_tick"
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, event_type, json.dumps(
{"from": old_status, "to": new_status, **(detail or {})})),
)
conn.commit()
return True
# ------------------------------------------------------------------
# Agent 调度
# ------------------------------------------------------------------
async def _dispatch_pending(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 pending 任务并调度
v3.0: 两条路径
- 确定性路径:retry/handoff/assignee/生命周期 → 直接 spawn
- 广播认领:无确定性路径 → spawn 所有空闲 Agent,自主 claim
"""
queries = Queries(db_path)
pending = queries.pending_dispatchable()
dispatched: List[str] = []
if not pending:
return dispatched
# 分两批:确定性 vs 广播
deterministic_tasks = []
broadcast_tasks = []
for task in pending[:self.max_dispatch_per_tick]:
decision = self.dispatcher.decide(task)
if decision.get("mode") in ("deterministic", "agent_handoff"):
deterministic_tasks.append((task, decision))
else:
broadcast_tasks.append(task)
# 路径1:确定性路由 → 直接 spawn
for task, decision in deterministic_tasks:
try:
result = await self.dispatcher.dispatch(
task,
project_config={
"project_id": project_id,
"db_path": db_path},
)
if result["status"] == "dispatched" and result["level"] in (
"full", "escalate"):
conn = get_connection(db_path)
try:
# [Step 5] handler 项目已在 dispatcher 中标 working,跳过 claimed
handler = TaskTypeRegistry.get_by_project(project_id)
if handler:
conn.execute(
"UPDATE tasks SET current_agent=? WHERE id=?",
(result["agent_id"], task.id),
)
conn.commit()
dispatched.append(task.id)
logger.info("Dispatched %s to %s (session=%s, handler auto-working)",
task.id, result["agent_id"],
result.get("session_id"))
else:
ok = self._transition_status(
conn, task.id, "claimed",
agent="daemon",
detail={"dispatched_to": result["agent_id"],
"session_id": result.get("session_id")},
)
if ok:
conn.execute(
"UPDATE tasks SET current_agent=? WHERE id=?",
(result["agent_id"], task.id),
)
conn.commit()
dispatched.append(task.id)
logger.info("Dispatched %s to %s (session=%s)",
task.id, result["agent_id"],
result.get("session_id"))
finally:
conn.close()
elif result["status"] == "blocked":
# Guardrail 拦截:任务标记为 blocked
conn = get_connection(db_path)
try:
self._transition_status(
conn, task.id, "blocked",
agent="daemon",
detail={"reason": result.get("reason", "guardrail"),
"violations": result.get("violations", [])},
)
logger.info("Task %s blocked by guardrail: %s",
task.id, result.get("reason", "unknown"))
finally:
conn.close()
except Exception:
logger.exception("Dispatch failed for %s", task.id)
# 路径2:广播认领
if broadcast_tasks:
broadcast_ids = await self._broadcast_claim(broadcast_tasks, db_path, project_id)
dispatched.extend(broadcast_ids)
# §15 Runaway Guard: 统一递增 dispatch_count
if dispatched:
conn = get_connection(db_path)
try:
for tid in dispatched:
conn.execute(
"UPDATE tasks SET dispatch_count = COALESCE(dispatch_count, 0) + 1 WHERE id=?",
(tid,),
)
conn.commit()
finally:
conn.close()
return dispatched
async def _broadcast_claim(self, tasks: list, db_path: Path,
project_id: str) -> List[str]:
"""广播一批待认领任务给所有空闲 Agent(每轮最多广播一次)
司马懿建议:攒一批任务,一次广播,而非每个任务触发一次广播。
广播前检查全局并发,接近上限时跳过。
"""
if not self.spawner:
return []
# 全局并发检查(司马懿建议 1
if self.counter.is_near_limit():
logger.info("Skipping broadcast: global concurrent near limit (%d/%d)",
self.counter.global_active, self.counter.max_global)
return []
# 分离:需要升级的 vs 可广播的
broadcastable = []
escalated = []
for t in tasks:
tracker = self._broadcast_tracker.get(t.id)
if tracker and tracker.round_number >= 3:
escalated.append(t)
else:
broadcastable.append(t)
# 升级庞统
for t in escalated:
conn = get_connection(db_path)
try:
self._transition_status(
conn, t.id, "escalated",
agent="daemon",
detail={"reason": "no_taker_after_3_broadcasts",
"round_number": self._broadcast_tracker.get(t.id).round_number if self._broadcast_tracker.get(t.id) else 0},
)
logger.warning(
"Escalated %s: no taker after 3 broadcast rounds", t.id)
self._broadcast_tracker.pop(t.id, None)
finally:
conn.close()
if not broadcastable:
return []
idle_agents = self._get_idle_agents()
if not idle_agents:
logger.warning(
"No idle agents for broadcast, skipping (capacity issue)")
return []
task_ids = [t.id for t in broadcastable]
logger.info("Broadcasting %d tasks to %d idle agents: %s",
len(broadcastable), len(idle_agents), task_ids)
# 审计事件
try:
conn = get_connection(db_path)
try:
for task in broadcastable:
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) "
"VALUES (?,?,?,?)",
(task.id, "daemon", "broadcast_claim",
json.dumps({"agents": idle_agents, "task_title": task.title})),
)
conn.commit()
finally:
conn.close()
except Exception:
pass
# 为每个任务获取/创建 tracker
for t in broadcastable:
if t.id not in self._broadcast_tracker:
self._broadcast_tracker[t.id] = BroadcastRound(task_id=t.id)
spawned = []
for agent_id in idle_agents:
prompt = self._build_claim_prompt(
agent_id, broadcastable, project_id)
try:
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=prompt,
task_id="broadcast",
task_db_path=db_path,
use_main_session=True,
broadcast_task_ids=[t.id for t in broadcastable],
)
if session_id is not None:
spawned.append(session_id)
# 记录已通知的 Agent
for t in broadcastable:
self._broadcast_tracker[t.id].notified_agents.add(
agent_id)
except AgentBusyError:
logger.debug("Broadcast skip %s: busy", agent_id)
except Exception:
logger.exception("Broadcast spawn failed for %s", agent_id)
return spawned
def _build_claim_prompt(self, agent_id: str, tasks: list,
project_id: str) -> str:
"""#03: 广播认领 prompt(身份+专长注入)"""
api_host = getattr(
self.spawner,
'api_host',
'127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(
self.spawner,
'api_port',
8083) if self.spawner else 8083
api_base = f"http://{api_host}:{api_port}/api"
# 获取 Agent 专长
caps = "通用"
try:
if self.dispatcher and self.dispatcher.router:
profile = self.dispatcher.router.agent_profiles.get(agent_id)
if profile and profile.capabilities_zh:
caps = ", ".join(profile.capabilities_zh)
except Exception:
pass
task_list = "\n".join([
f"- ID: {t.id}, 标题: {t.title}, 类型: {getattr(t, 'task_type', '') or 'general'}, "
f"优先级: {t.priority}"
for t in tasks
])
return f"""你是 {agent_id}{caps})。你们是一个协作团队,共享一块黑板。
## 待处理任务
{task_list}
## 你的角色
你收到团队广播。按你的专业判断回应:
1. 有属于你专业的任务 → 认领并执行
2. 不是你的活,但你的专业领域和这个任务有实际交叉 → 写 observation comment
- 从你的专业视角:你看到什么风险?什么约束?什么跨领域建议?
- 例:关羽看到编码任务 → "这个函数涉及风控计算,建议用 decimal 而非 float"
- 例:赵云看到编码任务 → "这个策略需要分钟线数据,NAS路径 /Volumes/stock/min"
- observation 限 200 字
3. 你的领域和任务无交叉 → NO_REPLY
## API
- 读任务详情: GET {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}?expand=all
- 认领: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/claim
body: {{"agent": "{agent_id}"}}
- 认领后标 working: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/status
body: {{"status": "working", "agent": "{agent_id}"}}
- 写评论: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/comments
body: {{"author": "{agent_id}", "comment_type": "observation", "body": "专业判断(≤200字)"}}
## 约束
- 一个 tick 只认领一个任务
- observation 只在领域有实际交叉时写,纯粹不匹配则 NO_REPLY
- 认领后必须写产出物再转 review
- claim 失败说明已被别人认领,NO_REPLY 退出
- 禁止使用 sessions_send 直接发消息
"""
@property
def counter(self):
"""从 Dispatcher 获取 counter"""
return getattr(self.dispatcher, 'counter',
None) if self.dispatcher else None
@staticmethod
def _is_pid_alive(pid: int) -> bool:
"""检查进程是否存活"""
import os
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False
def record_broadcast_response(
self, task_id: str, agent_id: str, outcome: str):
"""记录 Agent 对广播任务的反馈(Spawner 调用的公共 API"""
tracker = self._broadcast_tracker.get(task_id)
if not tracker:
return
if outcome == "claimed":
# Agent 认领了,清理 tracker
self._broadcast_tracker.pop(task_id, None)
else:
tracker.responded_agents.add(agent_id)
# 异步轮次结束检测:所有已通知的 Agent 都已反馈
if tracker.notified_agents and tracker.responded_agents >= tracker.notified_agents:
# 一轮结束
tracker.round_number += 1
tracker.notified_agents.clear()
tracker.responded_agents.clear()
logger.info("Broadcast round %d completed for task %s (async callback, no taker)",
tracker.round_number, task_id)
def _get_all_agent_ids(self) -> List[str]:
"""获取所有配置的 Agent ID"""
if self.dispatcher and hasattr(
self.dispatcher, 'router') and self.dispatcher.router:
return list(self.dispatcher.router.agent_profiles.keys())
return []
def _get_idle_agents(self) -> List[str]:
"""获取当前空闲的 Agent 列表(从 config agents 取,而非 counter._per_agent"""
if not self.counter:
return []
# agent_profiles 在 Router 初始化时从 config 填充,是完整 Agent 列表
all_agents = list(
self.dispatcher.router.agent_profiles.keys()) if self.dispatcher else []
active = self.counter.active_agents
return [aid for aid in all_agents if active.get(aid, 0) == 0]
async def _dispatch_reviews(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 review 状态任务,检查是否有产出,调度审查 Agent"""
# handler 项目(_mail/_toolchain)不走 review 流程
handler = TaskTypeRegistry.get_by_project(project_id)
if handler:
return []
queries = Queries(db_path)
bb = Blackboard(db_path)
review_tasks = queries.tasks_by_status("review")
dispatched: List[str] = []
for task in review_tasks:
# 检查是否已有 review 记录
reviews = bb.get_reviews(task.id)
if reviews:
continue # 已有审查,跳过
# 检查是否最近已 dispatch 过 review(防重复 dispatch
existing = self._check_recent_routing(db_path, task.id, "review")
if existing:
continue # 已有活跃 review dispatch
# #07.2: crash_limit 已移到 _check_timeouts 统一检查
# 检查是否有产出(司马懿建议:无产出直接标 failed)
outputs = bb.get_outputs(task.id)
if not outputs:
conn = get_connection(db_path)
try:
self._transition_status(
conn, task.id, "failed",
agent="daemon",
detail={"reason": "no_output_for_review"},
)
bb.add_observation(
task.id, "daemon",
"任务进入 review 状态但没有产出物,自动标记为 failed",
)
logger.warning("Task %s in review but no output, marking failed",
task.id)
finally:
conn.close()
continue
# 调度审查 Agent(司马懿)
try:
result = await self.dispatcher.dispatch(
task,
action_type="review",
project_config={
"project_id": project_id,
"db_path": db_path},
)
if result["status"] == "dispatched":
dispatched.append(task.id)
# 更新 current_agent 为审查者
conn = get_connection(db_path)
try:
conn.execute(
"UPDATE tasks SET current_agent=? WHERE id=?",
(result["agent_id"], task.id),
)
conn.commit()
finally:
conn.close()
logger.info("Dispatched review for %s to %s",
task.id, result["agent_id"])
except Exception:
logger.exception("Review dispatch failed for %s", task.id)
# §15 Runaway Guard: 统一递增 dispatch_count (review)
if dispatched:
conn = get_connection(db_path)
try:
for tid in dispatched:
conn.execute(
"UPDATE tasks SET dispatch_count = COALESCE(dispatch_count, 0) + 1 WHERE id=?",
(tid,),
)
conn.commit()
finally:
conn.close()
return dispatched
# ------------------------------------------------------------------
# 僵尸/超时处理
# ------------------------------------------------------------------
def _check_timeouts(self, db_path: Path) -> List[str]:
"""检查 claimed/working 超时的任务"""
queries = Queries(db_path)
reclaimed: List[str] = []
now = datetime.utcnow() # UTC,与 SQLite datetime('now') 一致
# §15 Runaway Guard: per-task dispatch_count 上限检查
# 覆盖所有状态,防止无限循环 dispatch
MAX_DISPATCH_COUNT = 10
for status_to_check in ("pending", "working", "claimed"):
tasks_to_check = queries.tasks_by_status(status_to_check)
for task in tasks_to_check:
dispatch_count = getattr(task, 'dispatch_count', 0) or 0
if dispatch_count >= MAX_DISPATCH_COUNT:
conn = get_connection(db_path)
try:
ok = self._transition_status(
conn, task.id, "failed",
agent="daemon",
detail={"reason": "runaway_guard",
"dispatch_count": dispatch_count,
"message": f"dispatch {dispatch_count} 次仍未完成,自动标 failed"},
)
if ok:
reclaimed.append(task.id)
logger.error(
"Task %s: runaway guard triggered (dispatch_count=%d, status=%s), marking failed",
task.id, dispatch_count, status_to_check)
finally:
conn.close()
# claimed 超时 → 重置为 pending(如果 retry_count >= 3 则升级庞统)
claimed = queries.tasks_by_status("claimed")
for task in claimed:
if not task.claimed_at:
continue
try:
claimed_time = datetime.fromisoformat(task.claimed_at)
elapsed = (now - claimed_time).total_seconds() / 60.0
if elapsed > self.claim_timeout_minutes:
retry_count = getattr(task, 'retry_count', 0) or 0
# 司马懿建议:复用 retry_count>=3 则升级庞统
if retry_count >= 3:
conn = get_connection(db_path)
try:
self._transition_status(
conn, task.id, "escalated",
agent="daemon",
detail={"reason": "claim_timeout_no_taker",
"retry_count": retry_count},
)
reclaimed.append(task.id)
logger.warning("Escalated %s: no taker after %d broadcasts",
task.id, retry_count)
finally:
conn.close()
else:
conn = get_connection(db_path)
try:
ok = self._transition_status(
conn, task.id, "pending",
agent="daemon",
detail={"reason": "claim_timeout",
"elapsed_minutes": round(elapsed, 1)},
)
if ok:
# 递增 retry_count(复用为广播轮次计数)
conn.execute(
"UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?",
(task.id,),
)
conn.commit()
reclaimed.append(task.id)
logger.info("Reclaimed %s: claimed → pending (timeout %.1fm, retry=%d)",
task.id, elapsed, retry_count + 1)
finally:
conn.close()
except (ValueError, TypeError):
pass
# working 超时 → 标记为 failed
working = queries.tasks_by_status("working")
for task in working:
# #07.2: crash_limit 统一检查(比超时更严重的信号)
if self.dispatcher and hasattr(
self.dispatcher, '_check_crash_limit'):
if self.dispatcher._check_crash_limit(
task.id, db_path, limit=3, window_minutes=30):
conn = get_connection(db_path)
try:
self._transition_status(
conn, task.id, "failed",
agent="daemon",
detail={"reason": "crash_limit",
"message": "30 分钟内 crash 3 次,自动标 failed"},
)
finally:
conn.close()
reclaimed.append(task.id)
logger.error(
"Task %s: executor crash limit (3/30m), marking failed", task.id)
continue
# #07.3 ACT-1: updated_at fallback 覆盖 mail auto-working(无 started_at/claimed_at
start_time_str = task.started_at or task.claimed_at or task.updated_at
if not start_time_str:
continue
try:
start_time = datetime.fromisoformat(start_time_str)
# per-task timeout: deadline 优先,否则用默认值
if task.deadline:
deadline_time = datetime.fromisoformat(task.deadline)
timeout_minutes = (
deadline_time - start_time).total_seconds() / 60.0
if timeout_minutes < 1:
timeout_minutes = self.default_task_timeout_minutes
else:
timeout_minutes = self.default_task_timeout_minutes
elapsed = (now - start_time).total_seconds() / 60.0
if elapsed > timeout_minutes:
# [Step 5] handler 幻觉门控兜底:check_completion 通过 + working → done
handler = TaskTypeRegistry.get_by_project(self._current_project_id)
if handler and handler.check_completion(task.id, db_path):
conn = get_connection(db_path)
try:
ok = self._transition_status(
conn, task.id, "done",
agent="daemon",
detail={"reason": "mail_auto_done_recheck",
"elapsed_minutes": round(elapsed, 1)},
)
if ok:
reclaimed.append(task.id)
logger.info("Mail %s: ticker recheck found reply, marked done (%.1fm)",
task.id, elapsed)
finally:
conn.close()
continue
conn = get_connection(db_path)
try:
ok = self._transition_status(
conn, task.id, "failed",
agent="daemon",
detail={"reason": "task_timeout",
"elapsed_minutes": round(elapsed, 1),
"timeout_minutes": round(timeout_minutes, 1)},
)
if ok:
reclaimed.append(task.id)
logger.warning("Task %s timed out (working %.1fm > %.1fm)",
task.id, elapsed, timeout_minutes)
finally:
conn.close()
except (ValueError, TypeError):
pass
# v2.7.2: 进程存活性检查 — counter 占用但进程已死的兜底
if self.spawner and self.counter and hasattr(
self.counter, "active_agents"):
for agent_id in list(self.counter.active_agents.keys()) if hasattr(
self.counter, "active_agents") else []:
session_info = self.spawner.get_session_by_agent(agent_id)
if not session_info:
continue
pid = session_info.get("pid")
task_id_check = session_info.get("task_id")
if pid and not self._is_pid_alive(pid):
logger.warning("Agent %s process dead (pid=%d), releasing counter",
agent_id, pid)
self.counter.release(agent_id)
# #07.2: review 状态不推 pending,保持 review 等 _dispatch_reviews 处理
# working 状态推回 pending 让 ticker 重新 dispatch
if task_id_check and db_path:
try:
conn = get_connection(db_path)
try:
current_row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (
task_id_check,)
).fetchone()
if current_row and current_row["status"] == "review":
logger.info(
"Task %s in review, keeping status (process dead)", task_id_check)
else:
self._transition_status(
conn, task_id_check, "pending",
agent="daemon",
detail={
"reason": "process_dead", "pid": pid},
)
finally:
conn.close()
except Exception:
logger.exception(
"Failed to handle process dead for task %s", task_id_check)
# #07.2: Fix-3b 已删除。review 超时/crash 统一由 process_dead + _check_timeouts 处理
return reclaimed
def _mail_check_reply(self, original_task_id: str, db_path: Path) -> bool:
"""Mail 幻觉门控:检查是否有回复邮件"""
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1",
(original_task_id, f'%{original_task_id}%'),
).fetchone()
return row is not None
finally:
conn.close()
except Exception as e:
logger.error(
"Mail %s: ticker reply check error: %s",
original_task_id,
e)
return True # 保守:查询失败假设有回复
def _check_recent_routing(self, db_path: Path, task_id: str,
action_type: str) -> bool:
"""检查最近 5 分钟内是否已 dispatch 过指定类型的路由(防重复)"""
try:
conn = get_connection(db_path)
try:
# 检查是否有 from_status=review 的 dispatched 记录(防止重复 review
# dispatch
if action_type == "review":
row = conn.execute(
"SELECT COUNT(*) as cnt FROM routing_decisions "
"WHERE task_id=? AND outcome='dispatched' "
"AND from_status='review' "
"AND created_at > datetime('now', '-5 minutes')",
(task_id,),
).fetchone()
else:
row = conn.execute(
"SELECT COUNT(*) as cnt FROM routing_decisions "
"WHERE task_id=? AND outcome='dispatched' "
"AND created_at > datetime('now', '-5 minutes')",
(task_id,),
).fetchone()
return row["cnt"] > 0 if row else False
finally:
conn.close()
except Exception:
return False
# ------------------------------------------------------------------
# PM2 Crash 启动恢复 (#06)
# ------------------------------------------------------------------
def _startup_recover(self) -> Dict[str, Any]:
"""PM2 crash 后启动恢复:扫描所有非终态任务,从黑板线索重建一致状态"""
NON_TERMINAL = {"claimed", "working", "review", "reviewing"}
projects = self.registry.list_projects()
recovery_report = {
"projects": {},
"total_recovered": 0,
"total_noop": 0}
# 收集所有需要扫描的项目(registry + 虚拟项目)
project_dirs = {}
for project_id, project_info in projects.items():
if project_info.get("status") == "active":
project_dirs[project_id] = self.registry.root / \
project_id / "blackboard.db"
# 虚拟项目:_general + 注册表自动发现
virtual_ids = ["_general"] + TaskTypeRegistry.virtual_projects()
for virtual_id in virtual_ids:
virtual_db = Path(self.registry.root) / \
virtual_id / "blackboard.db"
if virtual_db.exists() and virtual_id not in project_dirs:
project_dirs[virtual_id] = virtual_db
for project_id, db_path in project_dirs.items():
if not db_path.exists():
continue
# init_db 如果还没初始化
db_key = str(db_path)
if db_key not in self._initialized_dbs:
from src.blackboard.db import init_db
init_db(db_path)
self._initialized_dbs.add(db_key)
# #06: 临时设置 _current_project_id,确保 _transition_status
# 对 _mail 项目不清空 assigneeL839 特殊分支依赖此字段)
old_pid = self._current_project_id
self._current_project_id = project_id
try:
recovered, noop_count = self._recover_project(
db_path, NON_TERMINAL)
if recovered:
recovery_report["projects"][project_id] = recovered
recovery_report["total_recovered"] += len(recovered)
recovery_report["total_noop"] += noop_count
except Exception:
logger.exception(
"Startup recovery failed for project %s", project_id)
finally:
self._current_project_id = old_pid
if recovery_report["total_recovered"] > 0:
logger.info("Startup recovery: %d tasks recovered across %d projects",
recovery_report["total_recovered"],
len(recovery_report["projects"]))
elif recovery_report["total_noop"] > 0:
logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)",
recovery_report["total_noop"])
else:
logger.info(
"Startup recovery: no non-terminal tasks found, clean start")
return recovery_report
def _recover_project(self, db_path: Path, non_terminal: set) -> tuple:
"""恢复单个项目中的非终态任务
Returns:
(recovered_list, noop_count)
"""
conn = get_connection(db_path)
recovered = []
noop_count = 0
try:
for status in non_terminal:
rows = conn.execute(
"SELECT id, assignee, current_agent, updated_at FROM tasks WHERE status=?",
(status,)
).fetchall()
for task in rows:
try:
action = self._determine_recovery_action(
conn, task, status, db_path)
if action:
self._execute_recovery(
conn, task["id"], action, db_path)
recovered.append(
{"task_id": task["id"], "from": status, "action": action})
else:
# 审计:保持原状的任务也记录事件
noop_count += 1
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
(task["id"], "daemon", "startup_recovery_noop",
json.dumps({"status": status, "reason": "no_action_needed"}))
)
conn.commit()
except Exception:
logger.exception(
"Startup recovery failed for task %s", task["id"])
finally:
conn.close()
return recovered, noop_count
def _determine_recovery_action(self, conn, task, status: str,
db_path: Path) -> Optional[str]:
"""根据黑板线索决定恢复动作,返回 None 表示不需要干预"""
task_id = task["id"]
if status == "claimed":
# claimed 统一推 pending
return "push_to_pending"
if status == "working":
return self._recover_working_task(conn, task_id)
if status == "review":
return self._recover_review_task(conn, task_id)
if status == "reviewing":
# reviewing → 查 events 找前置状态(done 或 failed),精确恢复
prev_status = self._find_pre_reviewing_status(conn, task_id)
if prev_status == "failed":
return "push_to_failed"
return "push_to_done" # 兜底 done
return None
def _recover_working_task(self, conn, task_id: str) -> Optional[str]:
"""working 状态恢复:看黑板线索判断 agent 是否实际完成了工作"""
# 最后一次 attempt
last_attempt = conn.execute(
"SELECT outcome, agent FROM task_attempts WHERE task_id=? ORDER BY id DESC LIMIT 1",
(task_id,)
).fetchone()
if not last_attempt or last_attempt["outcome"] != "completed":
return "push_to_pending_keep_agent" # 保留 current_agent
# agent 正常退出,看是否有产出
has_output = conn.execute(
"SELECT 1 FROM outputs WHERE task_id=? LIMIT 1", (task_id,)
).fetchone() is not None
if not has_output:
return "push_to_pending_keep_agent"
# 看是否有 handoff
has_handoff = conn.execute(
"SELECT 1 FROM comments WHERE task_id=? AND comment_type='handoff' LIMIT 1",
(task_id,)
).fetchone() is not None
if has_handoff:
return "push_to_review" # agent 已完成工作并交接
return "push_to_pending_keep_agent"
def _recover_review_task(self, conn, task_id: str) -> Optional[str]:
"""review 状态恢复:看是否有审查结论"""
# 检查 reviews 表
review = conn.execute(
"SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
(task_id,)
).fetchone()
if review:
if review["verdict"] == "approved":
return "push_to_done"
else:
# needs_revision / rejected → 推回 pending 重新执行
return "push_to_pending"
# 无审查结论 → 保持 reviewticker 自然会 dispatch reviewer
return None
def _execute_recovery(self, conn, task_id: str,
action: str, db_path: Path):
"""执行恢复动作"""
# 获取原始状态(用于审计)
orig_row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
orig_status = orig_row["status"] if orig_row else "unknown"
if action == "push_to_pending":
self._transition_status(
conn, task_id, "pending",
agent="daemon",
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
# 清空 current_agent(常规推 pending,无特定 agent 接手)
conn.execute(
"UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,))
conn.commit()
elif action == "push_to_pending_keep_agent":
self._transition_status(
conn, task_id, "pending",
agent="daemon",
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
# 保留 current_agent,让同一 agent 重新接手
conn.commit()
elif action == "push_to_review":
self._transition_status(
conn, task_id, "review",
agent="daemon",
detail={
"reason": "startup_recovery",
"original_status": "working"},
)
conn.commit()
elif action == "push_to_done":
self._transition_status(
conn, task_id, "done",
agent="daemon",
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
conn.commit()
elif action == "push_to_failed":
self._transition_status(
conn, task_id, "failed",
agent="daemon",
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
conn.commit()
# 记录恢复审计事件
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
(task_id, "daemon", "startup_recovery",
json.dumps({"action": action}))
)
conn.commit()
logger.info(
"Recovery: task %s%s (action=%s)",
task_id,
action,
action)
def _find_pre_reviewing_status(self, conn, task_id: str) -> str:
"""查 events 表找到 reviewing 之前的状态(done 或 failed"""
# _transition_status 写入 event_type=f"task_{new_status}"detail 用
# from/to
rows = conn.execute(
"""SELECT detail FROM events
WHERE task_id=? AND event_type='task_reviewing'
ORDER BY id DESC LIMIT 1""",
(task_id,)
).fetchall()
for event in rows:
try:
detail = json.loads(event["detail"])
# _transition_status detail 格式: {"from": old_status, "to":
# new_status, ...}
prev = detail.get("from") or detail.get("old_status")
if prev in ("done", "failed"):
return prev
except (json.JSONDecodeError, KeyError):
continue
return "done" # 找不到则兜底 donereviewing 只从 done/failed 转入)
# ------------------------------------------------------------------
# 手动 tickAPI 端点触发)
# ------------------------------------------------------------------
async def manual_tick(self) -> Dict[str, Any]:
"""手动触发一次 tick"""
logger.info("Manual tick triggered")
result = await self.tick()
result["manual"] = True
return result