feat: Step 2-4 — Task/Mail/Toolchain Handlers + 11 PromptSections + BaseTaskHandler #25

Merged
pangtong-fujunshi merged 2 commits from feat/task-type-handlers-step2-4 into main 2026-06-10 13:47:04 +00:00
4 changed files with 971 additions and 0 deletions
+179
View File
@@ -0,0 +1,179 @@
"""base_task_handler.py — Task type handler 基类。
收敛合理的共性能力(crash rollback + verify + mark + notify),
子类只实现差异点。
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from src.daemon.prompt_composer import PromptContext, PromptComposer, PromptSection
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler")
@dataclass
class VerifyResult:
"""验证结果"""
passed: bool
reason: str # "has_output" / "no_reply" / "no_signal" / ...
evidence: str # "output_count=1, comment_count=0"
can_retry: bool = True
retry_count: int = 0
class BaseTaskHandler:
"""所有 task type handler 的基类。
职责:L2 引擎注入层的业务逻辑——prompt 构建、完成验证、状态标记。
不管:进程生命周期、exit 分类、重试决策(这些归 spawner)。
"""
# crash 类 outcome(进程级异常,需要 rollback)
CRASH_OUTCOMES = frozenset({
"crashed", "compact_failed", "process_crash",
"session_stuck", "compact_hanging",
})
task_type: str = ""
virtual_project: Optional[str] = None
# === 子类必须实现 ===
def build_prompt(self, context: PromptContext) -> str:
"""构建 L2 prompt(通过 PromptComposer 拼 section)。子类实现。"""
raise NotImplementedError
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""验证任务完成质量。每个 handler 自己的验证逻辑。子类实现。"""
raise NotImplementedError
def target_success_status(self) -> str:
"""验证通过后的目标状态。task='review', mail/toolchain='done'"""
return "review"
def get_sections(self) -> list[PromptSection]:
"""返回此 handler 的 prompt section 列表。子类实现。"""
return []
# === 基类提供统一流程 ===
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""spawn 前业务准备。默认 True。
mail/toolchain override 为 auto_working。"""
return True
def post_complete(self, task_id: str, agent_id: str,
outcome: str, db_path: Path) -> None:
"""spawn 完成后的业务处理。统一 4 步流程:
1. crash 处理 → rollback current_agent
2. verify → 验证产出
3. mark → 标目标状态
4. notify → 失败时 on_failure
"""
# 1. crash 处理(基类提供,所有 handler 继承)
if outcome in self.CRASH_OUTCOMES:
self._rollback_current_agent(db_path, task_id, agent_id)
return
# 2. verify
result = self.verify_completion(task_id, db_path)
# 3. mark
if result.passed:
self._mark_task_status(db_path, task_id, self.target_success_status())
logger.info("Task %s: verify passed (%s), marked %s",
task_id, result.reason, self.target_success_status())
else:
# 4. notify
self.on_failure(task_id, agent_id, db_path, result)
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败处理。默认:标 failed。子类可 override。"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Task %s: verify failed (%s), marked failed",
task_id, verify.reason)
def check_completion(self, task_id: str, db_path: Path) -> bool:
"""ticker 级别的完成检查。默认:False。"""
return False
# === 内部工具方法 ===
def _rollback_current_agent(self, db_path: Path, task_id: str, agent_id: str) -> None:
"""crash 后回退 current_agent → assignee,避免 exclude_current 卡死。
从 dispatcher._rollback_current_agent 迁移。"""
try:
conn = get_connection(db_path)
try:
conn.execute(
"UPDATE tasks SET current_agent = "
"(SELECT assignee FROM tasks WHERE id=?) "
"WHERE id=? AND current_agent=?",
(task_id, task_id, agent_id)
)
conn.commit()
finally:
conn.close()
logger.info("Task %s: rolled back current_agent from %s to assignee",
task_id, agent_id)
except Exception as e:
logger.warning("Task %s: failed to rollback current_agent: %s",
task_id, e)
def _mark_task_status(self, db_path: Path, task_id: str, status: str) -> None:
"""更新任务状态 + 写审计事件。
从 dispatcher._mark_task_status 迁移。"""
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
old_row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
old_status = old_row["status"] if old_row else "unknown"
conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?",
(status, task_id),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, payload) "
"VALUES (?, 'handler', 'status_change', ?)",
(task_id,
f'{{"from": "{old_status}", "to": "{status}", '
f'"source": "{self.task_type}_handler"}}'),
)
conn.commit()
finally:
conn.close()
except Exception as e:
logger.error("Task %s: mark status error: %s", task_id, e)
def _auto_mark_working(self, task_id: str, db_path: Path) -> bool:
"""pending → workingmail/toolchain 通用)。"""
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row or row["status"] not in ("pending", "claimed"):
logger.warning("Task %s: cannot mark working (status=%s)",
task_id, row["status"] if row else "not found")
return False
conn.execute(
"UPDATE tasks SET status='working', updated_at=datetime('now') "
"WHERE id=?", (task_id,))
conn.commit()
logger.info("Task %s: auto-marked working", task_id)
return True
finally:
conn.close()
except Exception as e:
logger.error("Task %s: failed to mark working: %s", task_id, e)
return False
+206
View File
@@ -0,0 +1,206 @@
"""mail_handler.py — Mail 任务 handler。
处理 Agent 间通信(飞鸽传书),含 inform 和 request 两种类型。
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Dict, Optional
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.mail")
class MailHandler(BaseTaskHandler):
"""Mail 任务 handler。"""
task_type = "mail"
virtual_project = "_mail"
def target_success_status(self) -> str:
return "done"
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""auto_workingpending → working"""
return self._auto_mark_working(task_id, db_path)
def build_prompt(self, context: PromptContext) -> str:
"""通过 PromptComposer 拼装 3 个 section。"""
composer = PromptComposer()
composer.add_many(self.get_sections())
return composer.compose(context)
def get_sections(self) -> list:
return [MailContextSection(), MailApiSection(), MailConstraintsSection()]
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""Mail 完成验证:区分 inform/request。
- inform: 始终通过(通知已阅即 done,不需要检查产出)
- request: 检查是否已回复
"""
performative = self._parse_performative(task_id, db_path)
if performative == "inform":
return VerifyResult(True, "inform_auto", f"performative={performative}")
# request: 检查是否已回复
has_reply = self._check_reply(task_id, db_path)
if has_reply:
return VerifyResult(True, "has_reply", f"performative={performative}")
return VerifyResult(False, "no_reply", f"performative={performative}")
# post_complete 由基类 BaseTaskHandler 统一处理(crash→verify→mark→notify
# inform: verify 始终通过 → 基类 mark done ✅
# request 有回复: verify 通过 → 基类 mark done ✅
# request 无回复: verify 失败 → 基类调 on_failure ✅
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""request 验证失败 → 标 failed + 通知发件人"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Mail %s: request verify failed (%s), marked failed",
task_id, verify.reason)
# 通知发件人
try:
from src.daemon.mail_notify import notify_mail_failed
notify_mail_failed(db_path, task_id, "no_reply_found")
except Exception as e:
logger.warning("Mail %s: failed to send notification: %s", task_id, e)
# === 内部方法 ===
def _parse_performative(self, task_id: str, db_path: Path) -> str:
"""解析 mail 类型(inform/request"""
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["must_haves"]:
meta = json.loads(row["must_haves"])
return meta.get("performative", meta.get("type", "request"))
finally:
conn.close()
except Exception:
pass
return "request"
def _check_reply(self, task_id: str, db_path: Path) -> bool:
"""检查是否已回复(从 dispatcher._mail_check_reply 迁移)"""
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT COUNT(*) as cnt FROM comments "
"WHERE task_id=? AND author != 'daemon' "
"AND comment_type != 'system'",
(task_id,)
).fetchone()
count = row["cnt"] if row else 0
return count > 0
finally:
conn.close()
except Exception as e:
logger.error("Mail %s: check reply error: %s", task_id, e)
return False
def check_completion(self, task_id: str, db_path: Path) -> bool:
"""ticker 级别的完成检查:检查是否已回复"""
return self._check_reply(task_id, db_path)
# ===================================================================
# Mail PromptSections
# ===================================================================
class MailContextSection:
"""邮件上下文段 — 发件人/收件人/主题/内容,区分 inform/request。"""
name: str = "mail_context"
priority: int = 10
def render(self, context: PromptContext) -> str:
if context.mail_type == "inform":
return self._render_inform(context)
return self._render_request(context)
def should_include(self, context: PromptContext) -> bool: # noqa: ARG002
return True
@staticmethod
def _render_inform(context: PromptContext) -> str:
return (
f"你收到一封飞鸽传书(纯通知)。\n\n"
f"发件者: {context.from_agent}\n"
f"主题: {context.title}\n"
f"内容: {context.description}\n\n"
f"已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。\n"
f"⚠️ 不要执行任何状态转换命令。"
)
@staticmethod
def _render_request(context: PromptContext) -> str:
return (
f"你收到一封飞鸽传书,需要你处理并回复。\n\n"
f"发件者: {context.from_agent}\n"
f"主题: {context.title}\n"
f"内容: {context.description}\n\n"
f"### 如何回复发件者\n\n"
f'curl -s -X POST http://localhost:8083/api/mail \\\n'
f" -H 'Content-Type: application/json' \\\n"
f' -d \'{{"from": "{context.agent_id}", '
f'"in_reply_to": "{context.task_id}", '
f'"title": "回复: {context.title}", '
f'"text": "你的回复内容"}}\'\n\n'
f"⚠️ 不需要填 \"to\",系统自动回复给发件者。"
)
class MailApiSection:
"""Mail API 操作指令段。"""
name: str = "mail_api"
priority: int = 40
def render(self, context: PromptContext) -> str:
return (
f"### 如何给其他人发新邮件\n\n"
f'curl -s -X POST http://localhost:8083/api/mail \\\n'
f" -H 'Content-Type: application/json' \\\n"
f' -d \'{{"from": "{context.agent_id}", '
f'"to": "对方agent-id", '
f'"title": "标题", '
f'"text": "正文", '
f'"type": "inform"}}\'\n\n'
f"⚠️ to 必须是有效的 agent id\n"
f"⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)"
)
def should_include(self, context: PromptContext) -> bool:
return context.mail_type == "request"
class MailConstraintsSection:
"""Mail 硬约束段。"""
name: str = "mail_constraints"
priority: int = 50
def render(self, context: PromptContext) -> str: # noqa: ARG002
return (
"## 硬约束\n\n"
"1. ⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。\n"
"2. ⚠️ 不能给自己发邮件\n"
"3. ⚠️ 发邮件时 to 必须是有效的 agent id\n"
"4. ⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)"
)
def should_include(self, context: PromptContext) -> bool: # noqa: ARG002
return True
+330
View File
@@ -0,0 +1,330 @@
"""task_handler.py — 黑板任务 handlertask_type='task')。
标准黑板任务:三信号验证 → review 状态。
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Dict, List, Optional
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler")
TERMINAL_STATES = frozenset({"review", "done", "failed", "cancelled"})
# ---------------------------------------------------------------------------
# Role → Skill 映射(D8 决策:L2 只给索引+引导语,不注全文)
# ---------------------------------------------------------------------------
ROLE_SKILL_MAP: Dict[str, str] = {
"executor": "blackboard-executor",
"reviewer": "blackboard-reviewer",
"reviewer-simayi": "blackboard-reviewer-simayi",
"reviewer-pangtong": "blackboard-reviewer-pangtong",
"planner": "blackboard-planner",
"claim": "blackboard-claim",
}
SKILL_BASE_PATH = "/Users/chufeng/.sanguo_projects/sanguo_mozi/skills"
# ---------------------------------------------------------------------------
# PromptSection 实现
# ---------------------------------------------------------------------------
class TaskContextSection:
"""段 1:任务上下文(title / desc / must_haves / status)。"""
name: str = "task_context"
priority: int = 10
def render(self, context: PromptContext) -> str:
parts = ["## 任务上下文"]
if context.task_id:
parts.append(f"任务ID: {context.task_id}")
if context.title:
parts.append(f"标题: {context.title}")
if context.description:
parts.append(f"描述: {context.description}")
if context.must_haves:
parts.append(f"必须完成: {context.must_haves}")
if context.task and context.task.get("status"):
parts.append(f"当前状态: {context.task['status']}")
return "\n".join(parts)
def should_include(self, context: PromptContext) -> bool:
return bool(context.task_id or context.title)
class PriorOutputsSection:
"""段 2:前序产出摘要(depends_on 非空时注入)。"""
name: str = "prior_outputs"
priority: int = 20
def render(self, context: PromptContext) -> str:
outputs = context.depends_on_outputs or []
parts = ["## 前序产出"]
for out in outputs:
tid = out.get("task_id", "?")
summary = out.get("summary", "无摘要")
parts.append(f"- [{tid}] {summary}")
return "\n".join(parts)
def should_include(self, context: PromptContext) -> bool:
return bool(context.depends_on_outputs)
class RoleSkillSection:
"""段 3:角色 Skill 索引+引导语(D8 决策:不注全文)。"""
name: str = "role_skill"
priority: int = 30
def render(self, context: PromptContext) -> str:
skill_name = ROLE_SKILL_MAP.get(context.role, "")
lines = [
"## 角色操作规范",
f"你的角色:{context.role}",
]
if skill_name:
lines.append(f"对应 Skill{skill_name}")
lines.append(
f"请用 read 工具读取 {SKILL_BASE_PATH}/{skill_name}/SKILL.md "
"获取完整操作规范。"
)
else:
lines.append("无对应 Skill 文件,按通用规范执行。")
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
class TaskApiSection:
"""段 4API 操作指令。"""
name: str = "task_api"
priority: int = 40
API_HOST = "localhost"
API_PORT = 8083
def render(self, context: PromptContext) -> str:
pid = context.project_id
tid = context.task_id
aid = context.agent_id
success_status = '"review"'
base = f"http://{self.API_HOST}:{self.API_PORT}/api/projects/{pid}/tasks/{tid}"
return (
"## 操作指令\n"
"### 状态回写\n"
f"开始工作:\n"
f'curl -X POST {base}/status \\\n'
f' -H "Content-Type: application/json" \\\n'
f' -d \'{{"status": "working", "agent": "{aid}"}}\'\n\n'
"### 写入产出\n"
f'curl -X POST {base}/outputs \\\n'
f' -H "Content-Type: application/json" \\\n'
f" -d '{{\"type\": \"text\", \"content\": \"<your output>\"}}'\n\n"
"### 完成后\n"
f"成功: status → {success_status} | 失败: status → \"failed\""
)
def should_include(self, context: PromptContext) -> bool:
return True
class TaskConstraintsSection:
"""段 5:硬约束。"""
name: str = "task_constraints"
priority: int = 50
def render(self, context: PromptContext) -> str:
constraints = ["## 硬约束"]
role = context.role
if role == "executor":
constraints.extend([
"- 完成后必须标 review",
"- 产出物不能为空(系统会验证)",
"- handoff comment ≥ 50 字符",
])
elif role.startswith("reviewer"):
constraints.extend([
"- 审查结果必须明确 pass/fail",
"- 评审意见须附证据(文件:行号)",
])
elif role == "planner":
constraints.extend([
"- 需求不清时提问,不要猜",
"- 子任务必须有明确的终态定义",
])
else:
constraints.append("- 按规范完成 assigned 任务")
return "\n".join(constraints)
def should_include(self, context: PromptContext) -> bool:
return True
class TaskHandler(BaseTaskHandler):
"""黑板标准任务 handler。
- verify: 三信号检查(output / comment / terminal status
- 成功 → review
- 失败 → 保持 working,让 ticker 重试
- review 完成 → 读取 verdictapproved 则 mark done
"""
task_type: str = "task"
virtual_project: Optional[str] = None
# === 子类实现 ===
def target_success_status(self) -> str:
"""task 类型验证通过后进 review。"""
return "review"
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""三信号验证:output / comment / terminal status。"""
try:
conn = get_connection(db_path)
try:
# 信号 1terminal status
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not row:
return VerifyResult(False, "not_found", "task not found",
can_retry=False)
status = row["status"]
if status in TERMINAL_STATES:
return VerifyResult(
True, "terminal_status",
f"status={status}", can_retry=False
)
# 信号 2outputs
output_count = conn.execute(
"SELECT COUNT(*) as cnt FROM outputs WHERE task_id=?",
(task_id,)
).fetchone()["cnt"]
if output_count > 0:
return VerifyResult(
True, "has_output",
f"output_count={output_count}"
)
# 信号 3:非 system 且内容 >= 50 字的 comment
comment_count = conn.execute(
"SELECT COUNT(*) as cnt FROM comments "
"WHERE task_id=? AND author != 'system' "
"AND LENGTH(content) >= 50",
(task_id,)
).fetchone()["cnt"]
if comment_count > 0:
return VerifyResult(
True, "has_comment",
f"comment_count={comment_count}"
)
# 无信号
return VerifyResult(
False, "no_signal",
f"output=0, comment=0, status={status}"
)
finally:
conn.close()
except Exception as e:
logger.error("Task %s: verify error: %s", task_id, e)
return VerifyResult(False, "verify_error", str(e))
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""task 类型不需要 pre_spawn 逻辑。"""
return True
def get_sections(self) -> list:
"""返回 5 个 PromptSection 实例。"""
return [
TaskContextSection(),
PriorOutputsSection(),
RoleSkillSection(),
TaskApiSection(),
TaskConstraintsSection(),
]
def build_prompt(self, context: PromptContext) -> str:
"""通过 PromptComposer 拼装 prompt sections。"""
composer = PromptComposer()
composer.add_many(self.get_sections())
return composer.compose(context)
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败:不标 failed,保持 working 让 ticker 重试。"""
logger.info(
"Task %s: verify failed (%s, evidence=%s), leaving working for ticker retry",
task_id, verify.reason, verify.evidence
)
# === Review 流程 ===
def handle_review_complete(self, task_id: str, db_path: Path) -> None:
"""Review 完成后处理:读取 verdict → approved 则 mark done
否则 @mention assignee via blackboard comment。"""
try:
conn = get_connection(db_path)
try:
# 读取最新 review
review_row = conn.execute(
"SELECT verdict, reviewer, comment FROM reviews "
"WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
(task_id,)
).fetchone()
if not review_row:
logger.warning("Task %s: no review found", task_id)
return
verdict = review_row["verdict"]
reviewer = review_row["reviewer"]
review_comment = review_row["comment"] or ""
# 获取 assignee
task_row = conn.execute(
"SELECT assignee FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not task_row:
logger.warning("Task %s: task not found for review", task_id)
return
assignee = task_row["assignee"]
if verdict == "approved":
self._mark_task_status(db_path, task_id, "done")
logger.info("Task %s: review approved by %s, marked done",
task_id, reviewer)
else:
# 非 approved:通过 blackboard comment @mention assignee
conn.execute(
"INSERT INTO comments (task_id, author, content) "
"VALUES (?, 'system', ?)",
(task_id,
f"@{assignee} review 未通过 (verdict={verdict}, "
f"reviewer={reviewer}): {review_comment}")
)
conn.commit()
# 回到 working 让 assignee 重新处理
self._mark_task_status(db_path, task_id, "working")
logger.info(
"Task %s: review not approved (%s by %s), "
"@mentioned assignee %s, back to working",
task_id, verdict, reviewer, assignee
)
finally:
conn.close()
except Exception as e:
logger.error("Task %s: handle_review_complete error: %s", task_id, e)
+256
View File
@@ -0,0 +1,256 @@
"""toolchain_handler.py — 工具链事件 handler。
处理 Gitea Webhook 事件(CI 失败、Review 请求、Issue 指派等)。
"""
from __future__ import annotations
import json
import logging
import urllib.request
from pathlib import Path
from typing import Dict
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.daemon.toolchain_templates import render_template, _TEMPLATE_MAP
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.toolchain")
# ---------------------------------------------------------------------------
# Toolchain PromptSections
# ---------------------------------------------------------------------------
class ToolchainContextSection:
"""事件类型 + 事件详情(priority=10"""
name: str = "toolchain_context"
priority: int = 10
def render(self, context: PromptContext) -> str:
event_type = context.event_type
event_data: Dict = context.event_data or {}
if event_type in _TEMPLATE_MAP:
# 使用模板引擎渲染已知事件
variables = {k: str(v) for k, v in event_data.items()}
return render_template(event_type, variables)
# fallback:通用事件描述
lines = [f"## 工具链事件", f""]
lines.append(f"- **事件类型**: {event_type or '未知'}")
if event_data:
lines.append(f"- **事件详情**:")
for key, value in event_data.items():
lines.append(f" - {key}: {value}")
lines.append(f"")
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
class ToolchainApiSection:
"""API 操作指令(priority=40),success_status=done"""
name: str = "toolchain_api"
priority: int = 40
API_HOST = "localhost:8083"
def render(self, context: PromptContext) -> str:
lines = [
"## API 操作指令",
"",
f"项目 ID: `{context.project_id}`",
f"任务 ID: `{context.task_id}`",
"",
"### 完成后必须更新任务状态",
"完成后务必通过以下命令将任务标记为 **done**:",
"```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/status" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"status": "done"}\'',
"```",
"",
"### 提交产出",
"如有产出(如 review 结果、修复方案),提交到任务 outputs:",
"```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/outputs" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"content": "<你的产出内容>", "type": "text"}\'',
"```",
"",
]
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
class ToolchainConstraintsSection:
"""硬约束(priority=50"""
name: str = "toolchain_constraints"
priority: int = 50
def render(self, context: PromptContext) -> str:
lines = [
"## 硬约束",
"",
"1. **必须标 done**:处理完成后必须通过 API 将任务状态更新为 `done`,否则视为未完成",
"2. **产出不能为空**:必须提交有意义的产出(output 或 comment),不能只改状态",
"3. **单一职责**:只处理本次事件相关的操作,不要越界执行无关任务",
"4. **出错即报告**:如果无法处理(如权限不足、资源不存在),在 comment 中说明原因并标 done",
"5. **不要创建新任务**:工具链事件只处理当前事件,不衍生新任务",
"",
]
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
# ---------------------------------------------------------------------------
# ToolchainHandler
# ---------------------------------------------------------------------------
class ToolchainHandler(BaseTaskHandler):
"""工具链事件 handler。"""
task_type = "toolchain"
virtual_project = "_toolchain"
def target_success_status(self) -> str:
return "done"
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""auto_workingpending → working"""
return self._auto_mark_working(task_id, db_path)
def get_sections(self) -> list:
"""返回 3 个 Toolchain PromptSection 实例"""
return [
ToolchainContextSection(),
ToolchainApiSection(),
ToolchainConstraintsSection(),
]
def build_prompt(self, context: PromptContext) -> str:
"""通过 PromptComposer 拼装 sections 为最终 prompt"""
composer = PromptComposer()
composer.add_many(self.get_sections())
return composer.compose(context)
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""检查行动输出(output 或 comment 有实质内容)"""
try:
conn = get_connection(db_path)
try:
# 检查 output
output_count = conn.execute(
"SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,)
).fetchone()[0]
if output_count > 0:
return VerifyResult(True, "has_output", f"output_count={output_count}")
# 检查 comment(非系统、有实质内容)
comment_count = conn.execute(
"SELECT COUNT(*) FROM comments WHERE task_id=? "
"AND author != 'system' AND LENGTH(content) >= 20",
(task_id,)
).fetchone()[0]
if comment_count > 0:
return VerifyResult(True, "has_comment", f"comment_count={comment_count}")
return VerifyResult(False, "no_action", "output=0, comment=0")
finally:
conn.close()
except Exception as e:
logger.error("Toolchain %s: verify error: %s", task_id, e)
return VerifyResult(False, "verify_error", str(e))
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败 → 标 failed + Mail API 通知主公"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Toolchain %s: verify failed (%s), marked failed", task_id, verify.reason)
# 从 db 读取事件上下文
event_type = ""
event_data: Dict = {}
try:
conn = get_connection(db_path)
row = conn.execute(
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["must_haves"]:
meta = json.loads(row["must_haves"])
event_type = meta.get("event_type", "")
raw = meta.get("event_data", "{}")
event_data = json.loads(raw) if isinstance(raw, str) else raw
conn.close()
except Exception:
pass
self._notify_via_mail_api(
task_id, verify.reason, verify.evidence,
event_type, event_data,
)
def _notify_via_mail_api(
self,
task_id: str,
reason: str,
evidence: str,
event_type: str,
event_data: Dict,
) -> None:
"""通过 Mail API 发送丰富的失败通知给主公。"""
# 构建行动指引
action_hint = "请检查黑板任务并手动处理。"
et_lower = event_type.lower()
if "ci" in et_lower or "deploy" in et_lower:
action_hint = "建议创建任务派给 jiangwei-infra 检查 CI/部署问题。"
elif "review" in et_lower:
action_hint = "建议查看 PR review 状态,必要时通知相关开发者。"
elif "issue" in et_lower:
action_hint = "建议创建任务派给对应开发者处理 Issue。"
# 构建事件详情
event_details = ""
if event_data:
event_details = "\n".join(
f" - {k}: {v}" for k, v in event_data.items()
)
title = f"[toolchain-handler] 工具链事件处理失败: {task_id}"
text = (
f"任务 {task_id} 验证失败\n\n"
f"事件类型: {event_type or '未知'}\n"
f"事件详情:\n{event_details or ' (无)'}\n\n"
f"失败原因: {reason}\n"
f"证据: {evidence}\n\n"
f"黑板任务: http://localhost:8083/ → 项目 _toolchain → 任务 {task_id}\n\n"
f"行动指引: {action_hint}"
)
payload = json.dumps({
"from": "daemon",
"to": "pangtong-fujunshi",
"title": title,
"text": text,
"type": "inform",
}, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
"http://localhost:8083/api/mail",
data=payload,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=5)
logger.info("Toolchain %s: sent failure notification via Mail API", task_id)
except Exception as e:
logger.warning("Toolchain %s: failed to notify via Mail API: %s", task_id, e)