From 6a3808b5cf6e4aba7a95ad80a2795585c033704c Mon Sep 17 00:00:00 2001 From: cfdaily Date: Thu, 21 May 2026 11:52:57 +0800 Subject: [PATCH] auto-sync: 2026-05-21 11:52:57 --- src/daemon/ticker.py | 114 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 111 insertions(+), 3 deletions(-) diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index 634ae07..5576488 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -415,7 +415,12 @@ class Ticker: async def _dispatch_pending(self, db_path: Path, project_id: str) -> List[str]: - """扫描 pending 任务并调度""" + """扫描 pending 任务并调度 + + v3.0: 两条路径 + - 确定性路径:retry/handoff/assignee/生命周期 → 直接 spawn + - 广播认领:无确定性路径 → spawn 所有空闲 Agent,自主 claim + """ queries = Queries(db_path) pending = queries.pending_dispatchable() dispatched: List[str] = [] @@ -423,14 +428,25 @@ class Ticker: if not pending: return dispatched + # 分两批:确定性 vs 广播 + deterministic_tasks = [] + broadcast_tasks = [] + for task in pending[:self.max_dispatch_per_tick]: + decision = self.dispatcher.decide(task) + if decision.get("mode") in ("deterministic", "agent_handoff"): + deterministic_tasks.append((task, decision)) + else: + broadcast_tasks.append(task) + + # 路径1:确定性路由 → 直接 spawn + for task, decision in deterministic_tasks: try: result = await self.dispatcher.dispatch( task, project_config={"project_id": project_id, "db_path": db_path}, ) if result["status"] == "dispatched" and result["level"] in ("full", "escalate"): - # 标记为 claimed + 更新 current_agent conn = get_connection(db_path) try: ok = self._transition_status( @@ -440,7 +456,6 @@ class Ticker: "session_id": result.get("session_id")}, ) if ok: - # 更新 current_agent(Router 审查时用) conn.execute( "UPDATE tasks SET current_agent=? WHERE id=?", (result["agent_id"], task.id), @@ -455,8 +470,101 @@ class Ticker: except Exception: logger.exception("Dispatch failed for %s", task.id) + # 路径2:广播认领 + if broadcast_tasks: + broadcast_ids = await self._broadcast_claim(broadcast_tasks, db_path, project_id) + dispatched.extend(broadcast_ids) + return dispatched + async def _broadcast_claim(self, tasks: list, db_path: Path, + project_id: str) -> List[str]: + """广播一批待认领任务给所有空闲 Agent(每轮最多广播一次) + + 司马懿建议:攒一批任务,一次广播,而非每个任务触发一次广播。 + 广播前检查全局并发,接近上限时跳过。 + """ + if not self.counter or not self.spawner: + return [] + + # 全局并发检查(司马懿建议 1) + if self.counter.global_active >= self.counter._max_global - 1: + logger.info("Skipping broadcast: global concurrent near limit (%d/%d)", + self.counter.global_active, self.counter._max_global) + return [] + + # 获取空闲 Agent + idle_agents = self._get_idle_agents() + if not idle_agents: + return [] + + task_ids = [t.id for t in tasks] + logger.info("Broadcasting %d tasks to %d idle agents: %s", + len(tasks), len(idle_agents), task_ids) + + spawned = [] + for agent_id in idle_agents: + if not await self.counter.can_acquire(agent_id): + continue + prompt = self._build_claim_prompt(agent_id, tasks, project_id) + try: + await self.counter.acquire(agent_id) + session_id = await self.spawner.spawn_full_agent( + agent_id=agent_id, + message=prompt, + on_complete=lambda aid, _: self.counter.release(aid), + ) + spawned.append(agent_id) + logger.info("Broadcast spawned %s (session=%s)", agent_id, session_id) + except Exception: + self.counter.release(agent_id) + logger.exception("Broadcast spawn failed for %s", agent_id) + + return task_ids if spawned else [] + + def _build_claim_prompt(self, agent_id: str, tasks: list, + project_id: str) -> str: + """构建广播认领的 prompt""" + api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1' + api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083 + + task_list = "\n".join([ + f"- ID: {t.id}, 标题: {t.title}, 类型: {getattr(t, 'task_type', '') or 'general'}, " + f"优先级: {t.priority}" + for t in tasks + ]) + + return f"""你是 {agent_id}。黑板上有 {len(tasks)} 个待认领任务。 + +## 待认领任务 +{task_list} + +## 操作 +1. 读黑板查看详情: + curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks?status=pending + +2. 选择适合你的任务并认领: + curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/TASK_ID/claim \\ + -H 'Content-Type: application/json' -d '{{"agent": "{agent_id}"}}' + +3. 认领成功后开始执行: + curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/TASK_ID/status \\ + -H 'Content-Type: application/json' -d '{{"status": "working", "agent": "{agent_id}"}}' + +4. 没有适合你的任务则退出 +""" + + def _get_idle_agents(self) -> List[str]: + """获取当前空闲的 Agent 列表""" + if not self.counter: + return [] + idle = [] + for agent_id in self.counter._per_agent: + active = self.counter.active_agents.get(agent_id, 0) + if active == 0: + idle.append(agent_id) + return idle + async def _dispatch_reviews(self, db_path: Path, project_id: str) -> List[str]: """扫描 review 状态任务,检查是否有产出,调度审查 Agent"""