diff --git a/src/api/shared.py b/src/api/shared.py new file mode 100644 index 0000000..77d2d5b --- /dev/null +++ b/src/api/shared.py @@ -0,0 +1,73 @@ +"""共享 helper 和常量""" + +from typing import Any, Dict +from fastapi import HTTPException + +from src.blackboard.operations import Blackboard +from src.blackboard.queries import Queries +from src.blackboard.models import Task +from src.blackboard.registry import ProjectRegistry +from src.utils import get_data_root + +# 虚拟项目白名单 +_VIRTUAL_PROJECTS = frozenset({"_general", "_mail", "_toolchain"}) + + +def _validate_project(project_id: str) -> str: + """校验 project_id""" + if project_id in _VIRTUAL_PROJECTS: + return project_id + reg = ProjectRegistry(get_data_root()) + if reg.get_project(project_id): + return project_id + raise HTTPException(400, { + "ok": False, + "error": "project_not_found", + "detail": f"Project '{project_id}' is not registered.", + "suggestions": [ + f"Register first: POST /api/projects with id='{project_id}'", + "Or use '_general' for tasks without a specific project", + ], + }) + + +def _bb(project_id: str) -> Blackboard: + _validate_project(project_id) + return Blackboard(get_data_root() / project_id / "blackboard.db") + + +def _q(project_id: str) -> Queries: + _validate_project(project_id) + return Queries(get_data_root() / project_id / "blackboard.db") + + +def _task_to_dict(t: Task) -> Dict[str, Any]: + d = {k: v for k, v in t.__dict__.items() if v is not None} + return d + + +_KNOWN_AGENT_IDS: list = [] + + +def _init_agent_ids(): + """从配置文件加载 Agent ID 列表""" + global _KNOWN_AGENT_IDS + if _KNOWN_AGENT_IDS: + return + try: + import yaml + import os + cfg_path = os.path.join(os.path.dirname(__file__), "..", "..", "config", "default.yaml") + with open(cfg_path) as f: + cfg = yaml.safe_load(f) + _KNOWN_AGENT_IDS = list(cfg.get("daemon", {}).get("agent_profiles", {}).keys()) + except Exception: + _KNOWN_AGENT_IDS = [] + + +def _extract_mentions(text: str) -> list: + """从文本中自动提取 @agent-id 格式的 mention""" + import re + _init_agent_ids() + candidates = set(re.findall(r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)', text)) + return [a for a in candidates if a in _KNOWN_AGENT_IDS] diff --git a/src/api/task_relation_routes.py b/src/api/task_relation_routes.py new file mode 100644 index 0000000..58357e9 --- /dev/null +++ b/src/api/task_relation_routes.py @@ -0,0 +1,240 @@ +"""Task 关联路由 — comments / outputs / decisions / observations / reviews / events / experiences / summary""" + +from __future__ import annotations + +import json +import os +from typing import Any, Dict, Optional + +from fastapi import APIRouter, HTTPException, Query + +from src.blackboard.models import Review +from src.blackboard.db import OUTPUT_TYPES + +from src.api.shared import ( + _bb, + _q, + _extract_mentions, +) + +router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"]) + + +# --------------------------------------------------------------------------- # +# Comments +# --------------------------------------------------------------------------- # + +@router.get("/tasks/{task_id}/comments") +async def get_comments(project_id: str, task_id: str, + comment_type: Optional[str] = None): + bb = _bb(project_id) + comments = bb.get_comments(task_id, comment_type=comment_type) + return {"comments": [dict(c.__dict__) for c in comments]} + + +@router.post("/tasks/{task_id}/comments") +async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + mentions_raw = body.get("mentions") + comment_body = body["body"] + + # #04: 自动从 body 提取 @mention,与显式传的 mentions 取并集 + auto_mentions = _extract_mentions(comment_body) + if isinstance(mentions_raw, str): + try: + explicit_mentions = json.loads(mentions_raw) + except Exception: + explicit_mentions = [] + elif isinstance(mentions_raw, list): + explicit_mentions = mentions_raw + else: + explicit_mentions = [] + merged_mentions = list(set(explicit_mentions + auto_mentions)) + + cid = bb.add_comment(task_id, body["author"], comment_body, + comment_type=body.get("comment_type", "general"), + mentions=merged_mentions) + if merged_mentions: + bb.record_mentions(cid, task_id, merged_mentions) + # #10: SSE 通知前端黑板有新 comment + try: + from src.api.sse_routes import get_broker + broker = get_broker() + broker.publish_sync("comment_added", { + "project_id": project_id, + "task_id": task_id, + "comment_id": cid, + "author": body["author"], + }) + except Exception: + pass + + return {"ok": True, "comment_id": cid, "mentions": merged_mentions} + + +# --------------------------------------------------------------------------- # +# Outputs +# --------------------------------------------------------------------------- # + +@router.get("/tasks/{task_id}/outputs") +async def get_outputs(project_id: str, task_id: str): + bb = _bb(project_id) + return {"outputs": [dict(o.__dict__) for o in bb.get_outputs(task_id)]} + + +@router.post("/tasks/{task_id}/outputs") +async def write_output(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + + # 字段校验 + Agent-friendly 错误 + agent = body.get("agent") + if not agent: + raise HTTPException(422, { + "error": "validation_failed", + "detail": "Missing required field: agent", + "hint": "Provide your agent ID, e.g. 'zhangfei-dev'", + }) + + # type 字段:接受 type 或 content_type(别名兼容) + output_type = body.get("type") or body.get("content_type") + valid_types = sorted(OUTPUT_TYPES) + if not output_type: + raise HTTPException(422, { + "error": "validation_failed", + "detail": "Missing required field: type", + "valid_values": {"type": valid_types}, + "hint": "Use 'type' field. Also accepts 'content_type' as alias.", + }) + if output_type not in OUTPUT_TYPES: + raise HTTPException(422, { + "error": "validation_failed", + "detail": f"Invalid type: '{output_type}'", + "valid_values": {"type": valid_types}, + }) + + title = body.get("title") + if not title: + raise HTTPException(422, { + "error": "validation_failed", + "detail": "Missing required field: title", + "hint": "Provide a brief title describing this output", + }) + + # 内容模式:content(直传)或 content_path(引用) + content = body.get("content") + content_path = body.get("content_path") or body.get("path") + + if content and not content_path: + # 内容直传模式:自动写文件 + artifacts_dir = os.path.join( + os.path.dirname(bb.db_path), "artifacts", task_id + ) + os.makedirs(artifacts_dir, exist_ok=True) + # 安全文件名 + safe_name = "".join( + c if c.isalnum() or c in "._-" else "_" for c in title) + if not safe_name: + safe_name = "output" + file_path = os.path.join(artifacts_dir, safe_name) + with open(file_path, "w", encoding="utf-8") as f: + f.write(content) + content_path = file_path + + oid = bb.write_output( + task_id, agent, output_type, title, + content_path=content_path, + summary=body.get("summary"), + metadata=body.get("metadata"), + ) + return {"ok": True, "output_id": oid} + + +# --------------------------------------------------------------------------- # +# Decisions +# --------------------------------------------------------------------------- # + +@router.get("/tasks/{task_id}/decisions") +async def get_decisions(project_id: str, task_id: str): + bb = _bb(project_id) + return {"decisions": [dict(d.__dict__) for d in bb.get_decisions(task_id)]} + + +@router.post("/tasks/{task_id}/decisions") +async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + did = bb.add_decision(task_id, body["decider"], body["decision"], + body["rationale"], + alternatives=body.get("alternatives")) + return {"ok": True, "decision_id": did} + + +# --------------------------------------------------------------------------- # +# Observations +# --------------------------------------------------------------------------- # + +@router.post("/tasks/{task_id}/observations") +async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + oid = bb.add_observation(task_id, body["observer"], body["body"], + severity=body.get("severity", "info")) + return {"ok": True, "observation_id": oid} + + +# --------------------------------------------------------------------------- # +# Reviews +# --------------------------------------------------------------------------- # + +@router.get("/tasks/{task_id}/reviews") +async def get_reviews(project_id: str, task_id: str): + bb = _bb(project_id) + return {"reviews": [dict(r.__dict__) for r in bb.get_reviews(task_id)]} + + +@router.post("/tasks/{task_id}/reviews") +async def add_review(project_id: str, task_id: str, body: Dict[str, Any]): + bb = _bb(project_id) + review = Review( + id=body["id"], task_id=task_id, reviewer=body["reviewer"], + review_type=body["review_type"], verdict=body["verdict"], + summary=body["summary"], confidence=body.get("confidence"), + round=body.get("round", 1), max_rounds=body.get("max_rounds", 3), + ) + bb.add_review(review) + return {"ok": True, "review_id": review.id} + + +# --------------------------------------------------------------------------- # +# Per-task Events & Experiences +# --------------------------------------------------------------------------- # + +@router.get("/tasks/{task_id}/events") +async def get_task_events(project_id: str, task_id: str, + limit: int = Query(50, le=200)): + q = _q(project_id) + return {"events": q.task_events(task_id, limit)} + + +@router.get("/tasks/{task_id}/experiences") +async def get_task_experiences(project_id: str, task_id: str): + q = _q(project_id) + return {"experiences": q.task_experiences(task_id)} + + +# --------------------------------------------------------------------------- # +# Global Events +# --------------------------------------------------------------------------- # + +@router.get("/events") +async def get_events(project_id: str, limit: int = Query(50, le=200)): + q = _q(project_id) + return {"events": q.recent_events(limit)} + + +# --------------------------------------------------------------------------- # +# Summary +# --------------------------------------------------------------------------- # + +@router.get("/summary") +async def task_summary(project_id: str): + q = _q(project_id) + return {"summary": q.task_summary()} diff --git a/src/api/blackboard_routes.py b/src/api/task_routes.py similarity index 50% rename from src/api/blackboard_routes.py rename to src/api/task_routes.py index 89aecc6..d3a71e5 100644 --- a/src/api/blackboard_routes.py +++ b/src/api/task_routes.py @@ -1,68 +1,45 @@ -"""API 路由 — 黑板 CRUD""" +"""Task 核心路由 — CRUD、状态、归档""" from __future__ import annotations import json import os +import sqlite3 +from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional -from fastapi import APIRouter, HTTPException, Query - -from src.blackboard.operations import Blackboard -from src.blackboard.models import Task, Review -from src.blackboard.queries import Queries -from src.blackboard.db import VALID_STATUSES, OUTPUT_TYPES -from src.blackboard.registry import ProjectRegistry +from fastapi import APIRouter, HTTPException +from src.blackboard.models import Task +from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS from src.utils import get_data_root +from src.api.shared import ( + _bb, + _q, + _task_to_dict, + _extract_mentions, +) + router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"]) -# 虚拟项目白名单(不需要在 registry 注册) -_VIRTUAL_PROJECTS = frozenset({"_general", "_mail"}) - -def _validate_project(project_id: str) -> str: - """校验 project_id,已知项目/虚拟项目放行,未知项目返回 400""" - if project_id in _VIRTUAL_PROJECTS: - return project_id - reg = ProjectRegistry(get_data_root()) - if reg.get_project(project_id): - return project_id - raise HTTPException(400, { - "ok": False, - "error": "project_not_found", - "detail": f"Project '{project_id}' is not registered.", - "suggestions": [ - f"Register first: POST /api/projects with id='{project_id}'", - "Or use '_general' for tasks without a specific project", - ], - }) - - -def _bb(project_id: str) -> Blackboard: - _validate_project(project_id) - return Blackboard(get_data_root() / project_id / "blackboard.db") - - -def _q(project_id: str) -> Queries: - _validate_project(project_id) - return Queries(get_data_root() / project_id / "blackboard.db") - - -# --- Tasks --- +# --------------------------------------------------------------------------- # +# Tasks +# --------------------------------------------------------------------------- # @router.get("/tasks") async def list_tasks(project_id: str, status: Optional[str] = None, assignee: Optional[str] = None, - parent_task: Optional[str] = None): + parent_task: Optional[str] = None, + q: Optional[str] = None): bb = _bb(project_id) - tasks = bb.list_tasks( - status=status, - assignee=assignee, - parent_task=parent_task) + tasks = bb.list_tasks(status=status, assignee=assignee, parent_task=parent_task) + if q: + q_lower = q.lower() + tasks = [t for t in tasks if q_lower in (t.title or "").lower()] return {"tasks": [_task_to_dict(t) for t in tasks]} @@ -74,6 +51,11 @@ async def get_task(project_id: str, task_id: str, if not task: raise HTTPException(404, f"Task not found: {task_id}") result = _task_to_dict(task) + + if not expand: + return result + + # expand=all: 保持旧格式(list + 聚合字段),向后兼容前端 TaskModal if expand == "all": q = _q(project_id) detail = q.task_detail(task_id) @@ -90,6 +72,37 @@ async def get_task(project_id: str, task_id: str, for d in bb.get_decisions(task_id)] result["events"] = q.task_events(task_id) result["experiences"] = q.task_experiences(task_id) + return result + + # 细粒度 expand: 新格式(comments/events 带 limit + total_count) + expand_list = expand.split(",") + q = _q(project_id) + + if "comments" in expand_list: + all_comments = bb.get_comments(task_id) + result["comments"] = { + "items": [dict(c.__dict__) for c in all_comments[-20:]], + "total_count": len(all_comments), + "limit": 20, + } + + if "events" in expand_list: + all_events = q.task_events(task_id, limit=99999) + result["events"] = { + "items": all_events[-30:], + "total_count": len(all_events), + "limit": 30, + } + + if "outputs" in expand_list: + result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)] + + if "reviews" in expand_list: + result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)] + + if "decisions" in expand_list: + result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)] + return result @@ -100,11 +113,9 @@ async def create_task(project_id: str, body: Dict[str, Any]): task_id = body.get("id") if not task_id: import re - from datetime import datetime prefix = re.sub(r'[^a-z0-9]', '-', project_id.lower()).strip('-')[:20] date_str = datetime.now().strftime('%Y%m%d') # seq: 查当前项目最大 seq - import sqlite3 db_path = get_data_root() / project_id / "blackboard.db" try: conn = sqlite3.connect(str(db_path), timeout=5) @@ -237,7 +248,6 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]): }) # 检查转换是否合法 - from src.blackboard.db import VALID_TRANSITIONS current = old_task.status allowed = VALID_TRANSITIONS.get(current, set()) if new_status not in allowed: @@ -271,220 +281,6 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]): return {"ok": True, "old_status": current, "new_status": new_status} -# --- @mention 自动提取(#04) --- -_KNOWN_AGENT_IDS: list = [] - - -def _init_agent_ids(): - """从配置文件加载 Agent ID 列表""" - global _KNOWN_AGENT_IDS - if _KNOWN_AGENT_IDS: - return - try: - import yaml - cfg_path = os.path.join( - os.path.dirname(__file__), - "..", - "..", - "config", - "default.yaml") - with open(cfg_path) as f: - cfg = yaml.safe_load(f) - _KNOWN_AGENT_IDS = list( - cfg.get( - "daemon", - {}).get( - "agent_profiles", - {}).keys()) - except Exception: - _KNOWN_AGENT_IDS = [] - - -def _extract_mentions(text: str) -> list: - """从文本中自动提取 @agent-id 格式的 mention""" - import re - _init_agent_ids() - candidates = set( - re.findall( - r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)', - text)) - return [a for a in candidates if a in _KNOWN_AGENT_IDS] - - -# --- Comments --- - -@router.get("/tasks/{task_id}/comments") -async def get_comments(project_id: str, task_id: str, - comment_type: Optional[str] = None): - bb = _bb(project_id) - comments = bb.get_comments(task_id, comment_type=comment_type) - return {"comments": [dict(c.__dict__) for c in comments]} - - -@router.post("/tasks/{task_id}/comments") -async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - mentions_raw = body.get("mentions") - comment_body = body["body"] - - # #04: 自动从 body 提取 @mention,与显式传的 mentions 取并集 - auto_mentions = _extract_mentions(comment_body) - if isinstance(mentions_raw, str): - try: - explicit_mentions = json.loads(mentions_raw) - except Exception: - explicit_mentions = [] - elif isinstance(mentions_raw, list): - explicit_mentions = mentions_raw - else: - explicit_mentions = [] - merged_mentions = list(set(explicit_mentions + auto_mentions)) - - cid = bb.add_comment(task_id, body["author"], comment_body, - comment_type=body.get("comment_type", "general"), - mentions=merged_mentions) - if merged_mentions: - bb.record_mentions(cid, task_id, merged_mentions) - # #10: SSE 通知前端黑板有新 comment - try: - from src.api.sse_routes import get_broker - broker = get_broker() - broker.publish_sync("comment_added", { - "project_id": project_id, - "task_id": task_id, - "comment_id": cid, - "author": body["author"], - }) - except Exception: - pass - - return {"ok": True, "comment_id": cid, "mentions": merged_mentions} - - -# --- Outputs --- - -@router.get("/tasks/{task_id}/outputs") -async def get_outputs(project_id: str, task_id: str): - bb = _bb(project_id) - return {"outputs": [dict(o.__dict__) for o in bb.get_outputs(task_id)]} - - -@router.post("/tasks/{task_id}/outputs") -async def write_output(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - - # 字段校验 + Agent-friendly 错误 - agent = body.get("agent") - if not agent: - raise HTTPException(422, { - "error": "validation_failed", - "detail": "Missing required field: agent", - "hint": "Provide your agent ID, e.g. 'zhangfei-dev'", - }) - - # type 字段:接受 type 或 content_type(别名兼容) - output_type = body.get("type") or body.get("content_type") - valid_types = sorted(OUTPUT_TYPES) - if not output_type: - raise HTTPException(422, { - "error": "validation_failed", - "detail": "Missing required field: type", - "valid_values": {"type": valid_types}, - "hint": "Use 'type' field. Also accepts 'content_type' as alias.", - }) - if output_type not in OUTPUT_TYPES: - raise HTTPException(422, { - "error": "validation_failed", - "detail": f"Invalid type: '{output_type}'", - "valid_values": {"type": valid_types}, - }) - - title = body.get("title") - if not title: - raise HTTPException(422, { - "error": "validation_failed", - "detail": "Missing required field: title", - "hint": "Provide a brief title describing this output", - }) - - # 内容模式:content(直传)或 content_path(引用) - content = body.get("content") - content_path = body.get("content_path") or body.get("path") - - if content and not content_path: - # 内容直传模式:自动写文件 - import os - artifacts_dir = os.path.join( - os.path.dirname(bb.db_path), "artifacts", task_id - ) - os.makedirs(artifacts_dir, exist_ok=True) - # 安全文件名 - safe_name = "".join( - c if c.isalnum() or c in "._-" else "_" for c in title) - if not safe_name: - safe_name = "output" - file_path = os.path.join(artifacts_dir, safe_name) - with open(file_path, "w", encoding="utf-8") as f: - f.write(content) - content_path = file_path - - oid = bb.write_output( - task_id, agent, output_type, title, - content_path=content_path, - summary=body.get("summary"), - metadata=body.get("metadata"), - ) - return {"ok": True, "output_id": oid} - - -# --- Decisions --- - -@router.get("/tasks/{task_id}/decisions") -async def get_decisions(project_id: str, task_id: str): - bb = _bb(project_id) - return {"decisions": [dict(d.__dict__) for d in bb.get_decisions(task_id)]} - - -@router.post("/tasks/{task_id}/decisions") -async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - did = bb.add_decision(task_id, body["decider"], body["decision"], - body["rationale"], - alternatives=body.get("alternatives")) - return {"ok": True, "decision_id": did} - - -# --- Observations --- - -@router.post("/tasks/{task_id}/observations") -async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - oid = bb.add_observation(task_id, body["observer"], body["body"], - severity=body.get("severity", "info")) - return {"ok": True, "observation_id": oid} - - -# --- Reviews --- - -@router.get("/tasks/{task_id}/reviews") -async def get_reviews(project_id: str, task_id: str): - bb = _bb(project_id) - return {"reviews": [dict(r.__dict__) for r in bb.get_reviews(task_id)]} - - -@router.post("/tasks/{task_id}/reviews") -async def add_review(project_id: str, task_id: str, body: Dict[str, Any]): - bb = _bb(project_id) - review = Review( - id=body["id"], task_id=task_id, reviewer=body["reviewer"], - review_type=body["review_type"], verdict=body["verdict"], - summary=body["summary"], confidence=body.get("confidence"), - round=body.get("round", 1), max_rounds=body.get("max_rounds", 3), - ) - bb.add_review(review) - return {"ok": True, "review_id": review.id} - - @router.patch("/tasks/{task_id}") async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]): """更新任务元数据(归档、标题等)""" @@ -497,7 +293,6 @@ async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]): if not updates: return {"ok": True} # 直接用 SQL 更新 - import sqlite3 conn = sqlite3.connect(str(bb.db_path), timeout=5) try: set_clause = ", ".join(f"{k}=?" for k in updates) @@ -509,38 +304,9 @@ async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]): return {"ok": True} -# --- Per-task Events & Experiences --- - -@router.get("/tasks/{task_id}/events") -async def get_task_events(project_id: str, task_id: str, - limit: int = Query(50, le=200)): - q = _q(project_id) - return {"events": q.task_events(task_id, limit)} - - -@router.get("/tasks/{task_id}/experiences") -async def get_task_experiences(project_id: str, task_id: str): - q = _q(project_id) - return {"experiences": q.task_experiences(task_id)} - - -# --- Global Events --- - -@router.get("/events") -async def get_events(project_id: str, limit: int = Query(50, le=200)): - q = _q(project_id) - return {"events": q.recent_events(limit)} - - -# --- Summary --- - -@router.get("/summary") -async def task_summary(project_id: str): - q = _q(project_id) - return {"summary": q.task_summary()} - - -# --- Archive (v2.8) --- +# --------------------------------------------------------------------------- # +# Archive (v2.8) +# --------------------------------------------------------------------------- # @router.post("/tasks/{task_id}/archive") async def archive_task(project_id: str, task_id: str, @@ -563,10 +329,3 @@ async def archive_done_tasks(project_id: str): bb = _bb(project_id) count = bb.archive_done_tasks() return {"ok": True, "archived_count": count} - - -# --- Helper --- - -def _task_to_dict(t: Task) -> Dict[str, Any]: - d = {k: v for k, v in t.__dict__.items() if v is not None} - return d diff --git a/src/main.py b/src/main.py index 7a167df..7f97dfa 100644 --- a/src/main.py +++ b/src/main.py @@ -7,7 +7,8 @@ from src.api.sse_routes import router as sse_router from src.api.project_routes import router as project_router from src.api.daemon_routes import router as daemon_router from src.api.checkpoint_routes import router as checkpoint_router -from src.api.blackboard_routes import router as blackboard_router +from src.api.task_routes import router as task_router +from src.api.task_relation_routes import router as task_relation_router import logging from contextlib import asynccontextmanager @@ -273,7 +274,8 @@ app.add_middleware( # --------------------------------------------------------------------------- -app.include_router(blackboard_router) +app.include_router(task_router) +app.include_router(task_relation_router) app.include_router(checkpoint_router) app.include_router(daemon_router) app.include_router(project_router)