auto-sync: 2026-05-21 11:52:57

This commit is contained in:
cfdaily
2026-05-21 11:52:57 +08:00
parent b21d496608
commit 6a3808b5cf
+111 -3
View File
@@ -415,7 +415,12 @@ class Ticker:
async def _dispatch_pending(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 pending 任务并调度"""
"""扫描 pending 任务并调度
v3.0: 两条路径
- 确定性路径:retry/handoff/assignee/生命周期 → 直接 spawn
- 广播认领:无确定性路径 → spawn 所有空闲 Agent,自主 claim
"""
queries = Queries(db_path)
pending = queries.pending_dispatchable()
dispatched: List[str] = []
@@ -423,14 +428,25 @@ class Ticker:
if not pending:
return dispatched
# 分两批:确定性 vs 广播
deterministic_tasks = []
broadcast_tasks = []
for task in pending[:self.max_dispatch_per_tick]:
decision = self.dispatcher.decide(task)
if decision.get("mode") in ("deterministic", "agent_handoff"):
deterministic_tasks.append((task, decision))
else:
broadcast_tasks.append(task)
# 路径1:确定性路由 → 直接 spawn
for task, decision in deterministic_tasks:
try:
result = await self.dispatcher.dispatch(
task,
project_config={"project_id": project_id, "db_path": db_path},
)
if result["status"] == "dispatched" and result["level"] in ("full", "escalate"):
# 标记为 claimed + 更新 current_agent
conn = get_connection(db_path)
try:
ok = self._transition_status(
@@ -440,7 +456,6 @@ class Ticker:
"session_id": result.get("session_id")},
)
if ok:
# 更新 current_agentRouter 审查时用)
conn.execute(
"UPDATE tasks SET current_agent=? WHERE id=?",
(result["agent_id"], task.id),
@@ -455,8 +470,101 @@ class Ticker:
except Exception:
logger.exception("Dispatch failed for %s", task.id)
# 路径2:广播认领
if broadcast_tasks:
broadcast_ids = await self._broadcast_claim(broadcast_tasks, db_path, project_id)
dispatched.extend(broadcast_ids)
return dispatched
async def _broadcast_claim(self, tasks: list, db_path: Path,
project_id: str) -> List[str]:
"""广播一批待认领任务给所有空闲 Agent(每轮最多广播一次)
司马懿建议:攒一批任务,一次广播,而非每个任务触发一次广播。
广播前检查全局并发,接近上限时跳过。
"""
if not self.counter or not self.spawner:
return []
# 全局并发检查(司马懿建议 1
if self.counter.global_active >= self.counter._max_global - 1:
logger.info("Skipping broadcast: global concurrent near limit (%d/%d)",
self.counter.global_active, self.counter._max_global)
return []
# 获取空闲 Agent
idle_agents = self._get_idle_agents()
if not idle_agents:
return []
task_ids = [t.id for t in tasks]
logger.info("Broadcasting %d tasks to %d idle agents: %s",
len(tasks), len(idle_agents), task_ids)
spawned = []
for agent_id in idle_agents:
if not await self.counter.can_acquire(agent_id):
continue
prompt = self._build_claim_prompt(agent_id, tasks, project_id)
try:
await self.counter.acquire(agent_id)
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=prompt,
on_complete=lambda aid, _: self.counter.release(aid),
)
spawned.append(agent_id)
logger.info("Broadcast spawned %s (session=%s)", agent_id, session_id)
except Exception:
self.counter.release(agent_id)
logger.exception("Broadcast spawn failed for %s", agent_id)
return task_ids if spawned else []
def _build_claim_prompt(self, agent_id: str, tasks: list,
project_id: str) -> str:
"""构建广播认领的 prompt"""
api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083
task_list = "\n".join([
f"- ID: {t.id}, 标题: {t.title}, 类型: {getattr(t, 'task_type', '') or 'general'}, "
f"优先级: {t.priority}"
for t in tasks
])
return f"""你是 {agent_id}。黑板上有 {len(tasks)} 个待认领任务。
## 待认领任务
{task_list}
## 操作
1. 读黑板查看详情:
curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks?status=pending
2. 选择适合你的任务并认领:
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/TASK_ID/claim \\
-H 'Content-Type: application/json' -d '{{"agent": "{agent_id}"}}'
3. 认领成功后开始执行:
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/TASK_ID/status \\
-H 'Content-Type: application/json' -d '{{"status": "working", "agent": "{agent_id}"}}'
4. 没有适合你的任务则退出
"""
def _get_idle_agents(self) -> List[str]:
"""获取当前空闲的 Agent 列表"""
if not self.counter:
return []
idle = []
for agent_id in self.counter._per_agent:
active = self.counter.active_agents.get(agent_id, 0)
if active == 0:
idle.append(agent_id)
return idle
async def _dispatch_reviews(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 review 状态任务,检查是否有产出,调度审查 Agent"""