9ec601d747
§15 Runaway Guard — per-task dispatch_count 上限,防止无限循环 dispatch 问题:mail/toolchain task 走 handler auto-working(跳过 claim),不受 claim_timeout 3 次重试兜底保护。如果反复 spawn 但永远到不了 done/failed, 会无限循环消耗资源(实际案例:2026-06-15 mention 重复投递事件)。 设计: - tasks 表新增 dispatch_count 字段 - 每次 ticker 成功 dispatch 时递增 - dispatch_count >= 10 时自动标 failed(reason=runaway_guard) - 覆盖所有非终态(pending/working/claimed) - 参考 Hermes v0.13 §3 Per-Task 重试上限 改动文件: - src/blackboard/db.py: _safe_add_column dispatch_count - src/blackboard/models.py: Task dataclass 加 dispatch_count - src/daemon/ticker.py: dispatch 递增 + _check_timeouts runaway guard - docs/design/15-runaway-guard.md: 设计文档 - tests/integration/test_ticker_integration.py: E13 测试 3 个 测试:456 passed, 3 skipped
1957 lines
80 KiB
Python
1957 lines
80 KiB
Python
"""Daemon Ticker — 30s 状态扫描 + 依赖推进 + 事件驱动
|
||
|
||
Tick 循环是整个 Daemon 的核心驱动:
|
||
1. 遍历所有 active 项目
|
||
2. 每个 tick 扫描黑板状态
|
||
3. 推进依赖链(blocked → pending)
|
||
4. 写入 daemon_tick 事件
|
||
5. 调度 pending 任务(F9 实现)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any, Callable, Coroutine, Dict, List, Optional
|
||
|
||
from dataclasses import dataclass, field as dc_field
|
||
|
||
from src.daemon.task_type_registry import TaskTypeRegistry
|
||
|
||
from src.blackboard.operations import Blackboard
|
||
from src.blackboard.db import get_connection
|
||
from src.daemon.spawner import AgentBusyError
|
||
from src.blackboard.queries import Queries
|
||
from src.blackboard.registry import ProjectRegistry
|
||
|
||
|
||
@dataclass
|
||
class BroadcastRound:
|
||
"""追踪单个任务的广播状态"""
|
||
task_id: str
|
||
notified_agents: set = dc_field(default_factory=set) # 已 spawn 过的 Agent
|
||
responded_agents: set = dc_field(
|
||
default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY)
|
||
round_number: int = 0 # 当前第几轮(0=未开始,1=第1轮)
|
||
|
||
|
||
logger = logging.getLogger("moziplus-v2.ticker")
|
||
|
||
|
||
class Ticker:
|
||
"""Daemon ticker 主循环"""
|
||
|
||
def __init__(
|
||
self,
|
||
registry: ProjectRegistry,
|
||
tick_interval: float = 30.0,
|
||
max_ticks: Optional[int] = None,
|
||
on_tick_complete: Optional[Callable[[],
|
||
Coroutine[Any, Any, None]]] = None,
|
||
dispatcher: Optional[Any] = None,
|
||
spawner: Optional[Any] = None,
|
||
max_dispatch_per_tick: int = 3,
|
||
claim_timeout_minutes: float = 5.0,
|
||
default_task_timeout_minutes: float = 30.0,
|
||
health_checker: Optional[Any] = None,
|
||
experience_distiller: Optional[Any] = None,
|
||
inbox_watcher: Optional[Any] = None,
|
||
):
|
||
"""
|
||
Args:
|
||
registry: 项目注册表
|
||
tick_interval: tick 间隔秒数
|
||
max_ticks: 测试用,限制最大 tick 次数
|
||
on_tick_complete: 每次 tick 完成后的回调(用于通知等)
|
||
dispatcher: Dispatcher 实例(Agent 调度)
|
||
spawner: AgentSpawner 实例(Agent spawn)
|
||
max_dispatch_per_tick: 每个 tick 最多调度多少个 pending 任务
|
||
claim_timeout_minutes: claimed 超时重置为 pending 的分钟数
|
||
default_task_timeout_minutes: working 超时标记 failed 的分钟数
|
||
"""
|
||
self.registry = registry
|
||
self.tick_interval = tick_interval
|
||
self.max_ticks = max_ticks
|
||
self.on_tick_complete = on_tick_complete
|
||
self.dispatcher = dispatcher
|
||
self.spawner = spawner
|
||
self.max_dispatch_per_tick = max_dispatch_per_tick
|
||
self.claim_timeout_minutes = claim_timeout_minutes
|
||
self.default_task_timeout_minutes = default_task_timeout_minutes
|
||
self.health_checker = health_checker
|
||
self.experience_distiller = experience_distiller
|
||
self.inbox_watcher = inbox_watcher
|
||
|
||
self._tick_count: int = 0
|
||
self._running: bool = False
|
||
self._task: Optional[asyncio.Task] = None
|
||
|
||
# 已初始化的 db_path 集合,避免每次 tick 重复 init_db
|
||
self._initialized_dbs: set = set()
|
||
|
||
# 每个项目上次 tick 的 event count(用于僵尸检测 F8)
|
||
self._last_event_counts: Dict[str, int] = {}
|
||
|
||
# 广播轮次追踪(Phase 1 bug fix)
|
||
self._broadcast_tracker: Dict[str, BroadcastRound] = {}
|
||
|
||
# 当前项目 ID(_dispatch_pending 需要)
|
||
self._current_project_id: Optional[str] = None
|
||
|
||
# ------------------------------------------------------------------
|
||
# 生命周期
|
||
# ------------------------------------------------------------------
|
||
|
||
async def start(self) -> None:
|
||
"""启动 tick 循环"""
|
||
if self._running:
|
||
return
|
||
self._running = True
|
||
|
||
# 启动恢复(PM2 crash 后重建一致状态)
|
||
self._startup_recover()
|
||
|
||
self._task = asyncio.create_task(self._loop())
|
||
# 启动 InboxWatcher(即时事件监听)
|
||
if self.inbox_watcher:
|
||
try:
|
||
await self.inbox_watcher.start()
|
||
logger.info("InboxWatcher started")
|
||
except Exception as e:
|
||
logger.warning("InboxWatcher start failed: %s", e)
|
||
logger.info("Ticker started (interval=%.1fs)", self.tick_interval)
|
||
|
||
async def stop(self) -> None:
|
||
"""停止 tick 循环"""
|
||
self._running = False
|
||
# 停止 InboxWatcher
|
||
if self.inbox_watcher:
|
||
try:
|
||
await self.inbox_watcher.stop()
|
||
logger.info("InboxWatcher stopped")
|
||
except Exception as e:
|
||
logger.warning("InboxWatcher stop failed: %s", e)
|
||
if self._task:
|
||
self._task.cancel()
|
||
try:
|
||
await self._task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
logger.info("Ticker stopped (ticks=%d)", self._tick_count)
|
||
|
||
@property
|
||
def tick_count(self) -> int:
|
||
return self._tick_count
|
||
|
||
@property
|
||
def is_running(self) -> bool:
|
||
return self._running
|
||
|
||
# ------------------------------------------------------------------
|
||
# 主循环
|
||
# ------------------------------------------------------------------
|
||
|
||
async def _loop(self) -> None:
|
||
"""Tick 主循环"""
|
||
while self._running:
|
||
try:
|
||
await self.tick()
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception:
|
||
logger.exception("Tick %d error", self._tick_count + 1)
|
||
|
||
if self.max_ticks and self._tick_count >= self.max_ticks:
|
||
logger.info("Max ticks reached (%d), stopping", self.max_ticks)
|
||
self._running = False
|
||
break
|
||
|
||
try:
|
||
await asyncio.sleep(self.tick_interval)
|
||
except asyncio.CancelledError:
|
||
return
|
||
|
||
# ------------------------------------------------------------------
|
||
# 单次 tick
|
||
# ------------------------------------------------------------------
|
||
|
||
async def tick(self) -> Dict[str, Any]:
|
||
"""执行一次 tick,遍历所有 active 项目"""
|
||
self._tick_count += 1
|
||
tick_num = self._tick_count
|
||
results: Dict[str, Any] = {
|
||
"tick": tick_num,
|
||
"projects": {},
|
||
}
|
||
|
||
projects = self.registry.list_projects()
|
||
active_projects = {
|
||
pid: info for pid, info in projects.items()
|
||
if info.get("status") == "active"
|
||
}
|
||
|
||
for project_id, project_info in active_projects.items():
|
||
try:
|
||
pr = await self._tick_project(project_id, project_info)
|
||
results["projects"][project_id] = pr
|
||
except Exception as e:
|
||
logger.exception(
|
||
"Tick %d project %s error",
|
||
tick_num,
|
||
project_id)
|
||
results["projects"][project_id] = {"error": str(e)}
|
||
|
||
# 虚拟项目 _general:不在 registry 但需要调度
|
||
general_db = Path(self.registry.root) / "_general" / "blackboard.db"
|
||
if general_db.exists() and "_general" not in active_projects:
|
||
try:
|
||
pr = await self._tick_project("_general", {
|
||
"id": "_general", "name": "一般任务",
|
||
"status": "active", "source": "virtual",
|
||
})
|
||
results["projects"]["_general"] = pr
|
||
except Exception as e:
|
||
logger.exception("Tick %d _general error", tick_num)
|
||
results["projects"]["_general"] = {"error": str(e)}
|
||
|
||
# 虚拟项目:从注册表自动发现 + _general 硬编码
|
||
for vp in TaskTypeRegistry.virtual_projects():
|
||
vp_db = Path(self.registry.root) / vp / "blackboard.db"
|
||
if vp_db.exists() and vp not in active_projects:
|
||
try:
|
||
vp_handler = TaskTypeRegistry.get_by_project(vp)
|
||
vp_name = vp_handler.display_name if vp_handler and vp_handler.display_name else vp
|
||
pr = await self._tick_project(vp, {
|
||
"id": vp, "name": vp_name,
|
||
"status": "active", "source": "virtual",
|
||
})
|
||
results["projects"][vp] = pr
|
||
except Exception as e:
|
||
logger.exception("Tick %d %s error", tick_num, vp)
|
||
results["projects"][vp] = {"error": str(e)}
|
||
|
||
logger.debug(
|
||
"Tick %d complete: %d projects",
|
||
tick_num,
|
||
len(active_projects))
|
||
|
||
if self.on_tick_complete:
|
||
try:
|
||
await self.on_tick_complete()
|
||
except Exception:
|
||
logger.exception("on_tick_complete callback error")
|
||
|
||
return results
|
||
|
||
async def _tick_project(self, project_id: str,
|
||
project_info: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""单项目的 tick 处理"""
|
||
project_dir = self.registry.root / project_id
|
||
db_path = project_dir / "blackboard.db"
|
||
|
||
if not db_path.exists():
|
||
return {"status": "no_db"}
|
||
|
||
# 只在首次遇到该 db_path 时执行 init_db
|
||
db_key = str(db_path)
|
||
if db_key not in self._initialized_dbs:
|
||
from src.blackboard.db import init_db
|
||
init_db(db_path)
|
||
self._initialized_dbs.add(db_key)
|
||
|
||
result: Dict[str, Any] = {
|
||
"status": "ok",
|
||
"summary_before": {},
|
||
"summary_after": {},
|
||
"advanced": [],
|
||
"dispatched": [],
|
||
"review_dispatched": [],
|
||
"zombie_reclaimed": [],
|
||
"parents_refreshed": [],
|
||
}
|
||
|
||
# 保存当前 project_id(供 _dispatch_pending 使用)
|
||
self._current_project_id = project_id
|
||
|
||
# 1. 扫描当前状态
|
||
queries = Queries(db_path)
|
||
result["summary_before"] = queries.task_summary()
|
||
|
||
# 2. 依赖推进
|
||
advanced = self._advance_dependencies(db_path)
|
||
result["advanced"] = advanced
|
||
|
||
# 3. 僵尸/超时处理(在依赖推进后、调度前)
|
||
zombie_reclaimed = self._check_timeouts(db_path)
|
||
result["zombie_reclaimed"] = zombie_reclaimed
|
||
|
||
# 4. 调度 pending 任务
|
||
if self.dispatcher and self.spawner:
|
||
dispatched = await self._dispatch_pending(db_path, project_id)
|
||
result["dispatched"] = dispatched
|
||
|
||
# 5. 调度审查任务
|
||
if self.dispatcher and self.spawner:
|
||
review_dispatched = await self._dispatch_reviews(db_path, project_id)
|
||
result["review_dispatched"] = review_dispatched
|
||
|
||
# 6. 聚合父 Task 状态(v2.7)
|
||
parents_refreshed = self._refresh_parent_statuses(db_path)
|
||
result["parents_refreshed"] = parents_refreshed
|
||
|
||
# 7. 一轮结束检测 + 庞统 review spawn(v2.9 #01)
|
||
round_reviewed = await self._check_round_complete(db_path, project_id)
|
||
result["round_reviewed"] = round_reviewed
|
||
|
||
# 8. @mention 通知(v2.9 #01)
|
||
mentions_processed = await self._process_mentions(db_path, project_id)
|
||
result["mentions_processed"] = mentions_processed
|
||
|
||
# 9. 写 daemon_tick 事件
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
|
||
(None, "daemon", "daemon_tick",
|
||
json.dumps({"tick": self._tick_count, "advanced_count": len(advanced),
|
||
"parents_refreshed": len(parents_refreshed)})),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
# 8. 健康检查(僵尸检测)
|
||
if self.health_checker:
|
||
try:
|
||
self.health_checker.check(
|
||
project_id, db_path, self._tick_count)
|
||
except Exception as e:
|
||
logger.warning("HealthChecker error for %s: %s", project_id, e)
|
||
|
||
# 9. 经验蒸馏(完成的 task 自动触发)
|
||
if self.experience_distiller:
|
||
try:
|
||
conn2 = get_connection(db_path)
|
||
try:
|
||
done_tasks = conn2.execute(
|
||
"SELECT id FROM tasks WHERE status='done' AND updated_at > datetime('now', '-60 seconds')"
|
||
).fetchall()
|
||
finally:
|
||
conn2.close()
|
||
for row in done_tasks:
|
||
t = Blackboard(db_path).get_task(row[0])
|
||
if t:
|
||
self.experience_distiller.distill_from_task(
|
||
task_id=t.id, task_title=t.title, task_type=t.task_type
|
||
)
|
||
except Exception as e:
|
||
logger.warning(
|
||
"ExperienceDistiller error for %s: %s", project_id, e)
|
||
|
||
# 10. 扫描后状态
|
||
result["summary_after"] = queries.task_summary()
|
||
|
||
return result
|
||
|
||
# ------------------------------------------------------------------
|
||
# 父 Task 状态聚合
|
||
# ------------------------------------------------------------------
|
||
|
||
def _refresh_parent_statuses(self, db_path: Path) -> List[str]:
|
||
"""全量扫描有子 Task 的父 Task,刷新聚合状态
|
||
|
||
跳过手动状态(cancelled, paused)的父 Task。使用单连接批量处理。
|
||
"""
|
||
queries = Queries(db_path)
|
||
refreshed: List[str] = []
|
||
|
||
conn = get_connection(db_path)
|
||
try:
|
||
# 找出所有有子 Task 的父 Task ID
|
||
parent_rows = conn.execute(
|
||
"SELECT DISTINCT parent_task FROM tasks WHERE parent_task IS NOT NULL"
|
||
).fetchall()
|
||
parent_ids = [r["parent_task"] for r in parent_rows]
|
||
|
||
for pid in parent_ids:
|
||
computed = queries.compute_parent_status(pid)
|
||
if computed is None:
|
||
continue
|
||
parent = conn.execute(
|
||
"SELECT status FROM tasks WHERE id=?", (pid,)
|
||
).fetchone()
|
||
if parent and parent["status"] != computed:
|
||
conn.execute(
|
||
"UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?",
|
||
(computed, pid),
|
||
)
|
||
refreshed.append(pid)
|
||
logger.info(
|
||
"Parent %s status aggregated: → %s", pid, computed)
|
||
|
||
if refreshed:
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
return refreshed
|
||
|
||
# ------------------------------------------------------------------
|
||
# 一轮结束检测 + 庞统 review (v2.9 #01)
|
||
# ------------------------------------------------------------------
|
||
|
||
MAX_ROUNDS = 5 # §4.5 防无限循环
|
||
|
||
async def _check_round_complete(self, db_path: Path,
|
||
project_id: str) -> List[str]:
|
||
"""检测 parent task 下所有 sub task 终态 → spawn 庞统 review
|
||
|
||
流程(§4.4):
|
||
1. 扫描所有 parent task
|
||
2. 对每个 parent 检查 sub task 是否全部终态
|
||
3. 检查 round_count 上限
|
||
4. increment round_count
|
||
5. spawn 庞统 review
|
||
"""
|
||
if not self.dispatcher or not self.spawner:
|
||
return []
|
||
|
||
bb = Blackboard(db_path)
|
||
reviewed: List[str] = []
|
||
|
||
# 找所有 parent task(有子 task 的)
|
||
conn = get_connection(db_path)
|
||
try:
|
||
parent_rows = conn.execute(
|
||
"SELECT DISTINCT parent_task FROM tasks WHERE parent_task IS NOT NULL"
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
for row in parent_rows:
|
||
parent_id = row["parent_task"]
|
||
try:
|
||
summary = bb.get_subtasks_summary(parent_id)
|
||
if not summary or not summary["all_terminal"]:
|
||
continue
|
||
|
||
# 检查 parent 自身状态:只有 done 状态的 parent 才触发
|
||
# (聚合后 parent 状态为 done 说明所有 sub 都完了)
|
||
# BUG-2 fix: done 和 failed 都触发 review(failed 时优先判断重试/换人)
|
||
if summary["parent_status"] not in ("done", "failed"):
|
||
continue
|
||
|
||
# 检查 round_count 上限
|
||
if summary["round_count"] >= self.MAX_ROUNDS:
|
||
logger.warning(
|
||
"Parent %s reached max rounds (%d), skipping review",
|
||
parent_id, self.MAX_ROUNDS)
|
||
continue
|
||
|
||
# 构建 prompt(用即将成为的 round_num,不提前 increment)
|
||
next_round = summary["round_count"] + 1
|
||
outputs = bb.get_aggregate_outputs(parent_id)
|
||
comments = bb.get_round_comments(parent_id)
|
||
parent_task = bb.get_task(parent_id)
|
||
|
||
if not parent_task:
|
||
continue
|
||
|
||
# spawn 庞统 review(先 spawn,成功后再 increment round_count)
|
||
review_prompt = self._build_review_prompt(
|
||
parent_task, summary, outputs, comments, next_round,
|
||
project_id=project_id
|
||
)
|
||
|
||
spawned = await self._spawn_pangtong_review(
|
||
parent_task, review_prompt, project_id, new_round=next_round
|
||
)
|
||
if spawned:
|
||
# spawn 成功 → increment round_count + 标记 reviewing
|
||
new_round = bb.increment_round_count(parent_id)
|
||
reviewed.append(parent_id)
|
||
logger.info(
|
||
"Round %d review spawned for parent %s (subs: %s)",
|
||
new_round, parent_id, summary
|
||
)
|
||
except Exception:
|
||
logger.exception("Round check error for parent %s", parent_id)
|
||
|
||
return reviewed
|
||
|
||
def _build_review_prompt(self, parent_task, summary: dict,
|
||
outputs: list, comments: list,
|
||
round_num: int,
|
||
project_id: str = "") -> str:
|
||
"""构建庞统 review prompt(§4.2 三问框架)"""
|
||
goal = parent_task.description or parent_task.title
|
||
must_haves = parent_task.must_haves or "{}"
|
||
|
||
# 成果物摘要(限制 token)
|
||
output_lines = []
|
||
for o in outputs[:20]: # 最多 20 个
|
||
output_lines.append(
|
||
f"- [{o.get('task_title', '?')}] {o.get('title', '?')} "
|
||
f"({o.get('output_type', '?')}) by {o.get('agent', '?')}"
|
||
)
|
||
|
||
# 讨论摘要(限制 50 条)
|
||
comment_lines = []
|
||
for c in comments[:50]:
|
||
comment_lines.append(
|
||
f"[{c.get('created_at', '?')[:16]}] {c.get('author', '?')}: {c.get('body', '?')[:200]}"
|
||
)
|
||
|
||
return f"""## 庞统 Review(第 {round_num} 轮)
|
||
|
||
### Goal
|
||
{goal}
|
||
|
||
### 验收标准
|
||
{must_haves}
|
||
|
||
### 本轮 Sub Task 状态
|
||
- 完成: {summary['done']}
|
||
- 失败: {summary['failed']}
|
||
- 取消: {summary['cancelled']}
|
||
- 总计: {summary['total']}
|
||
|
||
### 成果物
|
||
{chr(10).join(output_lines) if output_lines else '无'}
|
||
|
||
### 黑板讨论
|
||
{chr(10).join(comment_lines) if comment_lines else '无'}
|
||
|
||
### 三问
|
||
1. Goal 还清晰吗?(是否有 goal drift)
|
||
2. 成果物覆盖 goal 了吗?(逐条检查验收标准)
|
||
3. 下一轮需要做什么?(创建新 sub tasks / 标记完成 / 调整方向)
|
||
|
||
### 失败处理
|
||
{f'有 {summary["failed"]} 个 sub task failed,优先判断是应该重试、换人、还是调整方案。' if summary['failed'] > 0 else '无失败'}
|
||
|
||
### 你可以
|
||
- 创建新一轮 sub tasks: POST http://{self.spawner.api_host}:{self.spawner.api_port}/api/projects/{project_id}/tasks
|
||
body: {"title": "...", "description": "...", "parent_task": "{parent_task.id}", "assignee": "zhangfei-dev"}
|
||
- 调整 goal(更新 parent task description/must_haves)
|
||
- 标记完成(如果 goal 已达成,回复 GOAL_ACHIEVED)
|
||
|
||
Round 上限: {self.MAX_ROUNDS}(当前第 {round_num} 轮)
|
||
|
||
Project ID: {project_id}
|
||
Parent Task ID: {parent_task.id}
|
||
"""
|
||
|
||
async def _spawn_pangtong_review(self, parent_task,
|
||
review_prompt: str,
|
||
project_id: str,
|
||
new_round: int = 0) -> bool:
|
||
"""Spawn 庞统进行 review
|
||
|
||
流程:
|
||
1. spawn 庞统
|
||
2. 成功后把 parent 设为 reviewing(防重复触发)
|
||
3. 通过 on_complete 回调消费庞统结论
|
||
"""
|
||
try:
|
||
agent_id = "pangtong-fujunshi"
|
||
f"review-{parent_task.id}-r{new_round}"
|
||
|
||
# 构造 on_complete 回调:解析庞统结论,更新 parent 状态
|
||
async def _on_review_complete(aid: str, outcome: str):
|
||
try:
|
||
# 从 spawner session meta 读取庞统的回复文本
|
||
review_text = ""
|
||
if self.spawner:
|
||
# 找庞统最新完成的 session
|
||
latest_meta = None
|
||
latest_time = ""
|
||
for sid, sess in self.spawner._sessions.items():
|
||
if sess.get(
|
||
"agent_id") == agent_id and sess.get("meta"):
|
||
t = sess.get("completed_at", "")
|
||
if t > latest_time:
|
||
latest_time = t
|
||
latest_meta = sess["meta"]
|
||
if latest_meta and isinstance(latest_meta, dict):
|
||
payloads = latest_meta.get("payloads", [])
|
||
review_text = " ".join(
|
||
p.get("text", "") for p in payloads if isinstance(p, dict)
|
||
)
|
||
self._handle_review_conclusion(
|
||
parent_task.id, project_id, review_text, new_round)
|
||
except Exception:
|
||
logger.exception("Review conclusion handler failed for %s",
|
||
parent_task.id)
|
||
|
||
# 用 spawner.spawn_full_agent 直接 spawn
|
||
result = await self.spawner.spawn_full_agent(
|
||
agent_id=agent_id,
|
||
message=review_prompt,
|
||
task_id=parent_task.id,
|
||
use_main_session=True, # #02: 投递到 main session
|
||
task_db_path=None,
|
||
on_complete=_on_review_complete,
|
||
)
|
||
|
||
if result is not None:
|
||
# spawn 成功 → parent 进入 reviewing(防下个 tick 重复触发)
|
||
self._set_parent_reviewing(parent_task.id, project_id)
|
||
return True
|
||
return False
|
||
except Exception:
|
||
logger.exception(
|
||
"Failed to spawn pangtong review for %s",
|
||
parent_task.id)
|
||
return False
|
||
|
||
def _set_parent_reviewing(self, parent_id: str, project_id: str):
|
||
"""将 parent task 状态设为 reviewing(防重复触发)"""
|
||
try:
|
||
db_path = self._resolve_db_path(project_id)
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"UPDATE tasks SET status='reviewing', updated_at=datetime('now') "
|
||
"WHERE id=? AND status IN ('done', 'failed')",
|
||
(parent_id,))
|
||
conn.commit()
|
||
logger.info("Parent %s → reviewing (round review in progress)",
|
||
parent_id)
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
logger.exception("Failed to set parent %s to reviewing", parent_id)
|
||
|
||
def _handle_review_conclusion(self, parent_id: str, project_id: str,
|
||
review_text: str, round_num: int):
|
||
"""解析庞统 review 结论,更新 parent 状态
|
||
|
||
review_text 是庞统回复的文本(从 spawner session meta payloads 拼接)。
|
||
"""
|
||
db_path = self._resolve_db_path(project_id)
|
||
conn = get_connection(db_path)
|
||
try:
|
||
# 解析 GOAL_ACHIEVED
|
||
is_achieved = bool(
|
||
review_text and "GOAL_ACHIEVED" in review_text.upper())
|
||
|
||
if is_achieved:
|
||
# Goal 达成 → parent 最终完成
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"UPDATE tasks SET status='done', updated_at=datetime('now') "
|
||
"WHERE id=? AND status='reviewing'",
|
||
(parent_id,))
|
||
conn.commit()
|
||
logger.info(
|
||
"Parent %s review conclusion: GOAL_ACHIEVED → done",
|
||
parent_id)
|
||
else:
|
||
# 庞统可能创建了新 sub task 或需要继续 → 恢复 working
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"UPDATE tasks SET status='working', updated_at=datetime('now') "
|
||
"WHERE id=? AND status='reviewing'",
|
||
(parent_id,))
|
||
conn.commit()
|
||
sub_count = conn.execute(
|
||
"SELECT COUNT(*) FROM tasks WHERE parent_task=?",
|
||
(parent_id,)
|
||
).fetchone()[0]
|
||
logger.info(
|
||
"Parent %s review conclusion: continue → working "
|
||
"(round %d, subs=%d)",
|
||
parent_id, round_num, sub_count)
|
||
except Exception:
|
||
logger.exception(
|
||
"Failed to handle review conclusion for %s",
|
||
parent_id)
|
||
# 安全恢复:reviewing → working
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"UPDATE tasks SET status='working', updated_at=datetime('now') "
|
||
"WHERE id=? AND status='reviewing'",
|
||
(parent_id,))
|
||
conn.commit()
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
conn.close()
|
||
|
||
def _resolve_db_path(self, project_id: str) -> Path:
|
||
"""解析项目 DB 路径"""
|
||
from src.utils import get_data_root
|
||
return get_data_root() / project_id / "blackboard.db"
|
||
|
||
# ------------------------------------------------------------------
|
||
# @mention 通知处理 (v2.9 #01)
|
||
# ------------------------------------------------------------------
|
||
|
||
MENTION_MAX_RETRIES = 5
|
||
|
||
async def _process_mentions(self, db_path: Path,
|
||
project_id: str) -> List[str]:
|
||
"""扫描 pending mentions → spawn 被 @ 的 Agent
|
||
|
||
流程(§3.4):
|
||
1. 扫描 mention_queue 中 pending 且 retry_count < 5 的记录
|
||
2. 按 mentioned_agent 分组,同一 agent 多条 mention 合并为一次 spawn
|
||
3. 尝试 spawn,成功 → notified,失败 → retry_count++
|
||
"""
|
||
if not self.spawner:
|
||
return []
|
||
|
||
bb = Blackboard(db_path)
|
||
mentions = bb.get_pending_mentions(
|
||
max_retries=self.MENTION_MAX_RETRIES)
|
||
if not mentions:
|
||
return []
|
||
|
||
# 按 mentioned_agent 分组
|
||
agent_mentions: Dict[str, List[Dict]] = {}
|
||
for m in mentions:
|
||
aid = m["mentioned_agent"]
|
||
agent_mentions.setdefault(aid, []).append(m)
|
||
|
||
processed: List[str] = []
|
||
|
||
for agent_id, items in agent_mentions.items():
|
||
try:
|
||
# 构建 mention 摘要
|
||
mention_lines = []
|
||
task_ids = set()
|
||
for item in items:
|
||
mention_lines.append(
|
||
f"- [{item.get('comment_author', '?')}] {item.get('comment_body', '')[:200]}"
|
||
)
|
||
task_ids.add(item["task_id"])
|
||
|
||
# 取第一个 task 作为 spawn 上下文
|
||
tid = items[0]["task_id"]
|
||
task = bb.get_task(tid)
|
||
if not task:
|
||
continue
|
||
|
||
# 构建 mention prompt
|
||
prompt = self._build_mention_prompt(
|
||
agent_id, task, mention_lines, project_id)
|
||
|
||
# #09: 检测 rebuttal 场景 — review 状态的任务 + rebuttal comment
|
||
is_rebuttal_review = False
|
||
if task and task.status == "review":
|
||
for item in items:
|
||
ct = item.get("comment_type", "")
|
||
if ct == "rebuttal":
|
||
is_rebuttal_review = True
|
||
break
|
||
|
||
if is_rebuttal_review:
|
||
# rebuttal 重审:带 on_complete 回调处理新 verdict
|
||
_ticker = self
|
||
_pid = project_id
|
||
_t_id = tid
|
||
|
||
async def _rebuttal_on_complete(aid: str, outcome: str):
|
||
try:
|
||
if outcome in ("completed", "session_revived"):
|
||
# 读新 verdict
|
||
rdb_path = _ticker._resolve_db_path(_pid)
|
||
rconn = get_connection(rdb_path)
|
||
try:
|
||
new_review = rconn.execute(
|
||
"SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
|
||
(_t_id,)
|
||
).fetchone()
|
||
finally:
|
||
rconn.close()
|
||
|
||
if new_review and new_review["verdict"] == "approved":
|
||
_ticker._transition_status(
|
||
get_connection(
|
||
rdb_path), _t_id, "done",
|
||
agent="daemon",
|
||
detail={"reason": "rebuttal_approved"})
|
||
logger.info(
|
||
"Rebuttal: task %s approved after rebuttal", _t_id)
|
||
else:
|
||
# 仍非 approved → @mention assignee
|
||
verdict_str = new_review["verdict"] if new_review else "未知"
|
||
rconn2 = get_connection(rdb_path)
|
||
try:
|
||
t_row = rconn2.execute(
|
||
"SELECT assignee FROM tasks WHERE id=?", (_t_id,)).fetchone()
|
||
finally:
|
||
rconn2.close()
|
||
if t_row and t_row["assignee"]:
|
||
from src.blackboard.blackboard import Blackboard
|
||
bb2 = Blackboard(rdb_path)
|
||
bb2.add_comment(_t_id, "daemon",
|
||
f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
|
||
comment_type="review")
|
||
logger.info(
|
||
"Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str)
|
||
except Exception:
|
||
logger.exception(
|
||
"Rebuttal on_complete failed for task %s", _t_id)
|
||
|
||
result = await self.spawner.spawn_full_agent(
|
||
agent_id=agent_id,
|
||
message=prompt,
|
||
task_id=tid,
|
||
use_main_session=True,
|
||
on_complete=_rebuttal_on_complete,
|
||
)
|
||
else:
|
||
# 普通 mention:不带回调
|
||
result = await self.spawner.spawn_full_agent(
|
||
agent_id=agent_id,
|
||
message=prompt,
|
||
task_id=tid,
|
||
use_main_session=True, # #02: 投递到 main session
|
||
)
|
||
|
||
if result is not None:
|
||
# 成功 → 标记所有该 agent 的 mentions 为 notified
|
||
for item in items:
|
||
bb.mark_mention_notified(item["id"])
|
||
processed.append(agent_id)
|
||
logger.info(
|
||
"Mention spawn success: %s (%d mentions)",
|
||
agent_id,
|
||
len(items))
|
||
else:
|
||
# spawn 返回 None(其他原因)→ 递增 retry_count
|
||
for item in items:
|
||
bb.mark_mention_retry(item["id"])
|
||
logger.warning(
|
||
"Mention spawn failed: %s, retrying next tick", agent_id)
|
||
|
||
except AgentBusyError:
|
||
# Agent 忙,不递增 retry_count,等下次 tick 自然重试
|
||
logger.info(
|
||
"Mention spawn skipped: %s busy, will retry next tick",
|
||
agent_id)
|
||
|
||
except Exception:
|
||
logger.exception(
|
||
"Mention processing error for agent %s", agent_id)
|
||
for item in items:
|
||
try:
|
||
if item.get("retry_count",
|
||
0) >= self.MENTION_MAX_RETRIES - 1:
|
||
bb.mark_mention_failed(item["id"])
|
||
else:
|
||
bb.mark_mention_retry(item["id"])
|
||
except Exception:
|
||
pass
|
||
|
||
return processed
|
||
|
||
def _build_mention_prompt(self, agent_id: str, task: Any,
|
||
mention_lines: List[str],
|
||
project_id: str) -> str:
|
||
"""#03: @mention prompt(身份注入)"""
|
||
api_host = getattr(
|
||
self.spawner,
|
||
'api_host',
|
||
'127.0.0.1') if self.spawner else '127.0.0.1'
|
||
api_port = getattr(
|
||
self.spawner,
|
||
'api_port',
|
||
8083) if self.spawner else 8083
|
||
api_base = f"http://{api_host}:{api_port}/api"
|
||
|
||
# 获取 Agent 专长
|
||
caps = "通用"
|
||
try:
|
||
if self.dispatcher and self.dispatcher.router:
|
||
profile = self.dispatcher.router.agent_profiles.get(agent_id)
|
||
if profile and profile.capabilities_zh:
|
||
caps = ", ".join(profile.capabilities_zh)
|
||
except Exception:
|
||
pass
|
||
|
||
mentions_text = "\n".join(mention_lines[:10])
|
||
|
||
return f"""你在黑板上被 @ 了。
|
||
|
||
## 你的身份
|
||
你是 {agent_id},专长: {caps}。
|
||
|
||
## 相关讨论
|
||
{mentions_text}
|
||
|
||
## 任务上下文
|
||
- 项目: {project_id}
|
||
- 任务: {task.title}
|
||
- 描述: {task.description or '无'}
|
||
|
||
## 你能做什么
|
||
- 读完整上下文: GET {api_base}/projects/{project_id}/tasks/{task.id}?expand=all
|
||
- 回应: POST {api_base}/projects/{project_id}/tasks/{task.id}/comments
|
||
- 如果讨论收敛到可执行任务,可以创建 sub task
|
||
|
||
## 约束
|
||
- 只回应与你专长相关的内容
|
||
- 禁止使用 sessions_send 直接发消息
|
||
- 委托他人做事用黑板 comment @agent-id,系统自动路由(无需手动传 mentions 数组)
|
||
"""
|
||
|
||
def _advance_dependencies(self, db_path: Path) -> List[str]:
|
||
"""检查 blocked 任务,若所有依赖已完成则推进为 pending
|
||
|
||
Returns:
|
||
被推进的 task_id 列表
|
||
"""
|
||
queries = Queries(db_path)
|
||
blocked = queries.blocked_tasks_with_deps()
|
||
advanced: List[str] = []
|
||
|
||
if not blocked:
|
||
return advanced
|
||
|
||
conn = get_connection(db_path)
|
||
try:
|
||
for item in blocked:
|
||
if item["all_deps_done"]:
|
||
task_id = item["task_id"]
|
||
ok = self._transition_status(conn, task_id, "pending",
|
||
agent="daemon",
|
||
detail={"reason": "all_dependencies_done"})
|
||
if ok:
|
||
advanced.append(task_id)
|
||
logger.info("Advanced %s: blocked → pending", task_id)
|
||
finally:
|
||
conn.close()
|
||
|
||
return advanced
|
||
|
||
def _transition_status(self, conn, task_id: str, new_status: str,
|
||
agent: str = "daemon",
|
||
detail: Optional[Dict] = None) -> bool:
|
||
"""轻量状态转换(不走 Blackboard 类,避免 init_db)"""
|
||
from src.blackboard.db import VALID_TRANSITIONS, TERMINAL_STATUSES, EVENT_TYPES
|
||
from datetime import datetime
|
||
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
row = conn.execute(
|
||
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||
if not row:
|
||
return False
|
||
old_status = row["status"]
|
||
if old_status in TERMINAL_STATUSES or old_status == new_status:
|
||
return old_status == new_status
|
||
if new_status not in VALID_TRANSITIONS.get(old_status, set()):
|
||
return False
|
||
|
||
now = datetime.utcnow().isoformat()
|
||
# 重置到 pending 时清空 assignee(避免残留导致重复路由到同一 Agent)
|
||
# handler 虚拟项目(_mail 等)的 assignee 是收件人,永不清空
|
||
if new_status == "pending":
|
||
handler = TaskTypeRegistry.get_by_project(self._current_project_id)
|
||
if handler:
|
||
conn.execute(
|
||
"UPDATE tasks SET status=?, updated_at=? WHERE id=?",
|
||
(new_status, now, task_id),
|
||
)
|
||
else:
|
||
conn.execute(
|
||
"UPDATE tasks SET status=?, assignee=NULL, resumed_from=NULL, updated_at=? WHERE id=?",
|
||
(new_status, now, task_id),
|
||
)
|
||
elif new_status == "paused":
|
||
# 记录暂停前状态,恢复时回到原状态
|
||
conn.execute(
|
||
"UPDATE tasks SET status=?, resumed_from=?, updated_at=? WHERE id=?",
|
||
(new_status, old_status, now, task_id),
|
||
)
|
||
else:
|
||
conn.execute(
|
||
"UPDATE tasks SET status=?, updated_at=? WHERE id=?",
|
||
(new_status, now, task_id),
|
||
)
|
||
event_type = f"task_{new_status}"
|
||
if event_type not in EVENT_TYPES:
|
||
event_type = "daemon_tick"
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
|
||
(task_id, agent, event_type, json.dumps(
|
||
{"from": old_status, "to": new_status, **(detail or {})})),
|
||
)
|
||
conn.commit()
|
||
return True
|
||
|
||
# ------------------------------------------------------------------
|
||
# Agent 调度
|
||
# ------------------------------------------------------------------
|
||
|
||
async def _dispatch_pending(self, db_path: Path,
|
||
project_id: str) -> List[str]:
|
||
"""扫描 pending 任务并调度
|
||
|
||
v3.0: 两条路径
|
||
- 确定性路径:retry/handoff/assignee/生命周期 → 直接 spawn
|
||
- 广播认领:无确定性路径 → spawn 所有空闲 Agent,自主 claim
|
||
"""
|
||
queries = Queries(db_path)
|
||
pending = queries.pending_dispatchable()
|
||
dispatched: List[str] = []
|
||
|
||
if not pending:
|
||
return dispatched
|
||
|
||
# 分两批:确定性 vs 广播
|
||
deterministic_tasks = []
|
||
broadcast_tasks = []
|
||
|
||
for task in pending[:self.max_dispatch_per_tick]:
|
||
decision = self.dispatcher.decide(task)
|
||
if decision.get("mode") in ("deterministic", "agent_handoff"):
|
||
deterministic_tasks.append((task, decision))
|
||
else:
|
||
broadcast_tasks.append(task)
|
||
|
||
# 路径1:确定性路由 → 直接 spawn
|
||
for task, decision in deterministic_tasks:
|
||
try:
|
||
result = await self.dispatcher.dispatch(
|
||
task,
|
||
project_config={
|
||
"project_id": project_id,
|
||
"db_path": db_path},
|
||
)
|
||
if result["status"] == "dispatched" and result["level"] in (
|
||
"full", "escalate"):
|
||
conn = get_connection(db_path)
|
||
try:
|
||
# [Step 5] handler 项目已在 dispatcher 中标 working,跳过 claimed
|
||
handler = TaskTypeRegistry.get_by_project(project_id)
|
||
if handler:
|
||
conn.execute(
|
||
"UPDATE tasks SET current_agent=? WHERE id=?",
|
||
(result["agent_id"], task.id),
|
||
)
|
||
conn.commit()
|
||
dispatched.append(task.id)
|
||
logger.info("Dispatched %s to %s (session=%s, handler auto-working)",
|
||
task.id, result["agent_id"],
|
||
result.get("session_id"))
|
||
else:
|
||
ok = self._transition_status(
|
||
conn, task.id, "claimed",
|
||
agent="daemon",
|
||
detail={"dispatched_to": result["agent_id"],
|
||
"session_id": result.get("session_id")},
|
||
)
|
||
if ok:
|
||
conn.execute(
|
||
"UPDATE tasks SET current_agent=? WHERE id=?",
|
||
(result["agent_id"], task.id),
|
||
)
|
||
conn.commit()
|
||
dispatched.append(task.id)
|
||
logger.info("Dispatched %s to %s (session=%s)",
|
||
task.id, result["agent_id"],
|
||
result.get("session_id"))
|
||
finally:
|
||
conn.close()
|
||
elif result["status"] == "blocked":
|
||
# Guardrail 拦截:任务标记为 blocked
|
||
conn = get_connection(db_path)
|
||
try:
|
||
self._transition_status(
|
||
conn, task.id, "blocked",
|
||
agent="daemon",
|
||
detail={"reason": result.get("reason", "guardrail"),
|
||
"violations": result.get("violations", [])},
|
||
)
|
||
logger.info("Task %s blocked by guardrail: %s",
|
||
task.id, result.get("reason", "unknown"))
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
logger.exception("Dispatch failed for %s", task.id)
|
||
|
||
# 路径2:广播认领
|
||
if broadcast_tasks:
|
||
broadcast_ids = await self._broadcast_claim(broadcast_tasks, db_path, project_id)
|
||
dispatched.extend(broadcast_ids)
|
||
|
||
# §15 Runaway Guard: 统一递增 dispatch_count
|
||
if dispatched:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
for tid in dispatched:
|
||
conn.execute(
|
||
"UPDATE tasks SET dispatch_count = COALESCE(dispatch_count, 0) + 1 WHERE id=?",
|
||
(tid,),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
return dispatched
|
||
|
||
async def _broadcast_claim(self, tasks: list, db_path: Path,
|
||
project_id: str) -> List[str]:
|
||
"""广播一批待认领任务给所有空闲 Agent(每轮最多广播一次)
|
||
|
||
司马懿建议:攒一批任务,一次广播,而非每个任务触发一次广播。
|
||
广播前检查全局并发,接近上限时跳过。
|
||
"""
|
||
if not self.spawner:
|
||
return []
|
||
|
||
# 全局并发检查(司马懿建议 1)
|
||
if self.counter.is_near_limit():
|
||
logger.info("Skipping broadcast: global concurrent near limit (%d/%d)",
|
||
self.counter.global_active, self.counter.max_global)
|
||
return []
|
||
|
||
# 分离:需要升级的 vs 可广播的
|
||
broadcastable = []
|
||
escalated = []
|
||
for t in tasks:
|
||
tracker = self._broadcast_tracker.get(t.id)
|
||
if tracker and tracker.round_number >= 3:
|
||
escalated.append(t)
|
||
else:
|
||
broadcastable.append(t)
|
||
|
||
# 升级庞统
|
||
for t in escalated:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
self._transition_status(
|
||
conn, t.id, "escalated",
|
||
agent="daemon",
|
||
detail={"reason": "no_taker_after_3_broadcasts",
|
||
"round_number": self._broadcast_tracker.get(t.id).round_number if self._broadcast_tracker.get(t.id) else 0},
|
||
)
|
||
logger.warning(
|
||
"Escalated %s: no taker after 3 broadcast rounds", t.id)
|
||
self._broadcast_tracker.pop(t.id, None)
|
||
finally:
|
||
conn.close()
|
||
|
||
if not broadcastable:
|
||
return []
|
||
|
||
idle_agents = self._get_idle_agents()
|
||
if not idle_agents:
|
||
logger.warning(
|
||
"No idle agents for broadcast, skipping (capacity issue)")
|
||
return []
|
||
|
||
task_ids = [t.id for t in broadcastable]
|
||
logger.info("Broadcasting %d tasks to %d idle agents: %s",
|
||
len(broadcastable), len(idle_agents), task_ids)
|
||
|
||
# 审计事件
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
for task in broadcastable:
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, detail) "
|
||
"VALUES (?,?,?,?)",
|
||
(task.id, "daemon", "broadcast_claim",
|
||
json.dumps({"agents": idle_agents, "task_title": task.title})),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
|
||
# 为每个任务获取/创建 tracker
|
||
for t in broadcastable:
|
||
if t.id not in self._broadcast_tracker:
|
||
self._broadcast_tracker[t.id] = BroadcastRound(task_id=t.id)
|
||
|
||
spawned = []
|
||
for agent_id in idle_agents:
|
||
prompt = self._build_claim_prompt(
|
||
agent_id, broadcastable, project_id)
|
||
try:
|
||
session_id = await self.spawner.spawn_full_agent(
|
||
agent_id=agent_id,
|
||
message=prompt,
|
||
task_id="broadcast",
|
||
task_db_path=db_path,
|
||
use_main_session=True,
|
||
broadcast_task_ids=[t.id for t in broadcastable],
|
||
)
|
||
if session_id is not None:
|
||
spawned.append(session_id)
|
||
# 记录已通知的 Agent
|
||
for t in broadcastable:
|
||
self._broadcast_tracker[t.id].notified_agents.add(
|
||
agent_id)
|
||
except AgentBusyError:
|
||
logger.debug("Broadcast skip %s: busy", agent_id)
|
||
except Exception:
|
||
logger.exception("Broadcast spawn failed for %s", agent_id)
|
||
|
||
return spawned
|
||
|
||
def _build_claim_prompt(self, agent_id: str, tasks: list,
|
||
project_id: str) -> str:
|
||
"""#03: 广播认领 prompt(身份+专长注入)"""
|
||
api_host = getattr(
|
||
self.spawner,
|
||
'api_host',
|
||
'127.0.0.1') if self.spawner else '127.0.0.1'
|
||
api_port = getattr(
|
||
self.spawner,
|
||
'api_port',
|
||
8083) if self.spawner else 8083
|
||
api_base = f"http://{api_host}:{api_port}/api"
|
||
|
||
# 获取 Agent 专长
|
||
caps = "通用"
|
||
try:
|
||
if self.dispatcher and self.dispatcher.router:
|
||
profile = self.dispatcher.router.agent_profiles.get(agent_id)
|
||
if profile and profile.capabilities_zh:
|
||
caps = ", ".join(profile.capabilities_zh)
|
||
except Exception:
|
||
pass
|
||
|
||
task_list = "\n".join([
|
||
f"- ID: {t.id}, 标题: {t.title}, 类型: {getattr(t, 'task_type', '') or 'general'}, "
|
||
f"优先级: {t.priority}"
|
||
for t in tasks
|
||
])
|
||
|
||
return f"""你是 {agent_id}({caps})。你们是一个协作团队,共享一块黑板。
|
||
|
||
## 待处理任务
|
||
{task_list}
|
||
|
||
## 你的角色
|
||
你收到团队广播。按你的专业判断回应:
|
||
|
||
1. 有属于你专业的任务 → 认领并执行
|
||
2. 不是你的活,但你的专业领域和这个任务有实际交叉 → 写 observation comment
|
||
- 从你的专业视角:你看到什么风险?什么约束?什么跨领域建议?
|
||
- 例:关羽看到编码任务 → "这个函数涉及风控计算,建议用 decimal 而非 float"
|
||
- 例:赵云看到编码任务 → "这个策略需要分钟线数据,NAS路径 /Volumes/stock/min"
|
||
- observation 限 200 字
|
||
3. 你的领域和任务无交叉 → NO_REPLY
|
||
|
||
## API
|
||
- 读任务详情: GET {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}?expand=all
|
||
- 认领: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/claim
|
||
body: {{"agent": "{agent_id}"}}
|
||
- 认领后标 working: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/status
|
||
body: {{"status": "working", "agent": "{agent_id}"}}
|
||
- 写评论: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/comments
|
||
body: {{"author": "{agent_id}", "comment_type": "observation", "body": "专业判断(≤200字)"}}
|
||
|
||
## 约束
|
||
- 一个 tick 只认领一个任务
|
||
- observation 只在领域有实际交叉时写,纯粹不匹配则 NO_REPLY
|
||
- 认领后必须写产出物再转 review
|
||
- claim 失败说明已被别人认领,NO_REPLY 退出
|
||
- 禁止使用 sessions_send 直接发消息
|
||
"""
|
||
|
||
@property
|
||
def counter(self):
|
||
"""从 Dispatcher 获取 counter"""
|
||
return getattr(self.dispatcher, 'counter',
|
||
None) if self.dispatcher else None
|
||
|
||
@staticmethod
|
||
def _is_pid_alive(pid: int) -> bool:
|
||
"""检查进程是否存活"""
|
||
import os
|
||
try:
|
||
os.kill(pid, 0)
|
||
return True
|
||
except (ProcessLookupError, PermissionError):
|
||
return False
|
||
|
||
def record_broadcast_response(
|
||
self, task_id: str, agent_id: str, outcome: str):
|
||
"""记录 Agent 对广播任务的反馈(Spawner 调用的公共 API)"""
|
||
tracker = self._broadcast_tracker.get(task_id)
|
||
if not tracker:
|
||
return
|
||
if outcome == "claimed":
|
||
# Agent 认领了,清理 tracker
|
||
self._broadcast_tracker.pop(task_id, None)
|
||
else:
|
||
tracker.responded_agents.add(agent_id)
|
||
# 异步轮次结束检测:所有已通知的 Agent 都已反馈
|
||
if tracker.notified_agents and tracker.responded_agents >= tracker.notified_agents:
|
||
# 一轮结束
|
||
tracker.round_number += 1
|
||
tracker.notified_agents.clear()
|
||
tracker.responded_agents.clear()
|
||
logger.info("Broadcast round %d completed for task %s (async callback, no taker)",
|
||
tracker.round_number, task_id)
|
||
|
||
def _get_all_agent_ids(self) -> List[str]:
|
||
"""获取所有配置的 Agent ID"""
|
||
if self.dispatcher and hasattr(
|
||
self.dispatcher, 'router') and self.dispatcher.router:
|
||
return list(self.dispatcher.router.agent_profiles.keys())
|
||
return []
|
||
|
||
def _get_idle_agents(self) -> List[str]:
|
||
"""获取当前空闲的 Agent 列表(从 config agents 取,而非 counter._per_agent)"""
|
||
if not self.counter:
|
||
return []
|
||
# agent_profiles 在 Router 初始化时从 config 填充,是完整 Agent 列表
|
||
all_agents = list(
|
||
self.dispatcher.router.agent_profiles.keys()) if self.dispatcher else []
|
||
active = self.counter.active_agents
|
||
return [aid for aid in all_agents if active.get(aid, 0) == 0]
|
||
|
||
async def _dispatch_reviews(self, db_path: Path,
|
||
project_id: str) -> List[str]:
|
||
"""扫描 review 状态任务,检查是否有产出,调度审查 Agent"""
|
||
# handler 项目(_mail/_toolchain)不走 review 流程
|
||
handler = TaskTypeRegistry.get_by_project(project_id)
|
||
if handler:
|
||
return []
|
||
|
||
queries = Queries(db_path)
|
||
bb = Blackboard(db_path)
|
||
review_tasks = queries.tasks_by_status("review")
|
||
dispatched: List[str] = []
|
||
|
||
for task in review_tasks:
|
||
# 检查是否已有 review 记录
|
||
reviews = bb.get_reviews(task.id)
|
||
if reviews:
|
||
continue # 已有审查,跳过
|
||
|
||
# 检查是否最近已 dispatch 过 review(防重复 dispatch)
|
||
existing = self._check_recent_routing(db_path, task.id, "review")
|
||
if existing:
|
||
continue # 已有活跃 review dispatch
|
||
|
||
# #07.2: crash_limit 已移到 _check_timeouts 统一检查
|
||
|
||
# 检查是否有产出(司马懿建议:无产出直接标 failed)
|
||
outputs = bb.get_outputs(task.id)
|
||
if not outputs:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
self._transition_status(
|
||
conn, task.id, "failed",
|
||
agent="daemon",
|
||
detail={"reason": "no_output_for_review"},
|
||
)
|
||
bb.add_observation(
|
||
task.id, "daemon",
|
||
"任务进入 review 状态但没有产出物,自动标记为 failed",
|
||
)
|
||
logger.warning("Task %s in review but no output, marking failed",
|
||
task.id)
|
||
finally:
|
||
conn.close()
|
||
continue
|
||
|
||
# 调度审查 Agent(司马懿)
|
||
try:
|
||
result = await self.dispatcher.dispatch(
|
||
task,
|
||
action_type="review",
|
||
project_config={
|
||
"project_id": project_id,
|
||
"db_path": db_path},
|
||
)
|
||
if result["status"] == "dispatched":
|
||
dispatched.append(task.id)
|
||
# 更新 current_agent 为审查者
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute(
|
||
"UPDATE tasks SET current_agent=? WHERE id=?",
|
||
(result["agent_id"], task.id),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
logger.info("Dispatched review for %s to %s",
|
||
task.id, result["agent_id"])
|
||
except Exception:
|
||
logger.exception("Review dispatch failed for %s", task.id)
|
||
|
||
# §15 Runaway Guard: 统一递增 dispatch_count (review)
|
||
if dispatched:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
for tid in dispatched:
|
||
conn.execute(
|
||
"UPDATE tasks SET dispatch_count = COALESCE(dispatch_count, 0) + 1 WHERE id=?",
|
||
(tid,),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
return dispatched
|
||
|
||
# ------------------------------------------------------------------
|
||
# 僵尸/超时处理
|
||
# ------------------------------------------------------------------
|
||
|
||
def _check_timeouts(self, db_path: Path) -> List[str]:
|
||
"""检查 claimed/working 超时的任务"""
|
||
queries = Queries(db_path)
|
||
reclaimed: List[str] = []
|
||
now = datetime.utcnow() # UTC,与 SQLite datetime('now') 一致
|
||
|
||
# §15 Runaway Guard: per-task dispatch_count 上限检查
|
||
# 覆盖所有状态,防止无限循环 dispatch
|
||
MAX_DISPATCH_COUNT = 10
|
||
for status_to_check in ("pending", "working", "claimed"):
|
||
tasks_to_check = queries.tasks_by_status(status_to_check)
|
||
for task in tasks_to_check:
|
||
dispatch_count = getattr(task, 'dispatch_count', 0) or 0
|
||
if dispatch_count >= MAX_DISPATCH_COUNT:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
ok = self._transition_status(
|
||
conn, task.id, "failed",
|
||
agent="daemon",
|
||
detail={"reason": "runaway_guard",
|
||
"dispatch_count": dispatch_count,
|
||
"message": f"dispatch {dispatch_count} 次仍未完成,自动标 failed"},
|
||
)
|
||
if ok:
|
||
reclaimed.append(task.id)
|
||
logger.error(
|
||
"Task %s: runaway guard triggered (dispatch_count=%d, status=%s), marking failed",
|
||
task.id, dispatch_count, status_to_check)
|
||
finally:
|
||
conn.close()
|
||
|
||
# claimed 超时 → 重置为 pending(如果 retry_count >= 3 则升级庞统)
|
||
claimed = queries.tasks_by_status("claimed")
|
||
for task in claimed:
|
||
if not task.claimed_at:
|
||
continue
|
||
try:
|
||
claimed_time = datetime.fromisoformat(task.claimed_at)
|
||
elapsed = (now - claimed_time).total_seconds() / 60.0
|
||
if elapsed > self.claim_timeout_minutes:
|
||
retry_count = getattr(task, 'retry_count', 0) or 0
|
||
# 司马懿建议:复用 retry_count,>=3 则升级庞统
|
||
if retry_count >= 3:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
self._transition_status(
|
||
conn, task.id, "escalated",
|
||
agent="daemon",
|
||
detail={"reason": "claim_timeout_no_taker",
|
||
"retry_count": retry_count},
|
||
)
|
||
reclaimed.append(task.id)
|
||
logger.warning("Escalated %s: no taker after %d broadcasts",
|
||
task.id, retry_count)
|
||
finally:
|
||
conn.close()
|
||
else:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
ok = self._transition_status(
|
||
conn, task.id, "pending",
|
||
agent="daemon",
|
||
detail={"reason": "claim_timeout",
|
||
"elapsed_minutes": round(elapsed, 1)},
|
||
)
|
||
if ok:
|
||
# 递增 retry_count(复用为广播轮次计数)
|
||
conn.execute(
|
||
"UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?",
|
||
(task.id,),
|
||
)
|
||
conn.commit()
|
||
reclaimed.append(task.id)
|
||
logger.info("Reclaimed %s: claimed → pending (timeout %.1fm, retry=%d)",
|
||
task.id, elapsed, retry_count + 1)
|
||
finally:
|
||
conn.close()
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# working 超时 → 标记为 failed
|
||
working = queries.tasks_by_status("working")
|
||
for task in working:
|
||
# #07.2: crash_limit 统一检查(比超时更严重的信号)
|
||
if self.dispatcher and hasattr(
|
||
self.dispatcher, '_check_crash_limit'):
|
||
if self.dispatcher._check_crash_limit(
|
||
task.id, db_path, limit=3, window_minutes=30):
|
||
conn = get_connection(db_path)
|
||
try:
|
||
self._transition_status(
|
||
conn, task.id, "failed",
|
||
agent="daemon",
|
||
detail={"reason": "crash_limit",
|
||
"message": "30 分钟内 crash 3 次,自动标 failed"},
|
||
)
|
||
finally:
|
||
conn.close()
|
||
reclaimed.append(task.id)
|
||
logger.error(
|
||
"Task %s: executor crash limit (3/30m), marking failed", task.id)
|
||
continue
|
||
|
||
# #07.3 ACT-1: updated_at fallback 覆盖 mail auto-working(无 started_at/claimed_at)
|
||
start_time_str = task.started_at or task.claimed_at or task.updated_at
|
||
if not start_time_str:
|
||
continue
|
||
try:
|
||
start_time = datetime.fromisoformat(start_time_str)
|
||
# per-task timeout: deadline 优先,否则用默认值
|
||
if task.deadline:
|
||
deadline_time = datetime.fromisoformat(task.deadline)
|
||
timeout_minutes = (
|
||
deadline_time - start_time).total_seconds() / 60.0
|
||
if timeout_minutes < 1:
|
||
timeout_minutes = self.default_task_timeout_minutes
|
||
else:
|
||
timeout_minutes = self.default_task_timeout_minutes
|
||
|
||
elapsed = (now - start_time).total_seconds() / 60.0
|
||
if elapsed > timeout_minutes:
|
||
# [Step 5] handler 幻觉门控兜底:check_completion 通过 + working → done
|
||
handler = TaskTypeRegistry.get_by_project(self._current_project_id)
|
||
if handler and handler.check_completion(task.id, db_path):
|
||
conn = get_connection(db_path)
|
||
try:
|
||
ok = self._transition_status(
|
||
conn, task.id, "done",
|
||
agent="daemon",
|
||
detail={"reason": "mail_auto_done_recheck",
|
||
"elapsed_minutes": round(elapsed, 1)},
|
||
)
|
||
if ok:
|
||
reclaimed.append(task.id)
|
||
logger.info("Mail %s: ticker recheck found reply, marked done (%.1fm)",
|
||
task.id, elapsed)
|
||
finally:
|
||
conn.close()
|
||
continue
|
||
|
||
conn = get_connection(db_path)
|
||
try:
|
||
ok = self._transition_status(
|
||
conn, task.id, "failed",
|
||
agent="daemon",
|
||
detail={"reason": "task_timeout",
|
||
"elapsed_minutes": round(elapsed, 1),
|
||
"timeout_minutes": round(timeout_minutes, 1)},
|
||
)
|
||
if ok:
|
||
reclaimed.append(task.id)
|
||
logger.warning("Task %s timed out (working %.1fm > %.1fm)",
|
||
task.id, elapsed, timeout_minutes)
|
||
finally:
|
||
conn.close()
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# v2.7.2: 进程存活性检查 — counter 占用但进程已死的兜底
|
||
if self.spawner and self.counter and hasattr(
|
||
self.counter, "active_agents"):
|
||
for agent_id in list(self.counter.active_agents.keys()) if hasattr(
|
||
self.counter, "active_agents") else []:
|
||
session_info = self.spawner.get_session_by_agent(agent_id)
|
||
if not session_info:
|
||
continue
|
||
pid = session_info.get("pid")
|
||
task_id_check = session_info.get("task_id")
|
||
if pid and not self._is_pid_alive(pid):
|
||
logger.warning("Agent %s process dead (pid=%d), releasing counter",
|
||
agent_id, pid)
|
||
self.counter.release(agent_id)
|
||
# #07.2: review 状态不推 pending,保持 review 等 _dispatch_reviews 处理
|
||
# working 状态推回 pending 让 ticker 重新 dispatch
|
||
if task_id_check and db_path:
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
current_row = conn.execute(
|
||
"SELECT status FROM tasks WHERE id=?", (
|
||
task_id_check,)
|
||
).fetchone()
|
||
if current_row and current_row["status"] == "review":
|
||
logger.info(
|
||
"Task %s in review, keeping status (process dead)", task_id_check)
|
||
else:
|
||
self._transition_status(
|
||
conn, task_id_check, "pending",
|
||
agent="daemon",
|
||
detail={
|
||
"reason": "process_dead", "pid": pid},
|
||
)
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
logger.exception(
|
||
"Failed to handle process dead for task %s", task_id_check)
|
||
|
||
# #07.2: Fix-3b 已删除。review 超时/crash 统一由 process_dead + _check_timeouts 处理
|
||
|
||
return reclaimed
|
||
|
||
def _mail_check_reply(self, original_task_id: str, db_path: Path) -> bool:
|
||
"""Mail 幻觉门控:检查是否有回复邮件"""
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1",
|
||
(original_task_id, f'%{original_task_id}%'),
|
||
).fetchone()
|
||
return row is not None
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.error(
|
||
"Mail %s: ticker reply check error: %s",
|
||
original_task_id,
|
||
e)
|
||
return True # 保守:查询失败假设有回复
|
||
|
||
def _check_recent_routing(self, db_path: Path, task_id: str,
|
||
action_type: str) -> bool:
|
||
"""检查最近 5 分钟内是否已 dispatch 过指定类型的路由(防重复)"""
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
# 检查是否有 from_status=review 的 dispatched 记录(防止重复 review
|
||
# dispatch)
|
||
if action_type == "review":
|
||
row = conn.execute(
|
||
"SELECT COUNT(*) as cnt FROM routing_decisions "
|
||
"WHERE task_id=? AND outcome='dispatched' "
|
||
"AND from_status='review' "
|
||
"AND created_at > datetime('now', '-5 minutes')",
|
||
(task_id,),
|
||
).fetchone()
|
||
else:
|
||
row = conn.execute(
|
||
"SELECT COUNT(*) as cnt FROM routing_decisions "
|
||
"WHERE task_id=? AND outcome='dispatched' "
|
||
"AND created_at > datetime('now', '-5 minutes')",
|
||
(task_id,),
|
||
).fetchone()
|
||
return row["cnt"] > 0 if row else False
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
return False
|
||
|
||
# ------------------------------------------------------------------
|
||
# PM2 Crash 启动恢复 (#06)
|
||
# ------------------------------------------------------------------
|
||
|
||
def _startup_recover(self) -> Dict[str, Any]:
|
||
"""PM2 crash 后启动恢复:扫描所有非终态任务,从黑板线索重建一致状态"""
|
||
NON_TERMINAL = {"claimed", "working", "review", "reviewing"}
|
||
|
||
projects = self.registry.list_projects()
|
||
recovery_report = {
|
||
"projects": {},
|
||
"total_recovered": 0,
|
||
"total_noop": 0}
|
||
|
||
# 收集所有需要扫描的项目(registry + 虚拟项目)
|
||
project_dirs = {}
|
||
for project_id, project_info in projects.items():
|
||
if project_info.get("status") == "active":
|
||
project_dirs[project_id] = self.registry.root / \
|
||
project_id / "blackboard.db"
|
||
|
||
# 虚拟项目:_general + 注册表自动发现
|
||
virtual_ids = ["_general"] + TaskTypeRegistry.virtual_projects()
|
||
for virtual_id in virtual_ids:
|
||
virtual_db = Path(self.registry.root) / \
|
||
virtual_id / "blackboard.db"
|
||
if virtual_db.exists() and virtual_id not in project_dirs:
|
||
project_dirs[virtual_id] = virtual_db
|
||
|
||
for project_id, db_path in project_dirs.items():
|
||
if not db_path.exists():
|
||
continue
|
||
|
||
# init_db 如果还没初始化
|
||
db_key = str(db_path)
|
||
if db_key not in self._initialized_dbs:
|
||
from src.blackboard.db import init_db
|
||
init_db(db_path)
|
||
self._initialized_dbs.add(db_key)
|
||
|
||
# #06: 临时设置 _current_project_id,确保 _transition_status
|
||
# 对 _mail 项目不清空 assignee(L839 特殊分支依赖此字段)
|
||
old_pid = self._current_project_id
|
||
self._current_project_id = project_id
|
||
try:
|
||
recovered, noop_count = self._recover_project(
|
||
db_path, NON_TERMINAL)
|
||
if recovered:
|
||
recovery_report["projects"][project_id] = recovered
|
||
recovery_report["total_recovered"] += len(recovered)
|
||
recovery_report["total_noop"] += noop_count
|
||
except Exception:
|
||
logger.exception(
|
||
"Startup recovery failed for project %s", project_id)
|
||
finally:
|
||
self._current_project_id = old_pid
|
||
|
||
if recovery_report["total_recovered"] > 0:
|
||
logger.info("Startup recovery: %d tasks recovered across %d projects",
|
||
recovery_report["total_recovered"],
|
||
len(recovery_report["projects"]))
|
||
elif recovery_report["total_noop"] > 0:
|
||
logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)",
|
||
recovery_report["total_noop"])
|
||
else:
|
||
logger.info(
|
||
"Startup recovery: no non-terminal tasks found, clean start")
|
||
|
||
return recovery_report
|
||
|
||
def _recover_project(self, db_path: Path, non_terminal: set) -> tuple:
|
||
"""恢复单个项目中的非终态任务
|
||
|
||
Returns:
|
||
(recovered_list, noop_count)
|
||
"""
|
||
conn = get_connection(db_path)
|
||
recovered = []
|
||
noop_count = 0
|
||
|
||
try:
|
||
for status in non_terminal:
|
||
rows = conn.execute(
|
||
"SELECT id, assignee, current_agent, updated_at FROM tasks WHERE status=?",
|
||
(status,)
|
||
).fetchall()
|
||
|
||
for task in rows:
|
||
try:
|
||
action = self._determine_recovery_action(
|
||
conn, task, status, db_path)
|
||
if action:
|
||
self._execute_recovery(
|
||
conn, task["id"], action, db_path)
|
||
recovered.append(
|
||
{"task_id": task["id"], "from": status, "action": action})
|
||
else:
|
||
# 审计:保持原状的任务也记录事件
|
||
noop_count += 1
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
|
||
(task["id"], "daemon", "startup_recovery_noop",
|
||
json.dumps({"status": status, "reason": "no_action_needed"}))
|
||
)
|
||
conn.commit()
|
||
except Exception:
|
||
logger.exception(
|
||
"Startup recovery failed for task %s", task["id"])
|
||
finally:
|
||
conn.close()
|
||
|
||
return recovered, noop_count
|
||
|
||
def _determine_recovery_action(self, conn, task, status: str,
|
||
db_path: Path) -> Optional[str]:
|
||
"""根据黑板线索决定恢复动作,返回 None 表示不需要干预"""
|
||
task_id = task["id"]
|
||
|
||
if status == "claimed":
|
||
# claimed 统一推 pending
|
||
return "push_to_pending"
|
||
|
||
if status == "working":
|
||
return self._recover_working_task(conn, task_id)
|
||
|
||
if status == "review":
|
||
return self._recover_review_task(conn, task_id)
|
||
|
||
if status == "reviewing":
|
||
# reviewing → 查 events 找前置状态(done 或 failed),精确恢复
|
||
prev_status = self._find_pre_reviewing_status(conn, task_id)
|
||
if prev_status == "failed":
|
||
return "push_to_failed"
|
||
return "push_to_done" # 兜底 done
|
||
|
||
return None
|
||
|
||
def _recover_working_task(self, conn, task_id: str) -> Optional[str]:
|
||
"""working 状态恢复:看黑板线索判断 agent 是否实际完成了工作"""
|
||
# 最后一次 attempt
|
||
last_attempt = conn.execute(
|
||
"SELECT outcome, agent FROM task_attempts WHERE task_id=? ORDER BY id DESC LIMIT 1",
|
||
(task_id,)
|
||
).fetchone()
|
||
|
||
if not last_attempt or last_attempt["outcome"] != "completed":
|
||
return "push_to_pending_keep_agent" # 保留 current_agent
|
||
|
||
# agent 正常退出,看是否有产出
|
||
has_output = conn.execute(
|
||
"SELECT 1 FROM outputs WHERE task_id=? LIMIT 1", (task_id,)
|
||
).fetchone() is not None
|
||
|
||
if not has_output:
|
||
return "push_to_pending_keep_agent"
|
||
|
||
# 看是否有 handoff
|
||
has_handoff = conn.execute(
|
||
"SELECT 1 FROM comments WHERE task_id=? AND comment_type='handoff' LIMIT 1",
|
||
(task_id,)
|
||
).fetchone() is not None
|
||
|
||
if has_handoff:
|
||
return "push_to_review" # agent 已完成工作并交接
|
||
|
||
return "push_to_pending_keep_agent"
|
||
|
||
def _recover_review_task(self, conn, task_id: str) -> Optional[str]:
|
||
"""review 状态恢复:看是否有审查结论"""
|
||
# 检查 reviews 表
|
||
review = conn.execute(
|
||
"SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
|
||
(task_id,)
|
||
).fetchone()
|
||
|
||
if review:
|
||
if review["verdict"] == "approved":
|
||
return "push_to_done"
|
||
else:
|
||
# needs_revision / rejected → 推回 pending 重新执行
|
||
return "push_to_pending"
|
||
|
||
# 无审查结论 → 保持 review,ticker 自然会 dispatch reviewer
|
||
return None
|
||
|
||
def _execute_recovery(self, conn, task_id: str,
|
||
action: str, db_path: Path):
|
||
"""执行恢复动作"""
|
||
# 获取原始状态(用于审计)
|
||
orig_row = conn.execute(
|
||
"SELECT status FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
orig_status = orig_row["status"] if orig_row else "unknown"
|
||
|
||
if action == "push_to_pending":
|
||
self._transition_status(
|
||
conn, task_id, "pending",
|
||
agent="daemon",
|
||
detail={
|
||
"reason": "startup_recovery",
|
||
"original_status": orig_status},
|
||
)
|
||
# 清空 current_agent(常规推 pending,无特定 agent 接手)
|
||
conn.execute(
|
||
"UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,))
|
||
conn.commit()
|
||
|
||
elif action == "push_to_pending_keep_agent":
|
||
self._transition_status(
|
||
conn, task_id, "pending",
|
||
agent="daemon",
|
||
detail={
|
||
"reason": "startup_recovery",
|
||
"original_status": orig_status},
|
||
)
|
||
# 保留 current_agent,让同一 agent 重新接手
|
||
conn.commit()
|
||
|
||
elif action == "push_to_review":
|
||
self._transition_status(
|
||
conn, task_id, "review",
|
||
agent="daemon",
|
||
detail={
|
||
"reason": "startup_recovery",
|
||
"original_status": "working"},
|
||
)
|
||
conn.commit()
|
||
|
||
elif action == "push_to_done":
|
||
self._transition_status(
|
||
conn, task_id, "done",
|
||
agent="daemon",
|
||
detail={
|
||
"reason": "startup_recovery",
|
||
"original_status": orig_status},
|
||
)
|
||
conn.commit()
|
||
|
||
elif action == "push_to_failed":
|
||
self._transition_status(
|
||
conn, task_id, "failed",
|
||
agent="daemon",
|
||
detail={
|
||
"reason": "startup_recovery",
|
||
"original_status": orig_status},
|
||
)
|
||
conn.commit()
|
||
|
||
# 记录恢复审计事件
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
|
||
(task_id, "daemon", "startup_recovery",
|
||
json.dumps({"action": action}))
|
||
)
|
||
conn.commit()
|
||
|
||
logger.info(
|
||
"Recovery: task %s → %s (action=%s)",
|
||
task_id,
|
||
action,
|
||
action)
|
||
|
||
def _find_pre_reviewing_status(self, conn, task_id: str) -> str:
|
||
"""查 events 表找到 reviewing 之前的状态(done 或 failed)"""
|
||
# _transition_status 写入 event_type=f"task_{new_status}",detail 用
|
||
# from/to
|
||
rows = conn.execute(
|
||
"""SELECT detail FROM events
|
||
WHERE task_id=? AND event_type='task_reviewing'
|
||
ORDER BY id DESC LIMIT 1""",
|
||
(task_id,)
|
||
).fetchall()
|
||
|
||
for event in rows:
|
||
try:
|
||
detail = json.loads(event["detail"])
|
||
# _transition_status detail 格式: {"from": old_status, "to":
|
||
# new_status, ...}
|
||
prev = detail.get("from") or detail.get("old_status")
|
||
if prev in ("done", "failed"):
|
||
return prev
|
||
except (json.JSONDecodeError, KeyError):
|
||
continue
|
||
|
||
return "done" # 找不到则兜底 done(reviewing 只从 done/failed 转入)
|
||
|
||
# ------------------------------------------------------------------
|
||
# 手动 tick(API 端点触发)
|
||
# ------------------------------------------------------------------
|
||
|
||
async def manual_tick(self) -> Dict[str, Any]:
|
||
"""手动触发一次 tick"""
|
||
logger.info("Manual tick triggered")
|
||
result = await self.tick()
|
||
result["manual"] = True
|
||
return result
|