[moz] refactor(api): 拆分 blackboard_routes → task_routes + task_relation_routes + shared + expand 细粒度聚合
CI / lint (pull_request) Failing after 9s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 2s

This commit is contained in:
cfdaily
2026-06-14 14:02:59 +08:00
parent e70816a69f
commit 5db4c89fe7
5 changed files with 632 additions and 574 deletions
+315
View File
@@ -0,0 +1,315 @@
"""Task 核心路由 — CRUD、状态、归档"""
from __future__ import annotations
import json
import os
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException
from src.blackboard.models import Task
from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS
from src.utils import get_data_root
from src.api.shared import (
_bb,
_q,
_task_to_dict,
_validate_project,
_extract_mentions,
)
router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"])
# --------------------------------------------------------------------------- #
# Tasks
# --------------------------------------------------------------------------- #
@router.get("/tasks")
async def list_tasks(project_id: str,
status: Optional[str] = None,
assignee: Optional[str] = None,
parent_task: Optional[str] = None,
q: Optional[str] = None):
bb = _bb(project_id)
tasks = bb.list_tasks(status=status, assignee=assignee, parent_task=parent_task)
if q:
q_lower = q.lower()
tasks = [t for t in tasks if q_lower in (t.title or "").lower()]
return {"tasks": [_task_to_dict(t) for t in tasks]}
@router.get("/tasks/{task_id}")
async def get_task(project_id: str, task_id: str,
expand: Optional[str] = None):
bb = _bb(project_id)
task = bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task not found: {task_id}")
result = _task_to_dict(task)
if not expand:
return result
expand_list = expand.split(",") if expand != "all" else [
"comments", "outputs", "reviews", "events", "decisions"
]
q = _q(project_id)
if "comments" in expand_list:
all_comments = bb.get_comments(task_id)
result["comments"] = {
"items": [dict(c.__dict__) for c in all_comments[-20:]],
"total_count": len(all_comments),
"limit": 20,
}
if "events" in expand_list:
all_events = q.task_events(task_id, limit=99999)
result["events"] = {
"items": all_events[-30:],
"total_count": len(all_events),
"limit": 30,
}
if "outputs" in expand_list:
result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)]
if "reviews" in expand_list:
result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)]
if "decisions" in expand_list:
result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)]
return result
@router.post("/tasks")
async def create_task(project_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
# ID: 后端生成 {project_id_prefix}-{date}-{seq}
task_id = body.get("id")
if not task_id:
import re
prefix = re.sub(r'[^a-z0-9]', '-', project_id.lower()).strip('-')[:20]
date_str = datetime.now().strftime('%Y%m%d')
# seq: 查当前项目最大 seq
db_path = get_data_root() / project_id / "blackboard.db"
try:
conn = sqlite3.connect(str(db_path), timeout=5)
max_id_row = conn.execute(
"SELECT id FROM tasks WHERE id LIKE ? ORDER BY id DESC LIMIT 1",
(f"{prefix}-{date_str}-%",)
).fetchone()
conn.close()
seq = int(max_id_row[0].split('-')[-1]) + 1 if max_id_row else 1
except Exception:
seq = 1
task_id = f"{prefix}-{date_str}-{seq:04d}"
# Title: LLM 生成(如果前端没传)
title = body.get("title", "")
if not title or not title.strip():
desc = body.get("description", "") or ""
title = await _generate_title(desc) or ((desc[:30] + "") if len(desc) > 30 else (desc or "新军令"))
# #04: assignee 由 @mention 推断
description_text = body.get("description") or ""
description_mentions = _extract_mentions(description_text)
assignee = description_mentions[0] if description_mentions else None
# 兼容:显式传 assignee 且无 @mention 时保留(deprecated
if not assignee and body.get("assignee"):
assignee = body.get("assignee")
task = Task(
id=task_id, title=title,
description=body.get("description"),
task_type=body.get("task_type", None),
priority=body.get("priority", 5),
assignee=assignee,
assigned_by=body.get("assigned_by", "user"),
depends_on=json.dumps(
body["depends_on"]) if "depends_on" in body else None,
parent_task=body.get("parent_task"),
risk_level=body.get("risk_level", "standard"),
stage=body.get("stage"),
stages_json=body.get("stages_json", "[]"),
)
bb.create_task(task)
result = {"ok": True, "task_id": task.id, "title": task.title}
if body.get("assignee") and not description_mentions:
result["warning"] = "assignee parameter is deprecated, use @mention in description"
return result
async def _generate_title(description: str) -> str | None:
"""调用 LLM 生成简短标题"""
if not description or len(description) < 5:
return None
try:
from openai import OpenAI
import json as _json
# 从 OpenClaw 配置读 zhipu 凭据
base_url = "https://open.bigmodel.cn/api/paas/v4"
api_key = ""
model = "glm-4-flash"
oc_cfg = Path(os.environ.get(
"OPENCLAW_HOME", str(Path.home() / ".openclaw")
)) / "openclaw.json"
if oc_cfg.exists():
with open(oc_cfg) as f:
cfg = _json.load(f)
zhipu = cfg.get("models", {}).get("providers", {}).get("zhipu", {})
if zhipu.get("baseUrl"):
base_url = zhipu["baseUrl"]
if zhipu.get("apiKey"):
api_key = zhipu["apiKey"]
if not api_key:
return None # 没配 API key 就跳过
client = OpenAI(base_url=base_url, api_key=api_key)
resp = client.chat.completions.create(
model=model,
messages=[
{"role": "system",
"content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"},
{"role": "user", "content": description[:500]},
],
max_tokens=50,
temperature=0.3,
timeout=10,
)
title = resp.choices[0].message.content.strip().strip('"').strip("'")
if title and len(title) <= 30:
return title
except Exception as e:
import logging
logging.getLogger(
"moziplus-v2").warning(f"Title generation failed: {e}")
return None
@router.get("/tasks/{task_id}/progress")
async def task_progress(project_id: str, task_id: str):
"""Task Stage 进度(含子 Task 统计)"""
queries = _q(project_id)
progress = queries.parent_task_progress(task_id)
if not progress:
raise HTTPException(404, "Task not found")
return progress
@router.post("/tasks/{task_id}/claim")
async def claim_task(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
if not bb.claim_task(task_id, body["agent"]):
raise HTTPException(
409, "Claim failed (already claimed or wrong assignee)")
return {"ok": True}
@router.post("/tasks/{task_id}/status")
async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
old_task = bb.get_task(task_id)
if not old_task:
raise HTTPException(404, {
"error": "not_found",
"detail": f"Task {task_id} not found in project {project_id}",
})
new_status = body.get("status")
if not new_status:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: status",
"valid_values": {"status": sorted(VALID_STATUSES)},
})
# 检查转换是否合法
current = old_task.status
allowed = VALID_TRANSITIONS.get(current, set())
if new_status not in allowed:
raise HTTPException(409, {
"error": "invalid_transition",
"detail": f"Cannot transition from {current} to {new_status}",
"valid_transitions": {current: sorted(allowed)},
"hint": f"From '{current}', valid targets are: {sorted(allowed)}",
})
if not bb.update_task_status(task_id, new_status,
agent=body.get("agent")):
raise HTTPException(409, {
"error": "transition_failed",
"detail": f"Status update failed for {task_id}",
})
# SSE 推送
try:
from src.api.sse_routes import get_broker
broker = get_broker()
broker.publish_sync("task_updated", {
"project_id": project_id,
"task_id": task_id,
"old_status": current,
"new_status": new_status,
"agent": body.get("agent"),
})
except Exception:
pass
return {"ok": True, "old_status": current, "new_status": new_status}
@router.patch("/tasks/{task_id}")
async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]):
"""更新任务元数据(归档、标题等)"""
bb = _bb(project_id)
task = bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task {task_id} not found")
allowed = {"archived", "title", "description", "priority", "risk_level"}
updates = {k: v for k, v in body.items() if k in allowed}
if not updates:
return {"ok": True}
# 直接用 SQL 更新
conn = sqlite3.connect(str(bb.db_path), timeout=5)
try:
set_clause = ", ".join(f"{k}=?" for k in updates)
conn.execute(f"UPDATE tasks SET {set_clause}, updated_at=datetime('now') WHERE id=?",
(*updates.values(), task_id))
conn.commit()
finally:
conn.close()
return {"ok": True}
# --------------------------------------------------------------------------- #
# Archive (v2.8)
# --------------------------------------------------------------------------- #
@router.post("/tasks/{task_id}/archive")
async def archive_task(project_id: str, task_id: str,
body: Optional[Dict[str, Any]] = None):
bb = _bb(project_id)
task = bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task not found: {task_id}")
action = (body or {}).get("action", "archive")
if action == "unarchive":
bb.unarchive_task(task_id)
return {"ok": True, "action": "unarchived"}
else:
bb.archive_task(task_id)
return {"ok": True, "action": "archived"}
@router.post("/tasks/archive-done")
async def archive_done_tasks(project_id: str):
bb = _bb(project_id)
count = bb.archive_done_tasks()
return {"ok": True, "archived_count": count}