diff --git a/docs/design/12-pipeline-design.md b/docs/design/12-pipeline-design.md index 68be849..035db20 100644 --- a/docs/design/12-pipeline-design.md +++ b/docs/design/12-pipeline-design.md @@ -55,8 +55,16 @@ class PipelineStage: on_success: str # 成功后转到哪个 stage(或 "done"/"failed") on_failure: str # 失败后转到哪个 stage(可指向前序 stage = 退回) max_retries: int = 0 # 最大退回重试次数(0=不限,超限转 failed) - retry_count: int = 0 # 当前已重试次数(运行时状态) is_terminal: bool = False # 是否终态 + # 注意:retry_count 不在此处——PipelineStage 是注册表模板,所有同类型任务共享 + +@dataclass +class TaskPipelineState: + """每个任务的 Pipeline 运行时状态(存在 DB 或内存)""" + task_id: str + pipeline_type: str + current_stage: str + stage_retry_counts: Dict[str, int] = field(default_factory=dict) # stage_id → 已重试次数 @dataclass class Pipeline: @@ -73,7 +81,7 @@ class Pipeline: | **顺序** | `on_success` 指向下一个 stage | working → review → done | | **条件分支** | `on_success` / `on_failure` 指向不同 stage | review 通过→done,review 拒绝→working | | **退回** | `on_failure` 指向前序 stage | review 失败退回 working | -| **循环/重试** | `max_retries` + `retry_count` 控制 | review 最多退回 3 次,超过转 failed | +| **循环/重试** | `max_retries` + `TaskPipelineState.stage_retry_counts` 控制 | review 最多退回 3 次,超过转 failed | 异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。 @@ -313,26 +321,50 @@ class PipelineEngine: self.registry = registry self.bb = bb - def get_next_stage(self, task: Task, outcome: str) -> Optional[str]: - """根据当前 stage 和执行结果,计算下一个 stage""" - pipeline = self.registry.get(task.task_type) + def get_or_create_state(self, task: Task) -> Optional[TaskPipelineState]: + """获取或创建任务的 Pipeline 运行时状态""" + pipeline_type = task.task_type + if not pipeline_type: + return None # 自由模式 + pipeline = self.registry.get(pipeline_type) + if not pipeline: + return None + # 从 DB 或内存加载;首次则初始化 + state = self.bb.get_pipeline_state(task.id) + if not state: + state = TaskPipelineState( + task_id=task.id, + pipeline_type=pipeline_type, + current_stage=pipeline.entry, + ) + self.bb.save_pipeline_state(state) + return state + + def get_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]: + """根据当前 stage 和执行结果,计算下一个 stage(纯函数,不修改 PipelineStage 模板)""" + pipeline = self.registry.get(task_state.pipeline_type) if not pipeline: return None # 自由模式,不干预 - current_stage = pipeline.stages.get(task.status) + current_stage = pipeline.stages.get(task_state.current_stage) if not current_stage: return None if outcome == "success": + task_state.current_stage = current_stage.on_success return current_stage.on_success elif outcome == "failure": # 检查 max_retries if current_stage.max_retries > 0: - current_stage.retry_count += 1 - if current_stage.retry_count > current_stage.max_retries: + retries = task_state.stage_retry_counts.get(current_stage.id, 0) + retries += 1 + task_state.stage_retry_counts[current_stage.id] = retries + if retries > current_stage.max_retries: + task_state.current_stage = "failed" return "failed" return current_stage.on_failure else: + task_state.current_stage = "failed" return "failed" def get_agent_for_stage(self, task: Task) -> Optional[str]: @@ -369,11 +401,15 @@ class PipelineEngine: ### §8.3 Ticker 集成 +**TaskPipelineState 存储方案**:建议在 tasks 表新增 `pipeline_state_json` 列(TEXT),序列化存储 `TaskPipelineState`。Blackboard 的 `get_pipeline_state()` / `save_pipeline_state()` 负责序列化/反序列化。独立表方案亦可,但鉴于 state 结构简单且与任务 1:1,放 tasks 列更方便。 + Ticker 在任务状态流转时调用 PipelineEngine: -1. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure -2. `PipelineEngine.get_next_stage()` 计算下一个 stage -3. 如果下一个 stage 不是终态 → 更新任务状态 + `PipelineEngine.get_agent_for_stage()` 获取下一个 Agent → spawn -4. 如果是终态 → 标记 done/failed +1. `PipelineEngine.get_or_create_state(task)` 获取运行时状态 +2. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure +3. `PipelineEngine.get_next_stage(task_state, outcome)` 计算下一个 stage +4. 持久化更新 `TaskPipelineState`(`bb.save_pipeline_state(task_state)`) +5. 如果下一个 stage 不是终态 → 更新任务状态 + `PipelineEngine.get_agent_for_stage()` 获取下一个 Agent → spawn +6. 如果是终态 → 标记 done/failed ## §9 实施路线