Files
sanguo_moziplus_v2/src/api/blackboard_routes.py
T
2026-05-18 11:42:03 +08:00

324 lines
11 KiB
Python

"""API 路由 — 黑板 CRUD"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task, Review
from src.blackboard.queries import Queries
from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS, COMMENT_TYPES, OUTPUT_TYPES
from src.utils import get_data_root
router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"])
def _bb(project_id: str) -> Blackboard:
return Blackboard(get_data_root() / project_id / "blackboard.db")
def _q(project_id: str) -> Queries:
return Queries(get_data_root() / project_id / "blackboard.db")
# --- Tasks ---
@router.get("/tasks")
async def list_tasks(project_id: str,
status: Optional[str] = None,
assignee: Optional[str] = None,
parent_task: Optional[str] = None):
bb = _bb(project_id)
tasks = bb.list_tasks(status=status, assignee=assignee, parent_task=parent_task)
return {"tasks": [_task_to_dict(t) for t in tasks]}
@router.get("/tasks/{task_id}")
async def get_task(project_id: str, task_id: str,
expand: Optional[str] = None):
bb = _bb(project_id)
task = bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task not found: {task_id}")
result = _task_to_dict(task)
if expand == "all":
q = _q(project_id)
detail = q.task_detail(task_id)
if detail:
result["comments_count"] = detail.get("comments_count", 0)
result["outputs_count"] = detail.get("outputs_count", 0)
result["review_status"] = detail.get("review_status")
result["latest_event_detail"] = detail.get("latest_event_detail")
result["comments"] = [dict(c.__dict__) for c in bb.get_comments(task_id)]
result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)]
result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)]
result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)]
result["events"] = q.task_events(task_id)
result["experiences"] = q.task_experiences(task_id)
return result
@router.post("/tasks")
async def create_task(project_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
task = Task(
id=body["id"], title=body["title"],
description=body.get("description"),
task_type=body.get("task_type", "coding"),
priority=body.get("priority", 5),
assignee=body.get("assignee"),
assigned_by=body.get("assigned_by", "user"),
depends_on=json.dumps(body["depends_on"]) if "depends_on" in body else None,
parent_task=body.get("parent_task"),
risk_level=body.get("risk_level", "standard"),
stage=body.get("stage"),
)
bb.create_task(task)
return {"ok": True, "task_id": task.id}
@router.post("/tasks/{task_id}/claim")
async def claim_task(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
if not bb.claim_task(task_id, body["agent"]):
raise HTTPException(409, "Claim failed (already claimed or wrong assignee)")
return {"ok": True}
@router.post("/tasks/{task_id}/status")
async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
old_task = bb.get_task(task_id)
if not old_task:
raise HTTPException(404, {
"error": "not_found",
"detail": f"Task {task_id} not found in project {project_id}",
})
new_status = body.get("status")
if not new_status:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: status",
"valid_values": {"status": sorted(VALID_STATUSES)},
})
# 检查转换是否合法
from src.blackboard.db import VALID_TRANSITIONS
current = old_task.status
allowed = VALID_TRANSITIONS.get(current, set())
if new_status not in allowed:
raise HTTPException(409, {
"error": "invalid_transition",
"detail": f"Cannot transition from {current} to {new_status}",
"valid_transitions": {current: sorted(allowed)},
"hint": f"From '{current}', valid targets are: {sorted(allowed)}",
})
if not bb.update_task_status(task_id, new_status,
agent=body.get("agent")):
raise HTTPException(409, {
"error": "transition_failed",
"detail": f"Status update failed for {task_id}",
})
# SSE 推送
try:
from src.api.sse_routes import get_broker
broker = get_broker()
broker.publish_sync("task_updated", {
"project_id": project_id,
"task_id": task_id,
"old_status": current,
"new_status": new_status,
"agent": body.get("agent"),
})
except Exception:
pass
return {"ok": True, "old_status": current, "new_status": new_status}
# --- Comments ---
@router.get("/tasks/{task_id}/comments")
async def get_comments(project_id: str, task_id: str,
comment_type: Optional[str] = None):
bb = _bb(project_id)
comments = bb.get_comments(task_id, comment_type=comment_type)
return {"comments": [dict(c.__dict__) for c in comments]}
@router.post("/tasks/{task_id}/comments")
async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
cid = bb.add_comment(task_id, body["author"], body["body"],
comment_type=body.get("comment_type", "general"),
mentions=body.get("mentions"))
return {"ok": True, "comment_id": cid}
# --- Outputs ---
@router.get("/tasks/{task_id}/outputs")
async def get_outputs(project_id: str, task_id: str):
bb = _bb(project_id)
return {"outputs": [dict(o.__dict__) for o in bb.get_outputs(task_id)]}
@router.post("/tasks/{task_id}/outputs")
async def write_output(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
# 字段校验 + Agent-friendly 错误
agent = body.get("agent")
if not agent:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: agent",
"hint": "Provide your agent ID, e.g. 'zhangfei-dev'",
})
# type 字段:接受 type 或 content_type(别名兼容)
output_type = body.get("type") or body.get("content_type")
valid_types = sorted(OUTPUT_TYPES)
if not output_type:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: type",
"valid_values": {"type": valid_types},
"hint": "Use 'type' field. Also accepts 'content_type' as alias.",
})
if output_type not in OUTPUT_TYPES:
raise HTTPException(422, {
"error": "validation_failed",
"detail": f"Invalid type: '{output_type}'",
"valid_values": {"type": valid_types},
})
title = body.get("title")
if not title:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: title",
"hint": "Provide a brief title describing this output",
})
# 内容模式:content(直传)或 content_path(引用)
content = body.get("content")
content_path = body.get("content_path") or body.get("path")
if content and not content_path:
# 内容直传模式:自动写文件
import os
artifacts_dir = os.path.join(
os.path.dirname(bb.db_path), "artifacts", task_id
)
os.makedirs(artifacts_dir, exist_ok=True)
# 安全文件名
safe_name = "".join(c if c.isalnum() or c in "._-" else "_" for c in title)
if not safe_name:
safe_name = "output"
file_path = os.path.join(artifacts_dir, safe_name)
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
content_path = file_path
oid = bb.write_output(
task_id, agent, output_type, title,
content_path=content_path,
summary=body.get("summary"),
metadata=body.get("metadata"),
)
return {"ok": True, "output_id": oid}
# --- Decisions ---
@router.get("/tasks/{task_id}/decisions")
async def get_decisions(project_id: str, task_id: str):
bb = _bb(project_id)
return {"decisions": [dict(d.__dict__) for d in bb.get_decisions(task_id)]}
@router.post("/tasks/{task_id}/decisions")
async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
did = bb.add_decision(task_id, body["decider"], body["decision"],
body["rationale"],
alternatives=body.get("alternatives"))
return {"ok": True, "decision_id": did}
# --- Observations ---
@router.post("/tasks/{task_id}/observations")
async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
oid = bb.add_observation(task_id, body["observer"], body["body"],
severity=body.get("severity", "info"))
return {"ok": True, "observation_id": oid}
# --- Reviews ---
@router.get("/tasks/{task_id}/reviews")
async def get_reviews(project_id: str, task_id: str):
bb = _bb(project_id)
return {"reviews": [dict(r.__dict__) for r in bb.get_reviews(task_id)]}
@router.post("/tasks/{task_id}/reviews")
async def add_review(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
review = Review(
id=body["id"], task_id=task_id, reviewer=body["reviewer"],
review_type=body["review_type"], verdict=body["verdict"],
summary=body["summary"], confidence=body.get("confidence"),
round=body.get("round", 1), max_rounds=body.get("max_rounds", 3),
)
bb.add_review(review)
return {"ok": True, "review_id": review.id}
# --- Per-task Events & Experiences ---
@router.get("/tasks/{task_id}/events")
async def get_task_events(project_id: str, task_id: str,
limit: int = Query(50, le=200)):
q = _q(project_id)
return {"events": q.task_events(task_id, limit)}
@router.get("/tasks/{task_id}/experiences")
async def get_task_experiences(project_id: str, task_id: str):
q = _q(project_id)
return {"experiences": q.task_experiences(task_id)}
# --- Global Events ---
@router.get("/events")
async def get_events(project_id: str, limit: int = Query(50, le=200)):
q = _q(project_id)
return {"events": q.recent_events(limit)}
# --- Summary ---
@router.get("/summary")
async def task_summary(project_id: str):
q = _q(project_id)
return {"summary": q.task_summary()}
# --- Helper ---
def _task_to_dict(t: Task) -> Dict[str, Any]:
d = {k: v for k, v in t.__dict__.items() if v is not None}
return d