Files
sanguo_moziplus_v2/src/daemon/ticker.py
T
2026-05-30 13:44:33 +08:00

1396 lines
57 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Daemon Ticker — 30s 状态扫描 + 依赖推进 + 事件驱动
Tick 循环是整个 Daemon 的核心驱动:
1. 遍历所有 active 项目
2. 每个 tick 扫描黑板状态
3. 推进依赖链(blocked → pending
4. 写入 daemon_tick 事件
5. 调度 pending 任务(F9 实现)
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, List, Optional
from src.blackboard.operations import Blackboard
from src.blackboard.db import get_connection
from src.blackboard.models import Task
from src.daemon.spawner import AgentBusyError
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
logger = logging.getLogger("moziplus-v2.ticker")
class Ticker:
"""Daemon ticker 主循环"""
def __init__(
self,
registry: ProjectRegistry,
tick_interval: float = 30.0,
max_ticks: Optional[int] = None,
on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None,
dispatcher: Optional[Any] = None,
spawner: Optional[Any] = None,
max_dispatch_per_tick: int = 3,
claim_timeout_minutes: float = 5.0,
default_task_timeout_minutes: float = 30.0,
health_checker: Optional[Any] = None,
experience_distiller: Optional[Any] = None,
inbox_watcher: Optional[Any] = None,
):
"""
Args:
registry: 项目注册表
tick_interval: tick 间隔秒数
max_ticks: 测试用,限制最大 tick 次数
on_tick_complete: 每次 tick 完成后的回调(用于通知等)
dispatcher: Dispatcher 实例(Agent 调度)
spawner: AgentSpawner 实例(Agent spawn
max_dispatch_per_tick: 每个 tick 最多调度多少个 pending 任务
claim_timeout_minutes: claimed 超时重置为 pending 的分钟数
default_task_timeout_minutes: working 超时标记 failed 的分钟数
"""
self.registry = registry
self.tick_interval = tick_interval
self.max_ticks = max_ticks
self.on_tick_complete = on_tick_complete
self.dispatcher = dispatcher
self.spawner = spawner
self.max_dispatch_per_tick = max_dispatch_per_tick
self.claim_timeout_minutes = claim_timeout_minutes
self.default_task_timeout_minutes = default_task_timeout_minutes
self.health_checker = health_checker
self.experience_distiller = experience_distiller
self.inbox_watcher = inbox_watcher
self._tick_count: int = 0
self._running: bool = False
self._task: Optional[asyncio.Task] = None
# 已初始化的 db_path 集合,避免每次 tick 重复 init_db
self._initialized_dbs: set = set()
# 每个项目上次 tick 的 event count(用于僵尸检测 F8
self._last_event_counts: Dict[str, int] = {}
# 当前项目 ID_dispatch_pending 需要)
self._current_project_id: Optional[str] = None
# ------------------------------------------------------------------
# 生命周期
# ------------------------------------------------------------------
async def start(self) -> None:
"""启动 tick 循环"""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._loop())
# 启动 InboxWatcher(即时事件监听)
if self.inbox_watcher:
try:
await self.inbox_watcher.start()
logger.info("InboxWatcher started")
except Exception as e:
logger.warning("InboxWatcher start failed: %s", e)
logger.info("Ticker started (interval=%.1fs)", self.tick_interval)
async def stop(self) -> None:
"""停止 tick 循环"""
self._running = False
# 停止 InboxWatcher
if self.inbox_watcher:
try:
await self.inbox_watcher.stop()
logger.info("InboxWatcher stopped")
except Exception as e:
logger.warning("InboxWatcher stop failed: %s", e)
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Ticker stopped (ticks=%d)", self._tick_count)
@property
def tick_count(self) -> int:
return self._tick_count
@property
def is_running(self) -> bool:
return self._running
# ------------------------------------------------------------------
# 主循环
# ------------------------------------------------------------------
async def _loop(self) -> None:
"""Tick 主循环"""
while self._running:
try:
await self.tick()
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Tick %d error", self._tick_count + 1)
if self.max_ticks and self._tick_count >= self.max_ticks:
logger.info("Max ticks reached (%d), stopping", self.max_ticks)
self._running = False
break
try:
await asyncio.sleep(self.tick_interval)
except asyncio.CancelledError:
return
# ------------------------------------------------------------------
# 单次 tick
# ------------------------------------------------------------------
async def tick(self) -> Dict[str, Any]:
"""执行一次 tick,遍历所有 active 项目"""
self._tick_count += 1
tick_num = self._tick_count
results: Dict[str, Any] = {
"tick": tick_num,
"projects": {},
}
projects = self.registry.list_projects()
active_projects = {
pid: info for pid, info in projects.items()
if info.get("status") == "active"
}
for project_id, project_info in active_projects.items():
try:
pr = await self._tick_project(project_id, project_info)
results["projects"][project_id] = pr
except Exception as e:
logger.exception("Tick %d project %s error", tick_num, project_id)
results["projects"][project_id] = {"error": str(e)}
# 虚拟项目 _general:不在 registry 但需要调度
general_db = Path(self.registry.root) / "_general" / "blackboard.db"
if general_db.exists() and "_general" not in active_projects:
try:
pr = await self._tick_project("_general", {
"id": "_general", "name": "一般任务",
"status": "active", "source": "virtual",
})
results["projects"]["_general"] = pr
except Exception as e:
logger.exception("Tick %d _general error", tick_num)
results["projects"]["_general"] = {"error": str(e)}
# 虚拟项目 _mail:飞鸽传书
mail_db = Path(self.registry.root) / "_mail" / "blackboard.db"
if mail_db.exists() and "_mail" not in active_projects:
try:
pr = await self._tick_project("_mail", {
"id": "_mail", "name": "飞鸽传书",
"status": "active", "source": "virtual",
})
results["projects"]["_mail"] = pr
except Exception as e:
logger.exception("Tick %d _mail error", tick_num)
results["projects"]["_mail"] = {"error": str(e)}
logger.debug("Tick %d complete: %d projects", tick_num, len(active_projects))
if self.on_tick_complete:
try:
await self.on_tick_complete()
except Exception:
logger.exception("on_tick_complete callback error")
return results
async def _tick_project(self, project_id: str,
project_info: Dict[str, Any]) -> Dict[str, Any]:
"""单项目的 tick 处理"""
project_dir = self.registry.root / project_id
db_path = project_dir / "blackboard.db"
if not db_path.exists():
return {"status": "no_db"}
# 只在首次遇到该 db_path 时执行 init_db
db_key = str(db_path)
if db_key not in self._initialized_dbs:
from src.blackboard.db import init_db
init_db(db_path)
self._initialized_dbs.add(db_key)
result: Dict[str, Any] = {
"status": "ok",
"summary_before": {},
"summary_after": {},
"advanced": [],
"dispatched": [],
"review_dispatched": [],
"zombie_reclaimed": [],
"parents_refreshed": [],
}
# 保存当前 project_id(供 _dispatch_pending 使用)
self._current_project_id = project_id
# 1. 扫描当前状态
queries = Queries(db_path)
result["summary_before"] = queries.task_summary()
# 2. 依赖推进
advanced = self._advance_dependencies(db_path)
result["advanced"] = advanced
# 3. 僵尸/超时处理(在依赖推进后、调度前)
zombie_reclaimed = self._check_timeouts(db_path)
result["zombie_reclaimed"] = zombie_reclaimed
# 4. 调度 pending 任务
if self.dispatcher and self.spawner:
dispatched = await self._dispatch_pending(db_path, project_id)
result["dispatched"] = dispatched
# 5. 调度审查任务
if self.dispatcher and self.spawner:
review_dispatched = await self._dispatch_reviews(db_path, project_id)
result["review_dispatched"] = review_dispatched
# 6. 聚合父 Task 状态(v2.7
parents_refreshed = self._refresh_parent_statuses(db_path)
result["parents_refreshed"] = parents_refreshed
# 7. 一轮结束检测 + 庞统 review spawnv2.9 #01
round_reviewed = await self._check_round_complete(db_path, project_id)
result["round_reviewed"] = round_reviewed
# 8. @mention 通知(v2.9 #01
mentions_processed = await self._process_mentions(db_path, project_id)
result["mentions_processed"] = mentions_processed
# 9. 写 daemon_tick 事件
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(None, "daemon", "daemon_tick",
json.dumps({"tick": self._tick_count, "advanced_count": len(advanced),
"parents_refreshed": len(parents_refreshed)})),
)
conn.commit()
finally:
conn.close()
# 8. 健康检查(僵尸检测)
if self.health_checker:
try:
self.health_checker.check(project_id, db_path, self._tick_count)
except Exception as e:
logger.warning("HealthChecker error for %s: %s", project_id, e)
# 9. 经验蒸馏(完成的 task 自动触发)
if self.experience_distiller:
try:
conn2 = get_connection(db_path)
try:
done_tasks = conn2.execute(
"SELECT id FROM tasks WHERE status='done' AND updated_at > datetime('now', '-60 seconds')"
).fetchall()
finally:
conn2.close()
for row in done_tasks:
t = Blackboard(db_path).get_task(row[0])
if t:
self.experience_distiller.distill_from_task(
task_id=t.id, task_title=t.title, task_type=t.task_type
)
except Exception as e:
logger.warning("ExperienceDistiller error for %s: %s", project_id, e)
# 10. 扫描后状态
result["summary_after"] = queries.task_summary()
return result
# ------------------------------------------------------------------
# 父 Task 状态聚合
# ------------------------------------------------------------------
def _refresh_parent_statuses(self, db_path: Path) -> List[str]:
"""全量扫描有子 Task 的父 Task,刷新聚合状态
跳过手动状态(cancelled, paused)的父 Task。使用单连接批量处理。
"""
queries = Queries(db_path)
refreshed: List[str] = []
conn = get_connection(db_path)
try:
# 找出所有有子 Task 的父 Task ID
parent_rows = conn.execute(
"SELECT DISTINCT parent_task FROM tasks WHERE parent_task IS NOT NULL"
).fetchall()
parent_ids = [r["parent_task"] for r in parent_rows]
for pid in parent_ids:
computed = queries.compute_parent_status(pid)
if computed is None:
continue
parent = conn.execute(
"SELECT status FROM tasks WHERE id=?", (pid,)
).fetchone()
if parent and parent["status"] != computed:
conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?",
(computed, pid),
)
refreshed.append(pid)
logger.info("Parent %s status aggregated: → %s", pid, computed)
if refreshed:
conn.commit()
finally:
conn.close()
return refreshed
# ------------------------------------------------------------------
# 一轮结束检测 + 庞统 review (v2.9 #01)
# ------------------------------------------------------------------
MAX_ROUNDS = 5 # §4.5 防无限循环
async def _check_round_complete(self, db_path: Path,
project_id: str) -> List[str]:
"""检测 parent task 下所有 sub task 终态 → spawn 庞统 review
流程(§4.4):
1. 扫描所有 parent task
2. 对每个 parent 检查 sub task 是否全部终态
3. 检查 round_count 上限
4. increment round_count
5. spawn 庞统 review
"""
if not self.dispatcher or not self.spawner:
return []
bb = Blackboard(db_path)
reviewed: List[str] = []
# 找所有 parent task(有子 task 的)
conn = get_connection(db_path)
try:
parent_rows = conn.execute(
"SELECT DISTINCT parent_task FROM tasks WHERE parent_task IS NOT NULL"
).fetchall()
finally:
conn.close()
for row in parent_rows:
parent_id = row["parent_task"]
try:
summary = bb.get_subtasks_summary(parent_id)
if not summary or not summary["all_terminal"]:
continue
# 检查 parent 自身状态:只有 done 状态的 parent 才触发
# (聚合后 parent 状态为 done 说明所有 sub 都完了)
# BUG-2 fix: done 和 failed 都触发 reviewfailed 时优先判断重试/换人)
if summary["parent_status"] not in ("done", "failed"):
continue
# 检查 round_count 上限
if summary["round_count"] >= self.MAX_ROUNDS:
logger.warning(
"Parent %s reached max rounds (%d), skipping review",
parent_id, self.MAX_ROUNDS)
continue
# 构建 prompt(用即将成为的 round_num,不提前 increment
next_round = summary["round_count"] + 1
outputs = bb.get_aggregate_outputs(parent_id)
comments = bb.get_round_comments(parent_id)
parent_task = bb.get_task(parent_id)
if not parent_task:
continue
# spawn 庞统 review(先 spawn,成功后再 increment round_count
review_prompt = self._build_review_prompt(
parent_task, summary, outputs, comments, next_round,
project_id=project_id
)
spawned = await self._spawn_pangtong_review(
parent_task, review_prompt, project_id, new_round=next_round
)
if spawned:
# spawn 成功 → increment round_count + 标记 reviewing
new_round = bb.increment_round_count(parent_id)
reviewed.append(parent_id)
logger.info(
"Round %d review spawned for parent %s (subs: %s)",
new_round, parent_id, summary
)
except Exception as e:
logger.exception("Round check error for parent %s", parent_id)
return reviewed
def _build_review_prompt(self, parent_task, summary: dict,
outputs: list, comments: list,
round_num: int,
project_id: str = "") -> str:
"""构建庞统 review prompt(§4.2 三问框架)"""
goal = parent_task.description or parent_task.title
must_haves = parent_task.must_haves or "{}"
# 成果物摘要(限制 token
output_lines = []
for o in outputs[:20]: # 最多 20 个
output_lines.append(
f"- [{o.get('task_title', '?')}] {o.get('title', '?')} "
f"({o.get('output_type', '?')}) by {o.get('agent', '?')}"
)
# 讨论摘要(限制 50 条)
comment_lines = []
for c in comments[:50]:
comment_lines.append(
f"[{c.get('created_at', '?')[:16]}] {c.get('author', '?')}: {c.get('body', '?')[:200]}"
)
return f"""## 庞统 Review(第 {round_num} 轮)
### Goal
{goal}
### 验收标准
{must_haves}
### 本轮 Sub Task 状态
- 完成: {summary['done']}
- 失败: {summary['failed']}
- 取消: {summary['cancelled']}
- 总计: {summary['total']}
### 成果物
{chr(10).join(output_lines) if output_lines else ''}
### 黑板讨论
{chr(10).join(comment_lines) if comment_lines else ''}
### 三问
1. Goal 还清晰吗?(是否有 goal drift)
2. 成果物覆盖 goal 了吗?(逐条检查验收标准)
3. 下一轮需要做什么?(创建新 sub tasks / 标记完成 / 调整方向)
### 失败处理
{f'{summary["failed"]} 个 sub task failed,优先判断是应该重试、换人、还是调整方案。' if summary['failed'] > 0 else '无失败'}
### 你可以
- 创建新一轮 sub tasks: POST http://localhost:8083/api/projects/{project_id}/tasks
body: {"title": "...", "description": "...", "parent_task": "{parent_task.id}", "assignee": "zhangfei-dev"}
- 调整 goal(更新 parent task description/must_haves
- 标记完成(如果 goal 已达成,回复 GOAL_ACHIEVED
Round 上限: {self.MAX_ROUNDS}(当前第 {round_num} 轮)
Project ID: {project_id}
Parent Task ID: {parent_task.id}
"""
async def _spawn_pangtong_review(self, parent_task,
review_prompt: str,
project_id: str,
new_round: int = 0) -> bool:
"""Spawn 庞统进行 review
流程:
1. spawn 庞统
2. 成功后把 parent 设为 reviewing(防重复触发)
3. 通过 on_complete 回调消费庞统结论
"""
try:
agent_id = "pangtong-fujunshi"
session_id = f"review-{parent_task.id}-r{new_round}"
# 构造 on_complete 回调:解析庞统结论,更新 parent 状态
async def _on_review_complete(aid: str, outcome: str):
try:
# 从 spawner session meta 读取庞统的回复文本
review_text = ""
if self.spawner:
# 找庞统最新完成的 session
latest_meta = None
latest_time = ""
for sid, sess in self.spawner._sessions.items():
if sess.get("agent_id") == agent_id and sess.get("meta"):
t = sess.get("completed_at", "")
if t > latest_time:
latest_time = t
latest_meta = sess["meta"]
if latest_meta and isinstance(latest_meta, dict):
payloads = latest_meta.get("payloads", [])
review_text = " ".join(
p.get("text", "") for p in payloads if isinstance(p, dict)
)
self._handle_review_conclusion(
parent_task.id, project_id, review_text, new_round)
except Exception:
logger.exception("Review conclusion handler failed for %s",
parent_task.id)
# 用 spawner.spawn_full_agent 直接 spawn
result = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=review_prompt,
task_id=parent_task.id,
use_main_session=True, # #02: 投递到 main session
task_db_path=None,
on_complete=_on_review_complete,
)
if result is not None:
# spawn 成功 → parent 进入 reviewing(防下个 tick 重复触发)
self._set_parent_reviewing(parent_task.id, project_id)
return True
return False
except Exception as e:
logger.exception("Failed to spawn pangtong review for %s", parent_task.id)
return False
def _set_parent_reviewing(self, parent_id: str, project_id: str):
"""将 parent task 状态设为 reviewing(防重复触发)"""
try:
db_path = self._resolve_db_path(project_id)
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status='reviewing', updated_at=datetime('now') "
"WHERE id=? AND status IN ('done', 'failed')",
(parent_id,))
conn.commit()
logger.info("Parent %s → reviewing (round review in progress)",
parent_id)
finally:
conn.close()
except Exception:
logger.exception("Failed to set parent %s to reviewing", parent_id)
def _handle_review_conclusion(self, parent_id: str, project_id: str,
review_text: str, round_num: int):
"""解析庞统 review 结论,更新 parent 状态
review_text 是庞统回复的文本(从 spawner session meta payloads 拼接)。
"""
db_path = self._resolve_db_path(project_id)
conn = get_connection(db_path)
try:
# 解析 GOAL_ACHIEVED
is_achieved = bool(review_text and "GOAL_ACHIEVED" in review_text.upper())
if is_achieved:
# Goal 达成 → parent 最终完成
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status='done', updated_at=datetime('now') "
"WHERE id=? AND status='reviewing'",
(parent_id,))
conn.commit()
logger.info(
"Parent %s review conclusion: GOAL_ACHIEVED → done",
parent_id)
else:
# 庞统可能创建了新 sub task 或需要继续 → 恢复 working
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status='working', updated_at=datetime('now') "
"WHERE id=? AND status='reviewing'",
(parent_id,))
conn.commit()
sub_count = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE parent_task=?",
(parent_id,)
).fetchone()[0]
logger.info(
"Parent %s review conclusion: continue → working "
"(round %d, subs=%d)",
parent_id, round_num, sub_count)
except Exception:
logger.exception("Failed to handle review conclusion for %s", parent_id)
# 安全恢复:reviewing → working
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status='working', updated_at=datetime('now') "
"WHERE id=? AND status='reviewing'",
(parent_id,))
conn.commit()
except Exception:
pass
finally:
conn.close()
def _resolve_db_path(self, project_id: str) -> Path:
"""解析项目 DB 路径"""
from src.utils import get_data_root
return get_data_root() / project_id / "blackboard.db"
# ------------------------------------------------------------------
# @mention 通知处理 (v2.9 #01)
# ------------------------------------------------------------------
MENTION_MAX_RETRIES = 5
async def _process_mentions(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 pending mentions → spawn 被 @ 的 Agent
流程(§3.4):
1. 扫描 mention_queue 中 pending 且 retry_count < 5 的记录
2. 按 mentioned_agent 分组,同一 agent 多条 mention 合并为一次 spawn
3. 尝试 spawn,成功 → notified,失败 → retry_count++
"""
if not self.spawner:
return []
bb = Blackboard(db_path)
mentions = bb.get_pending_mentions(max_retries=self.MENTION_MAX_RETRIES)
if not mentions:
return []
# 按 mentioned_agent 分组
agent_mentions: Dict[str, List[Dict]] = {}
for m in mentions:
aid = m["mentioned_agent"]
agent_mentions.setdefault(aid, []).append(m)
processed: List[str] = []
for agent_id, items in agent_mentions.items():
try:
# 构建 mention 摘要
mention_lines = []
task_ids = set()
for item in items:
mention_lines.append(
f"- [{item.get('comment_author', '?')}] {item.get('comment_body', '')[:200]}"
)
task_ids.add(item["task_id"])
# 取第一个 task 作为 spawn 上下文
tid = items[0]["task_id"]
task = bb.get_task(tid)
if not task:
continue
# 构建 mention prompt
prompt = self._build_mention_prompt(
agent_id, task, mention_lines, project_id)
# spawn
result = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=prompt,
task_id=tid,
use_main_session=True, # #02: 投递到 main session
)
if result is not None:
# 成功 → 标记所有该 agent 的 mentions 为 notified
for item in items:
bb.mark_mention_notified(item["id"])
processed.append(agent_id)
logger.info("Mention spawn success: %s (%d mentions)", agent_id, len(items))
else:
# spawn 返回 None(其他原因)→ 递增 retry_count
for item in items:
bb.mark_mention_retry(item["id"])
logger.warning("Mention spawn failed: %s, retrying next tick", agent_id)
except AgentBusyError:
# Agent 忙,不递增 retry_count,等下次 tick 自然重试
logger.info("Mention spawn skipped: %s busy, will retry next tick", agent_id)
except Exception as e:
logger.exception("Mention processing error for agent %s", agent_id)
for item in items:
try:
if item.get("retry_count", 0) >= self.MENTION_MAX_RETRIES - 1:
bb.mark_mention_failed(item["id"])
else:
bb.mark_mention_retry(item["id"])
except Exception:
pass
return processed
def _build_mention_prompt(self, agent_id: str, task: Any,
mention_lines: List[str],
project_id: str) -> str:
"""#03: @mention prompt(身份注入)"""
api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083
api_base = f"http://{api_host}:{api_port}/api"
# 获取 Agent 专长
caps = "通用"
try:
if self.dispatcher and self.dispatcher.router:
profile = self.dispatcher.router.agent_profiles.get(agent_id)
if profile and profile.capabilities_zh:
caps = ", ".join(profile.capabilities_zh)
except Exception:
pass
mentions_text = "\n".join(mention_lines[:10])
return f"""你在黑板上被 @ 了。
## 你的身份
你是 {agent_id},专长: {caps}
## 相关讨论
{mentions_text}
## 任务上下文
- 项目: {project_id}
- 任务: {task.title}
- 描述: {task.description or ''}
## 你能做什么
- 读完整上下文: GET {api_base}/projects/{project_id}/tasks/{task.id}?expand=all
- 回应: POST {api_base}/projects/{project_id}/tasks/{task.id}/comments
- 如果讨论收敛到可执行任务,可以创建 sub task
## 约束
- 只回应与你专长相关的内容
- 禁止使用 sessions_send 直接发消息
"""
def _advance_dependencies(self, db_path: Path) -> List[str]:
"""检查 blocked 任务,若所有依赖已完成则推进为 pending
Returns:
被推进的 task_id 列表
"""
queries = Queries(db_path)
blocked = queries.blocked_tasks_with_deps()
advanced: List[str] = []
if not blocked:
return advanced
conn = get_connection(db_path)
try:
for item in blocked:
if item["all_deps_done"]:
task_id = item["task_id"]
ok = self._transition_status(conn, task_id, "pending",
agent="daemon",
detail={"reason": "all_dependencies_done"})
if ok:
advanced.append(task_id)
logger.info("Advanced %s: blocked → pending", task_id)
finally:
conn.close()
return advanced
def _transition_status(self, conn, task_id: str, new_status: str,
agent: str = "daemon",
detail: Optional[Dict] = None) -> bool:
"""轻量状态转换(不走 Blackboard 类,避免 init_db"""
from src.blackboard.db import VALID_TRANSITIONS, TERMINAL_STATUSES, EVENT_TYPES
from datetime import datetime
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return False
old_status = row["status"]
if old_status in TERMINAL_STATUSES or old_status == new_status:
return old_status == new_status
if new_status not in VALID_TRANSITIONS.get(old_status, set()):
return False
now = datetime.utcnow().isoformat()
# 重置到 pending 时清空 assignee(避免残留导致重复路由到同一 Agent)
# 但 Mail 的 assignee 是收件人,永不清空
if new_status == "pending":
if self._current_project_id == "_mail":
conn.execute(
"UPDATE tasks SET status=?, updated_at=? WHERE id=?",
(new_status, now, task_id),
)
else:
conn.execute(
"UPDATE tasks SET status=?, assignee=NULL, resumed_from=NULL, updated_at=? WHERE id=?",
(new_status, now, task_id),
)
elif new_status == "paused":
# 记录暂停前状态,恢复时回到原状态
conn.execute(
"UPDATE tasks SET status=?, resumed_from=?, updated_at=? WHERE id=?",
(new_status, old_status, now, task_id),
)
else:
conn.execute(
"UPDATE tasks SET status=?, updated_at=? WHERE id=?",
(new_status, now, task_id),
)
event_type = f"task_{new_status}"
if event_type not in EVENT_TYPES:
event_type = "daemon_tick"
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, event_type, json.dumps({"from": old_status, "to": new_status, **(detail or {})})),
)
conn.commit()
return True
# ------------------------------------------------------------------
# Agent 调度
# ------------------------------------------------------------------
async def _dispatch_pending(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 pending 任务并调度
v3.0: 两条路径
- 确定性路径:retry/handoff/assignee/生命周期 → 直接 spawn
- 广播认领:无确定性路径 → spawn 所有空闲 Agent,自主 claim
"""
queries = Queries(db_path)
pending = queries.pending_dispatchable()
dispatched: List[str] = []
if not pending:
return dispatched
# 分两批:确定性 vs 广播
deterministic_tasks = []
broadcast_tasks = []
for task in pending[:self.max_dispatch_per_tick]:
decision = self.dispatcher.decide(task)
if decision.get("mode") in ("deterministic", "agent_handoff"):
deterministic_tasks.append((task, decision))
else:
broadcast_tasks.append(task)
# 路径1:确定性路由 → 直接 spawn
for task, decision in deterministic_tasks:
try:
result = await self.dispatcher.dispatch(
task,
project_config={"project_id": project_id, "db_path": db_path},
)
if result["status"] == "dispatched" and result["level"] in ("full", "escalate"):
conn = get_connection(db_path)
try:
# [v2.7.1] Mail 已在 dispatcher 中标 working,跳过 claimed
if project_id == "_mail":
conn.execute(
"UPDATE tasks SET current_agent=? WHERE id=?",
(result["agent_id"], task.id),
)
conn.commit()
dispatched.append(task.id)
logger.info("Dispatched %s to %s (session=%s, mail auto-working)",
task.id, result["agent_id"],
result.get("session_id"))
else:
ok = self._transition_status(
conn, task.id, "claimed",
agent="daemon",
detail={"dispatched_to": result["agent_id"],
"session_id": result.get("session_id")},
)
if ok:
conn.execute(
"UPDATE tasks SET current_agent=? WHERE id=?",
(result["agent_id"], task.id),
)
conn.commit()
dispatched.append(task.id)
logger.info("Dispatched %s to %s (session=%s)",
task.id, result["agent_id"],
result.get("session_id"))
finally:
conn.close()
elif result["status"] == "blocked":
# Guardrail 拦截:任务标记为 blocked
conn = get_connection(db_path)
try:
self._transition_status(
conn, task.id, "blocked",
agent="daemon",
detail={"reason": result.get("reason", "guardrail"),
"violations": result.get("violations", [])},
)
logger.info("Task %s blocked by guardrail: %s",
task.id, result.get("reason", "unknown"))
finally:
conn.close()
except Exception:
logger.exception("Dispatch failed for %s", task.id)
# 路径2:广播认领
if broadcast_tasks:
broadcast_ids = await self._broadcast_claim(broadcast_tasks, db_path, project_id)
dispatched.extend(broadcast_ids)
return dispatched
async def _broadcast_claim(self, tasks: list, db_path: Path,
project_id: str) -> List[str]:
"""广播一批待认领任务给所有空闲 Agent(每轮最多广播一次)
司马懿建议:攒一批任务,一次广播,而非每个任务触发一次广播。
广播前检查全局并发,接近上限时跳过。
"""
if not self.spawner:
return []
# 全局并发检查(司马懿建议 1
if self.counter.is_near_limit():
logger.info("Skipping broadcast: global concurrent near limit (%d/%d)",
self.counter.global_active, self.counter.max_global)
return []
# 过滤掉已广播太多次的任务(retry_count >= 3 → 不广播,等庞统)
broadcastable = []
escalated = []
for t in tasks:
rc = getattr(t, 'retry_count', 0) or 0
if rc >= 3:
escalated.append(t)
else:
broadcastable.append(t)
# 升级庞统
for t in escalated:
conn = get_connection(db_path)
try:
self._transition_status(
conn, t.id, "escalated",
agent="daemon",
detail={"reason": "no_taker_after_3_broadcasts",
"retry_count": getattr(t, 'retry_count', 0)},
)
logger.warning("Escalated %s: no taker after %d broadcasts",
t.id, getattr(t, 'retry_count', 0))
finally:
conn.close()
if not broadcastable:
return []
# 获取空闲 Agent
idle_agents = self._get_idle_agents()
if not idle_agents:
# 无空闲 Agent → 系统容量问题,不递增 retry_count
logger.warning("No idle agents for broadcast, skipping (capacity issue)")
return []
task_ids = [t.id for t in broadcastable]
logger.info("Broadcasting %d tasks to %d idle agents: %s",
len(broadcastable), len(idle_agents), task_ids)
# 广播前为每个任务写审计事件
try:
conn = get_connection(db_path)
try:
for task in broadcastable:
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) "
"VALUES (?,?,?,?)",
(task.id, "daemon", "broadcast_claim",
json.dumps({"agents": idle_agents, "task_title": task.title})),
)
conn.commit()
finally:
conn.close()
except Exception:
pass # 审计记录失败不影响广播
spawned = []
for agent_id in idle_agents:
# v2.7.2: counter 检查在 spawn_full_agent 内部
prompt = self._build_claim_prompt(agent_id, broadcastable, project_id)
try:
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=prompt,
task_id=broadcastable[0].id if broadcastable else None,
task_db_path=db_path,
use_main_session=True, # #02: 投递到 main session
)
spawned.append(agent_id)
logger.info("Broadcast spawned %s (session=%s)", agent_id, session_id)
except AgentBusyError:
logger.debug("Broadcast skip %s: busy", agent_id)
except Exception:
logger.exception("Broadcast spawn failed for %s", agent_id)
return task_ids if spawned else []
def _build_claim_prompt(self, agent_id: str, tasks: list,
project_id: str) -> str:
"""#03: 广播认领 prompt(身份+专长注入)"""
api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083
api_base = f"http://{api_host}:{api_port}/api"
# 获取 Agent 专长
caps = "通用"
try:
if self.dispatcher and self.dispatcher.router:
profile = self.dispatcher.router.agent_profiles.get(agent_id)
if profile and profile.capabilities_zh:
caps = ", ".join(profile.capabilities_zh)
except Exception:
pass
task_list = "\n".join([
f"- ID: {t.id}, 标题: {t.title}, 类型: {getattr(t, 'task_type', '') or 'general'}, "
f"优先级: {t.priority}"
for t in tasks
])
return f"""你是 {agent_id},专长: {caps}
## 待认领任务
{task_list}
## 规则
- 只认领符合你专长的任务。你的专长是 {caps},不适合的任务不要认领
- 不确定时不要认领,留给更合适的 Agent
- 认领后必须写产出物再转 review
- claim 失败说明已被别人认领,NO_REPLY 退出
## 你能做什么
- 读任务详情: GET {api_base}/projects/{project_id}/tasks?status=pending
- 认领: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/claim
body: {{"agent": "{agent_id}"}}
- 认领后标 working: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/status
body: {{"status": "working", "agent": "{agent_id}"}}
- 写产出: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/outputs
body: {{"agent": "{agent_id}", "content_type": "code", "title": "产出标题", "content_path": "/path/to/file", "summary": "简要说明"}}
- 完成后标 review: POST {api_base}/projects/{project_id}/tasks/{{{{TASK_ID}}}}/status
body: {{"status": "review", "agent": "{agent_id}"}}
- 没有适合你的任务则 NO_REPLY
## 约束
- 一个 tick 只认领一个任务
- 禁止使用 sessions_send 直接发消息
"""
@property
def counter(self):
"""从 Dispatcher 获取 counter"""
return getattr(self.dispatcher, 'counter', None) if self.dispatcher else None
@staticmethod
def _is_pid_alive(pid: int) -> bool:
"""检查进程是否存活"""
import os
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False
def _get_idle_agents(self) -> List[str]:
"""获取当前空闲的 Agent 列表(从 config agents 取,而非 counter._per_agent"""
if not self.counter:
return []
# agent_profiles 在 Router 初始化时从 config 填充,是完整 Agent 列表
all_agents = list(self.dispatcher.router.agent_profiles.keys()) if self.dispatcher else []
active = self.counter.active_agents
return [aid for aid in all_agents if active.get(aid, 0) == 0]
async def _dispatch_reviews(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 review 状态任务,检查是否有产出,调度审查 Agent"""
# mail 任务不走 review 流程,直接跳过
if project_id == "_mail":
return []
queries = Queries(db_path)
bb = Blackboard(db_path)
review_tasks = queries.tasks_by_status("review")
dispatched: List[str] = []
for task in review_tasks:
# 检查是否已有 review 记录
reviews = bb.get_reviews(task.id)
if reviews:
continue # 已有审查,跳过
# 检查是否最近已 dispatch 过 review(防重复 dispatch
existing = self._check_recent_routing(db_path, task.id, "review")
if existing:
continue # 已有活跃 review dispatch
# 检查是否有产出(司马懿建议:无产出直接标 failed)
outputs = bb.get_outputs(task.id)
if not outputs:
conn = get_connection(db_path)
try:
self._transition_status(
conn, task.id, "failed",
agent="daemon",
detail={"reason": "no_output_for_review"},
)
bb.add_observation(
task.id, "daemon",
"任务进入 review 状态但没有产出物,自动标记为 failed",
)
logger.warning("Task %s in review but no output, marking failed",
task.id)
finally:
conn.close()
continue
# 调度审查 Agent(司马懿)
try:
result = await self.dispatcher.dispatch(
task,
action_type="review",
project_config={"project_id": project_id, "db_path": db_path},
)
if result["status"] == "dispatched":
dispatched.append(task.id)
# 更新 current_agent 为审查者
conn = get_connection(db_path)
try:
conn.execute(
"UPDATE tasks SET current_agent=? WHERE id=?",
(result["agent_id"], task.id),
)
conn.commit()
finally:
conn.close()
logger.info("Dispatched review for %s to %s",
task.id, result["agent_id"])
except Exception:
logger.exception("Review dispatch failed for %s", task.id)
return dispatched
# ------------------------------------------------------------------
# 僵尸/超时处理
# ------------------------------------------------------------------
def _check_timeouts(self, db_path: Path) -> List[str]:
"""检查 claimed/working 超时的任务"""
queries = Queries(db_path)
reclaimed: List[str] = []
now = datetime.utcnow() # UTC,与 SQLite datetime('now') 一致
# claimed 超时 → 重置为 pending(如果 retry_count >= 3 则升级庞统)
claimed = queries.tasks_by_status("claimed")
for task in claimed:
if not task.claimed_at:
continue
try:
claimed_time = datetime.fromisoformat(task.claimed_at)
elapsed = (now - claimed_time).total_seconds() / 60.0
if elapsed > self.claim_timeout_minutes:
retry_count = getattr(task, 'retry_count', 0) or 0
# 司马懿建议:复用 retry_count>=3 则升级庞统
if retry_count >= 3:
conn = get_connection(db_path)
try:
self._transition_status(
conn, task.id, "escalated",
agent="daemon",
detail={"reason": "claim_timeout_no_taker",
"retry_count": retry_count},
)
reclaimed.append(task.id)
logger.warning("Escalated %s: no taker after %d broadcasts",
task.id, retry_count)
finally:
conn.close()
else:
conn = get_connection(db_path)
try:
ok = self._transition_status(
conn, task.id, "pending",
agent="daemon",
detail={"reason": "claim_timeout",
"elapsed_minutes": round(elapsed, 1)},
)
if ok:
# 递增 retry_count(复用为广播轮次计数)
conn.execute(
"UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?",
(task.id,),
)
conn.commit()
reclaimed.append(task.id)
logger.info("Reclaimed %s: claimed → pending (timeout %.1fm, retry=%d)",
task.id, elapsed, retry_count + 1)
finally:
conn.close()
except (ValueError, TypeError):
pass
# working 超时 → 标记为 failed
working = queries.tasks_by_status("working")
for task in working:
start_time_str = task.started_at or task.claimed_at
if not start_time_str:
continue
try:
start_time = datetime.fromisoformat(start_time_str)
# per-task timeout: deadline 优先,否则用默认值
if task.deadline:
deadline_time = datetime.fromisoformat(task.deadline)
timeout_minutes = (deadline_time - start_time).total_seconds() / 60.0
if timeout_minutes < 1:
timeout_minutes = self.default_task_timeout_minutes
else:
timeout_minutes = self.default_task_timeout_minutes
elapsed = (now - start_time).total_seconds() / 60.0
if elapsed > timeout_minutes:
# [v2.7.1] Mail 幻觉门控兜底:有回复 + working → done
if self._current_project_id == "_mail":
has_reply = self._mail_check_reply(task.id, db_path)
if has_reply:
conn = get_connection(db_path)
try:
ok = self._transition_status(
conn, task.id, "done",
agent="daemon",
detail={"reason": "mail_auto_done_recheck",
"elapsed_minutes": round(elapsed, 1)},
)
if ok:
reclaimed.append(task.id)
logger.info("Mail %s: ticker recheck found reply, marked done (%.1fm)",
task.id, elapsed)
finally:
conn.close()
continue
conn = get_connection(db_path)
try:
ok = self._transition_status(
conn, task.id, "failed",
agent="daemon",
detail={"reason": "task_timeout",
"elapsed_minutes": round(elapsed, 1),
"timeout_minutes": round(timeout_minutes, 1)},
)
if ok:
reclaimed.append(task.id)
logger.warning("Task %s timed out (working %.1fm > %.1fm)",
task.id, elapsed, timeout_minutes)
finally:
conn.close()
except (ValueError, TypeError):
pass
# v2.7.2: 进程存活性检查 — counter 占用但进程已死的兜底
if self.spawner and self.counter and hasattr(self.counter, "active_agents"):
for agent_id in list(self.counter.active_agents.keys()) if hasattr(self.counter, "active_agents") else []:
session_info = self.spawner.get_session_by_agent(agent_id)
if not session_info:
continue
pid = session_info.get("pid")
task_id_check = session_info.get("task_id")
if pid and not self._is_pid_alive(pid):
logger.warning("Agent %s process dead (pid=%d), releasing counter + push back pending",
agent_id, pid)
self.counter.release(agent_id)
# 推回 pending 让 ticker 重新 dispatch
if task_id_check and db_path:
try:
conn = get_connection(db_path)
try:
self._transition_status(
conn, task_id_check, "pending",
agent="daemon",
detail={"reason": "process_dead", "pid": pid},
)
finally:
conn.close()
except Exception:
logger.exception("Failed to push back task %s", task_id_check)
return reclaimed
def _mail_check_reply(self, original_task_id: str, db_path: Path) -> bool:
"""Mail 幻觉门控:检查是否有回复邮件"""
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1",
(original_task_id, f'%{original_task_id}%'),
).fetchone()
return row is not None
finally:
conn.close()
except Exception as e:
logger.error("Mail %s: ticker reply check error: %s", original_task_id, e)
return True # 保守:查询失败假设有回复
def _check_recent_routing(self, db_path: Path, task_id: str,
action_type: str) -> bool:
"""检查最近 5 分钟内是否已 dispatch 过指定类型的路由(防重复)"""
try:
conn = get_connection(db_path)
try:
# 检查是否有 from_status=review 的 dispatched 记录(防止重复 review dispatch
if action_type == "review":
row = conn.execute(
"SELECT COUNT(*) as cnt FROM routing_decisions "
"WHERE task_id=? AND outcome='dispatched' "
"AND from_status='review' "
"AND created_at > datetime('now', '-5 minutes')",
(task_id,),
).fetchone()
else:
row = conn.execute(
"SELECT COUNT(*) as cnt FROM routing_decisions "
"WHERE task_id=? AND outcome='dispatched' "
"AND created_at > datetime('now', '-5 minutes')",
(task_id,),
).fetchone()
return row["cnt"] > 0 if row else False
finally:
conn.close()
except Exception:
return False
# ------------------------------------------------------------------
# 手动 tickAPI 端点触发)
# ------------------------------------------------------------------
async def manual_tick(self) -> Dict[str, Any]:
"""手动触发一次 tick"""
logger.info("Manual tick triggered")
result = await self.tick()
result["manual"] = True
return result