Files
sanguo_moziplus_v2/docs/design/12-pipeline-design.md
T
2026-06-04 21:57:44 +08:00

725 lines
27 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Pipeline 设计专题
> 版本: v2.1
> 日期: 2026-06-04
> 作者: 庞统(副军师)
> 状态: **评审中**
## §1 背景
当前 moziplus v2 的任务调度只有两条路径:确定性路由(Router 四条快速路径)和广播认领(spawn 所有空闲 Agent)。Router 不看 task_type,导致 coding 任务可能走入广播路径且无人认领时无限循环。
设计目标:
1. 支持用户指定任务执行流程(Pipeline)
2. 不指定时走广播认领(自由模式)
3. 广播认领有合理的轮次限制
4. Pipeline 支持灵活流转:顺序、分支、退回、循环重试
## §2 核心决策
**D1: Pipeline 模式 vs 自由模式二选一**
- 指定了 Pipeline → 硬约束模式,Agent 必须按 Pipeline 定义的流程执行,不可偏离
- 不指定 PipelineNone/general)→ 自由模式,广播认领,Agent 自主决定怎么做,只有基本完成标准(产出物 + handoff)
**D2: Pipeline 是强工作流,但支持灵活流转**
- 指定了 Pipeline 的任务,状态机是硬约束,执行步骤也是硬约束
- Agent 必须走 Pipeline 定义的所有步骤,不能跳过
- 但 Pipeline 本身可以定义灵活的流转规则:条件分支、退回重做、循环重试
- 没有"可偏离"的中间地带——偏离必须是 Pipeline 预定义的路径
**D3: @mention 在 Pipeline 模式下只做通知,不改变执行者**
- Pipeline 模式(强工作流):执行者由 Pipeline 决定(coding → zhangfei-dev),@mention 只是在黑板 comment 通知该 Agent,不改变执行者
- 自由模式:@mention 决定执行者(和现有逻辑一致)
- 理由:强工作流意味着流程和角色都是确定的,让数据专家走 coding 流程不合理
**D4: task_type 默认值改为 None**
- API 层 `task_type=body.get("task_type", None)` 而非默认 "coding"
- 不指定 = 自由模式,走广播认领
- 指定了 = Pipeline 模式
**D5: 两个入口**
1. **前端入口**:创建任务时下拉菜单选择 Pipeline,默认 none
2. **Chat 入口**:用户通过自然语言提需求,庞统判断是否需要指定 Pipeline。拿不准时询问用户
## §3 Pipeline 定义(灵活流转)
每个 Pipeline 是一个**有向状态机**,支持顺序、条件分支、退回、循环重试。
### 3.1 数据结构
```python
@dataclass
class PipelineStage:
id: str # 步骤标识(如 "working", "review"
agent: str # 执行者:固定 agent_id 或 "pipeline_default"
on_success: str # 成功后转到哪个 stage(或 "done"/"failed"
on_failure: str # 失败后转到哪个 stage(可指向前序 stage = 退回)
max_retries: int = 0 # 最大退回重试次数(0=不限,超限转 failed)
is_terminal: bool = False # 是否终态
# 注意:retry_count 不在此处——PipelineStage 是注册表模板,所有同类型任务共享
@dataclass
class TaskPipelineState:
"""每个任务的 Pipeline 运行时状态(存在 DB 或内存)"""
task_id: str
pipeline_type: str
current_stage: str
stage_retry_counts: Dict[str, int] = field(default_factory=dict) # stage_id → 已重试次数
@dataclass
class Pipeline:
task_type: str # 类型标识
default_agent: str # 默认执行者
entry: str # 入口 stage id
stages: Dict[str, PipelineStage] # stage id → PipelineStage
```
### 3.2 流转规则
| 流转类型 | 实现方式 | 示例 |
|---------|---------|------|
| **顺序** | `on_success` 指向下一个 stage | working → review → done |
| **条件分支** | `on_success` / `on_failure` 指向不同 stage | review 通过→donereview 拒绝→working |
| **退回** | `on_failure` 指向前序 stage | review 失败退回 working |
| **循环/重试** | `max_retries` + `TaskPipelineState.stage_retry_counts` 控制 | review 最多退回 3 次,超过转 failed |
异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。
**认领阶段不受 Pipeline 约束**:Pipeline 约束的是工作阶段(working/review/verify 等)。认领阶段(pending → claimed → entry stage)不受 Pipeline stages 约束,由 Router 和 Ticker 的标准流程处理。
### 3.3 Pipeline 示例
**coding pipelinemulti_step,含退回重试)**
```yaml
task_type: coding
default_agent: zhangfei-dev
entry: working
stages:
working:
agent: pipeline_default
on_success: review
on_failure: failed
review:
agent: simayi-challenger
on_success: done
on_failure: working # review 失败退回重做
max_retries: 3 # 最多退回 3 次,超过转 failed
```
流转图:
```
pending → claimed → working ──→ review ──→ done
↑ │
└────────────┘ (on_failure, 最多3次)
│ >3次
└──→ failed
```
**data pipelinesingle_step,简单顺序)**
```yaml
task_type: data
default_agent: zhaoyun-data
entry: working
stages:
working:
agent: pipeline_default
on_success: done
on_failure: failed
```
**review pipeline(单步,固定 agent**
```yaml
task_type: review
default_agent: simayi-challenger
entry: working
stages:
working:
agent: simayi-challenger
on_success: done
on_failure: failed
```
**deploy pipeline(含验证循环)**
```yaml
task_type: deploy
default_agent: jiangwei-infra
entry: working
stages:
working:
agent: pipeline_default
on_success: verify
on_failure: failed
verify:
agent: pipeline_default # 部署者自验
on_success: done
on_failure: working # 验证失败退回重新部署
max_retries: 2 # 最多重试 2 次
```
**risk_check pipeline(单步)**
```yaml
task_type: risk_check
default_agent: guanyu-dev
entry: working
stages:
working:
agent: pipeline_default
on_success: done
on_failure: failed
```
**自由模式(None/不指定)**
| 项目 | 值 |
|------|-----|
| default_agent | 广播认领 |
| stages | pending → claimed → working → review → done(基本状态机,无 Pipeline 约束) |
### 3.4 初始 Pipeline 注册表
| task_type | 类型 | default_agent | stages | 退回 |
|-----------|------|---------------|--------|------|
| `coding` | multi_step | `zhangfei-dev` | working → review → done | review→working (max 3) |
| `review` | single_step | `simayi-challenger` | working → done | 无 |
| `data` | single_step | `zhaoyun-data` | working → done | 无 |
| `deploy` | multi_step | `jiangwei-infra` | working → verify → done | verify→working (max 2) |
| `risk_check` | single_step | `guanyu-dev` | working → done | 无 |
| None | 自由模式 | 广播认领 | 基本状态机 | 无 |
## §4 路由逻辑(更新 Router
Router 新增路径 5:按 task_type 匹配 Pipeline
```
路径1: 本地操作 (action_type ∈ LOCAL_ACTIONS)
路径2: retry → 原执行者 (action_type == "retry")
Mode B: Agent 声明式交接 (next_capability 有值)
路径3: 生命周期流转 (status ∈ LIFECYCLE_CAPABILITY)
路径4: 有 assignee → 直接给
路径5(新增): task_type 有值 → 查 Pipeline 注册表 → 用 Pipeline 的 default_agent
- @mention 不覆盖执行者(Pipeline 模式下 @mention 只做通知)
- mode = "deterministic"
模糊场景: delegate 庞统 → 广播
```
路径 5 的优先级低于路径 4(assignee)和 Mode B(声明式交接),高于模糊场景。
路径 5 的具体逻辑:
```python
# router.py 新增
def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]:
task_type = task_info.get("task_type")
if not task_type:
return None
pipeline = self.pipeline_registry.get(task_type)
if not pipeline:
return None
return RouteDecision(
agent_id=pipeline.default_agent,
mode="deterministic",
confidence=0.85,
reason=f"Pipeline match: {task_type}{pipeline.default_agent}",
)
```
## §5 广播定义修正
**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。
具体机制:
- 每个 pending 广播任务,维护一个 `notified_agents` 追踪已通知和已反馈的 Agent
- 每次 tick:spawn 空闲且尚未被通知的 Agent
- Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈
- Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试
- 当所有 Agent 都已通知并反馈且没人认领 → 一轮结束 → round_number +1
- 有人认领 → 广播立即结束(不需要等其他 Agent 反馈)
- 连续 3 轮无人认领 → 升级庞统
注意:Agent 何时被 spawn 由 Spawner 保证(含超时上限),Ticker 不需要额外超时机制。
详细设计见 §10.2。
## §6 API 改动
### §6.1 修改现有端点
`POST /api/projects/{pid}/tasks`
- `task_type` 默认改为 `None`(不再是 "coding"
- 前端传 task_type 时走 Pipeline,不传时走自由模式
### §6.2 新增端点
`GET /api/pipelines`
- 返回可用 Pipeline 列表(从注册表加载)
- 供前端下拉菜单使用
- 响应格式:
```json
{
"pipelines": [
{"task_type": "coding", "label": "编码开发", "description": "多步:开发→审查→完成"},
{"task_type": "review", "label": "代码审查", "description": "单步:审查→完成"},
{"task_type": "data", "label": "数据获取", "description": "单步:执行→完成"},
{"task_type": "deploy", "label": "部署", "description": "多步:部署→验证→完成"},
{"task_type": "risk_check", "label": "风控检查", "description": "单步:检查→完成"}
]
}
```
## §7 前端改动
1. TaskModal 创建任务时:
- 新增 Pipeline 下拉选择(默认"自由模式")
- 选项从 `/api/pipelines` 动态加载
- 选中 Pipeline 后显示简要流程说明
2. Chat 入口:
- 庞统在需求探索时,判断是否需要指定 Pipeline
- 拿不准时通过 checkpoint 询问用户
## §8 Pipeline 引擎
### §8.1 注册表
```python
class PipelineRegistry:
"""Pipeline 注册表"""
def __init__(self):
self._pipelines: Dict[str, Pipeline] = {}
self._load_defaults()
def _load_defaults(self):
"""加载内置 Pipeline"""
for cfg in DEFAULT_PIPELINES:
p = self._parse_pipeline(cfg)
self._pipelines[p.task_type] = p
def get(self, task_type: str) -> Optional[Pipeline]:
return self._pipelines.get(task_type)
def list_all(self) -> List[Pipeline]:
return list(self._pipelines.values())
```
### §8.2 状态流转校验
```python
class PipelineEngine:
"""Pipeline 执行引擎"""
def __init__(self, registry: PipelineRegistry, bb: Blackboard):
self.registry = registry
self.bb = bb
def get_or_create_state(self, task: Task) -> Optional[TaskPipelineState]:
"""获取或创建任务的 Pipeline 运行时状态"""
pipeline_type = task.task_type
if not pipeline_type:
return None # 自由模式
pipeline = self.registry.get(pipeline_type)
if not pipeline:
return None
# 从 DB 或内存加载;首次则初始化
state = self.bb.get_pipeline_state(task.id)
if not state:
state = TaskPipelineState(
task_id=task.id,
pipeline_type=pipeline_type,
current_stage=pipeline.entry,
)
self.bb.save_pipeline_state(state)
return state
def peek_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]:
"""纯查询:返回下一个 stage id,不修改 task_state"""
pipeline = self.registry.get(task_state.pipeline_type)
if not pipeline:
return None
current_stage = pipeline.stages.get(task_state.current_stage)
if not current_stage:
return None
if outcome == "success":
return current_stage.on_success
elif outcome == "failure":
retries = task_state.stage_retry_counts.get(current_stage.id, 0)
if current_stage.max_retries > 0 and retries + 1 > current_stage.max_retries:
return "failed"
return current_stage.on_failure
else:
return "failed"
def advance_stage(self, task: Task, task_state: TaskPipelineState, outcome: str) -> str:
"""推进 Pipeline stage(有副作用:修改 task_state,同步 assignee"""
pipeline = self.registry.get(task_state.pipeline_type)
if not pipeline:
return task.status
current_stage = pipeline.stages.get(task_state.current_stage)
if not current_stage:
return task.status
# 使用 peek 计算下一个 stage
next_stage_id = self.peek_next_stage(task_state, outcome)
if not next_stage_id:
return task.status
# 更新重试计数(仅在 failure 时)
if outcome == "failure" and current_stage.max_retries > 0:
retries = task_state.stage_retry_counts.get(current_stage.id, 0) + 1
task_state.stage_retry_counts[current_stage.id] = retries
# 更新运行时状态
task_state.current_stage = next_stage_id
# 同步更新 assignee(仅 Pipeline 模式,关键!否则 Router 路径 4 会劫持)
next_stage = pipeline.stages.get(next_stage_id)
if next_stage:
new_agent = pipeline.default_agent if next_stage.agent == "pipeline_default" else next_stage.agent
# bb.update_assignee(task.id, new_agent)
return next_stage_id
def get_agent_for_stage(self, task: Task) -> Optional[str]:
"""获取当前 stage 应该执行的 Agent"""
pipeline = self.registry.get(task.task_type)
if not pipeline:
return None
current_stage = pipeline.stages.get(task.status)
if not current_stage:
return None
if current_stage.agent == "pipeline_default":
return pipeline.default_agent
return current_stage.agent
def validate_transition(self, task: Task, new_status: str) -> bool:
"""校验状态流转是否合法(硬约束)"""
pipeline = self.registry.get(task.task_type)
if not pipeline:
return True # 自由模式不限制
# 异常状态始终允许
if new_status in ("failed", "paused", "escalated", "blocked", "cancelled"):
return True
current_stage = pipeline.stages.get(task.status)
if not current_stage:
return True
allowed = {current_stage.on_success, current_stage.on_failure}
return new_status in allowed
```
### §8.3 Ticker 集成
**TaskPipelineState 存储方案**:建议在 tasks 表新增 `pipeline_state_json` 列(TEXT),序列化存储 `TaskPipelineState`。Blackboard 的 `get_pipeline_state()` / `save_pipeline_state()` 负责序列化/反序列化。独立表方案亦可,但鉴于 state 结构简单且与任务 1:1,放 tasks 列更方便。
### Entry 触发机制
Pipeline 任务的入口阶段不受 Pipeline stages 约束,由 Router 和 Ticker 标准流程处理:
```
1. 任务创建:status=pending, task_type=coding
2. Ticker 扫到 pending + task_type 有值
3. Router 路径 5 匹配 → pipeline.default_agent(如 zhangfei-dev
4. Dispatcher 返回 mode=deterministic, agent_id=zhangfei-dev
5. assignee 设为 pipeline.default_agent
6. Ticker spawn zhangfei-dev → Agent claim → status 变为 working
7. current_agent = zhangfei-dev
8. TaskPipelineState 创建:current_stage=workingentry stage
代码路径统一:所有任务(Pipeline / 自由模式)都走 pending → claim → working。
忙时处理:assignee 有值但 Agent 忙 → 等下一个 tick 重试(和路径 4 一样的行为)。
```
### 状态推进流程
Ticker 在任务状态流转时调用 PipelineEngine
1. `PipelineEngine.get_or_create_state(task)` 获取运行时状态
2. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure
3. `PipelineEngine.advance_stage(task, task_state, outcome)` 推进到下一个 stage
4. 持久化更新 `TaskPipelineState``bb.save_pipeline_state(task_state)`
5. 如果下一个 stage 不是终态 → 更新任务状态 + assignee 已由 advance_stage 同步 → spawn
6. 如果是终态 → 标记 done/failed
## §9 实施路线
### Phase 1Bug fix(独立实施)
| Step | 内容 | 详细设计 |
|------|------|---------|
| 1.1 | task_type 默认值改 None | §10.1 |
| 1.2 | 广播计数器修正 | §10.2 |
### Phase 2Pipeline 全量实施(Phase 1 完成后一口气做)
| Step | 内容 |
|------|------|
| 2.1 | Pipeline 注册表 + PipelineEngine(§8.1 + §8.2 |
| 2.2 | Router 路径 5(§4 |
| 2.3 | Ticker 集成 PipelineEngine(§8.3 |
| 2.4 | 广播 tracker 反馈回调注入 |
| 2.5 | API 端点(§6 |
| 2.6 | 前端 Pipeline 下拉(§7 |
| 2.7 | E2E 测试覆盖 |
## §10 Phase 1 详细设计
### §10.1 task_type 默认值改 None
**文件**`src/api/blackboard_routes.py`
**改动**1 行
```python
# 当前(line 133):
task_type=body.get("task_type", "coding"),
# 改为:
task_type=body.get("task_type", None),
```
**影响分析**
| 场景 | 改前 | 改后 |
|------|------|------|
| 前端不传 task_type | task_type="coding" → Router 不看 → 走广播 | task_type=None → Router 不看 → 走广播 |
| 前端传 task_type="coding" | coding | coding(不变) |
| E2E 测试不传 task_type | coding | None |
| 现有 DB 数据 | 不受影响(已写入) | 不受影响 |
**风险**:低。改前改后对于当前 Router 行为一致(Router 不看 task_type,都是走广播)。区别在于 Phase 2 实施后,task_type=None 的任务会继续走广播,而显式传了 task_type 的会走 Pipeline。
**E2E 测试影响**:待验证 `grep` 现有测试中 `task_type` 相关的硬断言,确认改为 `None` 不会破坏测试。如果有断言 `task_type == "coding"` 的测试,需改为 `task_type == None` 或删除断言。
**测试**:现有 E2E 测试中不传 task_type 的用例行为不变(都走广播),无需改测试。
### §10.2 广播计数器修正
**文件**`src/daemon/ticker.py`
#### 当前问题
- `_broadcast_claim` 在 line ~1020 执行广播
- line ~1046 检查 `retry_count >= 3` 但从不递增 retry_count
- retry_count 只在 `_check_timeouts` 的 claimed 超时路径递增(line ~1317
- 结果:没人认领的广播任务每 tick 广播一次,retry_count 永远为 0,无限循环
#### 修复方案
**新增数据结构**ticker.py 顶部):
```python
from dataclasses import dataclass, field
@dataclass
class BroadcastRound:
"""追踪单个任务的广播状态"""
task_id: str
notified_agents: set = field(default_factory=set) # 已 spawn 过的 Agent
responded_agents: set = field(default_factory=set) # 已返回反馈的 Agent
round_number: int = 1 # 当前第几轮
```
**Ticker 类新增属性**
```python
class Ticker:
def __init__(self, ...):
...
self._broadcast_tracker: Dict[str, BroadcastRound] = {}
self._all_agent_ids: Set[str] = set() # 从 config 加载
```
**`_broadcast_claim` 改造**(替换 line ~1020 区域):
```python
def _broadcast_claim(self, db_path: Path, broadcast_tasks: list):
"""广播认领:按轮次追踪,所有 Agent 反馈才算一轮"""
for task_info in broadcast_tasks:
task_id = task_info["id"]
# 获取或创建 tracker
tracker = self._broadcast_tracker.get(task_id)
if not tracker:
tracker = BroadcastRound(task_id=task_id)
self._broadcast_tracker[task_id] = tracker
# 检查是否已认领(可能上一个 tick 被认领了)
bb = Blackboard(db_path)
task = bb.get_task(task_id)
if task.status != "pending":
# 已被认领或状态已变,清理 tracker
self._broadcast_tracker.pop(task_id, None)
continue
# 获取空闲 Agent
idle_agents = self._get_idle_agents()
# 过滤已通知过的,只 spawn 尚未通知的
pending_agents = [a for a in idle_agents if a not in tracker.notified_agents]
if pending_agents:
# spawn 尚未通知的 Agent
for agent_id in pending_agents:
tracker.notified_agents.add(agent_id)
# ... 现有 spawn 广播逻辑 ...
else:
# 所有空闲 Agent 都已通知过
# 检查是否所有已知 Agent 都通知了
unnotified = self._all_agent_ids - tracker.notified_agents
if not unnotified:
# 所有 Agent 都通知过了 → 检查是否全部反馈
if tracker.responded_agents >= tracker.notified_agents:
# 全部已通知的 Agent 都反馈了且没人认领 → 一轮结束
if tracker.round_number >= 3:
# 3 轮广播无人认领,升级庞统
self._escalate_task(db_path, task_id)
del self._broadcast_tracker[task_id]
logger.warning(
"Broadcast 3 rounds exhausted, escalating: %s", task_id
)
else:
# 开始新一轮:重置通知/反馈,轮次+1
tracker.round_number += 1
tracker.notified_agents.clear()
tracker.responded_agents.clear()
logger.info(
"Broadcast round %d starting for: %s",
tracker.round_number, task_id,
)
# else: 还有 Agent 没通知到(可能在忙),等下一个 tick
```
**Agent 反馈回调**(通过 Ticker 公共 API,在 spawner.py 的 `on_agent_complete` 中注入):
```python
# ticker.py
def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str):
"""记录 Agent 对广播任务的反馈(公共 API)"""
tracker = self._broadcast_tracker.get(task_id)
if not tracker:
return
if outcome == "claimed":
self._broadcast_tracker.pop(task_id, None)
else:
tracker.responded_agents.add(agent_id)
# spawner.py 中 classify_outcome 之后
# 调用方式:self._ticker.record_broadcast_response(task_id, agent_id, outcome)
```
注入位置:spawner.py 的 `classify_outcome()` 返回后、`update_status()` 调用前。
**Ticker 重启恢复**
简单方案:重启时清空 `_broadcast_tracker`,所有广播中的任务从第一轮重新开始。最坏情况多广播一轮,可接受。
```python
# ticker.py 启动时
self._broadcast_tracker.clear()
logger.info("Broadcast tracker cleared (ticker restart)")
```
**测试**
```python
# tests/test_broadcast_tracker.py
def test_round_completes_when_all_respond():
"""所有 Agent 反馈后,一轮结束"""
...
def test_round_resets_for_next_round():
"""一轮结束后,开始新一轮"""
...
def test_escalate_after_3_rounds():
"""3 轮无人认领,升级庞统"""
...
def test_claim_clears_tracker():
"""Agent 认领后,tracker 清理"""
...
def test_busy_agent_not_counted():
"""忙的 Agent 不算已通知,下 tick 继续尝试"""
...
```
#### 改动文件清单
| 文件 | 改动 | 行数估计 |
|------|------|---------|
| `src/daemon/ticker.py` | BroadcastRound 数据类 + tracker 属性 + _broadcast_claim 改造 + _escalate_task | ~60 行新增/修改 |
| `src/daemon/spawner.py` | _on_broadcast_response 回调注入 | ~15 行 |
| `tests/test_broadcast_tracker.py` | 新测试文件 | ~80 行 |
## §11 待确认
1. Pipeline 注册表存储方式:内存 dict(当前方案)vs DB table(持久化)vs YAML file(声明式配置)?
2. 灵活 Pipeline 的 stage 定义格式:用 YAML 声明式还是 Python dict 硬编码?前端需要渲染流程图吗?
3. coding pipeline 的 stages 是否需要更细粒度?如 plan → implement → review → done
4. Phase 2 实施时,是否一次性把 YAML 声明式配置也做了,还是先硬编码?
5. 广播 tracker 的反馈回调:在 spawner 的 `classify_outcome` 之后注入,还是用独立的事件回调?
## §A 评审记录
### 仲达评审(2026-06-04+ Rebuttal
**原始结论**1 个必须修 + 6 个建议。
**Rebuttal 后结论**:3 项接受,2 项不接受,2 项待补充。双方达成一致。
| # | 问题 | 仲达原始判定 | Rebuttal 结论 | 理由 |
|---|------|------------|-------------|------|
| 1 | get_next_stage 副作用 | 建议 | ✅ 接受(拆 peek + advance) | 拆方法比单纯改名更清晰 |
| 2 | retry_counts 清理 | 建议 | ❌ 不改 | 清理丢历史,仲达自己说不影响正确性 |
| 3 | 忙 Agent 死锁 | **必须修** | ❌ 不改 | 广播轮次定义(所有人收到并反馈 = 一轮)和需求一致。Agent 忙由 Spawner 保证(含超时上限),不是 Ticker 职责 |
| 4 | pending/claimed 说明 | 建议 | ✅ 补充 | |
| 5 | entry 触发机制 | 建议 | ✅ 补充(走 claim | Pipeline 任务走标准 claim 流程,代码路径统一,忙时自然排队 |
| 6 | assignee 同步更新 | 建议 | ✅ 接受(仅 Pipeline 模式) | |
| 7 | role_map 具体 reviewer | 建议 | 记录(#11 范围) | |
## §B 后记:强工作流与 AI native 的关系(2026-06-04
### 设计意图记录
Pipeline 强工作流模式**本质上不是 AI native**
- **自由模式(AI native)**:Agent 自主判断、自主认领、自主决定执行方式。系统只提供任务和约束,Agent 是决策主体
- **Pipeline 强工作流**:系统控制流程、指定执行者、规定步骤顺序。Agent 是执行主体,不是决策主体
这两个模式的差异不仅是"要不要 claim"的问题,而是影响到:
- 引擎提示词(强工作流下提示词要严格约束 Agent 行为边界)
- Agent 的自主性(强工作流下 Agent 不能自主决定跳过/偏离)
- 错误处理(强工作流下 Agent 失败是流程回退,不是自主重试)
### 实施策略
**不着急实现 Pipeline 强工作流**。先把设计方案考虑清楚,特别是:
1. 强工作流下提示词模板如何约束 Agent 不产生不可预料分支
2. 自由模式和强工作流模式如何共存(同一系统、不同任务)
3. 两种模式对 Spawner/Ticker/Router 的差异化需求
当前优先级:
- **Phase 1bug fix**task_type 默认值改 None + 广播计数器修正
- **Pipeline 强工作流**:设计继续深化,等方案成熟后再实施