27 KiB
Pipeline 设计专题
版本: v2.1 日期: 2026-06-04 作者: 庞统(副军师) 状态: 评审中
§1 背景
当前 moziplus v2 的任务调度只有两条路径:确定性路由(Router 四条快速路径)和广播认领(spawn 所有空闲 Agent)。Router 不看 task_type,导致 coding 任务可能走入广播路径且无人认领时无限循环。
设计目标:
- 支持用户指定任务执行流程(Pipeline)
- 不指定时走广播认领(自由模式)
- 广播认领有合理的轮次限制
- Pipeline 支持灵活流转:顺序、分支、退回、循环重试
§2 核心决策
D1: Pipeline 模式 vs 自由模式二选一
- 指定了 Pipeline → 硬约束模式,Agent 必须按 Pipeline 定义的流程执行,不可偏离
- 不指定 Pipeline(None/general)→ 自由模式,广播认领,Agent 自主决定怎么做,只有基本完成标准(产出物 + handoff)
D2: Pipeline 是强工作流,但支持灵活流转
- 指定了 Pipeline 的任务,状态机是硬约束,执行步骤也是硬约束
- Agent 必须走 Pipeline 定义的所有步骤,不能跳过
- 但 Pipeline 本身可以定义灵活的流转规则:条件分支、退回重做、循环重试
- 没有"可偏离"的中间地带——偏离必须是 Pipeline 预定义的路径
D3: @mention 在 Pipeline 模式下只做通知,不改变执行者
- Pipeline 模式(强工作流):执行者由 Pipeline 决定(coding → zhangfei-dev),@mention 只是在黑板 comment 通知该 Agent,不改变执行者
- 自由模式:@mention 决定执行者(和现有逻辑一致)
- 理由:强工作流意味着流程和角色都是确定的,让数据专家走 coding 流程不合理
D4: task_type 默认值改为 None
- API 层
task_type=body.get("task_type", None)而非默认 "coding" - 不指定 = 自由模式,走广播认领
- 指定了 = Pipeline 模式
D5: 两个入口
- 前端入口:创建任务时下拉菜单选择 Pipeline,默认 none
- Chat 入口:用户通过自然语言提需求,庞统判断是否需要指定 Pipeline。拿不准时询问用户
§3 Pipeline 定义(灵活流转)
每个 Pipeline 是一个有向状态机,支持顺序、条件分支、退回、循环重试。
3.1 数据结构
@dataclass
class PipelineStage:
id: str # 步骤标识(如 "working", "review")
agent: str # 执行者:固定 agent_id 或 "pipeline_default"
on_success: str # 成功后转到哪个 stage(或 "done"/"failed")
on_failure: str # 失败后转到哪个 stage(可指向前序 stage = 退回)
max_retries: int = 0 # 最大退回重试次数(0=不限,超限转 failed)
is_terminal: bool = False # 是否终态
# 注意:retry_count 不在此处——PipelineStage 是注册表模板,所有同类型任务共享
@dataclass
class TaskPipelineState:
"""每个任务的 Pipeline 运行时状态(存在 DB 或内存)"""
task_id: str
pipeline_type: str
current_stage: str
stage_retry_counts: Dict[str, int] = field(default_factory=dict) # stage_id → 已重试次数
@dataclass
class Pipeline:
task_type: str # 类型标识
default_agent: str # 默认执行者
entry: str # 入口 stage id
stages: Dict[str, PipelineStage] # stage id → PipelineStage
3.2 流转规则
| 流转类型 | 实现方式 | 示例 |
|---|---|---|
| 顺序 | on_success 指向下一个 stage |
working → review → done |
| 条件分支 | on_success / on_failure 指向不同 stage |
review 通过→done,review 拒绝→working |
| 退回 | on_failure 指向前序 stage |
review 失败退回 working |
| 循环/重试 | max_retries + TaskPipelineState.stage_retry_counts 控制 |
review 最多退回 3 次,超过转 failed |
异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。
认领阶段不受 Pipeline 约束:Pipeline 约束的是工作阶段(working/review/verify 等)。认领阶段(pending → claimed → entry stage)不受 Pipeline stages 约束,由 Router 和 Ticker 的标准流程处理。
3.3 Pipeline 示例
coding pipeline(multi_step,含退回重试):
task_type: coding
default_agent: zhangfei-dev
entry: working
stages:
working:
agent: pipeline_default
on_success: review
on_failure: failed
review:
agent: simayi-challenger
on_success: done
on_failure: working # review 失败退回重做
max_retries: 3 # 最多退回 3 次,超过转 failed
流转图:
pending → claimed → working ──→ review ──→ done
↑ │
└────────────┘ (on_failure, 最多3次)
│ >3次
└──→ failed
data pipeline(single_step,简单顺序):
task_type: data
default_agent: zhaoyun-data
entry: working
stages:
working:
agent: pipeline_default
on_success: done
on_failure: failed
review pipeline(单步,固定 agent):
task_type: review
default_agent: simayi-challenger
entry: working
stages:
working:
agent: simayi-challenger
on_success: done
on_failure: failed
deploy pipeline(含验证循环):
task_type: deploy
default_agent: jiangwei-infra
entry: working
stages:
working:
agent: pipeline_default
on_success: verify
on_failure: failed
verify:
agent: pipeline_default # 部署者自验
on_success: done
on_failure: working # 验证失败退回重新部署
max_retries: 2 # 最多重试 2 次
risk_check pipeline(单步):
task_type: risk_check
default_agent: guanyu-dev
entry: working
stages:
working:
agent: pipeline_default
on_success: done
on_failure: failed
自由模式(None/不指定):
| 项目 | 值 |
|---|---|
| default_agent | 广播认领 |
| stages | pending → claimed → working → review → done(基本状态机,无 Pipeline 约束) |
3.4 初始 Pipeline 注册表
| task_type | 类型 | default_agent | stages | 退回 |
|---|---|---|---|---|
coding |
multi_step | zhangfei-dev |
working → review → done | review→working (max 3) |
review |
single_step | simayi-challenger |
working → done | 无 |
data |
single_step | zhaoyun-data |
working → done | 无 |
deploy |
multi_step | jiangwei-infra |
working → verify → done | verify→working (max 2) |
risk_check |
single_step | guanyu-dev |
working → done | 无 |
| None | 自由模式 | 广播认领 | 基本状态机 | 无 |
§4 路由逻辑(更新 Router)
Router 新增路径 5:按 task_type 匹配 Pipeline
路径1: 本地操作 (action_type ∈ LOCAL_ACTIONS)
路径2: retry → 原执行者 (action_type == "retry")
Mode B: Agent 声明式交接 (next_capability 有值)
路径3: 生命周期流转 (status ∈ LIFECYCLE_CAPABILITY)
路径4: 有 assignee → 直接给
路径5(新增): task_type 有值 → 查 Pipeline 注册表 → 用 Pipeline 的 default_agent
- @mention 不覆盖执行者(Pipeline 模式下 @mention 只做通知)
- mode = "deterministic"
模糊场景: delegate 庞统 → 广播
路径 5 的优先级低于路径 4(assignee)和 Mode B(声明式交接),高于模糊场景。
路径 5 的具体逻辑:
# router.py 新增
def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]:
task_type = task_info.get("task_type")
if not task_type:
return None
pipeline = self.pipeline_registry.get(task_type)
if not pipeline:
return None
return RouteDecision(
agent_id=pipeline.default_agent,
mode="deterministic",
confidence=0.85,
reason=f"Pipeline match: {task_type} → {pipeline.default_agent}",
)
§5 广播定义修正
一轮广播的定义:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。
具体机制:
- 每个 pending 广播任务,维护一个
notified_agents追踪已通知和已反馈的 Agent - 每次 tick:spawn 空闲且尚未被通知的 Agent
- Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈
- Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试
- 当所有 Agent 都已通知并反馈且没人认领 → 一轮结束 → round_number +1
- 有人认领 → 广播立即结束(不需要等其他 Agent 反馈)
- 连续 3 轮无人认领 → 升级庞统
注意:Agent 何时被 spawn 由 Spawner 保证(含超时上限),Ticker 不需要额外超时机制。
详细设计见 §10.2。
§6 API 改动
§6.1 修改现有端点
POST /api/projects/{pid}/tasks:
task_type默认改为None(不再是 "coding")- 前端传 task_type 时走 Pipeline,不传时走自由模式
§6.2 新增端点
GET /api/pipelines:
- 返回可用 Pipeline 列表(从注册表加载)
- 供前端下拉菜单使用
- 响应格式:
{
"pipelines": [
{"task_type": "coding", "label": "编码开发", "description": "多步:开发→审查→完成"},
{"task_type": "review", "label": "代码审查", "description": "单步:审查→完成"},
{"task_type": "data", "label": "数据获取", "description": "单步:执行→完成"},
{"task_type": "deploy", "label": "部署", "description": "多步:部署→验证→完成"},
{"task_type": "risk_check", "label": "风控检查", "description": "单步:检查→完成"}
]
}
§7 前端改动
-
TaskModal 创建任务时:
- 新增 Pipeline 下拉选择(默认"自由模式")
- 选项从
/api/pipelines动态加载 - 选中 Pipeline 后显示简要流程说明
-
Chat 入口:
- 庞统在需求探索时,判断是否需要指定 Pipeline
- 拿不准时通过 checkpoint 询问用户
§8 Pipeline 引擎
§8.1 注册表
class PipelineRegistry:
"""Pipeline 注册表"""
def __init__(self):
self._pipelines: Dict[str, Pipeline] = {}
self._load_defaults()
def _load_defaults(self):
"""加载内置 Pipeline"""
for cfg in DEFAULT_PIPELINES:
p = self._parse_pipeline(cfg)
self._pipelines[p.task_type] = p
def get(self, task_type: str) -> Optional[Pipeline]:
return self._pipelines.get(task_type)
def list_all(self) -> List[Pipeline]:
return list(self._pipelines.values())
§8.2 状态流转校验
class PipelineEngine:
"""Pipeline 执行引擎"""
def __init__(self, registry: PipelineRegistry, bb: Blackboard):
self.registry = registry
self.bb = bb
def get_or_create_state(self, task: Task) -> Optional[TaskPipelineState]:
"""获取或创建任务的 Pipeline 运行时状态"""
pipeline_type = task.task_type
if not pipeline_type:
return None # 自由模式
pipeline = self.registry.get(pipeline_type)
if not pipeline:
return None
# 从 DB 或内存加载;首次则初始化
state = self.bb.get_pipeline_state(task.id)
if not state:
state = TaskPipelineState(
task_id=task.id,
pipeline_type=pipeline_type,
current_stage=pipeline.entry,
)
self.bb.save_pipeline_state(state)
return state
def peek_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]:
"""纯查询:返回下一个 stage id,不修改 task_state"""
pipeline = self.registry.get(task_state.pipeline_type)
if not pipeline:
return None
current_stage = pipeline.stages.get(task_state.current_stage)
if not current_stage:
return None
if outcome == "success":
return current_stage.on_success
elif outcome == "failure":
retries = task_state.stage_retry_counts.get(current_stage.id, 0)
if current_stage.max_retries > 0 and retries + 1 > current_stage.max_retries:
return "failed"
return current_stage.on_failure
else:
return "failed"
def advance_stage(self, task: Task, task_state: TaskPipelineState, outcome: str) -> str:
"""推进 Pipeline stage(有副作用:修改 task_state,同步 assignee)"""
pipeline = self.registry.get(task_state.pipeline_type)
if not pipeline:
return task.status
current_stage = pipeline.stages.get(task_state.current_stage)
if not current_stage:
return task.status
# 使用 peek 计算下一个 stage
next_stage_id = self.peek_next_stage(task_state, outcome)
if not next_stage_id:
return task.status
# 更新重试计数(仅在 failure 时)
if outcome == "failure" and current_stage.max_retries > 0:
retries = task_state.stage_retry_counts.get(current_stage.id, 0) + 1
task_state.stage_retry_counts[current_stage.id] = retries
# 更新运行时状态
task_state.current_stage = next_stage_id
# 同步更新 assignee(仅 Pipeline 模式,关键!否则 Router 路径 4 会劫持)
next_stage = pipeline.stages.get(next_stage_id)
if next_stage:
new_agent = pipeline.default_agent if next_stage.agent == "pipeline_default" else next_stage.agent
# bb.update_assignee(task.id, new_agent)
return next_stage_id
def get_agent_for_stage(self, task: Task) -> Optional[str]:
"""获取当前 stage 应该执行的 Agent"""
pipeline = self.registry.get(task.task_type)
if not pipeline:
return None
current_stage = pipeline.stages.get(task.status)
if not current_stage:
return None
if current_stage.agent == "pipeline_default":
return pipeline.default_agent
return current_stage.agent
def validate_transition(self, task: Task, new_status: str) -> bool:
"""校验状态流转是否合法(硬约束)"""
pipeline = self.registry.get(task.task_type)
if not pipeline:
return True # 自由模式不限制
# 异常状态始终允许
if new_status in ("failed", "paused", "escalated", "blocked", "cancelled"):
return True
current_stage = pipeline.stages.get(task.status)
if not current_stage:
return True
allowed = {current_stage.on_success, current_stage.on_failure}
return new_status in allowed
§8.3 Ticker 集成
TaskPipelineState 存储方案:建议在 tasks 表新增 pipeline_state_json 列(TEXT),序列化存储 TaskPipelineState。Blackboard 的 get_pipeline_state() / save_pipeline_state() 负责序列化/反序列化。独立表方案亦可,但鉴于 state 结构简单且与任务 1:1,放 tasks 列更方便。
Entry 触发机制
Pipeline 任务的入口阶段不受 Pipeline stages 约束,由 Router 和 Ticker 标准流程处理:
1. 任务创建:status=pending, task_type=coding
2. Ticker 扫到 pending + task_type 有值
3. Router 路径 5 匹配 → pipeline.default_agent(如 zhangfei-dev)
4. Dispatcher 返回 mode=deterministic, agent_id=zhangfei-dev
5. assignee 设为 pipeline.default_agent
6. Ticker spawn zhangfei-dev → Agent claim → status 变为 working
7. current_agent = zhangfei-dev
8. TaskPipelineState 创建:current_stage=working(entry stage)
代码路径统一:所有任务(Pipeline / 自由模式)都走 pending → claim → working。
忙时处理:assignee 有值但 Agent 忙 → 等下一个 tick 重试(和路径 4 一样的行为)。
状态推进流程
Ticker 在任务状态流转时调用 PipelineEngine:
PipelineEngine.get_or_create_state(task)获取运行时状态- 任务完成(spawn 返回)→
classify_outcome()判定 success/failure PipelineEngine.advance_stage(task, task_state, outcome)推进到下一个 stage- 持久化更新
TaskPipelineState(bb.save_pipeline_state(task_state)) - 如果下一个 stage 不是终态 → 更新任务状态 + assignee 已由 advance_stage 同步 → spawn
- 如果是终态 → 标记 done/failed
§9 实施路线
Phase 1:Bug fix(独立实施)
| Step | 内容 | 详细设计 |
|---|---|---|
| 1.1 | task_type 默认值改 None | §10.1 |
| 1.2 | 广播计数器修正 | §10.2 |
Phase 2:Pipeline 全量实施(Phase 1 完成后一口气做)
| Step | 内容 |
|---|---|
| 2.1 | Pipeline 注册表 + PipelineEngine(§8.1 + §8.2) |
| 2.2 | Router 路径 5(§4) |
| 2.3 | Ticker 集成 PipelineEngine(§8.3) |
| 2.4 | 广播 tracker 反馈回调注入 |
| 2.5 | API 端点(§6) |
| 2.6 | 前端 Pipeline 下拉(§7) |
| 2.7 | E2E 测试覆盖 |
§10 Phase 1 详细设计
§10.1 task_type 默认值改 None
文件:src/api/blackboard_routes.py
改动:1 行
# 当前(line 133):
task_type=body.get("task_type", "coding"),
# 改为:
task_type=body.get("task_type", None),
影响分析:
| 场景 | 改前 | 改后 |
|---|---|---|
| 前端不传 task_type | task_type="coding" → Router 不看 → 走广播 | task_type=None → Router 不看 → 走广播 |
| 前端传 task_type="coding" | coding | coding(不变) |
| E2E 测试不传 task_type | coding | None |
| 现有 DB 数据 | 不受影响(已写入) | 不受影响 |
风险:低。改前改后对于当前 Router 行为一致(Router 不看 task_type,都是走广播)。区别在于 Phase 2 实施后,task_type=None 的任务会继续走广播,而显式传了 task_type 的会走 Pipeline。
E2E 测试影响:待验证 grep 现有测试中 task_type 相关的硬断言,确认改为 None 不会破坏测试。如果有断言 task_type == "coding" 的测试,需改为 task_type == None 或删除断言。
测试:现有 E2E 测试中不传 task_type 的用例行为不变(都走广播),无需改测试。
§10.2 广播计数器修正
文件:src/daemon/ticker.py
当前问题
_broadcast_claim在 line ~1020 执行广播- line ~1046 检查
retry_count >= 3但从不递增 retry_count - retry_count 只在
_check_timeouts的 claimed 超时路径递增(line ~1317) - 结果:没人认领的广播任务每 tick 广播一次,retry_count 永远为 0,无限循环
修复方案
新增数据结构(ticker.py 顶部):
from dataclasses import dataclass, field
@dataclass
class BroadcastRound:
"""追踪单个任务的广播状态"""
task_id: str
notified_agents: set = field(default_factory=set) # 已 spawn 过的 Agent
responded_agents: set = field(default_factory=set) # 已返回反馈的 Agent
round_number: int = 1 # 当前第几轮
Ticker 类新增属性:
class Ticker:
def __init__(self, ...):
...
self._broadcast_tracker: Dict[str, BroadcastRound] = {}
self._all_agent_ids: Set[str] = set() # 从 config 加载
_broadcast_claim 改造(替换 line ~1020 区域):
def _broadcast_claim(self, db_path: Path, broadcast_tasks: list):
"""广播认领:按轮次追踪,所有 Agent 反馈才算一轮"""
for task_info in broadcast_tasks:
task_id = task_info["id"]
# 获取或创建 tracker
tracker = self._broadcast_tracker.get(task_id)
if not tracker:
tracker = BroadcastRound(task_id=task_id)
self._broadcast_tracker[task_id] = tracker
# 检查是否已认领(可能上一个 tick 被认领了)
bb = Blackboard(db_path)
task = bb.get_task(task_id)
if task.status != "pending":
# 已被认领或状态已变,清理 tracker
self._broadcast_tracker.pop(task_id, None)
continue
# 获取空闲 Agent
idle_agents = self._get_idle_agents()
# 过滤已通知过的,只 spawn 尚未通知的
pending_agents = [a for a in idle_agents if a not in tracker.notified_agents]
if pending_agents:
# spawn 尚未通知的 Agent
for agent_id in pending_agents:
tracker.notified_agents.add(agent_id)
# ... 现有 spawn 广播逻辑 ...
else:
# 所有空闲 Agent 都已通知过
# 检查是否所有已知 Agent 都通知了
unnotified = self._all_agent_ids - tracker.notified_agents
if not unnotified:
# 所有 Agent 都通知过了 → 检查是否全部反馈
if tracker.responded_agents >= tracker.notified_agents:
# 全部已通知的 Agent 都反馈了且没人认领 → 一轮结束
if tracker.round_number >= 3:
# 3 轮广播无人认领,升级庞统
self._escalate_task(db_path, task_id)
del self._broadcast_tracker[task_id]
logger.warning(
"Broadcast 3 rounds exhausted, escalating: %s", task_id
)
else:
# 开始新一轮:重置通知/反馈,轮次+1
tracker.round_number += 1
tracker.notified_agents.clear()
tracker.responded_agents.clear()
logger.info(
"Broadcast round %d starting for: %s",
tracker.round_number, task_id,
)
# else: 还有 Agent 没通知到(可能在忙),等下一个 tick
Agent 反馈回调(通过 Ticker 公共 API,在 spawner.py 的 on_agent_complete 中注入):
# ticker.py
def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str):
"""记录 Agent 对广播任务的反馈(公共 API)"""
tracker = self._broadcast_tracker.get(task_id)
if not tracker:
return
if outcome == "claimed":
self._broadcast_tracker.pop(task_id, None)
else:
tracker.responded_agents.add(agent_id)
# spawner.py 中 classify_outcome 之后
# 调用方式:self._ticker.record_broadcast_response(task_id, agent_id, outcome)
注入位置:spawner.py 的 classify_outcome() 返回后、update_status() 调用前。
Ticker 重启恢复:
简单方案:重启时清空 _broadcast_tracker,所有广播中的任务从第一轮重新开始。最坏情况多广播一轮,可接受。
# ticker.py 启动时
self._broadcast_tracker.clear()
logger.info("Broadcast tracker cleared (ticker restart)")
测试:
# tests/test_broadcast_tracker.py
def test_round_completes_when_all_respond():
"""所有 Agent 反馈后,一轮结束"""
...
def test_round_resets_for_next_round():
"""一轮结束后,开始新一轮"""
...
def test_escalate_after_3_rounds():
"""3 轮无人认领,升级庞统"""
...
def test_claim_clears_tracker():
"""Agent 认领后,tracker 清理"""
...
def test_busy_agent_not_counted():
"""忙的 Agent 不算已通知,下 tick 继续尝试"""
...
改动文件清单
| 文件 | 改动 | 行数估计 |
|---|---|---|
src/daemon/ticker.py |
BroadcastRound 数据类 + tracker 属性 + _broadcast_claim 改造 + _escalate_task | ~60 行新增/修改 |
src/daemon/spawner.py |
_on_broadcast_response 回调注入 | ~15 行 |
tests/test_broadcast_tracker.py |
新测试文件 | ~80 行 |
§11 待确认
- Pipeline 注册表存储方式:内存 dict(当前方案)vs DB table(持久化)vs YAML file(声明式配置)?
- 灵活 Pipeline 的 stage 定义格式:用 YAML 声明式还是 Python dict 硬编码?前端需要渲染流程图吗?
- coding pipeline 的 stages 是否需要更细粒度?如 plan → implement → review → done?
- Phase 2 实施时,是否一次性把 YAML 声明式配置也做了,还是先硬编码?
- 广播 tracker 的反馈回调:在 spawner 的
classify_outcome之后注入,还是用独立的事件回调?
§A 评审记录
仲达评审(2026-06-04)+ Rebuttal
原始结论:1 个必须修 + 6 个建议。 Rebuttal 后结论:3 项接受,2 项不接受,2 项待补充。双方达成一致。
| # | 问题 | 仲达原始判定 | Rebuttal 结论 | 理由 |
|---|---|---|---|---|
| 1 | get_next_stage 副作用 | 建议 | ✅ 接受(拆 peek + advance) | 拆方法比单纯改名更清晰 |
| 2 | retry_counts 清理 | 建议 | ❌ 不改 | 清理丢历史,仲达自己说不影响正确性 |
| 3 | 忙 Agent 死锁 | 必须修 | ❌ 不改 | 广播轮次定义(所有人收到并反馈 = 一轮)和需求一致。Agent 忙由 Spawner 保证(含超时上限),不是 Ticker 职责 |
| 4 | pending/claimed 说明 | 建议 | ✅ 补充 | |
| 5 | entry 触发机制 | 建议 | ✅ 补充(走 claim) | Pipeline 任务走标准 claim 流程,代码路径统一,忙时自然排队 |
| 6 | assignee 同步更新 | 建议 | ✅ 接受(仅 Pipeline 模式) | |
| 7 | role_map 具体 reviewer | 建议 | 记录(#11 范围) |
§B 后记:强工作流与 AI native 的关系(2026-06-04)
设计意图记录
Pipeline 强工作流模式本质上不是 AI native:
- 自由模式(AI native):Agent 自主判断、自主认领、自主决定执行方式。系统只提供任务和约束,Agent 是决策主体
- Pipeline 强工作流:系统控制流程、指定执行者、规定步骤顺序。Agent 是执行主体,不是决策主体
这两个模式的差异不仅是"要不要 claim"的问题,而是影响到:
- 引擎提示词(强工作流下提示词要严格约束 Agent 行为边界)
- Agent 的自主性(强工作流下 Agent 不能自主决定跳过/偏离)
- 错误处理(强工作流下 Agent 失败是流程回退,不是自主重试)
实施策略
不着急实现 Pipeline 强工作流。先把设计方案考虑清楚,特别是:
- 强工作流下提示词模板如何约束 Agent 不产生不可预料分支
- 自由模式和强工作流模式如何共存(同一系统、不同任务)
- 两种模式对 Spawner/Ticker/Router 的差异化需求
当前优先级:
- Phase 1(bug fix):task_type 默认值改 None + 广播计数器修正
- Pipeline 强工作流:设计继续深化,等方案成熟后再实施