auto-sync: 2026-06-04 20:13:20

This commit is contained in:
cfdaily
2026-06-04 20:13:20 +08:00
parent f3e8d85b13
commit 3408c5bc26
+48 -12
View File
@@ -55,8 +55,16 @@ class PipelineStage:
on_success: str # 成功后转到哪个 stage(或 "done"/"failed"
on_failure: str # 失败后转到哪个 stage(可指向前序 stage = 退回)
max_retries: int = 0 # 最大退回重试次数(0=不限,超限转 failed)
retry_count: int = 0 # 当前已重试次数(运行时状态)
is_terminal: bool = False # 是否终态
# 注意:retry_count 不在此处——PipelineStage 是注册表模板,所有同类型任务共享
@dataclass
class TaskPipelineState:
"""每个任务的 Pipeline 运行时状态(存在 DB 或内存)"""
task_id: str
pipeline_type: str
current_stage: str
stage_retry_counts: Dict[str, int] = field(default_factory=dict) # stage_id → 已重试次数
@dataclass
class Pipeline:
@@ -73,7 +81,7 @@ class Pipeline:
| **顺序** | `on_success` 指向下一个 stage | working → review → done |
| **条件分支** | `on_success` / `on_failure` 指向不同 stage | review 通过→donereview 拒绝→working |
| **退回** | `on_failure` 指向前序 stage | review 失败退回 working |
| **循环/重试** | `max_retries` + `retry_count` 控制 | review 最多退回 3 次,超过转 failed |
| **循环/重试** | `max_retries` + `TaskPipelineState.stage_retry_counts` 控制 | review 最多退回 3 次,超过转 failed |
异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。
@@ -313,26 +321,50 @@ class PipelineEngine:
self.registry = registry
self.bb = bb
def get_next_stage(self, task: Task, outcome: str) -> Optional[str]:
"""根据当前 stage 和执行结果,计算下一个 stage"""
pipeline = self.registry.get(task.task_type)
def get_or_create_state(self, task: Task) -> Optional[TaskPipelineState]:
"""获取或创建任务的 Pipeline 运行时状态"""
pipeline_type = task.task_type
if not pipeline_type:
return None # 自由模式
pipeline = self.registry.get(pipeline_type)
if not pipeline:
return None
# 从 DB 或内存加载;首次则初始化
state = self.bb.get_pipeline_state(task.id)
if not state:
state = TaskPipelineState(
task_id=task.id,
pipeline_type=pipeline_type,
current_stage=pipeline.entry,
)
self.bb.save_pipeline_state(state)
return state
def get_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]:
"""根据当前 stage 和执行结果,计算下一个 stage(纯函数,不修改 PipelineStage 模板)"""
pipeline = self.registry.get(task_state.pipeline_type)
if not pipeline:
return None # 自由模式,不干预
current_stage = pipeline.stages.get(task.status)
current_stage = pipeline.stages.get(task_state.current_stage)
if not current_stage:
return None
if outcome == "success":
task_state.current_stage = current_stage.on_success
return current_stage.on_success
elif outcome == "failure":
# 检查 max_retries
if current_stage.max_retries > 0:
current_stage.retry_count += 1
if current_stage.retry_count > current_stage.max_retries:
retries = task_state.stage_retry_counts.get(current_stage.id, 0)
retries += 1
task_state.stage_retry_counts[current_stage.id] = retries
if retries > current_stage.max_retries:
task_state.current_stage = "failed"
return "failed"
return current_stage.on_failure
else:
task_state.current_stage = "failed"
return "failed"
def get_agent_for_stage(self, task: Task) -> Optional[str]:
@@ -369,11 +401,15 @@ class PipelineEngine:
### §8.3 Ticker 集成
**TaskPipelineState 存储方案**:建议在 tasks 表新增 `pipeline_state_json` 列(TEXT),序列化存储 `TaskPipelineState`。Blackboard 的 `get_pipeline_state()` / `save_pipeline_state()` 负责序列化/反序列化。独立表方案亦可,但鉴于 state 结构简单且与任务 1:1,放 tasks 列更方便。
Ticker 在任务状态流转时调用 PipelineEngine
1. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure
2. `PipelineEngine.get_next_stage()` 计算下一个 stage
3. 如果下一个 stage 不是终态 → 更新任务状态 + `PipelineEngine.get_agent_for_stage()` 获取下一个 Agent → spawn
4. 如果是终态 → 标记 done/failed
1. `PipelineEngine.get_or_create_state(task)` 获取运行时状态
2. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure
3. `PipelineEngine.get_next_stage(task_state, outcome)` 计算下一个 stage
4. 持久化更新 `TaskPipelineState``bb.save_pipeline_state(task_state)`
5. 如果下一个 stage 不是终态 → 更新任务状态 + `PipelineEngine.get_agent_for_stage()` 获取下一个 Agent → spawn
6. 如果是终态 → 标记 done/failed
## §9 实施路线