Files
2026-06-04 21:57:44 +08:00

27 KiB
Raw Permalink Blame History

Pipeline 设计专题

版本: v2.1 日期: 2026-06-04 作者: 庞统(副军师) 状态: 评审中

§1 背景

当前 moziplus v2 的任务调度只有两条路径:确定性路由(Router 四条快速路径)和广播认领(spawn 所有空闲 Agent)。Router 不看 task_type,导致 coding 任务可能走入广播路径且无人认领时无限循环。

设计目标:

  1. 支持用户指定任务执行流程(Pipeline)
  2. 不指定时走广播认领(自由模式)
  3. 广播认领有合理的轮次限制
  4. Pipeline 支持灵活流转:顺序、分支、退回、循环重试

§2 核心决策

D1: Pipeline 模式 vs 自由模式二选一

  • 指定了 Pipeline → 硬约束模式,Agent 必须按 Pipeline 定义的流程执行,不可偏离
  • 不指定 PipelineNone/general)→ 自由模式,广播认领,Agent 自主决定怎么做,只有基本完成标准(产出物 + handoff)

D2: Pipeline 是强工作流,但支持灵活流转

  • 指定了 Pipeline 的任务,状态机是硬约束,执行步骤也是硬约束
  • Agent 必须走 Pipeline 定义的所有步骤,不能跳过
  • 但 Pipeline 本身可以定义灵活的流转规则:条件分支、退回重做、循环重试
  • 没有"可偏离"的中间地带——偏离必须是 Pipeline 预定义的路径

D3: @mention 在 Pipeline 模式下只做通知,不改变执行者

  • Pipeline 模式(强工作流):执行者由 Pipeline 决定(coding → zhangfei-dev),@mention 只是在黑板 comment 通知该 Agent,不改变执行者
  • 自由模式:@mention 决定执行者(和现有逻辑一致)
  • 理由:强工作流意味着流程和角色都是确定的,让数据专家走 coding 流程不合理

D4: task_type 默认值改为 None

  • API 层 task_type=body.get("task_type", None) 而非默认 "coding"
  • 不指定 = 自由模式,走广播认领
  • 指定了 = Pipeline 模式

D5: 两个入口

  1. 前端入口:创建任务时下拉菜单选择 Pipeline,默认 none
  2. Chat 入口:用户通过自然语言提需求,庞统判断是否需要指定 Pipeline。拿不准时询问用户

§3 Pipeline 定义(灵活流转)

每个 Pipeline 是一个有向状态机,支持顺序、条件分支、退回、循环重试。

3.1 数据结构

@dataclass
class PipelineStage:
    id: str                          # 步骤标识(如 "working", "review"
    agent: str                       # 执行者:固定 agent_id 或 "pipeline_default"
    on_success: str                  # 成功后转到哪个 stage(或 "done"/"failed"
    on_failure: str                  # 失败后转到哪个 stage(可指向前序 stage = 退回)
    max_retries: int = 0             # 最大退回重试次数(0=不限,超限转 failed)
    is_terminal: bool = False        # 是否终态
    # 注意:retry_count 不在此处——PipelineStage 是注册表模板,所有同类型任务共享

@dataclass
class TaskPipelineState:
    """每个任务的 Pipeline 运行时状态(存在 DB 或内存)"""
    task_id: str
    pipeline_type: str
    current_stage: str
    stage_retry_counts: Dict[str, int] = field(default_factory=dict)  # stage_id → 已重试次数

@dataclass
class Pipeline:
    task_type: str                   # 类型标识
    default_agent: str               # 默认执行者
    entry: str                       # 入口 stage id
    stages: Dict[str, PipelineStage] # stage id → PipelineStage

3.2 流转规则

流转类型 实现方式 示例
顺序 on_success 指向下一个 stage working → review → done
条件分支 on_success / on_failure 指向不同 stage review 通过→donereview 拒绝→working
退回 on_failure 指向前序 stage review 失败退回 working
循环/重试 max_retries + TaskPipelineState.stage_retry_counts 控制 review 最多退回 3 次,超过转 failed

异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。

认领阶段不受 Pipeline 约束:Pipeline 约束的是工作阶段(working/review/verify 等)。认领阶段(pending → claimed → entry stage)不受 Pipeline stages 约束,由 Router 和 Ticker 的标准流程处理。

