2f1cb5c277
- Remove 7 unused imports (F401) - Fix 4 f-strings without placeholders (F541) - Fix indentation and blank line issues (E127/E302/E402) - Remove trailing whitespace on 22 blank lines (W293) Pure formatting changes, no logic modifications.
388 lines
14 KiB
Python
388 lines
14 KiB
Python
"""task_handler.py — 黑板任务 handler(task_type='task')。
|
||
|
||
标准黑板任务:三信号验证 → review 状态。
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Dict, Optional
|
||
|
||
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
|
||
from src.daemon.prompt_composer import PromptComposer, PromptContext
|
||
from src.blackboard.db import get_connection
|
||
|
||
logger = logging.getLogger("moziplus-v2.handler")
|
||
|
||
TERMINAL_STATES = frozenset({"review", "done", "failed", "cancelled"})
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Role → Skill 映射(D8 决策:L2 只给索引+引导语,不注全文)
|
||
# ---------------------------------------------------------------------------
|
||
ROLE_SKILL_MAP: Dict[str, str] = {
|
||
"executor": "blackboard-executor",
|
||
"reviewer": "blackboard-reviewer",
|
||
"reviewer-simayi": "blackboard-reviewer-simayi",
|
||
"reviewer-pangtong": "blackboard-reviewer-pangtong",
|
||
"planner": "blackboard-planner",
|
||
"claim": "blackboard-claim",
|
||
}
|
||
|
||
SKILL_BASE_PATH = os.environ.get(
|
||
"MOZI_SKILL_PATH",
|
||
"/Users/chufeng/.sanguo_projects/sanguo_mozi/skills",
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# PromptSection 实现
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TaskContextSection:
|
||
"""段 1:任务上下文(title / desc / must_haves / status)。"""
|
||
|
||
name: str = "task_context"
|
||
priority: int = 10
|
||
|
||
def render(self, context: PromptContext) -> str:
|
||
parts = ["## 任务上下文"]
|
||
if context.task_id:
|
||
parts.append(f"任务ID: {context.task_id}")
|
||
if context.title:
|
||
parts.append(f"标题: {context.title}")
|
||
if context.description:
|
||
parts.append(f"描述: {context.description}")
|
||
if context.must_haves:
|
||
parts.append(f"必须完成: {context.must_haves}")
|
||
if context.task and context.task.get("status"):
|
||
parts.append(f"当前状态: {context.task['status']}")
|
||
return "\n".join(parts)
|
||
|
||
def should_include(self, context: PromptContext) -> bool:
|
||
return bool(context.task_id or context.title)
|
||
|
||
|
||
class PriorOutputsSection:
|
||
"""段 2:前序产出摘要(depends_on 非空时注入)。"""
|
||
|
||
name: str = "prior_outputs"
|
||
priority: int = 20
|
||
|
||
def render(self, context: PromptContext) -> str:
|
||
outputs = context.depends_on_outputs or []
|
||
parts = ["## 前序产出"]
|
||
for out in outputs:
|
||
tid = out.get("task_id", "?")
|
||
summary = out.get("summary", "无摘要")
|
||
parts.append(f"- [{tid}] {summary}")
|
||
return "\n".join(parts)
|
||
|
||
def should_include(self, context: PromptContext) -> bool:
|
||
return bool(context.depends_on_outputs)
|
||
|
||
|
||
class RoleSkillSection:
|
||
"""段 3:角色 Skill 全文注入(对齐设计 §2.3 + BootstrapBuilder 行为)。"""
|
||
|
||
name: str = "role_skill"
|
||
priority: int = 30
|
||
|
||
def render(self, context: PromptContext) -> str:
|
||
skill_name = ROLE_SKILL_MAP.get(context.role, "")
|
||
lines = [
|
||
"## 角色操作规范",
|
||
f"你的角色:{context.role}",
|
||
]
|
||
if skill_name:
|
||
skill_path = os.path.join(SKILL_BASE_PATH, skill_name, "SKILL.md")
|
||
try:
|
||
with open(skill_path, encoding="utf-8") as f:
|
||
skill_content = f.read()
|
||
if skill_content:
|
||
lines.append(skill_content)
|
||
else:
|
||
lines.append(f"(Skill 文件为空:{skill_name})")
|
||
except FileNotFoundError:
|
||
lines.append(f"(Skill 文件不存在:{skill_name})")
|
||
else:
|
||
lines.append("无对应 Skill 文件,按通用规范执行。")
|
||
return "\n".join(lines)
|
||
|
||
def should_include(self, context: PromptContext) -> bool:
|
||
return True
|
||
|
||
|
||
class TaskApiSection:
|
||
"""段 4:API 操作指令。"""
|
||
|
||
name: str = "task_api"
|
||
priority: int = 40
|
||
|
||
API_HOST = "localhost"
|
||
API_PORT = 8083
|
||
|
||
def render(self, context: PromptContext) -> str:
|
||
pid = context.project_id
|
||
tid = context.task_id
|
||
aid = context.agent_id
|
||
success_status = '"review"'
|
||
base = f"http://{self.API_HOST}:{self.API_PORT}/api/projects/{pid}/tasks/{tid}"
|
||
return (
|
||
"## 操作指令\n"
|
||
"### 状态回写\n"
|
||
f"开始工作:\n"
|
||
f'curl -X POST {base}/status \\\n'
|
||
f' -H "Content-Type: application/json" \\\n'
|
||
f' -d \'{{"status": "working", "agent": "{aid}"}}\'\n\n'
|
||
"### 写入产出\n"
|
||
f'curl -X POST {base}/outputs \\\n'
|
||
f' -H "Content-Type: application/json" \\\n'
|
||
f" -d '{{\"type\": \"text\", \"content\": \"<your output>\"}}'\n\n"
|
||
"### 完成后\n"
|
||
f"成功: status → {success_status} | 失败: status → \"failed\""
|
||
)
|
||
|
||
def should_include(self, context: PromptContext) -> bool:
|
||
return True
|
||
|
||
|
||
class TaskConstraintsSection:
|
||
"""段 5:硬约束。"""
|
||
|
||
name: str = "task_constraints"
|
||
priority: int = 50
|
||
|
||
def render(self, context: PromptContext) -> str:
|
||
constraints = ["## 硬约束"]
|
||
role = context.role
|
||
if role == "executor":
|
||
constraints.extend([
|
||
"- 完成后必须标 review",
|
||
"- 产出物不能为空(系统会验证)",
|
||
"- handoff comment ≥ 50 字符",
|
||
])
|
||
elif role.startswith("reviewer"):
|
||
constraints.extend([
|
||
"- 审查结果必须明确 pass/fail",
|
||
"- 评审意见须附证据(文件:行号)",
|
||
])
|
||
elif role == "planner":
|
||
constraints.extend([
|
||
"- 需求不清时提问,不要猜",
|
||
"- 子任务必须有明确的终态定义",
|
||
])
|
||
else:
|
||
constraints.append("- 按规范完成 assigned 任务")
|
||
return "\n".join(constraints)
|
||
|
||
def should_include(self, context: PromptContext) -> bool:
|
||
return True
|
||
|
||
|
||
class TaskHandler(BaseTaskHandler):
|
||
"""黑板标准任务 handler。
|
||
|
||
- verify: 三信号检查(output / comment / terminal status)
|
||
- 成功 → review
|
||
- 失败 → 保持 working,让 ticker 重试
|
||
- review 完成 → 读取 verdict,approved 则 mark done
|
||
"""
|
||
|
||
task_type: str = "task"
|
||
virtual_project: Optional[str] = None
|
||
display_name = "黑板任务"
|
||
|
||
# === 子类实现 ===
|
||
|
||
def post_complete(self, task_id: str, agent_id: str,
|
||
outcome: str, db_path: Path) -> None:
|
||
"""Task on_complete:区分 executor 和 review。
|
||
|
||
executor: 基类统一流程(crash → verify → mark review)
|
||
review: handle_review_complete(读 verdict → done/keep review)
|
||
"""
|
||
# crash 处理(所有类型共用)
|
||
if outcome in self.CRASH_OUTCOMES:
|
||
self._rollback_current_agent(db_path, task_id, agent_id)
|
||
return
|
||
|
||
# 检查当前任务状态:如果是 review 状态 → review 完成流程
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT status FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
task_status = row["status"] if row else "unknown"
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
task_status = "unknown"
|
||
|
||
if task_status == "review":
|
||
# review 完成流程:只处理正常 outcome
|
||
if outcome in ("completed", "session_revived"):
|
||
self.handle_review_complete(task_id, db_path)
|
||
else:
|
||
logger.warning(
|
||
"Task %s: review agent %s abnormal outcome=%s, keeping review",
|
||
task_id, agent_id, outcome)
|
||
else:
|
||
# executor 完成流程:基类统一 verify → mark
|
||
result = self.verify_completion(task_id, db_path)
|
||
if result.passed:
|
||
self._mark_task_status(db_path, task_id, self.target_success_status())
|
||
logger.info("Task %s: verify passed (%s), marked %s",
|
||
task_id, result.reason, self.target_success_status())
|
||
else:
|
||
logger.info(
|
||
"Task %s: verify not passed (%s), leaving working",
|
||
task_id, result.reason)
|
||
# NOTE: executor verify 不通过时不标 failed,留 working。
|
||
# 原因:Agent 可能还在产出中(幻觉门控的后续轮次),
|
||
# ticker 超时检查会兜底处理。不调 on_failure 避免误判。
|
||
|
||
def target_success_status(self) -> str:
|
||
"""task 类型验证通过后进 review。"""
|
||
return "review"
|
||
|
||
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
|
||
"""三信号验证:output / comment / terminal status。"""
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
# 信号 1:terminal status
|
||
row = conn.execute(
|
||
"SELECT status FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
if not row:
|
||
return VerifyResult(False, "not_found", "task not found",
|
||
can_retry=False)
|
||
status = row["status"]
|
||
if status in TERMINAL_STATES:
|
||
return VerifyResult(
|
||
True, "terminal_status",
|
||
f"status={status}", can_retry=False
|
||
)
|
||
|
||
# 信号 2:outputs
|
||
output_count = conn.execute(
|
||
"SELECT COUNT(*) as cnt FROM outputs WHERE task_id=?",
|
||
(task_id,)
|
||
).fetchone()["cnt"]
|
||
if output_count > 0:
|
||
return VerifyResult(
|
||
True, "has_output",
|
||
f"output_count={output_count}"
|
||
)
|
||
|
||
# 信号 3:非 system 且内容 >= 50 字的 comment
|
||
comment_count = conn.execute(
|
||
"SELECT COUNT(*) as cnt FROM comments "
|
||
"WHERE task_id=? AND author != 'system' "
|
||
"AND LENGTH(content) >= 50",
|
||
(task_id,)
|
||
).fetchone()["cnt"]
|
||
if comment_count > 0:
|
||
return VerifyResult(
|
||
True, "has_comment",
|
||
f"comment_count={comment_count}"
|
||
)
|
||
|
||
# 无信号
|
||
return VerifyResult(
|
||
False, "no_signal",
|
||
f"output=0, comment=0, status={status}"
|
||
)
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.error("Task %s: verify error: %s", task_id, e)
|
||
return VerifyResult(False, "verify_error", str(e))
|
||
|
||
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
|
||
"""task 类型不需要 pre_spawn 逻辑。"""
|
||
return True
|
||
|
||
def get_sections(self) -> list:
|
||
"""返回 5 个 PromptSection 实例。"""
|
||
return [
|
||
TaskContextSection(),
|
||
PriorOutputsSection(),
|
||
RoleSkillSection(),
|
||
TaskApiSection(),
|
||
TaskConstraintsSection(),
|
||
]
|
||
|
||
def build_prompt(self, context: PromptContext) -> str:
|
||
"""通过 PromptComposer 拼装 prompt sections。"""
|
||
composer = PromptComposer()
|
||
composer.add_many(self.get_sections())
|
||
return composer.compose(context)
|
||
|
||
def on_failure(self, task_id: str, agent_id: str,
|
||
db_path: Path, verify: VerifyResult) -> None:
|
||
"""验证失败:不标 failed,保持 working 让 ticker 重试。"""
|
||
logger.info(
|
||
"Task %s: verify failed (%s, evidence=%s), leaving working for ticker retry",
|
||
task_id, verify.reason, verify.evidence
|
||
)
|
||
|
||
# === Review 流程 ===
|
||
|
||
def handle_review_complete(self, task_id: str, db_path: Path) -> None:
|
||
"""Review 完成后处理:读取 verdict → approved 则 mark done,
|
||
否则 @mention assignee via blackboard comment。"""
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
# 读取最新 review
|
||
review_row = conn.execute(
|
||
"SELECT verdict, reviewer, comment FROM reviews "
|
||
"WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
|
||
(task_id,)
|
||
).fetchone()
|
||
|
||
if not review_row:
|
||
logger.warning("Task %s: no review found", task_id)
|
||
return
|
||
|
||
verdict = review_row["verdict"]
|
||
reviewer = review_row["reviewer"]
|
||
review_comment = review_row["comment"] or ""
|
||
|
||
# 获取 assignee
|
||
task_row = conn.execute(
|
||
"SELECT assignee FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
if not task_row:
|
||
logger.warning("Task %s: task not found for review", task_id)
|
||
return
|
||
assignee = task_row["assignee"]
|
||
|
||
if verdict == "approved":
|
||
self._mark_task_status(db_path, task_id, "done")
|
||
logger.info("Task %s: review approved by %s, marked done",
|
||
task_id, reviewer)
|
||
else:
|
||
# 非 approved:通过 blackboard comment @mention assignee
|
||
# 保持 review 状态,让 assignee 自行决定下一步
|
||
conn.execute(
|
||
"INSERT INTO comments (task_id, author, content, comment_type) "
|
||
"VALUES (?, 'system', ?, 'review')",
|
||
(task_id,
|
||
f"@{assignee} review 未通过 (verdict={verdict}, "
|
||
f"reviewer={reviewer}): {review_comment}")
|
||
)
|
||
conn.commit()
|
||
logger.info(
|
||
"Task %s: review not approved (%s by %s), "
|
||
"@mentioned assignee %s, keeping review status",
|
||
task_id, verdict, reviewer, assignee
|
||
)
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.error("Task %s: handle_review_complete error: %s", task_id, e)
|