725 lines
27 KiB
Markdown
725 lines
27 KiB
Markdown
# Pipeline 设计专题
|
||
|
||
> 版本: v2.1
|
||
> 日期: 2026-06-04
|
||
> 作者: 庞统(副军师)
|
||
> 状态: **评审中**
|
||
|
||
## §1 背景
|
||
|
||
当前 moziplus v2 的任务调度只有两条路径:确定性路由(Router 四条快速路径)和广播认领(spawn 所有空闲 Agent)。Router 不看 task_type,导致 coding 任务可能走入广播路径且无人认领时无限循环。
|
||
|
||
设计目标:
|
||
1. 支持用户指定任务执行流程(Pipeline)
|
||
2. 不指定时走广播认领(自由模式)
|
||
3. 广播认领有合理的轮次限制
|
||
4. Pipeline 支持灵活流转:顺序、分支、退回、循环重试
|
||
|
||
## §2 核心决策
|
||
|
||
**D1: Pipeline 模式 vs 自由模式二选一**
|
||
- 指定了 Pipeline → 硬约束模式,Agent 必须按 Pipeline 定义的流程执行,不可偏离
|
||
- 不指定 Pipeline(None/general)→ 自由模式,广播认领,Agent 自主决定怎么做,只有基本完成标准(产出物 + handoff)
|
||
|
||
**D2: Pipeline 是强工作流,但支持灵活流转**
|
||
- 指定了 Pipeline 的任务,状态机是硬约束,执行步骤也是硬约束
|
||
- Agent 必须走 Pipeline 定义的所有步骤,不能跳过
|
||
- 但 Pipeline 本身可以定义灵活的流转规则:条件分支、退回重做、循环重试
|
||
- 没有"可偏离"的中间地带——偏离必须是 Pipeline 预定义的路径
|
||
|
||
**D3: @mention 在 Pipeline 模式下只做通知,不改变执行者**
|
||
- Pipeline 模式(强工作流):执行者由 Pipeline 决定(coding → zhangfei-dev),@mention 只是在黑板 comment 通知该 Agent,不改变执行者
|
||
- 自由模式:@mention 决定执行者(和现有逻辑一致)
|
||
- 理由:强工作流意味着流程和角色都是确定的,让数据专家走 coding 流程不合理
|
||
|
||
**D4: task_type 默认值改为 None**
|
||
- API 层 `task_type=body.get("task_type", None)` 而非默认 "coding"
|
||
- 不指定 = 自由模式,走广播认领
|
||
- 指定了 = Pipeline 模式
|
||
|
||
**D5: 两个入口**
|
||
1. **前端入口**:创建任务时下拉菜单选择 Pipeline,默认 none
|
||
2. **Chat 入口**:用户通过自然语言提需求,庞统判断是否需要指定 Pipeline。拿不准时询问用户
|
||
|
||
## §3 Pipeline 定义(灵活流转)
|
||
|
||
每个 Pipeline 是一个**有向状态机**,支持顺序、条件分支、退回、循环重试。
|
||
|
||
### 3.1 数据结构
|
||
|
||
```python
|
||
@dataclass
|
||
class PipelineStage:
|
||
id: str # 步骤标识(如 "working", "review")
|
||
agent: str # 执行者:固定 agent_id 或 "pipeline_default"
|
||
on_success: str # 成功后转到哪个 stage(或 "done"/"failed")
|
||
on_failure: str # 失败后转到哪个 stage(可指向前序 stage = 退回)
|
||
max_retries: int = 0 # 最大退回重试次数(0=不限,超限转 failed)
|
||
is_terminal: bool = False # 是否终态
|
||
# 注意:retry_count 不在此处——PipelineStage 是注册表模板,所有同类型任务共享
|
||
|
||
@dataclass
|
||
class TaskPipelineState:
|
||
"""每个任务的 Pipeline 运行时状态(存在 DB 或内存)"""
|
||
task_id: str
|
||
pipeline_type: str
|
||
current_stage: str
|
||
stage_retry_counts: Dict[str, int] = field(default_factory=dict) # stage_id → 已重试次数
|
||
|
||
@dataclass
|
||
class Pipeline:
|
||
task_type: str # 类型标识
|
||
default_agent: str # 默认执行者
|
||
entry: str # 入口 stage id
|
||
stages: Dict[str, PipelineStage] # stage id → PipelineStage
|
||
```
|
||
|
||
### 3.2 流转规则
|
||
|
||
| 流转类型 | 实现方式 | 示例 |
|
||
|---------|---------|------|
|
||
| **顺序** | `on_success` 指向下一个 stage | working → review → done |
|
||
| **条件分支** | `on_success` / `on_failure` 指向不同 stage | review 通过→done,review 拒绝→working |
|
||
| **退回** | `on_failure` 指向前序 stage | review 失败退回 working |
|
||
| **循环/重试** | `max_retries` + `TaskPipelineState.stage_retry_counts` 控制 | review 最多退回 3 次,超过转 failed |
|
||
|
||
异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。
|
||
|
||
**认领阶段不受 Pipeline 约束**:Pipeline 约束的是工作阶段(working/review/verify 等)。认领阶段(pending → claimed → entry stage)不受 Pipeline stages 约束,由 Router 和 Ticker 的标准流程处理。
|
||
|
||
### 3.3 Pipeline 示例
|
||
|
||
**coding pipeline(multi_step,含退回重试)**:
|
||
|
||
```yaml
|
||
task_type: coding
|
||
default_agent: zhangfei-dev
|
||
entry: working
|
||
|
||
stages:
|
||
working:
|
||
agent: pipeline_default
|
||
on_success: review
|
||
on_failure: failed
|
||
|
||
review:
|
||
agent: simayi-challenger
|
||
on_success: done
|
||
on_failure: working # review 失败退回重做
|
||
max_retries: 3 # 最多退回 3 次,超过转 failed
|
||
```
|
||
|
||
流转图:
|
||
```
|
||
pending → claimed → working ──→ review ──→ done
|
||
↑ │
|
||
└────────────┘ (on_failure, 最多3次)
|
||
│ >3次
|
||
└──→ failed
|
||
```
|
||
|
||
**data pipeline(single_step,简单顺序)**:
|
||
|
||
```yaml
|
||
task_type: data
|
||
default_agent: zhaoyun-data
|
||
entry: working
|
||
|
||
stages:
|
||
working:
|
||
agent: pipeline_default
|
||
on_success: done
|
||
on_failure: failed
|
||
```
|
||
|
||
**review pipeline(单步,固定 agent)**:
|
||
|
||
```yaml
|
||
task_type: review
|
||
default_agent: simayi-challenger
|
||
entry: working
|
||
|
||
stages:
|
||
working:
|
||
agent: simayi-challenger
|
||
on_success: done
|
||
on_failure: failed
|
||
```
|
||
|
||
**deploy pipeline(含验证循环)**:
|
||
|
||
```yaml
|
||
task_type: deploy
|
||
default_agent: jiangwei-infra
|
||
entry: working
|
||
|
||
stages:
|
||
working:
|
||
agent: pipeline_default
|
||
on_success: verify
|
||
on_failure: failed
|
||
|
||
verify:
|
||
agent: pipeline_default # 部署者自验
|
||
on_success: done
|
||
on_failure: working # 验证失败退回重新部署
|
||
max_retries: 2 # 最多重试 2 次
|
||
```
|
||
|
||
**risk_check pipeline(单步)**:
|
||
|
||
```yaml
|
||
task_type: risk_check
|
||
default_agent: guanyu-dev
|
||
entry: working
|
||
|
||
stages:
|
||
working:
|
||
agent: pipeline_default
|
||
on_success: done
|
||
on_failure: failed
|
||
```
|
||
|
||
**自由模式(None/不指定)**:
|
||
|
||
| 项目 | 值 |
|
||
|------|-----|
|
||
| default_agent | 广播认领 |
|
||
| stages | pending → claimed → working → review → done(基本状态机,无 Pipeline 约束) |
|
||
|
||
### 3.4 初始 Pipeline 注册表
|
||
|
||
| task_type | 类型 | default_agent | stages | 退回 |
|
||
|-----------|------|---------------|--------|------|
|
||
| `coding` | multi_step | `zhangfei-dev` | working → review → done | review→working (max 3) |
|
||
| `review` | single_step | `simayi-challenger` | working → done | 无 |
|
||
| `data` | single_step | `zhaoyun-data` | working → done | 无 |
|
||
| `deploy` | multi_step | `jiangwei-infra` | working → verify → done | verify→working (max 2) |
|
||
| `risk_check` | single_step | `guanyu-dev` | working → done | 无 |
|
||
| None | 自由模式 | 广播认领 | 基本状态机 | 无 |
|
||
|
||
## §4 路由逻辑(更新 Router)
|
||
|
||
Router 新增路径 5:按 task_type 匹配 Pipeline
|
||
|
||
```
|
||
路径1: 本地操作 (action_type ∈ LOCAL_ACTIONS)
|
||
路径2: retry → 原执行者 (action_type == "retry")
|
||
Mode B: Agent 声明式交接 (next_capability 有值)
|
||
路径3: 生命周期流转 (status ∈ LIFECYCLE_CAPABILITY)
|
||
路径4: 有 assignee → 直接给
|
||
路径5(新增): task_type 有值 → 查 Pipeline 注册表 → 用 Pipeline 的 default_agent
|
||
- @mention 不覆盖执行者(Pipeline 模式下 @mention 只做通知)
|
||
- mode = "deterministic"
|
||
模糊场景: delegate 庞统 → 广播
|
||
```
|
||
|
||
路径 5 的优先级低于路径 4(assignee)和 Mode B(声明式交接),高于模糊场景。
|
||
|
||
路径 5 的具体逻辑:
|
||
```python
|
||
# router.py 新增
|
||
def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]:
|
||
task_type = task_info.get("task_type")
|
||
if not task_type:
|
||
return None
|
||
|
||
pipeline = self.pipeline_registry.get(task_type)
|
||
if not pipeline:
|
||
return None
|
||
|
||
return RouteDecision(
|
||
agent_id=pipeline.default_agent,
|
||
mode="deterministic",
|
||
confidence=0.85,
|
||
reason=f"Pipeline match: {task_type} → {pipeline.default_agent}",
|
||
)
|
||
```
|
||
|
||
## §5 广播定义修正
|
||
|
||
**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。
|
||
|
||
具体机制:
|
||
- 每个 pending 广播任务,维护一个 `notified_agents` 追踪已通知和已反馈的 Agent
|
||
- 每次 tick:spawn 空闲且尚未被通知的 Agent
|
||
- Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈
|
||
- Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试
|
||
- 当所有 Agent 都已通知并反馈且没人认领 → 一轮结束 → round_number +1
|
||
- 有人认领 → 广播立即结束(不需要等其他 Agent 反馈)
|
||
- 连续 3 轮无人认领 → 升级庞统
|
||
|
||
注意:Agent 何时被 spawn 由 Spawner 保证(含超时上限),Ticker 不需要额外超时机制。
|
||
|
||
详细设计见 §10.2。
|
||
|
||
## §6 API 改动
|
||
|
||
### §6.1 修改现有端点
|
||
|
||
`POST /api/projects/{pid}/tasks`:
|
||
- `task_type` 默认改为 `None`(不再是 "coding")
|
||
- 前端传 task_type 时走 Pipeline,不传时走自由模式
|
||
|
||
### §6.2 新增端点
|
||
|
||
`GET /api/pipelines`:
|
||
- 返回可用 Pipeline 列表(从注册表加载)
|
||
- 供前端下拉菜单使用
|
||
- 响应格式:
|
||
```json
|
||
{
|
||
"pipelines": [
|
||
{"task_type": "coding", "label": "编码开发", "description": "多步:开发→审查→完成"},
|
||
{"task_type": "review", "label": "代码审查", "description": "单步:审查→完成"},
|
||
{"task_type": "data", "label": "数据获取", "description": "单步:执行→完成"},
|
||
{"task_type": "deploy", "label": "部署", "description": "多步:部署→验证→完成"},
|
||
{"task_type": "risk_check", "label": "风控检查", "description": "单步:检查→完成"}
|
||
]
|
||
}
|
||
```
|
||
|
||
## §7 前端改动
|
||
|
||
1. TaskModal 创建任务时:
|
||
- 新增 Pipeline 下拉选择(默认"自由模式")
|
||
- 选项从 `/api/pipelines` 动态加载
|
||
- 选中 Pipeline 后显示简要流程说明
|
||
|
||
2. Chat 入口:
|
||
- 庞统在需求探索时,判断是否需要指定 Pipeline
|
||
- 拿不准时通过 checkpoint 询问用户
|
||
|
||
## §8 Pipeline 引擎
|
||
|
||
### §8.1 注册表
|
||
|
||
```python
|
||
class PipelineRegistry:
|
||
"""Pipeline 注册表"""
|
||
|
||
def __init__(self):
|
||
self._pipelines: Dict[str, Pipeline] = {}
|
||
self._load_defaults()
|
||
|
||
def _load_defaults(self):
|
||
"""加载内置 Pipeline"""
|
||
for cfg in DEFAULT_PIPELINES:
|
||
p = self._parse_pipeline(cfg)
|
||
self._pipelines[p.task_type] = p
|
||
|
||
def get(self, task_type: str) -> Optional[Pipeline]:
|
||
return self._pipelines.get(task_type)
|
||
|
||
def list_all(self) -> List[Pipeline]:
|
||
return list(self._pipelines.values())
|
||
```
|
||
|
||
### §8.2 状态流转校验
|
||
|
||
```python
|
||
class PipelineEngine:
|
||
"""Pipeline 执行引擎"""
|
||
|
||
def __init__(self, registry: PipelineRegistry, bb: Blackboard):
|
||
self.registry = registry
|
||
self.bb = bb
|
||
|
||
def get_or_create_state(self, task: Task) -> Optional[TaskPipelineState]:
|
||
"""获取或创建任务的 Pipeline 运行时状态"""
|
||
pipeline_type = task.task_type
|
||
if not pipeline_type:
|
||
return None # 自由模式
|
||
pipeline = self.registry.get(pipeline_type)
|
||
if not pipeline:
|
||
return None
|
||
# 从 DB 或内存加载;首次则初始化
|
||
state = self.bb.get_pipeline_state(task.id)
|
||
if not state:
|
||
state = TaskPipelineState(
|
||
task_id=task.id,
|
||
pipeline_type=pipeline_type,
|
||
current_stage=pipeline.entry,
|
||
)
|
||
self.bb.save_pipeline_state(state)
|
||
return state
|
||
|
||
def peek_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]:
|
||
"""纯查询:返回下一个 stage id,不修改 task_state"""
|
||
pipeline = self.registry.get(task_state.pipeline_type)
|
||
if not pipeline:
|
||
return None
|
||
|
||
current_stage = pipeline.stages.get(task_state.current_stage)
|
||
if not current_stage:
|
||
return None
|
||
|
||
if outcome == "success":
|
||
return current_stage.on_success
|
||
elif outcome == "failure":
|
||
retries = task_state.stage_retry_counts.get(current_stage.id, 0)
|
||
if current_stage.max_retries > 0 and retries + 1 > current_stage.max_retries:
|
||
return "failed"
|
||
return current_stage.on_failure
|
||
else:
|
||
return "failed"
|
||
|
||
def advance_stage(self, task: Task, task_state: TaskPipelineState, outcome: str) -> str:
|
||
"""推进 Pipeline stage(有副作用:修改 task_state,同步 assignee)"""
|
||
pipeline = self.registry.get(task_state.pipeline_type)
|
||
if not pipeline:
|
||
return task.status
|
||
|
||
current_stage = pipeline.stages.get(task_state.current_stage)
|
||
if not current_stage:
|
||
return task.status
|
||
|
||
# 使用 peek 计算下一个 stage
|
||
next_stage_id = self.peek_next_stage(task_state, outcome)
|
||
if not next_stage_id:
|
||
return task.status
|
||
|
||
# 更新重试计数(仅在 failure 时)
|
||
if outcome == "failure" and current_stage.max_retries > 0:
|
||
retries = task_state.stage_retry_counts.get(current_stage.id, 0) + 1
|
||
task_state.stage_retry_counts[current_stage.id] = retries
|
||
|
||
# 更新运行时状态
|
||
task_state.current_stage = next_stage_id
|
||
|
||
# 同步更新 assignee(仅 Pipeline 模式,关键!否则 Router 路径 4 会劫持)
|
||
next_stage = pipeline.stages.get(next_stage_id)
|
||
if next_stage:
|
||
new_agent = pipeline.default_agent if next_stage.agent == "pipeline_default" else next_stage.agent
|
||
# bb.update_assignee(task.id, new_agent)
|
||
|
||
return next_stage_id
|
||
|
||
def get_agent_for_stage(self, task: Task) -> Optional[str]:
|
||
"""获取当前 stage 应该执行的 Agent"""
|
||
pipeline = self.registry.get(task.task_type)
|
||
if not pipeline:
|
||
return None
|
||
|
||
current_stage = pipeline.stages.get(task.status)
|
||
if not current_stage:
|
||
return None
|
||
|
||
if current_stage.agent == "pipeline_default":
|
||
return pipeline.default_agent
|
||
return current_stage.agent
|
||
|
||
def validate_transition(self, task: Task, new_status: str) -> bool:
|
||
"""校验状态流转是否合法(硬约束)"""
|
||
pipeline = self.registry.get(task.task_type)
|
||
if not pipeline:
|
||
return True # 自由模式不限制
|
||
|
||
# 异常状态始终允许
|
||
if new_status in ("failed", "paused", "escalated", "blocked", "cancelled"):
|
||
return True
|
||
|
||
current_stage = pipeline.stages.get(task.status)
|
||
if not current_stage:
|
||
return True
|
||
|
||
allowed = {current_stage.on_success, current_stage.on_failure}
|
||
return new_status in allowed
|
||
```
|
||
|
||
### §8.3 Ticker 集成
|
||
|
||
**TaskPipelineState 存储方案**:建议在 tasks 表新增 `pipeline_state_json` 列(TEXT),序列化存储 `TaskPipelineState`。Blackboard 的 `get_pipeline_state()` / `save_pipeline_state()` 负责序列化/反序列化。独立表方案亦可,但鉴于 state 结构简单且与任务 1:1,放 tasks 列更方便。
|
||
|
||
### Entry 触发机制
|
||
|
||
Pipeline 任务的入口阶段不受 Pipeline stages 约束,由 Router 和 Ticker 标准流程处理:
|
||
|
||
```
|
||
1. 任务创建:status=pending, task_type=coding
|
||
2. Ticker 扫到 pending + task_type 有值
|
||
3. Router 路径 5 匹配 → pipeline.default_agent(如 zhangfei-dev)
|
||
4. Dispatcher 返回 mode=deterministic, agent_id=zhangfei-dev
|
||
5. assignee 设为 pipeline.default_agent
|
||
6. Ticker spawn zhangfei-dev → Agent claim → status 变为 working
|
||
7. current_agent = zhangfei-dev
|
||
8. TaskPipelineState 创建:current_stage=working(entry stage)
|
||
|
||
代码路径统一:所有任务(Pipeline / 自由模式)都走 pending → claim → working。
|
||
忙时处理:assignee 有值但 Agent 忙 → 等下一个 tick 重试(和路径 4 一样的行为)。
|
||
```
|
||
|
||
### 状态推进流程
|
||
|
||
Ticker 在任务状态流转时调用 PipelineEngine:
|
||
1. `PipelineEngine.get_or_create_state(task)` 获取运行时状态
|
||
2. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure
|
||
3. `PipelineEngine.advance_stage(task, task_state, outcome)` 推进到下一个 stage
|
||
4. 持久化更新 `TaskPipelineState`(`bb.save_pipeline_state(task_state)`)
|
||
5. 如果下一个 stage 不是终态 → 更新任务状态 + assignee 已由 advance_stage 同步 → spawn
|
||
6. 如果是终态 → 标记 done/failed
|
||
|
||
## §9 实施路线
|
||
|
||
### Phase 1:Bug fix(独立实施)
|
||
|
||
| Step | 内容 | 详细设计 |
|
||
|------|------|---------|
|
||
| 1.1 | task_type 默认值改 None | §10.1 |
|
||
| 1.2 | 广播计数器修正 | §10.2 |
|
||
|
||
### Phase 2:Pipeline 全量实施(Phase 1 完成后一口气做)
|
||
|
||
| Step | 内容 |
|
||
|------|------|
|
||
| 2.1 | Pipeline 注册表 + PipelineEngine(§8.1 + §8.2) |
|
||
| 2.2 | Router 路径 5(§4) |
|
||
| 2.3 | Ticker 集成 PipelineEngine(§8.3) |
|
||
| 2.4 | 广播 tracker 反馈回调注入 |
|
||
| 2.5 | API 端点(§6) |
|
||
| 2.6 | 前端 Pipeline 下拉(§7) |
|
||
| 2.7 | E2E 测试覆盖 |
|
||
|
||
## §10 Phase 1 详细设计
|
||
|
||
### §10.1 task_type 默认值改 None
|
||
|
||
**文件**:`src/api/blackboard_routes.py`
|
||
|
||
**改动**:1 行
|
||
|
||
```python
|
||
# 当前(line 133):
|
||
task_type=body.get("task_type", "coding"),
|
||
|
||
# 改为:
|
||
task_type=body.get("task_type", None),
|
||
```
|
||
|
||
**影响分析**:
|
||
|
||
| 场景 | 改前 | 改后 |
|
||
|------|------|------|
|
||
| 前端不传 task_type | task_type="coding" → Router 不看 → 走广播 | task_type=None → Router 不看 → 走广播 |
|
||
| 前端传 task_type="coding" | coding | coding(不变) |
|
||
| E2E 测试不传 task_type | coding | None |
|
||
| 现有 DB 数据 | 不受影响(已写入) | 不受影响 |
|
||
|
||
**风险**:低。改前改后对于当前 Router 行为一致(Router 不看 task_type,都是走广播)。区别在于 Phase 2 实施后,task_type=None 的任务会继续走广播,而显式传了 task_type 的会走 Pipeline。
|
||
|
||
**E2E 测试影响**:待验证 `grep` 现有测试中 `task_type` 相关的硬断言,确认改为 `None` 不会破坏测试。如果有断言 `task_type == "coding"` 的测试,需改为 `task_type == None` 或删除断言。
|
||
|
||
**测试**:现有 E2E 测试中不传 task_type 的用例行为不变(都走广播),无需改测试。
|
||
|
||
### §10.2 广播计数器修正
|
||
|
||
**文件**:`src/daemon/ticker.py`
|
||
|
||
#### 当前问题
|
||
|
||
- `_broadcast_claim` 在 line ~1020 执行广播
|
||
- line ~1046 检查 `retry_count >= 3` 但从不递增 retry_count
|
||
- retry_count 只在 `_check_timeouts` 的 claimed 超时路径递增(line ~1317)
|
||
- 结果:没人认领的广播任务每 tick 广播一次,retry_count 永远为 0,无限循环
|
||
|
||
#### 修复方案
|
||
|
||
**新增数据结构**(ticker.py 顶部):
|
||
|
||
```python
|
||
from dataclasses import dataclass, field
|
||
|
||
@dataclass
|
||
class BroadcastRound:
|
||
"""追踪单个任务的广播状态"""
|
||
task_id: str
|
||
notified_agents: set = field(default_factory=set) # 已 spawn 过的 Agent
|
||
responded_agents: set = field(default_factory=set) # 已返回反馈的 Agent
|
||
round_number: int = 1 # 当前第几轮
|
||
```
|
||
|
||
**Ticker 类新增属性**:
|
||
|
||
```python
|
||
class Ticker:
|
||
def __init__(self, ...):
|
||
...
|
||
self._broadcast_tracker: Dict[str, BroadcastRound] = {}
|
||
self._all_agent_ids: Set[str] = set() # 从 config 加载
|
||
```
|
||
|
||
**`_broadcast_claim` 改造**(替换 line ~1020 区域):
|
||
|
||
```python
|
||
def _broadcast_claim(self, db_path: Path, broadcast_tasks: list):
|
||
"""广播认领:按轮次追踪,所有 Agent 反馈才算一轮"""
|
||
|
||
for task_info in broadcast_tasks:
|
||
task_id = task_info["id"]
|
||
|
||
# 获取或创建 tracker
|
||
tracker = self._broadcast_tracker.get(task_id)
|
||
if not tracker:
|
||
tracker = BroadcastRound(task_id=task_id)
|
||
self._broadcast_tracker[task_id] = tracker
|
||
|
||
# 检查是否已认领(可能上一个 tick 被认领了)
|
||
bb = Blackboard(db_path)
|
||
task = bb.get_task(task_id)
|
||
if task.status != "pending":
|
||
# 已被认领或状态已变,清理 tracker
|
||
self._broadcast_tracker.pop(task_id, None)
|
||
continue
|
||
|
||
# 获取空闲 Agent
|
||
idle_agents = self._get_idle_agents()
|
||
|
||
# 过滤已通知过的,只 spawn 尚未通知的
|
||
pending_agents = [a for a in idle_agents if a not in tracker.notified_agents]
|
||
|
||
if pending_agents:
|
||
# spawn 尚未通知的 Agent
|
||
for agent_id in pending_agents:
|
||
tracker.notified_agents.add(agent_id)
|
||
# ... 现有 spawn 广播逻辑 ...
|
||
else:
|
||
# 所有空闲 Agent 都已通知过
|
||
# 检查是否所有已知 Agent 都通知了
|
||
unnotified = self._all_agent_ids - tracker.notified_agents
|
||
|
||
if not unnotified:
|
||
# 所有 Agent 都通知过了 → 检查是否全部反馈
|
||
if tracker.responded_agents >= tracker.notified_agents:
|
||
# 全部已通知的 Agent 都反馈了且没人认领 → 一轮结束
|
||
if tracker.round_number >= 3:
|
||
# 3 轮广播无人认领,升级庞统
|
||
self._escalate_task(db_path, task_id)
|
||
del self._broadcast_tracker[task_id]
|
||
logger.warning(
|
||
"Broadcast 3 rounds exhausted, escalating: %s", task_id
|
||
)
|
||
else:
|
||
# 开始新一轮:重置通知/反馈,轮次+1
|
||
tracker.round_number += 1
|
||
tracker.notified_agents.clear()
|
||
tracker.responded_agents.clear()
|
||
logger.info(
|
||
"Broadcast round %d starting for: %s",
|
||
tracker.round_number, task_id,
|
||
)
|
||
# else: 还有 Agent 没通知到(可能在忙),等下一个 tick
|
||
```
|
||
|
||
**Agent 反馈回调**(通过 Ticker 公共 API,在 spawner.py 的 `on_agent_complete` 中注入):
|
||
|
||
```python
|
||
# ticker.py
|
||
def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str):
|
||
"""记录 Agent 对广播任务的反馈(公共 API)"""
|
||
tracker = self._broadcast_tracker.get(task_id)
|
||
if not tracker:
|
||
return
|
||
if outcome == "claimed":
|
||
self._broadcast_tracker.pop(task_id, None)
|
||
else:
|
||
tracker.responded_agents.add(agent_id)
|
||
|
||
# spawner.py 中 classify_outcome 之后
|
||
# 调用方式:self._ticker.record_broadcast_response(task_id, agent_id, outcome)
|
||
```
|
||
|
||
注入位置:spawner.py 的 `classify_outcome()` 返回后、`update_status()` 调用前。
|
||
|
||
**Ticker 重启恢复**:
|
||
|
||
简单方案:重启时清空 `_broadcast_tracker`,所有广播中的任务从第一轮重新开始。最坏情况多广播一轮,可接受。
|
||
|
||
```python
|
||
# ticker.py 启动时
|
||
self._broadcast_tracker.clear()
|
||
logger.info("Broadcast tracker cleared (ticker restart)")
|
||
```
|
||
|
||
**测试**:
|
||
|
||
```python
|
||
# tests/test_broadcast_tracker.py
|
||
def test_round_completes_when_all_respond():
|
||
"""所有 Agent 反馈后,一轮结束"""
|
||
...
|
||
|
||
def test_round_resets_for_next_round():
|
||
"""一轮结束后,开始新一轮"""
|
||
...
|
||
|
||
def test_escalate_after_3_rounds():
|
||
"""3 轮无人认领,升级庞统"""
|
||
...
|
||
|
||
def test_claim_clears_tracker():
|
||
"""Agent 认领后,tracker 清理"""
|
||
...
|
||
|
||
def test_busy_agent_not_counted():
|
||
"""忙的 Agent 不算已通知,下 tick 继续尝试"""
|
||
...
|
||
```
|
||
|
||
#### 改动文件清单
|
||
|
||
| 文件 | 改动 | 行数估计 |
|
||
|------|------|---------|
|
||
| `src/daemon/ticker.py` | BroadcastRound 数据类 + tracker 属性 + _broadcast_claim 改造 + _escalate_task | ~60 行新增/修改 |
|
||
| `src/daemon/spawner.py` | _on_broadcast_response 回调注入 | ~15 行 |
|
||
| `tests/test_broadcast_tracker.py` | 新测试文件 | ~80 行 |
|
||
|
||
## §11 待确认
|
||
|
||
1. Pipeline 注册表存储方式:内存 dict(当前方案)vs DB table(持久化)vs YAML file(声明式配置)?
|
||
2. 灵活 Pipeline 的 stage 定义格式:用 YAML 声明式还是 Python dict 硬编码?前端需要渲染流程图吗?
|
||
3. coding pipeline 的 stages 是否需要更细粒度?如 plan → implement → review → done?
|
||
4. Phase 2 实施时,是否一次性把 YAML 声明式配置也做了,还是先硬编码?
|
||
5. 广播 tracker 的反馈回调:在 spawner 的 `classify_outcome` 之后注入,还是用独立的事件回调?
|
||
|
||
## §A 评审记录
|
||
|
||
### 仲达评审(2026-06-04)+ Rebuttal
|
||
|
||
**原始结论**:1 个必须修 + 6 个建议。
|
||
**Rebuttal 后结论**:3 项接受,2 项不接受,2 项待补充。双方达成一致。
|
||
|
||
| # | 问题 | 仲达原始判定 | Rebuttal 结论 | 理由 |
|
||
|---|------|------------|-------------|------|
|
||
| 1 | get_next_stage 副作用 | 建议 | ✅ 接受(拆 peek + advance) | 拆方法比单纯改名更清晰 |
|
||
| 2 | retry_counts 清理 | 建议 | ❌ 不改 | 清理丢历史,仲达自己说不影响正确性 |
|
||
| 3 | 忙 Agent 死锁 | **必须修** | ❌ 不改 | 广播轮次定义(所有人收到并反馈 = 一轮)和需求一致。Agent 忙由 Spawner 保证(含超时上限),不是 Ticker 职责 |
|
||
| 4 | pending/claimed 说明 | 建议 | ✅ 补充 | |
|
||
| 5 | entry 触发机制 | 建议 | ✅ 补充(走 claim) | Pipeline 任务走标准 claim 流程,代码路径统一,忙时自然排队 |
|
||
| 6 | assignee 同步更新 | 建议 | ✅ 接受(仅 Pipeline 模式) | |
|
||
| 7 | role_map 具体 reviewer | 建议 | 记录(#11 范围) | |
|
||
|
||
## §B 后记:强工作流与 AI native 的关系(2026-06-04)
|
||
|
||
### 设计意图记录
|
||
|
||
Pipeline 强工作流模式**本质上不是 AI native**:
|
||
|
||
- **自由模式(AI native)**:Agent 自主判断、自主认领、自主决定执行方式。系统只提供任务和约束,Agent 是决策主体
|
||
- **Pipeline 强工作流**:系统控制流程、指定执行者、规定步骤顺序。Agent 是执行主体,不是决策主体
|
||
|
||
这两个模式的差异不仅是"要不要 claim"的问题,而是影响到:
|
||
- 引擎提示词(强工作流下提示词要严格约束 Agent 行为边界)
|
||
- Agent 的自主性(强工作流下 Agent 不能自主决定跳过/偏离)
|
||
- 错误处理(强工作流下 Agent 失败是流程回退,不是自主重试)
|
||
|
||
### 实施策略
|
||
|
||
**不着急实现 Pipeline 强工作流**。先把设计方案考虑清楚,特别是:
|
||
1. 强工作流下提示词模板如何约束 Agent 不产生不可预料分支
|
||
2. 自由模式和强工作流模式如何共存(同一系统、不同任务)
|
||
3. 两种模式对 Spawner/Ticker/Router 的差异化需求
|
||
|
||
当前优先级:
|
||
- **Phase 1(bug fix)**:task_type 默认值改 None + 广播计数器修正
|
||
- **Pipeline 强工作流**:设计继续深化,等方案成熟后再实施
|