3.3 Pipeline 示例

coding pipelinemulti_step,含退回重试)

task_type: coding
default_agent: zhangfei-dev
entry: working

stages:
  working:
    agent: pipeline_default
    on_success: review
    on_failure: failed

  review:
    agent: simayi-challenger
    on_success: done
    on_failure: working      # review 失败退回重做
    max_retries: 3           # 最多退回 3 次,超过转 failed

流转图:

pending → claimed → working ──→ review ──→ done
                      ↑            │
                      └────────────┘ (on_failure, 最多3次)
                                   │ >3次
                                   └──→ failed

data pipelinesingle_step,简单顺序)

task_type: data
default_agent: zhaoyun-data
entry: working

stages:
  working:
    agent: pipeline_default
    on_success: done
    on_failure: failed

review pipeline(单步,固定 agent

task_type: review
default_agent: simayi-challenger
entry: working

stages:
  working:
    agent: simayi-challenger
    on_success: done
    on_failure: failed

deploy pipeline(含验证循环)

task_type: deploy
default_agent: jiangwei-infra
entry: working

stages:
  working:
    agent: pipeline_default
    on_success: verify
    on_failure: failed

  verify:
    agent: pipeline_default    # 部署者自验
    on_success: done
    on_failure: working        # 验证失败退回重新部署
    max_retries: 2             # 最多重试 2 次

risk_check pipeline(单步)

task_type: risk_check
default_agent: guanyu-dev
entry: working

stages:
  working:
    agent: pipeline_default
    on_success: done
    on_failure: failed

自由模式(None/不指定)

项目
default_agent 广播认领
stages pending → claimed → working → review → done(基本状态机,无 Pipeline 约束)

3.4 初始 Pipeline 注册表

task_type 类型 default_agent stages 退回
coding multi_step zhangfei-dev working → review → done review→working (max 3)
review single_step simayi-challenger working → done
data single_step zhaoyun-data working → done
deploy multi_step jiangwei-infra working → verify → done verify→working (max 2)
risk_check single_step guanyu-dev working → done
None 自由模式 广播认领 基本状态机

§4 路由逻辑(更新 Router

Router 新增路径 5:按 task_type 匹配 Pipeline

路径1: 本地操作 (action_type ∈ LOCAL_ACTIONS)
路径2: retry → 原执行者 (action_type == "retry")
Mode B: Agent 声明式交接 (next_capability 有值)
路径3: 生命周期流转 (status ∈ LIFECYCLE_CAPABILITY)
路径4: 有 assignee → 直接给
路径5(新增): task_type 有值 → 查 Pipeline 注册表 → 用 Pipeline 的 default_agent
  - @mention 不覆盖执行者(Pipeline 模式下 @mention 只做通知)
  - mode = "deterministic"
模糊场景: delegate 庞统 → 广播

路径 5 的优先级低于路径 4(assignee)和 Mode B(声明式交接),高于模糊场景。

路径 5 的具体逻辑:

# router.py 新增
def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]:
    task_type = task_info.get("task_type")
    if not task_type:
        return None
    
    pipeline = self.pipeline_registry.get(task_type)
    if not pipeline:
        return None
    
    return RouteDecision(
        agent_id=pipeline.default_agent,
        mode="deterministic",
        confidence=0.85,
        reason=f"Pipeline match: {task_type}{pipeline.default_agent}",
    )

§5 广播定义修正

一轮广播的定义:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。

具体机制:

  • 每个 pending 广播任务,维护一个 notified_agents 追踪已通知和已反馈的 Agent
  • 每次 tick:spawn 空闲且尚未被通知的 Agent
  • Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈
  • Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试
  • 当所有 Agent 都已通知并反馈且没人认领 → 一轮结束 → round_number +1
  • 有人认领 → 广播立即结束(不需要等其他 Agent 反馈)
  • 连续 3 轮无人认领 → 升级庞统

注意:Agent 何时被 spawn 由 Spawner 保证(含超时上限),Ticker 不需要额外超时机制。

详细设计见 §10.2。

§6 API 改动

§6.1 修改现有端点

POST /api/projects/{pid}/tasks

  • task_type 默认改为 None(不再是 "coding"
  • 前端传 task_type 时走 Pipeline,不传时走自由模式

§6.2 新增端点

GET /api/pipelines

  • 返回可用 Pipeline 列表(从注册表加载)
  • 供前端下拉菜单使用
  • 响应格式:
{
  "pipelines": [
    {"task_type": "coding", "label": "编码开发", "description": "多步:开发→审查→完成"},
    {"task_type": "review", "label": "代码审查", "description": "单步:审查→完成"},
    {"task_type": "data", "label": "数据获取", "description": "单步:执行→完成"},
    {"task_type": "deploy", "label": "部署", "description": "多步:部署→验证→完成"},
    {"task_type": "risk_check", "label": "风控检查", "description": "单步:检查→完成"}
  ]
}

§7 前端改动

  1. TaskModal 创建任务时:

    • 新增 Pipeline 下拉选择(默认"自由模式")
    • 选项从 /api/pipelines 动态加载
    • 选中 Pipeline 后显示简要流程说明
  2. Chat 入口:

    • 庞统在需求探索时,判断是否需要指定 Pipeline
    • 拿不准时通过 checkpoint 询问用户

§8 Pipeline 引擎

§8.1 注册表

class PipelineRegistry:
    """Pipeline 注册表"""
    
    def __init__(self):
        self._pipelines: Dict[str, Pipeline] = {}
        self._load_defaults()
    
    def _load_defaults(self):
        """加载内置 Pipeline"""
        for cfg in DEFAULT_PIPELINES:
            p = self._parse_pipeline(cfg)
            self._pipelines[p.task_type] = p
    
    def get(self, task_type: str) -> Optional[Pipeline]:
        return self._pipelines.get(task_type)
    
    def list_all(self) -> List[Pipeline]:
        return list(self._pipelines.values())

§8.2 状态流转校验

class PipelineEngine:
    """Pipeline 执行引擎"""
    
    def __init__(self, registry: PipelineRegistry, bb: Blackboard):
        self.registry = registry
        self.bb = bb
    
    def get_or_create_state(self, task: Task) -> Optional[TaskPipelineState]:
        """获取或创建任务的 Pipeline 运行时状态"""
        pipeline_type = task.task_type
        if not pipeline_type:
            return None  # 自由模式
        pipeline = self.registry.get(pipeline_type)
        if not pipeline:
            return None
        # 从 DB 或内存加载;首次则初始化
        state = self.bb.get_pipeline_state(task.id)
        if not state:
            state = TaskPipelineState(
                task_id=task.id,
                pipeline_type=pipeline_type,
                current_stage=pipeline.entry,
            )
            self.bb.save_pipeline_state(state)
        return state
    
    def peek_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]:
        """纯查询:返回下一个 stage id,不修改 task_state"""
        pipeline = self.registry.get(task_state.pipeline_type)
        if not pipeline:
            return None
        
        current_stage = pipeline.stages.get(task_state.current_stage)
        if not current_stage:
            return None
        
        if outcome == "success":
            return current_stage.on_success
        elif outcome == "failure":
            retries = task_state.stage_retry_counts.get(current_stage.id, 0)
            if current_stage.max_retries > 0 and retries + 1 > current_stage.max_retries:
                return "failed"
            return current_stage.on_failure
        else:
            return "failed"

    def advance_stage(self, task: Task, task_state: TaskPipelineState, outcome: str) -> str:
        """推进 Pipeline stage(有副作用:修改 task_state,同步 assignee"""
        pipeline = self.registry.get(task_state.pipeline_type)
        if not pipeline:
            return task.status
        
        current_stage = pipeline.stages.get(task_state.current_stage)
        if not current_stage:
            return task.status
        
        # 使用 peek 计算下一个 stage
        next_stage_id = self.peek_next_stage(task_state, outcome)
        if not next_stage_id:
            return task.status
        
        # 更新重试计数(仅在 failure 时)
        if outcome == "failure" and current_stage.max_retries > 0:
            retries = task_state.stage_retry_counts.get(current_stage.id, 0) + 1
            task_state.stage_retry_counts[current_stage.id] = retries
        
        # 更新运行时状态
        task_state.current_stage = next_stage_id
        
        # 同步更新 assignee(仅 Pipeline 模式,关键!否则 Router 路径 4 会劫持)
        next_stage = pipeline.stages.get(next_stage_id)
        if next_stage:
            new_agent = pipeline.default_agent if next_stage.agent == "pipeline_default" else next_stage.agent
            # bb.update_assignee(task.id, new_agent)
        
        return next_stage_id
    
    def get_agent_for_stage(self, task: Task) -> Optional[str]:
        """获取当前 stage 应该执行的 Agent"""
        pipeline = self.registry.get(task.task_type)
        if not pipeline:
            return None
        
        current_stage = pipeline.stages.get(task.status)
        if not current_stage:
            return None
        
        if current_stage.agent == "pipeline_default":
            return pipeline.default_agent
        return current_stage.agent
    
    def validate_transition(self, task: Task, new_status: str) -> bool:
        """校验状态流转是否合法(硬约束)"""
        pipeline = self.registry.get(task.task_type)
        if not pipeline:
            return True  # 自由模式不限制
        
        # 异常状态始终允许
        if new_status in ("failed", "paused", "escalated", "blocked", "cancelled"):
            return True
        
        current_stage = pipeline.stages.get(task.status)
        if not current_stage:
            return True
        
        allowed = {current_stage.on_success, current_stage.on_failure}
        return new_status in allowed

§8.3 Ticker 集成

TaskPipelineState 存储方案:建议在 tasks 表新增 pipeline_state_json 列(TEXT),序列化存储 TaskPipelineState。Blackboard 的 get_pipeline_state() / save_pipeline_state() 负责序列化/反序列化。独立表方案亦可,但鉴于 state 结构简单且与任务 1:1,放 tasks 列更方便。

Entry 触发机制

Pipeline 任务的入口阶段不受 Pipeline stages 约束,由 Router 和 Ticker 标准流程处理:

1. 任务创建:status=pending, task_type=coding
2. Ticker 扫到 pending + task_type 有值
3. Router 路径 5 匹配 → pipeline.default_agent(如 zhangfei-dev
4. Dispatcher 返回 mode=deterministic, agent_id=zhangfei-dev
5. assignee 设为 pipeline.default_agent
6. Ticker spawn zhangfei-dev → Agent claim → status 变为 working
7. current_agent = zhangfei-dev
8. TaskPipelineState 创建:current_stage=workingentry stage

代码路径统一:所有任务(Pipeline / 自由模式)都走 pending → claim → working。
忙时处理:assignee 有值但 Agent 忙 → 等下一个 tick 重试(和路径 4 一样的行为)。

状态推进流程

Ticker 在任务状态流转时调用 PipelineEngine

  1. PipelineEngine.get_or_create_state(task) 获取运行时状态
  2. 任务完成(spawn 返回)→ classify_outcome() 判定 success/failure
  3. PipelineEngine.advance_stage(task, task_state, outcome) 推进到下一个 stage
  4. 持久化更新 TaskPipelineStatebb.save_pipeline_state(task_state)
  5. 如果下一个 stage 不是终态 → 更新任务状态 + assignee 已由 advance_stage 同步 → spawn
  6. 如果是终态 → 标记 done/failed

§9 实施路线

Phase 1Bug fix(独立实施)

Step 内容 详细设计
1.1 task_type 默认值改 None §10.1
1.2 广播计数器修正 §10.2

Phase 2Pipeline 全量实施(Phase 1 完成后一口气做)

Step 内容
2.1 Pipeline 注册表 + PipelineEngine(§8.1 + §8.2
2.2 Router 路径 5(§4
2.3 Ticker 集成 PipelineEngine(§8.3
2.4 广播 tracker 反馈回调注入
2.5 API 端点(§6
2.6 前端 Pipeline 下拉(§7
2.7 E2E 测试覆盖

§10 Phase 1 详细设计

§10.1 task_type 默认值改 None

文件src/api/blackboard_routes.py

改动1 行

# 当前(line 133):
task_type=body.get("task_type", "coding"),

# 改为:
task_type=body.get("task_type", None),

影响分析

场景 改前 改后
前端不传 task_type task_type="coding" → Router 不看 → 走广播 task_type=None → Router 不看 → 走广播
前端传 task_type="coding" coding coding(不变)
E2E 测试不传 task_type coding None
现有 DB 数据 不受影响(已写入) 不受影响

风险:低。改前改后对于当前 Router 行为一致(Router 不看 task_type,都是走广播)。区别在于 Phase 2 实施后,task_type=None 的任务会继续走广播,而显式传了 task_type 的会走 Pipeline。

E2E 测试影响:待验证 grep 现有测试中 task_type 相关的硬断言,确认改为 None 不会破坏测试。如果有断言 task_type == "coding" 的测试,需改为 task_type == None 或删除断言。

测试:现有 E2E 测试中不传 task_type 的用例行为不变(都走广播),无需改测试。

§10.2 广播计数器修正

文件src/daemon/ticker.py

当前问题

  • _broadcast_claim 在 line ~1020 执行广播
  • line ~1046 检查 retry_count >= 3 但从不递增 retry_count
  • retry_count 只在 _check_timeouts 的 claimed 超时路径递增(line ~1317
  • 结果:没人认领的广播任务每 tick 广播一次,retry_count 永远为 0,无限循环

修复方案

新增数据结构ticker.py 顶部):

from dataclasses import dataclass, field

@dataclass
class BroadcastRound:
    """追踪单个任务的广播状态"""
    task_id: str
    notified_agents: set = field(default_factory=set)   # 已 spawn 过的 Agent
    responded_agents: set = field(default_factory=set)   # 已返回反馈的 Agent
    round_number: int = 1                                # 当前第几轮

Ticker 类新增属性

class Ticker:
    def __init__(self, ...):
        ...
        self._broadcast_tracker: Dict[str, BroadcastRound] = {}
        self._all_agent_ids: Set[str] = set()  # 从 config 加载

_broadcast_claim 改造(替换 line ~1020 区域):

def _broadcast_claim(self, db_path: Path, broadcast_tasks: list):
    """广播认领:按轮次追踪,所有 Agent 反馈才算一轮"""
    
    for task_info in broadcast_tasks:
        task_id = task_info["id"]
        
        # 获取或创建 tracker
        tracker = self._broadcast_tracker.get(task_id)
        if not tracker:
            tracker = BroadcastRound(task_id=task_id)
            self._broadcast_tracker[task_id] = tracker
        
        # 检查是否已认领(可能上一个 tick 被认领了)
        bb = Blackboard(db_path)
        task = bb.get_task(task_id)
        if task.status != "pending":
            # 已被认领或状态已变,清理 tracker
            self._broadcast_tracker.pop(task_id, None)
            continue
        
        # 获取空闲 Agent
        idle_agents = self._get_idle_agents()
        
        # 过滤已通知过的,只 spawn 尚未通知的
        pending_agents = [a for a in idle_agents if a not in tracker.notified_agents]
        
        if pending_agents:
            # spawn 尚未通知的 Agent
            for agent_id in pending_agents:
                tracker.notified_agents.add(agent_id)
                # ... 现有 spawn 广播逻辑 ...
        else:
            # 所有空闲 Agent 都已通知过
            # 检查是否所有已知 Agent 都通知了
            unnotified = self._all_agent_ids - tracker.notified_agents
            
            if not unnotified:
                # 所有 Agent 都通知过了 → 检查是否全部反馈
                if tracker.responded_agents >= tracker.notified_agents:
                # 全部已通知的 Agent 都反馈了且没人认领 → 一轮结束
                if tracker.round_number >= 3:
                    # 3 轮广播无人认领,升级庞统
                    self._escalate_task(db_path, task_id)
                    del self._broadcast_tracker[task_id]
                    logger.warning(
                        "Broadcast 3 rounds exhausted, escalating: %s", task_id
                    )
                else:
                    # 开始新一轮:重置通知/反馈,轮次+1
                    tracker.round_number += 1
                    tracker.notified_agents.clear()
                    tracker.responded_agents.clear()
                    logger.info(
                        "Broadcast round %d starting for: %s",
                        tracker.round_number, task_id,
                    )
            # else: 还有 Agent 没通知到(可能在忙),等下一个 tick

Agent 反馈回调(通过 Ticker 公共 API,在 spawner.py 的 on_agent_complete 中注入):

# ticker.py
 def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str):
    """记录 Agent 对广播任务的反馈(公共 API"""
    tracker = self._broadcast_tracker.get(task_id)
    if not tracker:
        return
    if outcome == "claimed":
        self._broadcast_tracker.pop(task_id, None)
    else:
        tracker.responded_agents.add(agent_id)

# spawner.py 中 classify_outcome 之后
# 调用方式:self._ticker.record_broadcast_response(task_id, agent_id, outcome)

注入位置:spawner.py 的 classify_outcome() 返回后、update_status() 调用前。

Ticker 重启恢复

简单方案:重启时清空 _broadcast_tracker,所有广播中的任务从第一轮重新开始。最坏情况多广播一轮,可接受。

# ticker.py 启动时
self._broadcast_tracker.clear()
logger.info("Broadcast tracker cleared (ticker restart)")

测试

# tests/test_broadcast_tracker.py
def test_round_completes_when_all_respond():
    """所有 Agent 反馈后,一轮结束"""
    ...

def test_round_resets_for_next_round():
    """一轮结束后,开始新一轮"""
    ...

def test_escalate_after_3_rounds():
    """3 轮无人认领,升级庞统"""
    ...

def test_claim_clears_tracker():
    """Agent 认领后,tracker 清理"""
    ...

def test_busy_agent_not_counted():
    """忙的 Agent 不算已通知,下 tick 继续尝试"""
    ...

改动文件清单

文件 改动 行数估计
src/daemon/ticker.py BroadcastRound 数据类 + tracker 属性 + _broadcast_claim 改造 + _escalate_task ~60 行新增/修改
src/daemon/spawner.py _on_broadcast_response 回调注入 ~15 行
tests/test_broadcast_tracker.py 新测试文件 ~80 行

§11 待确认

  1. Pipeline 注册表存储方式:内存 dict(当前方案)vs DB table(持久化)vs YAML file(声明式配置)?
  2. 灵活 Pipeline 的 stage 定义格式:用 YAML 声明式还是 Python dict 硬编码?前端需要渲染流程图吗?
  3. coding pipeline 的 stages 是否需要更细粒度?如 plan → implement → review → done
  4. Phase 2 实施时,是否一次性把 YAML 声明式配置也做了,还是先硬编码?
  5. 广播 tracker 的反馈回调:在 spawner 的 classify_outcome 之后注入,还是用独立的事件回调?

§A 评审记录

仲达评审(2026-06-04+ Rebuttal

原始结论1 个必须修 + 6 个建议。 Rebuttal 后结论:3 项接受,2 项不接受,2 项待补充。双方达成一致。

# 问题 仲达原始判定 Rebuttal 结论 理由
1 get_next_stage 副作用 建议 接受(拆 peek + advance 拆方法比单纯改名更清晰
2 retry_counts 清理 建议 不改 清理丢历史,仲达自己说不影响正确性
3 忙 Agent 死锁 必须修 不改 广播轮次定义(所有人收到并反馈 = 一轮)和需求一致。Agent 忙由 Spawner 保证(含超时上限),不是 Ticker 职责
4 pending/claimed 说明 建议 补充
5 entry 触发机制 建议 补充(走 claim Pipeline 任务走标准 claim 流程,代码路径统一,忙时自然排队
6 assignee 同步更新 建议 接受(仅 Pipeline 模式)
7 role_map 具体 reviewer 建议 记录(#11 范围)

§B 后记:强工作流与 AI native 的关系(2026-06-04

设计意图记录

Pipeline 强工作流模式本质上不是 AI native

  • 自由模式(AI native:Agent 自主判断、自主认领、自主决定执行方式。系统只提供任务和约束,Agent 是决策主体
  • Pipeline 强工作流:系统控制流程、指定执行者、规定步骤顺序。Agent 是执行主体,不是决策主体

这两个模式的差异不仅是"要不要 claim"的问题,而是影响到:

  • 引擎提示词(强工作流下提示词要严格约束 Agent 行为边界)
  • Agent 的自主性(强工作流下 Agent 不能自主决定跳过/偏离)
  • 错误处理(强工作流下 Agent 失败是流程回退,不是自主重试)

实施策略

不着急实现 Pipeline 强工作流。先把设计方案考虑清楚,特别是:

  1. 强工作流下提示词模板如何约束 Agent 不产生不可预料分支
  2. 自由模式和强工作流模式如何共存(同一系统、不同任务)
  3. 两种模式对 Spawner/Ticker/Router 的差异化需求

当前优先级:

  • Phase 1bug fixtask_type 默认值改 None + 广播计数器修正
  • Pipeline 强工作流:设计继续深化,等方案成熟后再实施