"""Task 关联路由 — comments / outputs / decisions / observations / reviews / events / experiences / summary""" from __future__ import annotations import json import os from typing import Any, Dict, Optional from fastapi import APIRouter, HTTPException, Query from src.blackboard.models import Review from src.blackboard.db import OUTPUT_TYPES from src.api.shared import ( _bb, _q, _extract_mentions, ) router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"]) # --------------------------------------------------------------------------- # # Comments # --------------------------------------------------------------------------- # @router.get("/tasks/{task_id}/comments") async def get_comments(project_id: str, task_id: str, comment_type: Optional[str] = None): bb = _bb(project_id) comments = bb.get_comments(task_id, comment_type=comment_type) return {"comments": [dict(c.__dict__) for c in comments]} @router.post("/tasks/{task_id}/comments") async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) mentions_raw = body.get("mentions") comment_body = body["body"] # #04: 自动从 body 提取 @mention,与显式传的 mentions 取并集 auto_mentions = _extract_mentions(comment_body) if isinstance(mentions_raw, str): try: explicit_mentions = json.loads(mentions_raw) except Exception: explicit_mentions = [] elif isinstance(mentions_raw, list): explicit_mentions = mentions_raw else: explicit_mentions = [] merged_mentions = list(set(explicit_mentions + auto_mentions)) cid = bb.add_comment(task_id, body["author"], comment_body, comment_type=body.get("comment_type", "general"), mentions=merged_mentions) if merged_mentions: bb.record_mentions(cid, task_id, merged_mentions) # #10: SSE 通知前端黑板有新 comment try: from src.api.sse_routes import get_broker broker = get_broker() broker.publish_sync("comment_added", { "project_id": project_id, "task_id": task_id, "comment_id": cid, "author": body["author"], }) except Exception: pass return {"ok": True, "comment_id": cid, "mentions": merged_mentions} # --------------------------------------------------------------------------- # # Outputs # --------------------------------------------------------------------------- # @router.get("/tasks/{task_id}/outputs") async def get_outputs(project_id: str, task_id: str): bb = _bb(project_id) return {"outputs": [dict(o.__dict__) for o in bb.get_outputs(task_id)]} @router.post("/tasks/{task_id}/outputs") async def write_output(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) # 字段校验 + Agent-friendly 错误 agent = body.get("agent") if not agent: raise HTTPException(422, { "error": "validation_failed", "detail": "Missing required field: agent", "hint": "Provide your agent ID, e.g. 'zhangfei-dev'", }) # type 字段:接受 type 或 content_type(别名兼容) output_type = body.get("type") or body.get("content_type") valid_types = sorted(OUTPUT_TYPES) if not output_type: raise HTTPException(422, { "error": "validation_failed", "detail": "Missing required field: type", "valid_values": {"type": valid_types}, "hint": "Use 'type' field. Also accepts 'content_type' as alias.", }) if output_type not in OUTPUT_TYPES: raise HTTPException(422, { "error": "validation_failed", "detail": f"Invalid type: '{output_type}'", "valid_values": {"type": valid_types}, }) title = body.get("title") if not title: raise HTTPException(422, { "error": "validation_failed", "detail": "Missing required field: title", "hint": "Provide a brief title describing this output", }) # 内容模式:content(直传)或 content_path(引用) content = body.get("content") content_path = body.get("content_path") or body.get("path") if content and not content_path: # 内容直传模式:自动写文件 artifacts_dir = os.path.join( os.path.dirname(bb.db_path), "artifacts", task_id ) os.makedirs(artifacts_dir, exist_ok=True) # 安全文件名 safe_name = "".join( c if c.isalnum() or c in "._-" else "_" for c in title) if not safe_name: safe_name = "output" file_path = os.path.join(artifacts_dir, safe_name) with open(file_path, "w", encoding="utf-8") as f: f.write(content) content_path = file_path oid = bb.write_output( task_id, agent, output_type, title, content_path=content_path, summary=body.get("summary"), metadata=body.get("metadata"), ) return {"ok": True, "output_id": oid} # --------------------------------------------------------------------------- # # Decisions # --------------------------------------------------------------------------- # @router.get("/tasks/{task_id}/decisions") async def get_decisions(project_id: str, task_id: str): bb = _bb(project_id) return {"decisions": [dict(d.__dict__) for d in bb.get_decisions(task_id)]} @router.post("/tasks/{task_id}/decisions") async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) did = bb.add_decision(task_id, body["decider"], body["decision"], body["rationale"], alternatives=body.get("alternatives")) return {"ok": True, "decision_id": did} # --------------------------------------------------------------------------- # # Observations # --------------------------------------------------------------------------- # @router.post("/tasks/{task_id}/observations") async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) oid = bb.add_observation(task_id, body["observer"], body["body"], severity=body.get("severity", "info")) return {"ok": True, "observation_id": oid} # --------------------------------------------------------------------------- # # Reviews # --------------------------------------------------------------------------- # @router.get("/tasks/{task_id}/reviews") async def get_reviews(project_id: str, task_id: str): bb = _bb(project_id) return {"reviews": [dict(r.__dict__) for r in bb.get_reviews(task_id)]} @router.post("/tasks/{task_id}/reviews") async def add_review(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) review = Review( id=body["id"], task_id=task_id, reviewer=body["reviewer"], review_type=body["review_type"], verdict=body["verdict"], summary=body["summary"], confidence=body.get("confidence"), round=body.get("round", 1), max_rounds=body.get("max_rounds", 3), ) bb.add_review(review) return {"ok": True, "review_id": review.id} # --------------------------------------------------------------------------- # # Per-task Events & Experiences # --------------------------------------------------------------------------- # @router.get("/tasks/{task_id}/events") async def get_task_events(project_id: str, task_id: str, limit: int = Query(50, le=200)): q = _q(project_id) return {"events": q.task_events(task_id, limit)} @router.get("/tasks/{task_id}/experiences") async def get_task_experiences(project_id: str, task_id: str): q = _q(project_id) return {"experiences": q.task_experiences(task_id)} # --------------------------------------------------------------------------- # # Global Events # --------------------------------------------------------------------------- # @router.get("/events") async def get_events(project_id: str, limit: int = Query(50, le=200)): q = _q(project_id) return {"events": q.recent_events(limit)} # --------------------------------------------------------------------------- # # Summary # --------------------------------------------------------------------------- # @router.get("/summary") async def task_summary(project_id: str): q = _q(project_id) return {"summary": q.task_summary()}