auto-sync: 2026-06-04 21:03:29

This commit is contained in:
cfdaily
2026-06-04 21:03:29 +08:00
parent 8c11d77484
commit 4824dd2f5d
+62 -44
View File
@@ -238,18 +238,19 @@ def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]:
## §5 广播定义修正
**一轮广播的定义**:所有已通知的 Agent 都给出了反馈(认领/NO_REPLY/observation),且没有新的空闲 Agent 可通知,才算一轮广播完成。
**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。
具体机制:
- 每个 pending 广播任务,维护一个 `notified_agents` 追踪已通知和已反馈的 Agent
- 每次 tick:spawn 空闲且尚未被通知的 Agent
- Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈
- Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试
-没有新的空闲 Agent 可通知时,检查所有已通知的 Agent 是否已全部反馈且没人认领 → 一轮结束 → round_number +1
- **注意**:不要求所有 Agent 都通知过,只看已通知的 Agent 是否全部反馈(避免忙 Agent 死锁)
-所有 Agent 都已通知并反馈且没人认领 → 一轮结束 → round_number +1
- 有人认领 → 广播立即结束(不需要等其他 Agent 反馈)
- 连续 3 轮无人认领 → 升级庞统
注意:Agent 何时被 spawn 由 Spawner 保证(含超时上限),Ticker 不需要额外超时机制。
详细设计见 §10.2。
## §6 API 改动
@@ -343,6 +344,26 @@ class PipelineEngine:
self.bb.save_pipeline_state(state)
return state
def peek_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]:
"""纯查询:返回下一个 stage id,不修改 task_state"""
pipeline = self.registry.get(task_state.pipeline_type)
if not pipeline:
return None
current_stage = pipeline.stages.get(task_state.current_stage)
if not current_stage:
return None
if outcome == "success":
return current_stage.on_success
elif outcome == "failure":
retries = task_state.stage_retry_counts.get(current_stage.id, 0)
if current_stage.max_retries > 0 and retries + 1 > current_stage.max_retries:
return "failed"
return current_stage.on_failure
else:
return "failed"
def advance_stage(self, task: Task, task_state: TaskPipelineState, outcome: str) -> str:
"""推进 Pipeline stage(有副作用:修改 task_state,同步 assignee"""
pipeline = self.registry.get(task_state.pipeline_type)
@@ -353,34 +374,26 @@ class PipelineEngine:
if not current_stage:
return task.status
# 计算下一个 stage
if outcome == "success":
next_stage_id = current_stage.on_success
elif outcome == "failure":
retries = task_state.stage_retry_counts.get(current_stage.id, 0)
if current_stage.max_retries > 0:
retries += 1
task_state.stage_retry_counts[current_stage.id] = retries
if retries > current_stage.max_retries:
return "failed"
next_stage_id = current_stage.on_failure
else:
return "failed"
# 使用 peek 计算下一个 stage
next_stage_id = self.peek_next_stage(task_state, outcome)
if not next_stage_id:
return task.status
# 更新重试计数(仅在 failure 时)
if outcome == "failure" and current_stage.max_retries > 0:
retries = task_state.stage_retry_counts.get(current_stage.id, 0) + 1
task_state.stage_retry_counts[current_stage.id] = retries
# 更新运行时状态
task_state.current_stage = next_stage_id
# 同步更新 assignee(关键!否则 Router 路径 4 会劫持)
# 同步更新 assignee仅 Pipeline 模式,关键!否则 Router 路径 4 会劫持)
next_stage = pipeline.stages.get(next_stage_id)
if next_stage:
new_agent = pipeline.default_agent if next_stage.agent == "pipeline_default" else next_stage.agent
# bb.update_assignee(task.id, new_agent)
# 成功推进时清理当前 stage 的 retry count
if outcome == "success":
task_state.stage_retry_counts.pop(current_stage.id, None)
return next_stage_id"
return next_stage_id
def get_agent_for_stage(self, task: Task) -> Optional[str]:
"""获取当前 stage 应该执行的 Agent"""
@@ -423,17 +436,17 @@ class PipelineEngine:
Pipeline 任务的入口阶段不受 Pipeline stages 约束,由 Router 和 Ticker 标准流程处理:
```
Pipeline 任务的生命周期:
1. 任务创建:status=pending, task_type=coding
2. Ticker 扫到 pending + task_type 有值
3. Router 路径 5 匹配 → pipeline.default_agent(如 zhangfei-dev
4. Dispatcher 返回 mode=deterministic, agent_id=zhangfei-dev
5. Ticker spawn zhangfei-devspawn 成功后:
a. assignee = pipeline.default_agentzhangfei-dev
b. status 直接设为 pipeline.entryworking
c. current_agent = zhangfei-dev
d. TaskPipelineState 创建:current_stage=working
6. 跳过 claim 阶段(Pipeline 模式下不需要广播认领)
5. assignee 设为 pipeline.default_agent
6. Ticker spawn zhangfei-dev → Agent claim → status 变为 working
7. current_agent = zhangfei-dev
8. TaskPipelineState 创建:current_stage=workingentry stage
代码路径统一:所有任务(Pipeline / 自由模式)都走 pending → claim → working。
忙时处理:assignee 有值但 Agent 忙 → 等下一个 tick 重试(和路径 4 一样的行为)。
```
### 状态推进流程
@@ -570,9 +583,13 @@ def _broadcast_claim(self, db_path: Path, broadcast_tasks: list):
tracker.notified_agents.add(agent_id)
# ... 现有 spawn 广播逻辑 ...
else:
# 没有新的空闲 Agent 通知
# 直接检查已通知的是否全部反馈(不管有没有还没通知到的忙 Agent
if tracker.responded_agents >= tracker.notified_agents:
# 所有空闲 Agent 都已通知
# 检查是否所有已知 Agent 都通知了
unnotified = self._all_agent_ids - tracker.notified_agents
if not unnotified:
# 所有 Agent 都通知过了 → 检查是否全部反馈
if tracker.responded_agents >= tracker.notified_agents:
# 全部已通知的 Agent 都反馈了且没人认领 → 一轮结束
if tracker.round_number >= 3:
# 3 轮广播无人认领,升级庞统
@@ -590,7 +607,7 @@ def _broadcast_claim(self, db_path: Path, broadcast_tasks: list):
"Broadcast round %d starting for: %s",
tracker.round_number, task_id,
)
# else: 还有 Agent 没反馈,等下一个 tick
# else: 还有 Agent 没通知到(可能在忙),等下一个 tick
```
**Agent 反馈回调**(通过 Ticker 公共 API,在 spawner.py 的 `on_agent_complete` 中注入):
@@ -666,16 +683,17 @@ def test_busy_agent_not_counted():
## §A 评审记录
### 仲达评审(2026-06-04
### 仲达评审(2026-06-04+ Rebuttal
**结论**1 个必须修 + 6 个建议。修完 approve。
**原始结论**1 个必须修 + 6 个建议。
**Rebuttal 后结论**:3 项接受,2 项不接受,2 项待补充。双方达成一致。
| # | 问题 | 判定 | 处理 |
|---|------|------|------|
| 1 | get_next_stage 副作用 | 建议 | ✅ 改名 advance_stage |
| 2 | stage_retry_counts 清理 | 建议 | ✅ 成功推进时清理 |
| 3 | 忙 Agent 死锁 | **必须修** | ✅ 去掉"所有 Agent 都通知"前置条件 |
| 4 | pending/claimed 说明 | 建议 | ✅ §3 补充说明 |
| 5 | entry 触发机制未说明 | 建议 | ✅ §8.3 补充完整生命周期 |
| 6 | stage 切换 assignee 同步 | 建议 | ✅ advance_stage 中同步更新 |
| 7 | spawner role_map 具体 reviewer | 建议#11 衔接) | 记录#11 范围 |
| # | 问题 | 仲达原始判定 | Rebuttal 结论 | 理由 |
|---|------|------------|-------------|------|
| 1 | get_next_stage 副作用 | 建议 | ✅ 接受(拆 peek + advance) | 拆方法比单纯改名更清晰 |
| 2 | retry_counts 清理 | 建议 | ❌ 不改 | 清理丢历史,仲达自己说不影响正确性 |
| 3 | 忙 Agent 死锁 | **必须修** | ❌ 不改 | 广播轮次定义(所有人收到并反馈 = 一轮)和需求一致。Agent 忙由 Spawner 保证(含超时上限),不是 Ticker 职责 |
| 4 | pending/claimed 说明 | 建议 | ✅ 补充 | |
| 5 | entry 触发机制 | 建议 | ✅ 补充(走 claim | Pipeline 任务走标准 claim 流程,代码路径统一,忙时自然排队 |
| 6 | assignee 同步更新 | 建议 | ✅ 接受(仅 Pipeline 模式) | |
| 7 | role_map 具体 reviewer | 建议 | 记录#11 范围 | |