diff --git a/docs/design/20-task-type-architecture.md b/docs/design/20-task-type-architecture.md index ae9ee5e..dfd4688 100644 --- a/docs/design/20-task-type-architecture.md +++ b/docs/design/20-task-type-architecture.md @@ -1,9 +1,11 @@ --- title: "TaskTypeRegistry + Handler 架构重构" created: 2026-06-10 -version: v2.0 +version: v3.0 --- +# §1 现状分析(v3.0 更新说明:§1-§13 保留原样,新增 §14-§18,更新 §3/§5/§7) + # §1 现状分析 moziplus v2 的任务调度系统当前通过 `if/else` 硬编码区分两种 task type:普通任务(task)和邮件(mail)。分支逻辑散落在 dispatcher、spawner、ticker 三个核心模块中,新增 task type 需要同时改动三处。 @@ -51,10 +53,11 @@ moziplus v2 的任务调度系统当前通过 `if/else` 硬编码区分两种 ta 定义 Python Protocol,所有 task type handler 必须满足此接口: ```python -from typing import Protocol, Optional, Dict, Any +from typing import Protocol, Optional, Dict, Any, runtime_checkable from pathlib import Path +@runtime_checkable class TaskTypeHandler(Protocol): """所有 task type handler 的统一接口。""" @@ -62,35 +65,12 @@ class TaskTypeHandler(Protocol): task_type: str # 类型标识:'task' | 'mail' | 'toolchain' virtual_project: Optional[str] # 虚拟项目 ID,如 '_mail'、'_toolchain'。普通任务为 None - def build_prompt( - self, - task_id: str, - title: str, - description: str, - must_haves: str, - project_id: str, - agent_id: str, - task: Optional[Dict] = None, - spawn_type: str = "executor", - spawner: Any = None, - ) -> str: - """构建 Agent prompt。""" + def build_prompt(self, context: "PromptContext") -> str: + """构建 Agent prompt(通过 PromptComposer 拼 section)。""" ... - def build_api_section( - self, project_id: str, task_id: str, agent_id: str - ) -> str: - """构建 API 操作指令(success_status 等)。""" - ... - - def skip_guardrail(self, project_id: str) -> bool: - """是否跳过 guardrail 检查。""" - ... - - def pre_spawn( - self, task_id: str, db_path: Path, dispatcher: Any - ) -> Optional[callable]: - """spawn 前回调,返回 on_checks_passed 回调或 None。""" + def pre_spawn(self, task_id: str, db_path: Path) -> bool: + """spawn 前业务准备。默认 True,mail/toolchain override 为 auto_working。""" ... def post_complete( @@ -99,30 +79,15 @@ class TaskTypeHandler(Protocol): agent_id: str, outcome: str, db_path: Path, - must_haves: str, - dispatcher: Any, ) -> None: - """spawn 完成后回调。""" - ... - - def build_retry_prompt( - self, - task_id: str, - agent_id: str, - retry_count: int, - max_retries: int, - retry_field: str, - task_info: Dict, - spawner: Any, - ) -> str: - """构建重试 prompt。""" + """spawn 完成后的业务处理。统一流程:crash→verify→mark→notify。""" ... def check_completion(self, task_id: str, db_path: Path) -> bool: - """检查任务是否已完成(如 mail 的回复检查)。""" + """ticker 级别的完成检查。""" ... - def get_sections(self) -> list['PromptSection']: + def get_sections(self) -> list: """返回此 handler 的 prompt section 列表。 返回有序的 PromptSection 列表,由 PromptComposer 统一拼装。 @@ -131,6 +96,43 @@ class TaskTypeHandler(Protocol): ... ``` +### 基类提供统一流程(§16 详细定义) + +```python +# 基类收敛共性能力,子类只实现差异点 +@dataclass +class VerifyResult: + """验证结果""" + passed: bool + reason: str # "has_output" / "no_reply" / "no_signal" / ... + evidence: str # "output_count=1, comment_count=0" + can_retry: bool = True + retry_count: int = 0 + +class BaseTaskHandler: + """详见 §16""" + CRASH_OUTCOMES = frozenset({ + "crashed", "compact_failed", "process_crash", + "session_stuck", "compact_hanging", + }) + + def post_complete(self, task_id, agent_id, outcome, db_path): + """统一 4 步流程:crash → verify → mark → notify""" + ... + + def verify_completion(self, task_id, db_path) -> VerifyResult: + """子类必须实现""" + ... + + def _rollback_current_agent(self, db_path, task_id, agent_id): + """基类提供""" + ... + + def on_failure(self, task_id, agent_id, db_path, verify): + """子类可 override""" + ... +``` + **设计原则**: - 每个方法在现有代码中都有明确的对应实现点,不存在"悬空"抽象 @@ -203,6 +205,12 @@ TaskTypeRegistry.register(ToolchainHandler()) # §5 三个 Handler 的实现边界 +> 三个 handler 都继承 BaseTaskHandler(§16),共性由基类提供,差异收敛到以下方法: +> - `verify_completion`:各自的验证逻辑 +> - `target_success_status`:done 或 review +> - `on_failure`:各自的失败处理 +> - `pre_spawn`:各自的前置准备 + ## TaskHandler(普通任务) 将现有 default(非 mail)分支封装为 handler,**不替代 BootstrapBuilder**。 @@ -339,34 +347,39 @@ for vp in TaskTypeRegistry.virtual_projects(): 按风险从低到高排列,每步完成后跑 `pytest -m "not e2e"` 全量回归测试。 -### Step 1:注册表 + PromptComposer 基础设施 +### Step 1:注册表 + PromptComposer + BaseTaskHandler 基础设施 - 新建 `src/daemon/task_type_registry.py`:`TaskTypeHandler` Protocol + `TaskTypeRegistry` - 新建 `src/daemon/prompt_composer.py`:`PromptSection` Protocol + `PromptContext` + `PromptComposer` -- 编写单元测试验证:注册/查询、section 排序/去重/条件过滤 +- 新建 `src/daemon/base_task_handler.py`:`BaseTaskHandler` 基类(VerifyResult + post_complete 统一流程 + _rollback_current_agent) +- 编写单元测试验证:注册/查询、section 排序/去重/条件过滤、基类 post_complete 流程 - **风险**:极低,纯新增文件,不改动现有代码 -### Step 2:TaskHandler +### Step 2:TaskHandler(继承 BaseTaskHandler) -- 新建 `src/daemon/task_handler.py` +- 新建 `src/daemon/task_handler.py`,继承 `BaseTaskHandler` - 实现 5 个 section(TaskContext / PriorOutputs / RoleSkill / TaskApi / TaskConstraints) - `build_prompt` 内部用 PromptComposer 拼装 +- 实现 `verify_completion`(三信号检查)和 review 分支 - 注册到 TaskTypeRegistry - 运行全量回归测试,验证普通任务路径不变 - **风险**:低,现有 BootstrapBuilder 逻辑包一层 -### Step 3:ToolchainHandler +### Step 3:ToolchainHandler(继承 BaseTaskHandler) -- 新建 `src/daemon/toolchain_handler.py` +- 新建 `src/daemon/toolchain_handler.py`,继承 `BaseTaskHandler` - 实现 3 个 section(ToolchainContext / ToolchainApi / ToolchainConstraints) +- 实现 `verify_completion`(行动输出检查)和 `on_failure`(通知主公) - 注册到 TaskTypeRegistry - **风险**:低,全新代码 -### Step 4:MailHandler +### Step 4:MailHandler(继承 BaseTaskHandler,含 crash rollback 修复) -- 新建 `src/daemon/mail_handler.py` +- 新建 `src/daemon/mail_handler.py`,继承 `BaseTaskHandler` - 实现 3 个 section(MailContext / MailApi / MailConstraints) - 从 dispatcher / spawner / ticker 三处迁移 mail 逻辑 +- 实现 `verify_completion`(回复检查 + inform/request 区分) +- **补上 crash rollback**(当前缺失,是 bug) - 注册到 TaskTypeRegistry - 重点回归测试 mail 路径:发送、回复、重试、幻觉门控 - **风险**:中 @@ -453,7 +466,20 @@ for vp in TaskTypeRegistry.virtual_projects(): # §12 PromptSection 模式 -基于知识库优秀实践(Hermes 10层有序注入、Microsoft 三层中间件、我们自己的四层加载架构),引入 PromptSection 模式。 +基于知识库优秀实践(Hermes 10层有序注入、Microsoft 三层中间件),引入 PromptSection 模式。 + +> 五层架构定义、L1-L4 去重规则、层间引导详见 **§14**。 + +priority 范围与 L2 注入组件的对应关系: + +| priority | L2 组件 | 说明 | +|----------|---------|------| +| 10-19 | ③ 任务上下文 | 做什么 | +| 20-29 | ④ 前序信息 | 之前做了什么 | +| 30-39 | ① 操作规范 | 怎么做(Skill 索引或全文,handler 决定) | +| 40-49 | ① 操作规范(API 部分) | 怎么回写 | +| 50-59 | ⑤ 约束 + Guardrail | 不能做什么 | +| 60-69 | ⑥⑦ 审查协议/经验 | 扩展 | ## 核心思想 @@ -638,12 +664,346 @@ def get_sections(self) -> list[PromptSection]: src/daemon/ ├── task_type_registry.py # §3 + §4:Protocol + Registry ├── prompt_composer.py # §12 PromptSection + PromptContext + PromptComposer -├── task_handler.py # §13 TaskHandler + 5 sections -├── mail_handler.py # §13 MailHandler + 3 sections -├── toolchain_handler.py # §13 ToolchainHandler + 3 sections +├── base_task_handler.py # §16 BaseTaskHandler 基类 +├── task_handler.py # §13 TaskHandler(继承 BaseTaskHandler)+ 5 sections +├── mail_handler.py # §13 MailHandler(继承 BaseTaskHandler)+ 3 sections +├── toolchain_handler.py # §13 ToolchainHandler(继承 BaseTaskHandler)+ 3 sections ├── dispatcher.py # §6 改动 ├── spawner.py # §6 改动 ├── ticker.py # §6 改动 ├── bootstrap.py # 保留,TaskContextSection 内部调用 └── toolchain_templates.py # 保留,ToolchainContextSection 内部调用 ``` + +--- + +# §14 上下文五层架构统一 + +五层定义(统一设计语言): + +| 层 | 名称 | 机制 | 内容示例 | token | +|---|------|------|---------|-------| +| L0 | 铁律层 | Hook 每轮强制注入 | GATE 铁律、Delegation 铁律 | ~500 | +| L1 | 角色层 | Workspace 自动注入 | SOUL.md、AGENTS.md、TOOLS.md、MEMORY.md | ~2000 | +| L2 | 引擎注入层 | PromptComposer 按 handler 拼装 | 任务上下文、前序产出、角色规范、API 指令、约束 | ~1500 | +| L3 | 被动参考层 | Skills 索引注入,Agent 按需 read 全文 | OpenClaw 42 Skills + moziplus SkillRegistry | 按需 | +| L4 | 检索层 | Agent 运行时主动检索 | wiki 知识库、NAS 文档、Web 搜索 | 按需 | + +**PromptComposer 是 L2 层的拼装机制**。 + +## L1-L4 去重规则 + +当前 L1 和 L2 存在重叠(Agent 身份两处注入、API 操作指令两处注入、状态流转规则两处注入)。重构后: + +| 信息 | 唯一归属 | 其他层怎么处理 | +|------|---------|--------------| +| Agent 身份 | L1 | L2 删除 `_inject_agent_identity` | +| 团队协作规则 | L1 | L2 不重复 | +| API 操作方法 | L2(任务级精简版) | L1 保留黑板概述,L2 只给本次任务的 curl | +| Skill 全文 | L3(Agent 按需 read) | L2 只给索引+引导语,不注入全文 | +| 状态流转规则 | L1(完整版) | L2 只给 success_status(done/review) | +| 安全红线 | L0 | L2 不重复 | +| 任务上下文 | L2 | L1 不涉及 | + +## 层间引导 + +每层只做自己的事,通过层间引导语串联: + +- L2 prompt 末尾追加引导语: + - “需要详细操作规范?用 `read` 读取对应 Skill 文件”(引导到 L3) + - “需要更多知识?查看 wiki 知识库或 Web 搜索”(引导到 L4) + +--- + +# §15 Spawner/Handler 职责边界 + +## Spawner 职责(进程管理层) + +| 职责 | 说明 | +|------|------| +| 进程启动/监控 | spawn subprocess、monitor stdout/stderr | +| 进程退出分类 | `_classify_outcome`(A0-A17 全在 spawner) | +| 重试决策 | `should_retry` + `_do_retry` + cooldown | +| counter 管理 | acquire/release/cooldown | +| attempt 记录 | `_record_attempt` | + +## Handler 职责(业务调度层) + +| 职责 | 说明 | +|------|------| +| prompt 构建 | 通过 PromptComposer 拼 section | +| pre_spawn 业务准备 | auto_working 等 | +| crash 回退 | rollback current_agent | +| 完成验证 | verify_completion | +| 状态标记 | mark success/failed | +| 失败通知 | notify_failure | + +## 关键边界 + +1. **Spawner 不做业务逻辑**:`_build_mail_prompt` 和 `_build_api_section` 迁移到 handler 后,spawner 不再构建 prompt +2. **Handler 不碰进程管理**:handler 不做 exit 分类、不做 retry 决策、不管 counter +3. **状态标记不冲突**:spawner 的 `_mark_task` 处理进程级异常(crash/auth_failed/api_error → failed),handler 的 `mark_task_status` 处理业务级完成(done/review/failed)。两者操作不同 outcome 场景,互斥不重复 +4. **on_complete 是桥梁**:spawner 完成进程级处理后调 `on_complete(outcome)`,handler 收到 outcome 做业务级处理 + +--- + +# §16 BaseTaskHandler 基类设计 + +## 设计原则 + +基类收敛**合理的共性能力**,不是现有代码的归类总结。参考: +- Hermes: "Keep calling tools until complete AND verified" +- Quality Gate: 三阶段门控(机械→语义→共识) +- Edict: stalled→retry→escalate 升级策略 +- OpenAI Agents SDK: Input/Output Guardrail + +## 基类定义 + +```python +@dataclass +class VerifyResult: + """验证结果""" + passed: bool + reason: str # "has_output" / "no_reply" / "no_signal" / ... + evidence: str # "output_count=1, comment_count=0" + can_retry: bool = True + retry_count: int = 0 + +class BaseTaskHandler: + """所有 task type handler 的基类。 + + 职责:L2 引擎注入层的业务逻辑——prompt 构建、完成验证、状态标记。 + 不管:进程生命周期、exit 分类、重试决策(这些归 spawner)。 + """ + + # crash 类 outcome(进程级异常,需要 rollback) + CRASH_OUTCOMES = frozenset({ + "crashed", "compact_failed", "process_crash", + "session_stuck", "compact_hanging", + }) + + # === 子类必须实现 === + task_type: str + virtual_project: Optional[str] + + def build_prompt(self, context: PromptContext) -> str: + """构建 L2 prompt(通过 PromptComposer 拼 section)""" + ... + + def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: + """验证任务完成质量。每个 handler 自己的验证逻辑。""" + ... + + def target_success_status(self) -> str: + """验证通过后的目标状态。task='review', mail/toolchain='done'""" + return "review" + + # === 基类提供统一流程 === + + def pre_spawn(self, task_id: str, db_path: Path) -> bool: + """spawn 前业务准备。默认:True。 + mail/toolchain override 为 auto_working。""" + return True + + def post_complete(self, task_id: str, agent_id: str, + outcome: str, db_path: Path) -> None: + """spawn 完成后的业务处理。统一 4 步流程: + + 1. crash 处理 → rollback current_agent + 2. verify → 验证产出 + 3. mark → 标目标状态 + 4. notify → 失败时通知 + + spawner 已完成进程级处理(exit 分类、重试、counter release)。 + 这里只做业务级处理。 + """ + # 1. crash 处理(基类提供,所有 handler 继承) + if outcome in self.CRASH_OUTCOMES: + self._rollback_current_agent(db_path, task_id, agent_id) + return # crash 不进 verify,不标状态 + + # 2. verify + result = self.verify_completion(task_id, db_path) + + # 3. mark + if result.passed: + mark_task_status(db_path, task_id, self.target_success_status()) + else: + # 4. notify(on_failure 内部处理) + self.on_failure(task_id, agent_id, db_path, result) + + def _rollback_current_agent(self, db_path: Path, task_id: str, agent_id: str) -> None: + """crash 后回退 current_agent → assignee,避免 exclude_current 卡死。 + 从 dispatcher._rollback_current_agent 迁移。""" + ... + + def on_failure(self, task_id: str, agent_id: str, + db_path: Path, verify: VerifyResult) -> None: + """验证失败处理。默认:标 failed。 + 子类可 override 加通知等。""" + mark_task_status(db_path, task_id, "failed") + + def check_completion(self, task_id: str, db_path: Path) -> bool: + """ticker 级别的完成检查。默认:False。""" + return False +``` + +## 为什么删掉了这些方法 + +| 删除的方法 | 原因 | +|-----------|------| +| `skip_guardrail` | guardrail 是系统级安全层,不该由 handler 开关。guardrail 规则自己判断 project_id 是否跳过 | +| `crash_rollback`(独立方法) | 合并到 post_complete 第一步,不需要独立方法 | +| `handle_failure` / `notify_failure`(独立方法) | 合并为 `on_failure`,子类 override 一个方法即可 | +| `build_retry_prompt` | retry 是 spawner 层的职责,handler 不管重试 | + +## 为什么 verify_completion 是每个 handler 必须实现的 + +参考 Hermes 的 "Keep calling tools until complete AND verified"——验证不是可选的,是完成流程的核心环节。每个 handler 的验证逻辑不同(task 看三信号、mail 看回复、toolchain 看行动输出),但**必须验证**这个要求是共性的。 + +--- + +# §17 三个 Handler 的完整执行流程 + +## 统一流程骨架 + +``` +ticker 扫描 → dispatcher.decide → 路由到 agent + │ + ▼ +handler.pre_spawn(task_id, db_path) + │ task: return True(无准备) + │ mail/toolchain: auto_working(pending → working) + ▼ +spawner.spawn_full_agent() + ├── counter acquire + ├── handler.build_prompt(context) ← L2 prompt 拼装 + ├── subprocess 启动 Agent 进程 + ├── monitor + │ + ▼ (Agent 进程退出) +spawner._handle_exit() + ├── _classify_outcome → outcome + ├── should_retry=True → _do_retry(spawner 自己处理,不调 handler) + └── should_retry=False → on_complete(outcome) + │ + ▼ +handler.post_complete(task_id, agent_id, outcome, db_path) + ├── 1. crash? → rollback current_agent → return + ├── 2. verify_completion → VerifyResult + ├── 3. passed? → mark target_success_status() + └── 4. failed? → on_failure() +``` + +## TaskHandler 执行流程 + +| 阶段 | 动作 | 代码来源 | +|------|------|----------| +| pre_spawn | return True | — | +| build_prompt | PromptComposer 拼 5 个 section | BootstrapBuilder | +| post_complete | 见下方 | dispatcher._task_on_complete | +| 1. crash | rollback current_agent | dispatcher._rollback_current_agent | +| 2. verify | 三信号检查(output_count > 0 OR comment_count > 0 OR status 已终态) | dispatcher._task_verify_completion | +| 3. passed → mark | "review" | dispatcher._task_auto_complete | +| 3. failed → on_failure | 留 working(等 ticker 重投) | 当前行为保持 | + +**Task 特殊逻辑**:review 阶段的 on_complete 需要读 verdict → approved 标 done / 非 approved @mention assignee。这是 TaskHandler 的 review 分支,不走 verify 流程。 + +## MailHandler 执行流程 + +| 阶段 | 动作 | 代码来源 | +|------|------|----------| +| pre_spawn | auto_working(pending → working) | dispatcher._mail_auto_working | +| build_prompt | PromptComposer 拼 3 个 section | spawner._build_mail_prompt | +| post_complete | 见下方 | dispatcher._mail_on_complete | +| 1. crash | rollback current_agent(**补上**) | 新增 | +| 2. verify | 区分 inform/request:request 检查是否回复,inform 检查 outcome | dispatcher._mail_auto_complete | +| 3. passed → mark | "done" | dispatcher._mail_auto_complete | +| 3. failed → on_failure | mark "failed" + Mail 通知发件人 | dispatcher._mail_auto_complete | + +**Mail 修复项**:当前 mail crash 时不做 rollback current_agent,可能导致 exclude_current 卡死。重构后补上。 + +## ToolchainHandler 执行流程 + +| 阶段 | 动作 | 代码来源 | +|------|------|----------| +| pre_spawn | auto_working(pending → working) | 新增 | +| build_prompt | PromptComposer 拼 3 个 section | toolchain_templates.py | +| post_complete | 见下方 | 新增 | +| 1. crash | rollback current_agent | 新增 | +| 2. verify | 检查行动输出(output 或 comment 有实质内容) | 新增 | +| 3. passed → mark | "done" | 新增 | +| 3. failed → on_failure | mark "failed" + Mail 通知主公 | 新增 | + +## 三个 handler 差异收敛表 + +| 差异点 | TaskHandler | MailHandler | ToolchainHandler | +|--------|------------|-------------|------------------| +| pre_spawn | 无 | auto_working | auto_working | +| sections 数量 | 5 | 3 | 3 | +| verify 逻辑 | 三信号检查 | 回复检查 + inform/request 区分 | 行动输出检查 | +| target_success_status | review | done | done | +| on_failure | 留 working | 标 failed + 通知发件人 | 标 failed + 通知主公 | +| review 分支 | 有(读 verdict) | 无 | 无 | + +--- + +# §18 设计决策记录 + +本节记录设计过程中的关键讨论和决策,便于未来回顾。 + +## D1: 方案A(独立 task type)vs 方案B(mail 内子分支) + +**决策**:方案A,独立 task type。 + +**理由**: +- toolchain 和 mail 的行为差异越来越大 +- 方案A 数据隔离、生命周期独立、未来演进互不影响 +- 改动量和方案B差不多,但架构语义更好 +- 主公明确表示"不想修修补补" + +## D2: 设计一步到位,实现分步 + +**决策**:PromptSection 模式 + BaseTaskHandler 基类 + 五层架构统一都在设计文档中完整定义,但实施按 5 步渐进。 + +**理由**:避免设计时偷懒、实现时痛苦。设计完整后实施每步有清晰目标。 + +## D3: 三种 handler 不是“子集”关系 + +**讨论**:最初认为 MailHandler/ToolchainHandler 是 TaskHandler 的子集。 + +**结论**:三种 handler 走相同的流程骨架(Protocol 定义),但每一步的实现各自不同。TaskHandler 的 prompt 最复杂(5 sections),但 MailHandler 有独特的幻觉门控和回复检查。差异是真实的,不是简单的“全”和“子集”。 + +## D4: 幻觉门控和 verify 应该所有 handler 都有 + +**发现**:当前只有 mail 有幻觉门控、只有 task 有三信号验证。实际这是所有 handler 都应该有的核心能力。 + +**决策**:verify_completion 成为 BaseTaskHandler 的抽象方法,所有 handler 必须实现。 + +## D5: crash_rollback 放在 handler 基类 + +**讨论**:crash 处理分散在 spawner(进程级 cooldown)和 dispatcher(业务级 rollback current_agent)。 + +**结论**: +- spawner 管进程级:cooldown、counter release +- handler 管业务级:rollback current_agent +- 放在 BaseTaskHandler.post_complete 第一步,所有 handler 都继承,不遗漏 +- 当前 mail 缺少 crash rollback,是 bug,重构后补上 + +## D6: skip_guardrail 从 handler 接口删除 + +**理由**:guardrail 是系统级安全层,不该由 handler 开关。guardrail 规则自己判断 project_id 是否跳过。handler 不需要知道 guardrail 的存在。 + +## D7: spawner 的 prompt 构建迁移到 handler + +**讨论**:当前 `_build_mail_prompt` 和 `_build_api_section` 在 spawner 中,按职责应该归 handler。 + +**结论**:handler 的 build_prompt 通过 PromptComposer 拼 section,spawner 只负责传递 prompt 给 subprocess。spawner 不再做任何 prompt 构建逻辑。 + +## D8: L2 Skill 段最小化 + +**讨论**:当前 BootstrapBuilder 段3 注入 Skill 全文(~800 token),重复了 L3 层的职责。 + +**结论**:L2 的 RoleSkillSection 改为注入索引+引导语(~100 token),引导 Agent 用 `read` 去读 Skill 全文(L3 层)。遵循 Hermes 的渐进式 Skill 加载模式。 + +--- diff --git a/src/daemon/prompt_composer.py b/src/daemon/prompt_composer.py new file mode 100644 index 0000000..1940f10 --- /dev/null +++ b/src/daemon/prompt_composer.py @@ -0,0 +1,127 @@ +""" +prompt_composer.py — PromptSection Protocol + PromptContext + PromptComposer + +拼装器:有序管理 prompt 段落,按优先级排序后合并为最终 prompt。 +""" + +import logging +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Protocol, runtime_checkable + +logger = logging.getLogger("moziplus-v2.prompt_composer") + +# --------------------------------------------------------------------------- +# Section 优先级范围约定 +# --------------------------------------------------------------------------- +PRIORITY_CONTEXT = 10 # 任务上下文 +PRIORITY_PRIOR = 20 # 前序信息 +PRIORITY_ROLE = 30 # 角色规范 +PRIORITY_API = 40 # API 操作指令 +PRIORITY_CONSTRAINTS = 50 # 硬约束 +PRIORITY_EXTENSION = 60 # 扩展段 + + +# --------------------------------------------------------------------------- +# PromptSection Protocol +# --------------------------------------------------------------------------- +@runtime_checkable +class PromptSection(Protocol): + """一个 prompt 段""" + + name: str # 段名(去重用,同名覆盖) + priority: int # 排序优先级(小数字=靠前) + + def render(self, context: "PromptContext") -> str: + """渲染此段的文本内容。返回空字符串表示不注入。""" + ... + + def should_include(self, context: "PromptContext") -> bool: + """是否注入此段(默认 True,条件段可覆盖)。""" + ... + + +# --------------------------------------------------------------------------- +# PromptContext 数据对象 +# --------------------------------------------------------------------------- +@dataclass +class PromptContext: + """Prompt 渲染的统一上下文""" + + task_id: str + title: str + description: str + must_haves: str + project_id: str + agent_id: str + + task: Optional[Dict] = None + role: str = "executor" + spawn_type: str = "executor" + + # mail 专用 + from_agent: str = "" + mail_type: str = "" # inform / request + + # toolchain 专用 + event_type: str = "" # ci_failure / review_request / ... + event_data: Dict = field(default_factory=dict) + + # 前序产出 + depends_on_outputs: Optional[List] = None + + +# --------------------------------------------------------------------------- +# PromptComposer 拼装器 +# --------------------------------------------------------------------------- +class PromptComposer: + """有序拼装 prompt sections""" + + SEPARATOR = "\n\n---\n\n" + TOKEN_BUDGET_WARN = 800 # token 预算警告阈值 + CHARS_PER_TOKEN = 3.5 # 估算比率 + + def __init__(self) -> None: + self._sections: List[Any] = [] # List[PromptSection] + + def add(self, section: Any) -> None: + """添加一个 section(同名覆盖)""" + self._sections = [s for s in self._sections if s.name != section.name] + self._sections.append(section) + + def add_many(self, sections: List[Any]) -> None: + """批量添加""" + for s in sections: + self.add(s) + + def compose(self, context: PromptContext) -> str: + """拼装最终 prompt + + 1. 过滤 should_include=False 的段 + 2. 按 priority 排序 + 3. 逐段 render + 4. 过滤空段 + 5. 用分隔符连接 + 6. Token 预算警告(不截断) + """ + active = [s for s in self._sections if s.should_include(context)] + active.sort(key=lambda s: s.priority) + + parts = [s.render(context) for s in active] + parts = [p for p in parts if p.strip()] + + result = self.SEPARATOR.join(parts) + + # Token 估算 + tokens = max(1, int(len(result) / self.CHARS_PER_TOKEN)) + logger.debug( + "Composed prompt from %d sections, %d tokens", + len(parts), tokens, + ) + + if tokens > self.TOKEN_BUDGET_WARN: + logger.warning( + "Prompt exceeds %d token budget: %d tokens (task_id=%s)", + self.TOKEN_BUDGET_WARN, tokens, context.task_id, + ) + + return result diff --git a/src/daemon/task_type_registry.py b/src/daemon/task_type_registry.py new file mode 100644 index 0000000..061fcd0 --- /dev/null +++ b/src/daemon/task_type_registry.py @@ -0,0 +1,102 @@ +""" +task_type_registry.py — Task type handler Protocol + Registry. + +启动时一次性加载 handler,运行时只读。 +零依赖:不导入项目内其他模块。 +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol, runtime_checkable + +if TYPE_CHECKING: + from src.daemon.prompt_composer import PromptContext + +logger = logging.getLogger("moziplus-v2.registry") + + +# --------------------------------------------------------------------------- +# Protocol +# --------------------------------------------------------------------------- + +@runtime_checkable +class TaskTypeHandler(Protocol): + """所有 task type handler 的统一接口。""" + + # 属性(通过 __init__ 设置) + task_type: str # 类型标识:'task' | 'mail' | 'toolchain' + virtual_project: Optional[str] # 虚拟项目 ID,如 '_mail'、'_toolchain'。普通任务为 None + + def build_prompt(self, context: "PromptContext") -> str: + """构建 Agent prompt(通过 PromptComposer 拼 section)。""" + ... + + def pre_spawn(self, task_id: str, db_path: Path) -> bool: + """spawn 前业务准备。默认 True,mail/toolchain override 为 auto_working。""" + ... + + def post_complete( + self, + task_id: str, + agent_id: str, + outcome: str, + db_path: Path, + ) -> None: + """spawn 完成后的业务处理。统一流程:crash→verify→mark→notify。""" + ... + + def check_completion(self, task_id: str, db_path: Path) -> bool: + """ticker 级别的完成检查。""" + ... + + def get_sections(self) -> list: + """返回此 handler 的 prompt section 列表。""" + ... + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + +class TaskTypeRegistry: + """Task type handler 注册表。启动时一次性加载,运行时只读。""" + + _handlers: Dict[str, TaskTypeHandler] = {} + + @classmethod + def register(cls, handler: TaskTypeHandler) -> None: + """注册一个 handler。启动时调用一次。""" + if handler.task_type in cls._handlers: + raise ValueError(f"Task type '{handler.task_type}' already registered") + cls._handlers[handler.task_type] = handler + vp = getattr(handler, "virtual_project", None) + logger.info("Registered task type handler: %s (virtual_project=%s)", handler.task_type, vp) + + @classmethod + def get_by_project(cls, project_id: str) -> Optional[TaskTypeHandler]: + """通过 project_id 查找 handler(匹配 virtual_project)。""" + for h in cls._handlers.values(): + if h.virtual_project == project_id: + return h + return None + + @classmethod + def get(cls, task_type: str) -> Optional[TaskTypeHandler]: + """通过 task_type 标识查找 handler。""" + return cls._handlers.get(task_type) + + @classmethod + def virtual_projects(cls) -> list[str]: + """返回所有已注册的虚拟项目 ID(ticker 自动发现用)。""" + return [ + h.virtual_project + for h in cls._handlers.values() + if h.virtual_project is not None + ] + + @classmethod + def clear(cls) -> None: + """清空注册表(仅测试用)。""" + cls._handlers = {}