Files
sanguo_moziplus_v2/src/daemon/task_handler.py
T
pangtong-fujunshi 1089991455
Deploy / ci (push) Successful in 10s
Deploy / deploy (push) Successful in 11s
Deploy / notify-deploy-failure (push) Successful in 1s
fix(lint): resolve all 37 flake8 issues (#33)
2026-06-11 02:34:50 +00:00

388 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""task_handler.py — 黑板任务 handlertask_type='task')。
标准黑板任务:三信号验证 → review 状态。
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import Dict, Optional
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler")
TERMINAL_STATES = frozenset({"review", "done", "failed", "cancelled"})
# ---------------------------------------------------------------------------
# Role → Skill 映射(D8 决策:L2 只给索引+引导语,不注全文)
# ---------------------------------------------------------------------------
ROLE_SKILL_MAP: Dict[str, str] = {
"executor": "blackboard-executor",
"reviewer": "blackboard-reviewer",
"reviewer-simayi": "blackboard-reviewer-simayi",
"reviewer-pangtong": "blackboard-reviewer-pangtong",
"planner": "blackboard-planner",
"claim": "blackboard-claim",
}
SKILL_BASE_PATH = os.environ.get(
"MOZI_SKILL_PATH",
"/Users/chufeng/.sanguo_projects/sanguo_mozi/skills",
)
# ---------------------------------------------------------------------------
# PromptSection 实现
# ---------------------------------------------------------------------------
class TaskContextSection:
"""段 1:任务上下文(title / desc / must_haves / status)。"""
name: str = "task_context"
priority: int = 10
def render(self, context: PromptContext) -> str:
parts = ["## 任务上下文"]
if context.task_id:
parts.append(f"任务ID: {context.task_id}")
if context.title:
parts.append(f"标题: {context.title}")
if context.description:
parts.append(f"描述: {context.description}")
if context.must_haves:
parts.append(f"必须完成: {context.must_haves}")
if context.task and context.task.get("status"):
parts.append(f"当前状态: {context.task['status']}")
return "\n".join(parts)
def should_include(self, context: PromptContext) -> bool:
return bool(context.task_id or context.title)
class PriorOutputsSection:
"""段 2:前序产出摘要(depends_on 非空时注入)。"""
name: str = "prior_outputs"
priority: int = 20
def render(self, context: PromptContext) -> str:
outputs = context.depends_on_outputs or []
parts = ["## 前序产出"]
for out in outputs:
tid = out.get("task_id", "?")
summary = out.get("summary", "无摘要")
parts.append(f"- [{tid}] {summary}")
return "\n".join(parts)
def should_include(self, context: PromptContext) -> bool:
return bool(context.depends_on_outputs)
class RoleSkillSection:
"""段 3:角色 Skill 全文注入(对齐设计 §2.3 + BootstrapBuilder 行为)。"""
name: str = "role_skill"
priority: int = 30
def render(self, context: PromptContext) -> str:
skill_name = ROLE_SKILL_MAP.get(context.role, "")
lines = [
"## 角色操作规范",
f"你的角色:{context.role}",
]
if skill_name:
skill_path = os.path.join(SKILL_BASE_PATH, skill_name, "SKILL.md")
try:
with open(skill_path, encoding="utf-8") as f:
skill_content = f.read()
if skill_content:
lines.append(skill_content)
else:
lines.append(f"Skill 文件为空:{skill_name}")
except FileNotFoundError:
lines.append(f"Skill 文件不存在:{skill_name}")
else:
lines.append("无对应 Skill 文件,按通用规范执行。")
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
class TaskApiSection:
"""段 4API 操作指令。"""
name: str = "task_api"
priority: int = 40
API_HOST = "localhost"
API_PORT = 8083
def render(self, context: PromptContext) -> str:
pid = context.project_id
tid = context.task_id
aid = context.agent_id
success_status = '"review"'
base = f"http://{self.API_HOST}:{self.API_PORT}/api/projects/{pid}/tasks/{tid}"
return (
"## 操作指令\n"
"### 状态回写\n"
f"开始工作:\n"
f'curl -X POST {base}/status \\\n'
f' -H "Content-Type: application/json" \\\n'
f' -d \'{{"status": "working", "agent": "{aid}"}}\'\n\n'
"### 写入产出\n"
f'curl -X POST {base}/outputs \\\n'
f' -H "Content-Type: application/json" \\\n'
f" -d '{{\"type\": \"text\", \"content\": \"<your output>\"}}'\n\n"
"### 完成后\n"
f"成功: status → {success_status} | 失败: status → \"failed\""
)
def should_include(self, context: PromptContext) -> bool:
return True
class TaskConstraintsSection:
"""段 5:硬约束。"""
name: str = "task_constraints"
priority: int = 50
def render(self, context: PromptContext) -> str:
constraints = ["## 硬约束"]
role = context.role
if role == "executor":
constraints.extend([
"- 完成后必须标 review",
"- 产出物不能为空(系统会验证)",
"- handoff comment ≥ 50 字符",
])
elif role.startswith("reviewer"):
constraints.extend([
"- 审查结果必须明确 pass/fail",
"- 评审意见须附证据(文件:行号)",
])
elif role == "planner":
constraints.extend([
"- 需求不清时提问,不要猜",
"- 子任务必须有明确的终态定义",
])
else:
constraints.append("- 按规范完成 assigned 任务")
return "\n".join(constraints)
def should_include(self, context: PromptContext) -> bool:
return True
class TaskHandler(BaseTaskHandler):
"""黑板标准任务 handler。
- verify: 三信号检查(output / comment / terminal status
- 成功 → review
- 失败 → 保持 working,让 ticker 重试
- review 完成 → 读取 verdictapproved 则 mark done
"""
task_type: str = "task"
virtual_project: Optional[str] = None
display_name = "黑板任务"
# === 子类实现 ===
def post_complete(self, task_id: str, agent_id: str,
outcome: str, db_path: Path) -> None:
"""Task on_complete:区分 executor 和 review。
executor: 基类统一流程(crash → verify → mark review
review: handle_review_complete(读 verdict → done/keep review
"""
# crash 处理(所有类型共用)
if outcome in self.CRASH_OUTCOMES:
self._rollback_current_agent(db_path, task_id, agent_id)
return
# 检查当前任务状态:如果是 review 状态 → review 完成流程
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
task_status = row["status"] if row else "unknown"
finally:
conn.close()
except Exception:
task_status = "unknown"
if task_status == "review":
# review 完成流程:只处理正常 outcome
if outcome in ("completed", "session_revived"):
self.handle_review_complete(task_id, db_path)
else:
logger.warning(
"Task %s: review agent %s abnormal outcome=%s, keeping review",
task_id, agent_id, outcome)
else:
# executor 完成流程:基类统一 verify → mark
result = self.verify_completion(task_id, db_path)
if result.passed:
self._mark_task_status(db_path, task_id, self.target_success_status())
logger.info("Task %s: verify passed (%s), marked %s",
task_id, result.reason, self.target_success_status())
else:
logger.info(
"Task %s: verify not passed (%s), leaving working",
task_id, result.reason)
# NOTE: executor verify 不通过时不标 failed,留 working。
# 原因:Agent 可能还在产出中(幻觉门控的后续轮次),
# ticker 超时检查会兜底处理。不调 on_failure 避免误判。
def target_success_status(self) -> str:
"""task 类型验证通过后进 review。"""
return "review"
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""三信号验证:output / comment / terminal status。"""
try:
conn = get_connection(db_path)
try:
# 信号 1terminal status
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not row:
return VerifyResult(False, "not_found", "task not found",
can_retry=False)
status = row["status"]
if status in TERMINAL_STATES:
return VerifyResult(
True, "terminal_status",
f"status={status}", can_retry=False
)
# 信号 2outputs
output_count = conn.execute(
"SELECT COUNT(*) as cnt FROM outputs WHERE task_id=?",
(task_id,)
).fetchone()["cnt"]
if output_count > 0:
return VerifyResult(
True, "has_output",
f"output_count={output_count}"
)
# 信号 3:非 system 且内容 >= 50 字的 comment
comment_count = conn.execute(
"SELECT COUNT(*) as cnt FROM comments "
"WHERE task_id=? AND author != 'system' "
"AND LENGTH(content) >= 50",
(task_id,)
).fetchone()["cnt"]
if comment_count > 0:
return VerifyResult(
True, "has_comment",
f"comment_count={comment_count}"
)
# 无信号
return VerifyResult(
False, "no_signal",
f"output=0, comment=0, status={status}"
)
finally:
conn.close()
except Exception as e:
logger.error("Task %s: verify error: %s", task_id, e)
return VerifyResult(False, "verify_error", str(e))
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""task 类型不需要 pre_spawn 逻辑。"""
return True
def get_sections(self) -> list:
"""返回 5 个 PromptSection 实例。"""
return [
TaskContextSection(),
PriorOutputsSection(),
RoleSkillSection(),
TaskApiSection(),
TaskConstraintsSection(),
]
def build_prompt(self, context: PromptContext) -> str:
"""通过 PromptComposer 拼装 prompt sections。"""
composer = PromptComposer()
composer.add_many(self.get_sections())
return composer.compose(context)
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败:不标 failed,保持 working 让 ticker 重试。"""
logger.info(
"Task %s: verify failed (%s, evidence=%s), leaving working for ticker retry",
task_id, verify.reason, verify.evidence
)
# === Review 流程 ===
def handle_review_complete(self, task_id: str, db_path: Path) -> None:
"""Review 完成后处理:读取 verdict → approved 则 mark done
否则 @mention assignee via blackboard comment。"""
try:
conn = get_connection(db_path)
try:
# 读取最新 review
review_row = conn.execute(
"SELECT verdict, reviewer, comment FROM reviews "
"WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
(task_id,)
).fetchone()
if not review_row:
logger.warning("Task %s: no review found", task_id)
return
verdict = review_row["verdict"]
reviewer = review_row["reviewer"]
review_comment = review_row["comment"] or ""
# 获取 assignee
task_row = conn.execute(
"SELECT assignee FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not task_row:
logger.warning("Task %s: task not found for review", task_id)
return
assignee = task_row["assignee"]
if verdict == "approved":
self._mark_task_status(db_path, task_id, "done")
logger.info("Task %s: review approved by %s, marked done",
task_id, reviewer)
else:
# 非 approved:通过 blackboard comment @mention assignee
# 保持 review 状态,让 assignee 自行决定下一步
conn.execute(
"INSERT INTO comments (task_id, author, content, comment_type) "
"VALUES (?, 'system', ?, 'review')",
(task_id,
f"@{assignee} review 未通过 (verdict={verdict}, "
f"reviewer={reviewer}): {review_comment}")
)
conn.commit()
logger.info(
"Task %s: review not approved (%s by %s), "
"@mentioned assignee %s, keeping review status",
task_id, verdict, reviewer, assignee
)
finally:
conn.close()
except Exception as e:
logger.error("Task %s: handle_review_complete error: %s", task_id, e)