From 5db4c89fe70a6ebe782eb5bc50191a37159b57f5 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 14 Jun 2026 14:02:59 +0800 Subject: [PATCH 1/3] =?UTF-8?q?[moz]=20refactor(api):=20=E6=8B=86=E5=88=86?= =?UTF-8?q?=20blackboard=5Froutes=20=E2=86=92=20task=5Froutes=20+=20task?= =?UTF-8?q?=5Frelation=5Froutes=20+=20shared=20+=20expand=20=E7=BB=86?= =?UTF-8?q?=E7=B2=92=E5=BA=A6=E8=81=9A=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/api/blackboard_routes.py | 572 -------------------------------- src/api/shared.py | 73 ++++ src/api/task_relation_routes.py | 240 ++++++++++++++ src/api/task_routes.py | 315 ++++++++++++++++++ src/main.py | 6 +- 5 files changed, 632 insertions(+), 574 deletions(-) delete mode 100644 src/api/blackboard_routes.py create mode 100644 src/api/shared.py create mode 100644 src/api/task_relation_routes.py create mode 100644 src/api/task_routes.py diff --git a/src/api/blackboard_routes.py b/src/api/blackboard_routes.py deleted file mode 100644 index 89aecc6..0000000 --- a/src/api/blackboard_routes.py +++ /dev/null @@ -1,572 +0,0 @@ -"""API 路由 — 黑板 CRUD""" - -from __future__ import annotations - -import json -import os -from pathlib import Path -from typing import Any, Dict, Optional - -from fastapi import APIRouter, HTTPException, Query - -from src.blackboard.operations import Blackboard -from src.blackboard.models import Task, Review -from src.blackboard.queries import Queries -from src.blackboard.db import VALID_STATUSES, OUTPUT_TYPES -from src.blackboard.registry import ProjectRegistry - -from src.utils import get_data_root - -router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"]) - -# 虚拟项目白名单(不需要在 registry 注册) -_VIRTUAL_PROJECTS = frozenset({"_general", "_mail"}) - - -def _validate_project(project_id: str) -> str: - """校验 project_id,已知项目/虚拟项目放行,未知项目返回 400""" - if project_id in _VIRTUAL_PROJECTS: - return project_id - reg = ProjectRegistry(get_data_root()) - if reg.get_project(project_id): - return project_id - raise HTTPException(400, { - "ok": False, - "error": "project_not_found", - "detail": f"Project '{project_id}' is not registered.", - "suggestions": [ - f"Register first: POST /api/projects with id='{project_id}'", - "Or use '_general' for tasks without a specific project", - ], - }) - - -def _bb(project_id: str) -> Blackboard: - _validate_project(project_id) - return Blackboard(get_data_root() / project_id / "blackboard.db") - - -def _q(project_id: str) -> Queries: - _validate_project(project_id) - return Queries(get_data_root() / project_id / "blackboard.db") - - -# --- Tasks --- - -@router.get("/tasks") -async def list_tasks(project_id: str, - status: Optional[str] = None, - assignee: Optional[str] = None, - parent_task: Optional[str] = None): - bb = _bb(project_id) - tasks = bb.list_tasks( - status=status, - assignee=assignee, - parent_task=parent_task) - return {"tasks": [_task_to_dict(t) for t in tasks]} - - -@router.get("/tasks/{task_id}") -async def get_task(project_id: str, task_id: str, - expand: Optional[str] = None): - bb = _bb(project_id) - task = bb.get_task(task_id) - if not task: - raise HTTPException(404, f"Task not found: {task_id}") - result = _task_to_dict(task) - if expand == "all": - q = _q(project_id) - detail = q.task_detail(task_id) - if detail: - result["comments_count"] = detail.get("comments_count", 0) - result["outputs_count"] = detail.get("outputs_count", 0) - result["review_status"] = detail.get("review_status") - result["latest_event_detail"] = detail.get("latest_event_detail") - result["comments"] = [dict(c.__dict__) - for c in bb.get_comments(task_id)] - result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)] - result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)] - result["decisions"] = [dict(d.__dict__) - for d in bb.get_decisions(task_id)] - result["events"] = q.task_events(task_id) - result["experiences"] = q.task_experiences(task_id) - return result - - -@router.post("/tasks") -async def create_task(project_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - # ID: 后端生成 {project_id_prefix}-{date}-{seq} - task_id = body.get("id") - if not task_id: - import re - from datetime import datetime - prefix = re.sub(r'[^a-z0-9]', '-', project_id.lower()).strip('-')[:20] - date_str = datetime.now().strftime('%Y%m%d') - # seq: 查当前项目最大 seq - import sqlite3 - db_path = get_data_root() / project_id / "blackboard.db" - try: - conn = sqlite3.connect(str(db_path), timeout=5) - max_id_row = conn.execute( - "SELECT id FROM tasks WHERE id LIKE ? ORDER BY id DESC LIMIT 1", - (f"{prefix}-{date_str}-%",) - ).fetchone() - conn.close() - seq = int(max_id_row[0].split('-')[-1]) + 1 if max_id_row else 1 - except Exception: - seq = 1 - task_id = f"{prefix}-{date_str}-{seq:04d}" - - # Title: LLM 生成(如果前端没传) - title = body.get("title", "") - if not title or not title.strip(): - desc = body.get("description", "") or "" - title = await _generate_title(desc) or ((desc[:30] + "…") if len(desc) > 30 else (desc or "新军令")) - - # #04: assignee 由 @mention 推断 - description_text = body.get("description") or "" - description_mentions = _extract_mentions(description_text) - assignee = description_mentions[0] if description_mentions else None - # 兼容:显式传 assignee 且无 @mention 时保留(deprecated) - if not assignee and body.get("assignee"): - assignee = body.get("assignee") - - task = Task( - id=task_id, title=title, - description=body.get("description"), - task_type=body.get("task_type", None), - priority=body.get("priority", 5), - assignee=assignee, - assigned_by=body.get("assigned_by", "user"), - depends_on=json.dumps( - body["depends_on"]) if "depends_on" in body else None, - parent_task=body.get("parent_task"), - risk_level=body.get("risk_level", "standard"), - stage=body.get("stage"), - stages_json=body.get("stages_json", "[]"), - ) - bb.create_task(task) - result = {"ok": True, "task_id": task.id, "title": task.title} - if body.get("assignee") and not description_mentions: - result["warning"] = "assignee parameter is deprecated, use @mention in description" - return result - - -async def _generate_title(description: str) -> str | None: - """调用 LLM 生成简短标题""" - if not description or len(description) < 5: - return None - try: - from openai import OpenAI - import json as _json - # 从 OpenClaw 配置读 zhipu 凭据 - base_url = "https://open.bigmodel.cn/api/paas/v4" - api_key = "" - model = "glm-4-flash" - oc_cfg = Path(os.environ.get( - "OPENCLAW_HOME", str(Path.home() / ".openclaw") - )) / "openclaw.json" - if oc_cfg.exists(): - with open(oc_cfg) as f: - cfg = _json.load(f) - zhipu = cfg.get("models", {}).get("providers", {}).get("zhipu", {}) - if zhipu.get("baseUrl"): - base_url = zhipu["baseUrl"] - if zhipu.get("apiKey"): - api_key = zhipu["apiKey"] - if not api_key: - return None # 没配 API key 就跳过 - client = OpenAI(base_url=base_url, api_key=api_key) - resp = client.chat.completions.create( - model=model, - messages=[ - {"role": "system", - "content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"}, - {"role": "user", "content": description[:500]}, - ], - max_tokens=50, - temperature=0.3, - timeout=10, - ) - title = resp.choices[0].message.content.strip().strip('"').strip("'") - if title and len(title) <= 30: - return title - except Exception as e: - import logging - logging.getLogger( - "moziplus-v2").warning(f"Title generation failed: {e}") - return None - - -@router.get("/tasks/{task_id}/progress") -async def task_progress(project_id: str, task_id: str): - """Task Stage 进度(含子 Task 统计)""" - queries = _q(project_id) - progress = queries.parent_task_progress(task_id) - if not progress: - raise HTTPException(404, "Task not found") - return progress - - -@router.post("/tasks/{task_id}/claim") -async def claim_task(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - if not bb.claim_task(task_id, body["agent"]): - raise HTTPException( - 409, "Claim failed (already claimed or wrong assignee)") - return {"ok": True} - - -@router.post("/tasks/{task_id}/status") -async def update_status(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - old_task = bb.get_task(task_id) - if not old_task: - raise HTTPException(404, { - "error": "not_found", - "detail": f"Task {task_id} not found in project {project_id}", - }) - - new_status = body.get("status") - if not new_status: - raise HTTPException(422, { - "error": "validation_failed", - "detail": "Missing required field: status", - "valid_values": {"status": sorted(VALID_STATUSES)}, - }) - - # 检查转换是否合法 - from src.blackboard.db import VALID_TRANSITIONS - current = old_task.status - allowed = VALID_TRANSITIONS.get(current, set()) - if new_status not in allowed: - raise HTTPException(409, { - "error": "invalid_transition", - "detail": f"Cannot transition from {current} to {new_status}", - "valid_transitions": {current: sorted(allowed)}, - "hint": f"From '{current}', valid targets are: {sorted(allowed)}", - }) - - if not bb.update_task_status(task_id, new_status, - agent=body.get("agent")): - raise HTTPException(409, { - "error": "transition_failed", - "detail": f"Status update failed for {task_id}", - }) - - # SSE 推送 - try: - from src.api.sse_routes import get_broker - broker = get_broker() - broker.publish_sync("task_updated", { - "project_id": project_id, - "task_id": task_id, - "old_status": current, - "new_status": new_status, - "agent": body.get("agent"), - }) - except Exception: - pass - return {"ok": True, "old_status": current, "new_status": new_status} - - -# --- @mention 自动提取(#04) --- -_KNOWN_AGENT_IDS: list = [] - - -def _init_agent_ids(): - """从配置文件加载 Agent ID 列表""" - global _KNOWN_AGENT_IDS - if _KNOWN_AGENT_IDS: - return - try: - import yaml - cfg_path = os.path.join( - os.path.dirname(__file__), - "..", - "..", - "config", - "default.yaml") - with open(cfg_path) as f: - cfg = yaml.safe_load(f) - _KNOWN_AGENT_IDS = list( - cfg.get( - "daemon", - {}).get( - "agent_profiles", - {}).keys()) - except Exception: - _KNOWN_AGENT_IDS = [] - - -def _extract_mentions(text: str) -> list: - """从文本中自动提取 @agent-id 格式的 mention""" - import re - _init_agent_ids() - candidates = set( - re.findall( - r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)', - text)) - return [a for a in candidates if a in _KNOWN_AGENT_IDS] - - -# --- Comments --- - -@router.get("/tasks/{task_id}/comments") -async def get_comments(project_id: str, task_id: str, - comment_type: Optional[str] = None): - bb = _bb(project_id) - comments = bb.get_comments(task_id, comment_type=comment_type) - return {"comments": [dict(c.__dict__) for c in comments]} - - -@router.post("/tasks/{task_id}/comments") -async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - mentions_raw = body.get("mentions") - comment_body = body["body"] - - # #04: 自动从 body 提取 @mention,与显式传的 mentions 取并集 - auto_mentions = _extract_mentions(comment_body) - if isinstance(mentions_raw, str): - try: - explicit_mentions = json.loads(mentions_raw) - except Exception: - explicit_mentions = [] - elif isinstance(mentions_raw, list): - explicit_mentions = mentions_raw - else: - explicit_mentions = [] - merged_mentions = list(set(explicit_mentions + auto_mentions)) - - cid = bb.add_comment(task_id, body["author"], comment_body, - comment_type=body.get("comment_type", "general"), - mentions=merged_mentions) - if merged_mentions: - bb.record_mentions(cid, task_id, merged_mentions) - # #10: SSE 通知前端黑板有新 comment - try: - from src.api.sse_routes import get_broker - broker = get_broker() - broker.publish_sync("comment_added", { - "project_id": project_id, - "task_id": task_id, - "comment_id": cid, - "author": body["author"], - }) - except Exception: - pass - - return {"ok": True, "comment_id": cid, "mentions": merged_mentions} - - -# --- Outputs --- - -@router.get("/tasks/{task_id}/outputs") -async def get_outputs(project_id: str, task_id: str): - bb = _bb(project_id) - return {"outputs": [dict(o.__dict__) for o in bb.get_outputs(task_id)]} - - -@router.post("/tasks/{task_id}/outputs") -async def write_output(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - - # 字段校验 + Agent-friendly 错误 - agent = body.get("agent") - if not agent: - raise HTTPException(422, { - "error": "validation_failed", - "detail": "Missing required field: agent", - "hint": "Provide your agent ID, e.g. 'zhangfei-dev'", - }) - - # type 字段:接受 type 或 content_type(别名兼容) - output_type = body.get("type") or body.get("content_type") - valid_types = sorted(OUTPUT_TYPES) - if not output_type: - raise HTTPException(422, { - "error": "validation_failed", - "detail": "Missing required field: type", - "valid_values": {"type": valid_types}, - "hint": "Use 'type' field. Also accepts 'content_type' as alias.", - }) - if output_type not in OUTPUT_TYPES: - raise HTTPException(422, { - "error": "validation_failed", - "detail": f"Invalid type: '{output_type}'", - "valid_values": {"type": valid_types}, - }) - - title = body.get("title") - if not title: - raise HTTPException(422, { - "error": "validation_failed", - "detail": "Missing required field: title", - "hint": "Provide a brief title describing this output", - }) - - # 内容模式:content(直传)或 content_path(引用) - content = body.get("content") - content_path = body.get("content_path") or body.get("path") - - if content and not content_path: - # 内容直传模式:自动写文件 - import os - artifacts_dir = os.path.join( - os.path.dirname(bb.db_path), "artifacts", task_id - ) - os.makedirs(artifacts_dir, exist_ok=True) - # 安全文件名 - safe_name = "".join( - c if c.isalnum() or c in "._-" else "_" for c in title) - if not safe_name: - safe_name = "output" - file_path = os.path.join(artifacts_dir, safe_name) - with open(file_path, "w", encoding="utf-8") as f: - f.write(content) - content_path = file_path - - oid = bb.write_output( - task_id, agent, output_type, title, - content_path=content_path, - summary=body.get("summary"), - metadata=body.get("metadata"), - ) - return {"ok": True, "output_id": oid} - - -# --- Decisions --- - -@router.get("/tasks/{task_id}/decisions") -async def get_decisions(project_id: str, task_id: str): - bb = _bb(project_id) - return {"decisions": [dict(d.__dict__) for d in bb.get_decisions(task_id)]} - - -@router.post("/tasks/{task_id}/decisions") -async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - did = bb.add_decision(task_id, body["decider"], body["decision"], - body["rationale"], - alternatives=body.get("alternatives")) - return {"ok": True, "decision_id": did} - - -# --- Observations --- - -@router.post("/tasks/{task_id}/observations") -async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - oid = bb.add_observation(task_id, body["observer"], body["body"], - severity=body.get("severity", "info")) - return {"ok": True, "observation_id": oid} - - -# --- Reviews --- - -@router.get("/tasks/{task_id}/reviews") -async def get_reviews(project_id: str, task_id: str): - bb = _bb(project_id) - return {"reviews": [dict(r.__dict__) for r in bb.get_reviews(task_id)]} - - -@router.post("/tasks/{task_id}/reviews") -async def add_review(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - review = Review( - id=body["id"], task_id=task_id, reviewer=body["reviewer"], - review_type=body["review_type"], verdict=body["verdict"], - summary=body["summary"], confidence=body.get("confidence"), - round=body.get("round", 1), max_rounds=body.get("max_rounds", 3), - ) - bb.add_review(review) - return {"ok": True, "review_id": review.id} - - -@router.patch("/tasks/{task_id}") -async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]): - """更新任务元数据(归档、标题等)""" - bb = _bb(project_id) - task = bb.get_task(task_id) - if not task: - raise HTTPException(404, f"Task {task_id} not found") - allowed = {"archived", "title", "description", "priority", "risk_level"} - updates = {k: v for k, v in body.items() if k in allowed} - if not updates: - return {"ok": True} - # 直接用 SQL 更新 - import sqlite3 - conn = sqlite3.connect(str(bb.db_path), timeout=5) - try: - set_clause = ", ".join(f"{k}=?" for k in updates) - conn.execute(f"UPDATE tasks SET {set_clause}, updated_at=datetime('now') WHERE id=?", - (*updates.values(), task_id)) - conn.commit() - finally: - conn.close() - return {"ok": True} - - -# --- Per-task Events & Experiences --- - -@router.get("/tasks/{task_id}/events") -async def get_task_events(project_id: str, task_id: str, - limit: int = Query(50, le=200)): - q = _q(project_id) - return {"events": q.task_events(task_id, limit)} - - -@router.get("/tasks/{task_id}/experiences") -async def get_task_experiences(project_id: str, task_id: str): - q = _q(project_id) - return {"experiences": q.task_experiences(task_id)} - - -# --- Global Events --- - -@router.get("/events") -async def get_events(project_id: str, limit: int = Query(50, le=200)): - q = _q(project_id) - return {"events": q.recent_events(limit)} - - -# --- Summary --- - -@router.get("/summary") -async def task_summary(project_id: str): - q = _q(project_id) - return {"summary": q.task_summary()} - - -# --- Archive (v2.8) --- - -@router.post("/tasks/{task_id}/archive") -async def archive_task(project_id: str, task_id: str, - body: Optional[Dict[str, Any]] = None): - bb = _bb(project_id) - task = bb.get_task(task_id) - if not task: - raise HTTPException(404, f"Task not found: {task_id}") - action = (body or {}).get("action", "archive") - if action == "unarchive": - bb.unarchive_task(task_id) - return {"ok": True, "action": "unarchived"} - else: - bb.archive_task(task_id) - return {"ok": True, "action": "archived"} - - -@router.post("/tasks/archive-done") -async def archive_done_tasks(project_id: str): - bb = _bb(project_id) - count = bb.archive_done_tasks() - return {"ok": True, "archived_count": count} - - -# --- Helper --- - -def _task_to_dict(t: Task) -> Dict[str, Any]: - d = {k: v for k, v in t.__dict__.items() if v is not None} - return d diff --git a/src/api/shared.py b/src/api/shared.py new file mode 100644 index 0000000..8bddf0c --- /dev/null +++ b/src/api/shared.py @@ -0,0 +1,73 @@ +"""共享 helper 和常量""" + +from pathlib import Path +from typing import Any, Dict +from fastapi import HTTPException + +from src.blackboard.operations import Blackboard +from src.blackboard.queries import Queries +from src.blackboard.models import Task +from src.blackboard.registry import ProjectRegistry +from src.utils import get_data_root + +# 虚拟项目白名单 +_VIRTUAL_PROJECTS = frozenset({"_general", "_mail"}) + + +def _validate_project(project_id: str) -> str: + """校验 project_id""" + if project_id in _VIRTUAL_PROJECTS: + return project_id + reg = ProjectRegistry(get_data_root()) + if reg.get_project(project_id): + return project_id + raise HTTPException(400, { + "ok": False, + "error": "project_not_found", + "detail": f"Project '{project_id}' is not registered.", + "suggestions": [ + f"Register first: POST /api/projects with id='{project_id}'", + "Or use '_general' for tasks without a specific project", + ], + }) + + +def _bb(project_id: str) -> Blackboard: + _validate_project(project_id) + return Blackboard(get_data_root() / project_id / "blackboard.db") + + +def _q(project_id: str) -> Queries: + _validate_project(project_id) + return Queries(get_data_root() / project_id / "blackboard.db") + + +def _task_to_dict(t: Task) -> Dict[str, Any]: + d = {k: v for k, v in t.__dict__.items() if v is not None} + return d + + +_KNOWN_AGENT_IDS: list = [] + + +def _init_agent_ids(): + """从配置文件加载 Agent ID 列表""" + global _KNOWN_AGENT_IDS + if _KNOWN_AGENT_IDS: + return + try: + import yaml, os + cfg_path = os.path.join(os.path.dirname(__file__), "..", "..", "config", "default.yaml") + with open(cfg_path) as f: + cfg = yaml.safe_load(f) + _KNOWN_AGENT_IDS = list(cfg.get("daemon", {}).get("agent_profiles", {}).keys()) + except Exception: + _KNOWN_AGENT_IDS = [] + + +def _extract_mentions(text: str) -> list: + """从文本中自动提取 @agent-id 格式的 mention""" + import re + _init_agent_ids() + candidates = set(re.findall(r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)', text)) + return [a for a in candidates if a in _KNOWN_AGENT_IDS] diff --git a/src/api/task_relation_routes.py b/src/api/task_relation_routes.py new file mode 100644 index 0000000..58357e9 --- /dev/null +++ b/src/api/task_relation_routes.py @@ -0,0 +1,240 @@ +"""Task 关联路由 — comments / outputs / decisions / observations / reviews / events / experiences / summary""" + +from __future__ import annotations + +import json +import os +from typing import Any, Dict, Optional + +from fastapi import APIRouter, HTTPException, Query + +from src.blackboard.models import Review +from src.blackboard.db import OUTPUT_TYPES + +from src.api.shared import ( + _bb, + _q, + _extract_mentions, +) + +router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"]) + + +# --------------------------------------------------------------------------- # +# Comments +# --------------------------------------------------------------------------- # + +@router.get("/tasks/{task_id}/comments") +async def get_comments(project_id: str, task_id: str, + comment_type: Optional[str] = None): + bb = _bb(project_id) + comments = bb.get_comments(task_id, comment_type=comment_type) + return {"comments": [dict(c.__dict__) for c in comments]} + + +@router.post("/tasks/{task_id}/comments") +async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + mentions_raw = body.get("mentions") + comment_body = body["body"] + + # #04: 自动从 body 提取 @mention,与显式传的 mentions 取并集 + auto_mentions = _extract_mentions(comment_body) + if isinstance(mentions_raw, str): + try: + explicit_mentions = json.loads(mentions_raw) + except Exception: + explicit_mentions = [] + elif isinstance(mentions_raw, list): + explicit_mentions = mentions_raw + else: + explicit_mentions = [] + merged_mentions = list(set(explicit_mentions + auto_mentions)) + + cid = bb.add_comment(task_id, body["author"], comment_body, + comment_type=body.get("comment_type", "general"), + mentions=merged_mentions) + if merged_mentions: + bb.record_mentions(cid, task_id, merged_mentions) + # #10: SSE 通知前端黑板有新 comment + try: + from src.api.sse_routes import get_broker + broker = get_broker() + broker.publish_sync("comment_added", { + "project_id": project_id, + "task_id": task_id, + "comment_id": cid, + "author": body["author"], + }) + except Exception: + pass + + return {"ok": True, "comment_id": cid, "mentions": merged_mentions} + + +# --------------------------------------------------------------------------- # +# Outputs +# --------------------------------------------------------------------------- # + +@router.get("/tasks/{task_id}/outputs") +async def get_outputs(project_id: str, task_id: str): + bb = _bb(project_id) + return {"outputs": [dict(o.__dict__) for o in bb.get_outputs(task_id)]} + + +@router.post("/tasks/{task_id}/outputs") +async def write_output(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + + # 字段校验 + Agent-friendly 错误 + agent = body.get("agent") + if not agent: + raise HTTPException(422, { + "error": "validation_failed", + "detail": "Missing required field: agent", + "hint": "Provide your agent ID, e.g. 'zhangfei-dev'", + }) + + # type 字段:接受 type 或 content_type(别名兼容) + output_type = body.get("type") or body.get("content_type") + valid_types = sorted(OUTPUT_TYPES) + if not output_type: + raise HTTPException(422, { + "error": "validation_failed", + "detail": "Missing required field: type", + "valid_values": {"type": valid_types}, + "hint": "Use 'type' field. Also accepts 'content_type' as alias.", + }) + if output_type not in OUTPUT_TYPES: + raise HTTPException(422, { + "error": "validation_failed", + "detail": f"Invalid type: '{output_type}'", + "valid_values": {"type": valid_types}, + }) + + title = body.get("title") + if not title: + raise HTTPException(422, { + "error": "validation_failed", + "detail": "Missing required field: title", + "hint": "Provide a brief title describing this output", + }) + + # 内容模式:content(直传)或 content_path(引用) + content = body.get("content") + content_path = body.get("content_path") or body.get("path") + + if content and not content_path: + # 内容直传模式:自动写文件 + artifacts_dir = os.path.join( + os.path.dirname(bb.db_path), "artifacts", task_id + ) + os.makedirs(artifacts_dir, exist_ok=True) + # 安全文件名 + safe_name = "".join( + c if c.isalnum() or c in "._-" else "_" for c in title) + if not safe_name: + safe_name = "output" + file_path = os.path.join(artifacts_dir, safe_name) + with open(file_path, "w", encoding="utf-8") as f: + f.write(content) + content_path = file_path + + oid = bb.write_output( + task_id, agent, output_type, title, + content_path=content_path, + summary=body.get("summary"), + metadata=body.get("metadata"), + ) + return {"ok": True, "output_id": oid} + + +# --------------------------------------------------------------------------- # +# Decisions +# --------------------------------------------------------------------------- # + +@router.get("/tasks/{task_id}/decisions") +async def get_decisions(project_id: str, task_id: str): + bb = _bb(project_id) + return {"decisions": [dict(d.__dict__) for d in bb.get_decisions(task_id)]} + + +@router.post("/tasks/{task_id}/decisions") +async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + did = bb.add_decision(task_id, body["decider"], body["decision"], + body["rationale"], + alternatives=body.get("alternatives")) + return {"ok": True, "decision_id": did} + + +# --------------------------------------------------------------------------- # +# Observations +# --------------------------------------------------------------------------- # + +@router.post("/tasks/{task_id}/observations") +async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + oid = bb.add_observation(task_id, body["observer"], body["body"], + severity=body.get("severity", "info")) + return {"ok": True, "observation_id": oid} + + +# --------------------------------------------------------------------------- # +# Reviews +# --------------------------------------------------------------------------- # + +@router.get("/tasks/{task_id}/reviews") +async def get_reviews(project_id: str, task_id: str): + bb = _bb(project_id) + return {"reviews": [dict(r.__dict__) for r in bb.get_reviews(task_id)]} + + +@router.post("/tasks/{task_id}/reviews") +async def add_review(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + review = Review( + id=body["id"], task_id=task_id, reviewer=body["reviewer"], + review_type=body["review_type"], verdict=body["verdict"], + summary=body["summary"], confidence=body.get("confidence"), + round=body.get("round", 1), max_rounds=body.get("max_rounds", 3), + ) + bb.add_review(review) + return {"ok": True, "review_id": review.id} + + +# --------------------------------------------------------------------------- # +# Per-task Events & Experiences +# --------------------------------------------------------------------------- # + +@router.get("/tasks/{task_id}/events") +async def get_task_events(project_id: str, task_id: str, + limit: int = Query(50, le=200)): + q = _q(project_id) + return {"events": q.task_events(task_id, limit)} + + +@router.get("/tasks/{task_id}/experiences") +async def get_task_experiences(project_id: str, task_id: str): + q = _q(project_id) + return {"experiences": q.task_experiences(task_id)} + + +# --------------------------------------------------------------------------- # +# Global Events +# --------------------------------------------------------------------------- # + +@router.get("/events") +async def get_events(project_id: str, limit: int = Query(50, le=200)): + q = _q(project_id) + return {"events": q.recent_events(limit)} + + +# --------------------------------------------------------------------------- # +# Summary +# --------------------------------------------------------------------------- # + +@router.get("/summary") +async def task_summary(project_id: str): + q = _q(project_id) + return {"summary": q.task_summary()} diff --git a/src/api/task_routes.py b/src/api/task_routes.py new file mode 100644 index 0000000..a0e04af --- /dev/null +++ b/src/api/task_routes.py @@ -0,0 +1,315 @@ +"""Task 核心路由 — CRUD、状态、归档""" + +from __future__ import annotations + +import json +import os +import sqlite3 +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional + +from fastapi import APIRouter, HTTPException + +from src.blackboard.models import Task +from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS +from src.utils import get_data_root + +from src.api.shared import ( + _bb, + _q, + _task_to_dict, + _validate_project, + _extract_mentions, +) + +router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"]) + + +# --------------------------------------------------------------------------- # +# Tasks +# --------------------------------------------------------------------------- # + +@router.get("/tasks") +async def list_tasks(project_id: str, + status: Optional[str] = None, + assignee: Optional[str] = None, + parent_task: Optional[str] = None, + q: Optional[str] = None): + bb = _bb(project_id) + tasks = bb.list_tasks(status=status, assignee=assignee, parent_task=parent_task) + if q: + q_lower = q.lower() + tasks = [t for t in tasks if q_lower in (t.title or "").lower()] + return {"tasks": [_task_to_dict(t) for t in tasks]} + + +@router.get("/tasks/{task_id}") +async def get_task(project_id: str, task_id: str, + expand: Optional[str] = None): + bb = _bb(project_id) + task = bb.get_task(task_id) + if not task: + raise HTTPException(404, f"Task not found: {task_id}") + result = _task_to_dict(task) + + if not expand: + return result + + expand_list = expand.split(",") if expand != "all" else [ + "comments", "outputs", "reviews", "events", "decisions" + ] + + q = _q(project_id) + + if "comments" in expand_list: + all_comments = bb.get_comments(task_id) + result["comments"] = { + "items": [dict(c.__dict__) for c in all_comments[-20:]], + "total_count": len(all_comments), + "limit": 20, + } + + if "events" in expand_list: + all_events = q.task_events(task_id, limit=99999) + result["events"] = { + "items": all_events[-30:], + "total_count": len(all_events), + "limit": 30, + } + + if "outputs" in expand_list: + result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)] + + if "reviews" in expand_list: + result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)] + + if "decisions" in expand_list: + result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)] + + return result + + +@router.post("/tasks") +async def create_task(project_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + # ID: 后端生成 {project_id_prefix}-{date}-{seq} + task_id = body.get("id") + if not task_id: + import re + prefix = re.sub(r'[^a-z0-9]', '-', project_id.lower()).strip('-')[:20] + date_str = datetime.now().strftime('%Y%m%d') + # seq: 查当前项目最大 seq + db_path = get_data_root() / project_id / "blackboard.db" + try: + conn = sqlite3.connect(str(db_path), timeout=5) + max_id_row = conn.execute( + "SELECT id FROM tasks WHERE id LIKE ? ORDER BY id DESC LIMIT 1", + (f"{prefix}-{date_str}-%",) + ).fetchone() + conn.close() + seq = int(max_id_row[0].split('-')[-1]) + 1 if max_id_row else 1 + except Exception: + seq = 1 + task_id = f"{prefix}-{date_str}-{seq:04d}" + + # Title: LLM 生成(如果前端没传) + title = body.get("title", "") + if not title or not title.strip(): + desc = body.get("description", "") or "" + title = await _generate_title(desc) or ((desc[:30] + "…") if len(desc) > 30 else (desc or "新军令")) + + # #04: assignee 由 @mention 推断 + description_text = body.get("description") or "" + description_mentions = _extract_mentions(description_text) + assignee = description_mentions[0] if description_mentions else None + # 兼容:显式传 assignee 且无 @mention 时保留(deprecated) + if not assignee and body.get("assignee"): + assignee = body.get("assignee") + + task = Task( + id=task_id, title=title, + description=body.get("description"), + task_type=body.get("task_type", None), + priority=body.get("priority", 5), + assignee=assignee, + assigned_by=body.get("assigned_by", "user"), + depends_on=json.dumps( + body["depends_on"]) if "depends_on" in body else None, + parent_task=body.get("parent_task"), + risk_level=body.get("risk_level", "standard"), + stage=body.get("stage"), + stages_json=body.get("stages_json", "[]"), + ) + bb.create_task(task) + result = {"ok": True, "task_id": task.id, "title": task.title} + if body.get("assignee") and not description_mentions: + result["warning"] = "assignee parameter is deprecated, use @mention in description" + return result + + +async def _generate_title(description: str) -> str | None: + """调用 LLM 生成简短标题""" + if not description or len(description) < 5: + return None + try: + from openai import OpenAI + import json as _json + # 从 OpenClaw 配置读 zhipu 凭据 + base_url = "https://open.bigmodel.cn/api/paas/v4" + api_key = "" + model = "glm-4-flash" + oc_cfg = Path(os.environ.get( + "OPENCLAW_HOME", str(Path.home() / ".openclaw") + )) / "openclaw.json" + if oc_cfg.exists(): + with open(oc_cfg) as f: + cfg = _json.load(f) + zhipu = cfg.get("models", {}).get("providers", {}).get("zhipu", {}) + if zhipu.get("baseUrl"): + base_url = zhipu["baseUrl"] + if zhipu.get("apiKey"): + api_key = zhipu["apiKey"] + if not api_key: + return None # 没配 API key 就跳过 + client = OpenAI(base_url=base_url, api_key=api_key) + resp = client.chat.completions.create( + model=model, + messages=[ + {"role": "system", + "content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"}, + {"role": "user", "content": description[:500]}, + ], + max_tokens=50, + temperature=0.3, + timeout=10, + ) + title = resp.choices[0].message.content.strip().strip('"').strip("'") + if title and len(title) <= 30: + return title + except Exception as e: + import logging + logging.getLogger( + "moziplus-v2").warning(f"Title generation failed: {e}") + return None + + +@router.get("/tasks/{task_id}/progress") +async def task_progress(project_id: str, task_id: str): + """Task Stage 进度(含子 Task 统计)""" + queries = _q(project_id) + progress = queries.parent_task_progress(task_id) + if not progress: + raise HTTPException(404, "Task not found") + return progress + + +@router.post("/tasks/{task_id}/claim") +async def claim_task(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + if not bb.claim_task(task_id, body["agent"]): + raise HTTPException( + 409, "Claim failed (already claimed or wrong assignee)") + return {"ok": True} + + +@router.post("/tasks/{task_id}/status") +async def update_status(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + old_task = bb.get_task(task_id) + if not old_task: + raise HTTPException(404, { + "error": "not_found", + "detail": f"Task {task_id} not found in project {project_id}", + }) + + new_status = body.get("status") + if not new_status: + raise HTTPException(422, { + "error": "validation_failed", + "detail": "Missing required field: status", + "valid_values": {"status": sorted(VALID_STATUSES)}, + }) + + # 检查转换是否合法 + current = old_task.status + allowed = VALID_TRANSITIONS.get(current, set()) + if new_status not in allowed: + raise HTTPException(409, { + "error": "invalid_transition", + "detail": f"Cannot transition from {current} to {new_status}", + "valid_transitions": {current: sorted(allowed)}, + "hint": f"From '{current}', valid targets are: {sorted(allowed)}", + }) + + if not bb.update_task_status(task_id, new_status, + agent=body.get("agent")): + raise HTTPException(409, { + "error": "transition_failed", + "detail": f"Status update failed for {task_id}", + }) + + # SSE 推送 + try: + from src.api.sse_routes import get_broker + broker = get_broker() + broker.publish_sync("task_updated", { + "project_id": project_id, + "task_id": task_id, + "old_status": current, + "new_status": new_status, + "agent": body.get("agent"), + }) + except Exception: + pass + return {"ok": True, "old_status": current, "new_status": new_status} + + +@router.patch("/tasks/{task_id}") +async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]): + """更新任务元数据(归档、标题等)""" + bb = _bb(project_id) + task = bb.get_task(task_id) + if not task: + raise HTTPException(404, f"Task {task_id} not found") + allowed = {"archived", "title", "description", "priority", "risk_level"} + updates = {k: v for k, v in body.items() if k in allowed} + if not updates: + return {"ok": True} + # 直接用 SQL 更新 + conn = sqlite3.connect(str(bb.db_path), timeout=5) + try: + set_clause = ", ".join(f"{k}=?" for k in updates) + conn.execute(f"UPDATE tasks SET {set_clause}, updated_at=datetime('now') WHERE id=?", + (*updates.values(), task_id)) + conn.commit() + finally: + conn.close() + return {"ok": True} + + +# --------------------------------------------------------------------------- # +# Archive (v2.8) +# --------------------------------------------------------------------------- # + +@router.post("/tasks/{task_id}/archive") +async def archive_task(project_id: str, task_id: str, + body: Optional[Dict[str, Any]] = None): + bb = _bb(project_id) + task = bb.get_task(task_id) + if not task: + raise HTTPException(404, f"Task not found: {task_id}") + action = (body or {}).get("action", "archive") + if action == "unarchive": + bb.unarchive_task(task_id) + return {"ok": True, "action": "unarchived"} + else: + bb.archive_task(task_id) + return {"ok": True, "action": "archived"} + + +@router.post("/tasks/archive-done") +async def archive_done_tasks(project_id: str): + bb = _bb(project_id) + count = bb.archive_done_tasks() + return {"ok": True, "archived_count": count} diff --git a/src/main.py b/src/main.py index 7a167df..7f97dfa 100644 --- a/src/main.py +++ b/src/main.py @@ -7,7 +7,8 @@ from src.api.sse_routes import router as sse_router from src.api.project_routes import router as project_router from src.api.daemon_routes import router as daemon_router from src.api.checkpoint_routes import router as checkpoint_router -from src.api.blackboard_routes import router as blackboard_router +from src.api.task_routes import router as task_router +from src.api.task_relation_routes import router as task_relation_router import logging from contextlib import asynccontextmanager @@ -273,7 +274,8 @@ app.add_middleware( # --------------------------------------------------------------------------- -app.include_router(blackboard_router) +app.include_router(task_router) +app.include_router(task_relation_router) app.include_router(checkpoint_router) app.include_router(daemon_router) app.include_router(project_router) -- 2.45.4 From d09fd4a173fa40b2cc318e3a9e9da309d64b8805 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 14 Jun 2026 14:20:24 +0800 Subject: [PATCH 2/3] =?UTF-8?q?[moz]=20fix(api):=20flake8=20lint=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20=E2=80=94=20=E7=A7=BB=E9=99=A4=E6=9C=AA?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=20import?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/api/shared.py | 4 ++-- src/api/task_routes.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/api/shared.py b/src/api/shared.py index 8bddf0c..0b4d161 100644 --- a/src/api/shared.py +++ b/src/api/shared.py @@ -1,6 +1,5 @@ """共享 helper 和常量""" -from pathlib import Path from typing import Any, Dict from fastapi import HTTPException @@ -56,7 +55,8 @@ def _init_agent_ids(): if _KNOWN_AGENT_IDS: return try: - import yaml, os + import yaml + import os cfg_path = os.path.join(os.path.dirname(__file__), "..", "..", "config", "default.yaml") with open(cfg_path) as f: cfg = yaml.safe_load(f) diff --git a/src/api/task_routes.py b/src/api/task_routes.py index a0e04af..9071619 100644 --- a/src/api/task_routes.py +++ b/src/api/task_routes.py @@ -19,7 +19,6 @@ from src.api.shared import ( _bb, _q, _task_to_dict, - _validate_project, _extract_mentions, ) -- 2.45.4 From cc2e5aa64caa1741e68171bc72a6667406528c5a Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 14 Jun 2026 14:22:14 +0800 Subject: [PATCH 3/3] =?UTF-8?q?[moz]=20fix(api):=20Review=20M1=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=20=E2=80=94=20expand=3Dall=20=E4=BF=9D=E6=8C=81?= =?UTF-8?q?=E6=97=A7=E6=A0=BC=E5=BC=8F=20+=20=5Ftoolchain=20=E5=8A=A0?= =?UTF-8?q?=E5=85=A5=20=5FVIRTUAL=5FPROJECTS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - M1: expand=all 保持旧 list 格式(向后兼容 TaskModal .map()/.length) - 细粒度 expand=comments,events 用新 {items,total_count,limit} 格式 - S1(PR#73): _toolchain 加入 _VIRTUAL_PROJECTS - S1(PR#72): 移除 _validate_project 未使用 import --- src/api/shared.py | 2 +- src/api/task_routes.py | 23 ++++++++++++++++++++--- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/api/shared.py b/src/api/shared.py index 0b4d161..77d2d5b 100644 --- a/src/api/shared.py +++ b/src/api/shared.py @@ -10,7 +10,7 @@ from src.blackboard.registry import ProjectRegistry from src.utils import get_data_root # 虚拟项目白名单 -_VIRTUAL_PROJECTS = frozenset({"_general", "_mail"}) +_VIRTUAL_PROJECTS = frozenset({"_general", "_mail", "_toolchain"}) def _validate_project(project_id: str) -> str: diff --git a/src/api/task_routes.py b/src/api/task_routes.py index 9071619..d3a71e5 100644 --- a/src/api/task_routes.py +++ b/src/api/task_routes.py @@ -55,10 +55,27 @@ async def get_task(project_id: str, task_id: str, if not expand: return result - expand_list = expand.split(",") if expand != "all" else [ - "comments", "outputs", "reviews", "events", "decisions" - ] + # expand=all: 保持旧格式(list + 聚合字段),向后兼容前端 TaskModal + if expand == "all": + q = _q(project_id) + detail = q.task_detail(task_id) + if detail: + result["comments_count"] = detail.get("comments_count", 0) + result["outputs_count"] = detail.get("outputs_count", 0) + result["review_status"] = detail.get("review_status") + result["latest_event_detail"] = detail.get("latest_event_detail") + result["comments"] = [dict(c.__dict__) + for c in bb.get_comments(task_id)] + result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)] + result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)] + result["decisions"] = [dict(d.__dict__) + for d in bb.get_decisions(task_id)] + result["events"] = q.task_events(task_id) + result["experiences"] = q.task_experiences(task_id) + return result + # 细粒度 expand: 新格式(comments/events 带 limit + total_count) + expand_list = expand.split(",") q = _q(project_id) if "comments" in expand_list: -- 2.45.4