"""API 路由 — 黑板 CRUD""" from __future__ import annotations import json import os from pathlib import Path from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException, Query from src.blackboard.operations import Blackboard from src.blackboard.models import Task, Review from src.blackboard.queries import Queries from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS, COMMENT_TYPES, OUTPUT_TYPES from src.blackboard.registry import ProjectRegistry from src.utils import get_data_root router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"]) # 虚拟项目白名单(不需要在 registry 注册) _VIRTUAL_PROJECTS = frozenset({"_general", "_mail"}) def _validate_project(project_id: str) -> str: """校验 project_id,已知项目/虚拟项目放行,未知项目返回 400""" if project_id in _VIRTUAL_PROJECTS: return project_id reg = ProjectRegistry(get_data_root()) if reg.get_project(project_id): return project_id raise HTTPException(400, { "ok": False, "error": "project_not_found", "detail": f"Project '{project_id}' is not registered.", "suggestions": [ f"Register first: POST /api/projects with id='{project_id}'", "Or use '_general' for tasks without a specific project", ], }) def _bb(project_id: str) -> Blackboard: _validate_project(project_id) return Blackboard(get_data_root() / project_id / "blackboard.db") def _q(project_id: str) -> Queries: _validate_project(project_id) return Queries(get_data_root() / project_id / "blackboard.db") # --- Tasks --- @router.get("/tasks") async def list_tasks(project_id: str, status: Optional[str] = None, assignee: Optional[str] = None, parent_task: Optional[str] = None): bb = _bb(project_id) tasks = bb.list_tasks(status=status, assignee=assignee, parent_task=parent_task) return {"tasks": [_task_to_dict(t) for t in tasks]} @router.get("/tasks/{task_id}") async def get_task(project_id: str, task_id: str, expand: Optional[str] = None): bb = _bb(project_id) task = bb.get_task(task_id) if not task: raise HTTPException(404, f"Task not found: {task_id}") result = _task_to_dict(task) if expand == "all": q = _q(project_id) detail = q.task_detail(task_id) if detail: result["comments_count"] = detail.get("comments_count", 0) result["outputs_count"] = detail.get("outputs_count", 0) result["review_status"] = detail.get("review_status") result["latest_event_detail"] = detail.get("latest_event_detail") result["comments"] = [dict(c.__dict__) for c in bb.get_comments(task_id)] result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)] result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)] result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)] result["events"] = q.task_events(task_id) result["experiences"] = q.task_experiences(task_id) return result @router.post("/tasks") async def create_task(project_id: str, body: Dict[str, Any]): bb = _bb(project_id) # ID: 后端生成 {project_id_prefix}-{date}-{seq} task_id = body.get("id") if not task_id: import re from datetime import datetime prefix = re.sub(r'[^a-z0-9]', '-', project_id.lower()).strip('-')[:20] date_str = datetime.now().strftime('%Y%m%d') # seq: 查当前项目最大 seq import sqlite3 db_path = get_data_root() / project_id / "blackboard.db" try: conn = sqlite3.connect(str(db_path), timeout=5) max_id_row = conn.execute( "SELECT id FROM tasks WHERE id LIKE ? ORDER BY id DESC LIMIT 1", (f"{prefix}-{date_str}-%",) ).fetchone() conn.close() seq = int(max_id_row[0].split('-')[-1]) + 1 if max_id_row else 1 except Exception: seq = 1 task_id = f"{prefix}-{date_str}-{seq:04d}" # Title: LLM 生成(如果前端没传) title = body.get("title", "") if not title or not title.strip(): desc = body.get("description", "") or "" title = await _generate_title(desc) or ((desc[:30] + "…") if len(desc) > 30 else (desc or "新军令")) # #04: assignee 由 @mention 推断 description_text = body.get("description") or "" description_mentions = _extract_mentions(description_text) assignee = description_mentions[0] if description_mentions else None # 兼容:显式传 assignee 且无 @mention 时保留(deprecated) if not assignee and body.get("assignee"): assignee = body.get("assignee") task = Task( id=task_id, title=title, description=body.get("description"), task_type=body.get("task_type", None), priority=body.get("priority", 5), assignee=assignee, assigned_by=body.get("assigned_by", "user"), depends_on=json.dumps(body["depends_on"]) if "depends_on" in body else None, parent_task=body.get("parent_task"), risk_level=body.get("risk_level", "standard"), stage=body.get("stage"), stages_json=body.get("stages_json", "[]"), ) bb.create_task(task) result = {"ok": True, "task_id": task.id, "title": task.title} if body.get("assignee") and not description_mentions: result["warning"] = "assignee parameter is deprecated, use @mention in description" return result async def _generate_title(description: str) -> str | None: """调用 LLM 生成简短标题""" if not description or len(description) < 5: return None try: from openai import OpenAI import json as _json # 从 OpenClaw 配置读 zhipu 凭据 base_url = "https://open.bigmodel.cn/api/paas/v4" api_key = "" model = "glm-4-flash" oc_cfg = Path(os.environ.get( "OPENCLAW_HOME", str(Path.home() / ".openclaw") )) / "openclaw.json" if oc_cfg.exists(): with open(oc_cfg) as f: cfg = _json.load(f) zhipu = cfg.get("models", {}).get("providers", {}).get("zhipu", {}) if zhipu.get("baseUrl"): base_url = zhipu["baseUrl"] if zhipu.get("apiKey"): api_key = zhipu["apiKey"] if not api_key: return None # 没配 API key 就跳过 client = OpenAI(base_url=base_url, api_key=api_key) resp = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"}, {"role": "user", "content": description[:500]}, ], max_tokens=50, temperature=0.3, timeout=10, ) title = resp.choices[0].message.content.strip().strip('"').strip("'") if title and len(title) <= 30: return title except Exception as e: import logging logging.getLogger("moziplus-v2").warning(f"Title generation failed: {e}") return None @router.get("/tasks/{task_id}/progress") async def task_progress(project_id: str, task_id: str): """Task Stage 进度(含子 Task 统计)""" queries = _q(project_id) progress = queries.parent_task_progress(task_id) if not progress: raise HTTPException(404, "Task not found") return progress @router.post("/tasks/{task_id}/claim") async def claim_task(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) if not bb.claim_task(task_id, body["agent"]): raise HTTPException(409, "Claim failed (already claimed or wrong assignee)") return {"ok": True} @router.post("/tasks/{task_id}/status") async def update_status(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) old_task = bb.get_task(task_id) if not old_task: raise HTTPException(404, { "error": "not_found", "detail": f"Task {task_id} not found in project {project_id}", }) new_status = body.get("status") if not new_status: raise HTTPException(422, { "error": "validation_failed", "detail": "Missing required field: status", "valid_values": {"status": sorted(VALID_STATUSES)}, }) # 检查转换是否合法 from src.blackboard.db import VALID_TRANSITIONS current = old_task.status allowed = VALID_TRANSITIONS.get(current, set()) if new_status not in allowed: raise HTTPException(409, { "error": "invalid_transition", "detail": f"Cannot transition from {current} to {new_status}", "valid_transitions": {current: sorted(allowed)}, "hint": f"From '{current}', valid targets are: {sorted(allowed)}", }) if not bb.update_task_status(task_id, new_status, agent=body.get("agent")): raise HTTPException(409, { "error": "transition_failed", "detail": f"Status update failed for {task_id}", }) # SSE 推送 try: from src.api.sse_routes import get_broker broker = get_broker() broker.publish_sync("task_updated", { "project_id": project_id, "task_id": task_id, "old_status": current, "new_status": new_status, "agent": body.get("agent"), }) except Exception: pass return {"ok": True, "old_status": current, "new_status": new_status} # --- @mention 自动提取(#04) --- _KNOWN_AGENT_IDS: list = [] def _init_agent_ids(): """从配置文件加载 Agent ID 列表""" global _KNOWN_AGENT_IDS if _KNOWN_AGENT_IDS: return try: import yaml cfg_path = os.path.join(os.path.dirname(__file__), "..", "..", "config", "default.yaml") with open(cfg_path) as f: cfg = yaml.safe_load(f) _KNOWN_AGENT_IDS = list(cfg.get("daemon", {}).get("agent_profiles", {}).keys()) except Exception: _KNOWN_AGENT_IDS = [] def _extract_mentions(text: str) -> list: """从文本中自动提取 @agent-id 格式的 mention""" import re _init_agent_ids() candidates = set(re.findall(r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)', text)) return [a for a in candidates if a in _KNOWN_AGENT_IDS] # --- Comments --- @router.get("/tasks/{task_id}/comments") async def get_comments(project_id: str, task_id: str, comment_type: Optional[str] = None): bb = _bb(project_id) comments = bb.get_comments(task_id, comment_type=comment_type) return {"comments": [dict(c.__dict__) for c in comments]} @router.post("/tasks/{task_id}/comments") async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) mentions_raw = body.get("mentions") comment_body = body["body"] # #04: 自动从 body 提取 @mention,与显式传的 mentions 取并集 auto_mentions = _extract_mentions(comment_body) if isinstance(mentions_raw, str): try: explicit_mentions = json.loads(mentions_raw) except Exception: explicit_mentions = [] elif isinstance(mentions_raw, list): explicit_mentions = mentions_raw else: explicit_mentions = [] merged_mentions = list(set(explicit_mentions + auto_mentions)) cid = bb.add_comment(task_id, body["author"], comment_body, comment_type=body.get("comment_type", "general"), mentions=merged_mentions) if merged_mentions: bb.record_mentions(cid, task_id, merged_mentions) # #10: SSE 通知前端黑板有新 comment try: from src.api.sse_routes import get_broker broker = get_broker() broker.publish_sync("comment_added", { "project_id": project_id, "task_id": task_id, "comment_id": cid, "author": body["author"], }) except Exception: pass return {"ok": True, "comment_id": cid, "mentions": merged_mentions} # --- Outputs --- @router.get("/tasks/{task_id}/outputs") async def get_outputs(project_id: str, task_id: str): bb = _bb(project_id) return {"outputs": [dict(o.__dict__) for o in bb.get_outputs(task_id)]} @router.post("/tasks/{task_id}/outputs") async def write_output(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) # 字段校验 + Agent-friendly 错误 agent = body.get("agent") if not agent: raise HTTPException(422, { "error": "validation_failed", "detail": "Missing required field: agent", "hint": "Provide your agent ID, e.g. 'zhangfei-dev'", }) # type 字段:接受 type 或 content_type(别名兼容) output_type = body.get("type") or body.get("content_type") valid_types = sorted(OUTPUT_TYPES) if not output_type: raise HTTPException(422, { "error": "validation_failed", "detail": "Missing required field: type", "valid_values": {"type": valid_types}, "hint": "Use 'type' field. Also accepts 'content_type' as alias.", }) if output_type not in OUTPUT_TYPES: raise HTTPException(422, { "error": "validation_failed", "detail": f"Invalid type: '{output_type}'", "valid_values": {"type": valid_types}, }) title = body.get("title") if not title: raise HTTPException(422, { "error": "validation_failed", "detail": "Missing required field: title", "hint": "Provide a brief title describing this output", }) # 内容模式:content(直传)或 content_path(引用) content = body.get("content") content_path = body.get("content_path") or body.get("path") if content and not content_path: # 内容直传模式:自动写文件 import os artifacts_dir = os.path.join( os.path.dirname(bb.db_path), "artifacts", task_id ) os.makedirs(artifacts_dir, exist_ok=True) # 安全文件名 safe_name = "".join(c if c.isalnum() or c in "._-" else "_" for c in title) if not safe_name: safe_name = "output" file_path = os.path.join(artifacts_dir, safe_name) with open(file_path, "w", encoding="utf-8") as f: f.write(content) content_path = file_path oid = bb.write_output( task_id, agent, output_type, title, content_path=content_path, summary=body.get("summary"), metadata=body.get("metadata"), ) return {"ok": True, "output_id": oid} # --- Decisions --- @router.get("/tasks/{task_id}/decisions") async def get_decisions(project_id: str, task_id: str): bb = _bb(project_id) return {"decisions": [dict(d.__dict__) for d in bb.get_decisions(task_id)]} @router.post("/tasks/{task_id}/decisions") async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) did = bb.add_decision(task_id, body["decider"], body["decision"], body["rationale"], alternatives=body.get("alternatives")) return {"ok": True, "decision_id": did} # --- Observations --- @router.post("/tasks/{task_id}/observations") async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) oid = bb.add_observation(task_id, body["observer"], body["body"], severity=body.get("severity", "info")) return {"ok": True, "observation_id": oid} # --- Reviews --- @router.get("/tasks/{task_id}/reviews") async def get_reviews(project_id: str, task_id: str): bb = _bb(project_id) return {"reviews": [dict(r.__dict__) for r in bb.get_reviews(task_id)]} @router.post("/tasks/{task_id}/reviews") async def add_review(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) review = Review( id=body["id"], task_id=task_id, reviewer=body["reviewer"], review_type=body["review_type"], verdict=body["verdict"], summary=body["summary"], confidence=body.get("confidence"), round=body.get("round", 1), max_rounds=body.get("max_rounds", 3), ) bb.add_review(review) return {"ok": True, "review_id": review.id} @router.patch("/tasks/{task_id}") async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]): """更新任务元数据(归档、标题等)""" bb = _bb(project_id) task = bb.get_task(task_id) if not task: raise HTTPException(404, f"Task {task_id} not found") allowed = {"archived", "title", "description", "priority", "risk_level"} updates = {k: v for k, v in body.items() if k in allowed} if not updates: return {"ok": True} # 直接用 SQL 更新 import sqlite3 conn = sqlite3.connect(str(bb.db_path), timeout=5) try: set_clause = ", ".join(f"{k}=?" for k in updates) conn.execute(f"UPDATE tasks SET {set_clause}, updated_at=datetime('now') WHERE id=?", (*updates.values(), task_id)) conn.commit() finally: conn.close() return {"ok": True} # --- Per-task Events & Experiences --- @router.get("/tasks/{task_id}/events") async def get_task_events(project_id: str, task_id: str, limit: int = Query(50, le=200)): q = _q(project_id) return {"events": q.task_events(task_id, limit)} @router.get("/tasks/{task_id}/experiences") async def get_task_experiences(project_id: str, task_id: str): q = _q(project_id) return {"experiences": q.task_experiences(task_id)} # --- Global Events --- @router.get("/events") async def get_events(project_id: str, limit: int = Query(50, le=200)): q = _q(project_id) return {"events": q.recent_events(limit)} # --- Summary --- @router.get("/summary") async def task_summary(project_id: str): q = _q(project_id) return {"summary": q.task_summary()} # --- Archive (v2.8) --- @router.post("/tasks/{task_id}/archive") async def archive_task(project_id: str, task_id: str, body: Optional[Dict[str, Any]] = None): bb = _bb(project_id) task = bb.get_task(task_id) if not task: raise HTTPException(404, f"Task not found: {task_id}") action = (body or {}).get("action", "archive") if action == "unarchive": bb.unarchive_task(task_id) return {"ok": True, "action": "unarchived"} else: bb.archive_task(task_id) return {"ok": True, "action": "archived"} @router.post("/tasks/archive-done") async def archive_done_tasks(project_id: str): bb = _bb(project_id) count = bb.archive_done_tasks() return {"ok": True, "archived_count": count} # --- Helper --- def _task_to_dict(t: Task) -> Dict[str, Any]: d = {k: v for k, v in t.__dict__.items() if v is not None} return d