d58e38d58f
PR #14 从旧分支复制文件导致回退了 PR #10 的 lint 修复。 修复内容: - autoflake 移除未使用导入/变量 - autopep8 修复缩进/空格 - 手动修复 F821(pathlib→Path), F541(f-string), F841(未使用变量) - 所有修复均通过 flake8 --max-line-length=120 --extend-ignore=E501 检查 (0 errors)
573 lines
19 KiB
Python
573 lines
19 KiB
Python
"""API 路由 — 黑板 CRUD"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Optional
|
||
|
||
from fastapi import APIRouter, HTTPException, Query
|
||
|
||
from src.blackboard.operations import Blackboard
|
||
from src.blackboard.models import Task, Review
|
||
from src.blackboard.queries import Queries
|
||
from src.blackboard.db import VALID_STATUSES, OUTPUT_TYPES
|
||
from src.blackboard.registry import ProjectRegistry
|
||
|
||
from src.utils import get_data_root
|
||
|
||
router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"])
|
||
|
||
# 虚拟项目白名单(不需要在 registry 注册)
|
||
_VIRTUAL_PROJECTS = frozenset({"_general", "_mail"})
|
||
|
||
|
||
def _validate_project(project_id: str) -> str:
|
||
"""校验 project_id,已知项目/虚拟项目放行,未知项目返回 400"""
|
||
if project_id in _VIRTUAL_PROJECTS:
|
||
return project_id
|
||
reg = ProjectRegistry(get_data_root())
|
||
if reg.get_project(project_id):
|
||
return project_id
|
||
raise HTTPException(400, {
|
||
"ok": False,
|
||
"error": "project_not_found",
|
||
"detail": f"Project '{project_id}' is not registered.",
|
||
"suggestions": [
|
||
f"Register first: POST /api/projects with id='{project_id}'",
|
||
"Or use '_general' for tasks without a specific project",
|
||
],
|
||
})
|
||
|
||
|
||
def _bb(project_id: str) -> Blackboard:
|
||
_validate_project(project_id)
|
||
return Blackboard(get_data_root() / project_id / "blackboard.db")
|
||
|
||
|
||
def _q(project_id: str) -> Queries:
|
||
_validate_project(project_id)
|
||
return Queries(get_data_root() / project_id / "blackboard.db")
|
||
|
||
|
||
# --- Tasks ---
|
||
|
||
@router.get("/tasks")
|
||
async def list_tasks(project_id: str,
|
||
status: Optional[str] = None,
|
||
assignee: Optional[str] = None,
|
||
parent_task: Optional[str] = None):
|
||
bb = _bb(project_id)
|
||
tasks = bb.list_tasks(
|
||
status=status,
|
||
assignee=assignee,
|
||
parent_task=parent_task)
|
||
return {"tasks": [_task_to_dict(t) for t in tasks]}
|
||
|
||
|
||
@router.get("/tasks/{task_id}")
|
||
async def get_task(project_id: str, task_id: str,
|
||
expand: Optional[str] = None):
|
||
bb = _bb(project_id)
|
||
task = bb.get_task(task_id)
|
||
if not task:
|
||
raise HTTPException(404, f"Task not found: {task_id}")
|
||
result = _task_to_dict(task)
|
||
if expand == "all":
|
||
q = _q(project_id)
|
||
detail = q.task_detail(task_id)
|
||
if detail:
|
||
result["comments_count"] = detail.get("comments_count", 0)
|
||
result["outputs_count"] = detail.get("outputs_count", 0)
|
||
result["review_status"] = detail.get("review_status")
|
||
result["latest_event_detail"] = detail.get("latest_event_detail")
|
||
result["comments"] = [dict(c.__dict__)
|
||
for c in bb.get_comments(task_id)]
|
||
result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)]
|
||
result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)]
|
||
result["decisions"] = [dict(d.__dict__)
|
||
for d in bb.get_decisions(task_id)]
|
||
result["events"] = q.task_events(task_id)
|
||
result["experiences"] = q.task_experiences(task_id)
|
||
return result
|
||
|
||
|
||
@router.post("/tasks")
|
||
async def create_task(project_id: str, body: Dict[str, Any]):
|
||
bb = _bb(project_id)
|
||
# ID: 后端生成 {project_id_prefix}-{date}-{seq}
|
||
task_id = body.get("id")
|
||
if not task_id:
|
||
import re
|
||
from datetime import datetime
|
||
prefix = re.sub(r'[^a-z0-9]', '-', project_id.lower()).strip('-')[:20]
|
||
date_str = datetime.now().strftime('%Y%m%d')
|
||
# seq: 查当前项目最大 seq
|
||
import sqlite3
|
||
db_path = get_data_root() / project_id / "blackboard.db"
|
||
try:
|
||
conn = sqlite3.connect(str(db_path), timeout=5)
|
||
max_id_row = conn.execute(
|
||
"SELECT id FROM tasks WHERE id LIKE ? ORDER BY id DESC LIMIT 1",
|
||
(f"{prefix}-{date_str}-%",)
|
||
).fetchone()
|
||
conn.close()
|
||
seq = int(max_id_row[0].split('-')[-1]) + 1 if max_id_row else 1
|
||
except Exception:
|
||
seq = 1
|
||
task_id = f"{prefix}-{date_str}-{seq:04d}"
|
||
|
||
# Title: LLM 生成(如果前端没传)
|
||
title = body.get("title", "")
|
||
if not title or not title.strip():
|
||
desc = body.get("description", "") or ""
|
||
title = await _generate_title(desc) or ((desc[:30] + "…") if len(desc) > 30 else (desc or "新军令"))
|
||
|
||
# #04: assignee 由 @mention 推断
|
||
description_text = body.get("description") or ""
|
||
description_mentions = _extract_mentions(description_text)
|
||
assignee = description_mentions[0] if description_mentions else None
|
||
# 兼容:显式传 assignee 且无 @mention 时保留(deprecated)
|
||
if not assignee and body.get("assignee"):
|
||
assignee = body.get("assignee")
|
||
|
||
task = Task(
|
||
id=task_id, title=title,
|
||
description=body.get("description"),
|
||
task_type=body.get("task_type", None),
|
||
priority=body.get("priority", 5),
|
||
assignee=assignee,
|
||
assigned_by=body.get("assigned_by", "user"),
|
||
depends_on=json.dumps(
|
||
body["depends_on"]) if "depends_on" in body else None,
|
||
parent_task=body.get("parent_task"),
|
||
risk_level=body.get("risk_level", "standard"),
|
||
stage=body.get("stage"),
|
||
stages_json=body.get("stages_json", "[]"),
|
||
)
|
||
bb.create_task(task)
|
||
result = {"ok": True, "task_id": task.id, "title": task.title}
|
||
if body.get("assignee") and not description_mentions:
|
||
result["warning"] = "assignee parameter is deprecated, use @mention in description"
|
||
return result
|
||
|
||
|
||
async def _generate_title(description: str) -> str | None:
|
||
"""调用 LLM 生成简短标题"""
|
||
if not description or len(description) < 5:
|
||
return None
|
||
try:
|
||
from openai import OpenAI
|
||
import json as _json
|
||
# 从 OpenClaw 配置读 zhipu 凭据
|
||
base_url = "https://open.bigmodel.cn/api/paas/v4"
|
||
api_key = ""
|
||
model = "glm-4-flash"
|
||
oc_cfg = Path(os.environ.get(
|
||
"OPENCLAW_HOME", str(Path.home() / ".openclaw")
|
||
)) / "openclaw.json"
|
||
if oc_cfg.exists():
|
||
with open(oc_cfg) as f:
|
||
cfg = _json.load(f)
|
||
zhipu = cfg.get("models", {}).get("providers", {}).get("zhipu", {})
|
||
if zhipu.get("baseUrl"):
|
||
base_url = zhipu["baseUrl"]
|
||
if zhipu.get("apiKey"):
|
||
api_key = zhipu["apiKey"]
|
||
if not api_key:
|
||
return None # 没配 API key 就跳过
|
||
client = OpenAI(base_url=base_url, api_key=api_key)
|
||
resp = client.chat.completions.create(
|
||
model=model,
|
||
messages=[
|
||
{"role": "system",
|
||
"content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"},
|
||
{"role": "user", "content": description[:500]},
|
||
],
|
||
max_tokens=50,
|
||
temperature=0.3,
|
||
timeout=10,
|
||
)
|
||
title = resp.choices[0].message.content.strip().strip('"').strip("'")
|
||
if title and len(title) <= 30:
|
||
return title
|
||
except Exception as e:
|
||
import logging
|
||
logging.getLogger(
|
||
"moziplus-v2").warning(f"Title generation failed: {e}")
|
||
return None
|
||
|
||
|
||
@router.get("/tasks/{task_id}/progress")
|
||
async def task_progress(project_id: str, task_id: str):
|
||
"""Task Stage 进度(含子 Task 统计)"""
|
||
queries = _q(project_id)
|
||
progress = queries.parent_task_progress(task_id)
|
||
if not progress:
|
||
raise HTTPException(404, "Task not found")
|
||
return progress
|
||
|
||
|
||
@router.post("/tasks/{task_id}/claim")
|
||
async def claim_task(project_id: str, task_id: str, body: Dict[str, Any]):
|
||
bb = _bb(project_id)
|
||
if not bb.claim_task(task_id, body["agent"]):
|
||
raise HTTPException(
|
||
409, "Claim failed (already claimed or wrong assignee)")
|
||
return {"ok": True}
|
||
|
||
|
||
@router.post("/tasks/{task_id}/status")
|
||
async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
|
||
bb = _bb(project_id)
|
||
old_task = bb.get_task(task_id)
|
||
if not old_task:
|
||
raise HTTPException(404, {
|
||
"error": "not_found",
|
||
"detail": f"Task {task_id} not found in project {project_id}",
|
||
})
|
||
|
||
new_status = body.get("status")
|
||
if not new_status:
|
||
raise HTTPException(422, {
|
||
"error": "validation_failed",
|
||
"detail": "Missing required field: status",
|
||
"valid_values": {"status": sorted(VALID_STATUSES)},
|
||
})
|
||
|
||
# 检查转换是否合法
|
||
from src.blackboard.db import VALID_TRANSITIONS
|
||
current = old_task.status
|
||
allowed = VALID_TRANSITIONS.get(current, set())
|
||
if new_status not in allowed:
|
||
raise HTTPException(409, {
|
||
"error": "invalid_transition",
|
||
"detail": f"Cannot transition from {current} to {new_status}",
|
||
"valid_transitions": {current: sorted(allowed)},
|
||
"hint": f"From '{current}', valid targets are: {sorted(allowed)}",
|
||
})
|
||
|
||
if not bb.update_task_status(task_id, new_status,
|
||
agent=body.get("agent")):
|
||
raise HTTPException(409, {
|
||
"error": "transition_failed",
|
||
"detail": f"Status update failed for {task_id}",
|
||
})
|
||
|
||
# SSE 推送
|
||
try:
|
||
from src.api.sse_routes import get_broker
|
||
broker = get_broker()
|
||
broker.publish_sync("task_updated", {
|
||
"project_id": project_id,
|
||
"task_id": task_id,
|
||
"old_status": current,
|
||
"new_status": new_status,
|
||
"agent": body.get("agent"),
|
||
})
|
||
except Exception:
|
||
pass
|
||
return {"ok": True, "old_status": current, "new_status": new_status}
|
||
|
||
|
||
# --- @mention 自动提取(#04) ---
|
||
_KNOWN_AGENT_IDS: list = []
|
||
|
||
|
||
def _init_agent_ids():
|
||
"""从配置文件加载 Agent ID 列表"""
|
||
global _KNOWN_AGENT_IDS
|
||
if _KNOWN_AGENT_IDS:
|
||
return
|
||
try:
|
||
import yaml
|
||
cfg_path = os.path.join(
|
||
os.path.dirname(__file__),
|
||
"..",
|
||
"..",
|
||
"config",
|
||
"default.yaml")
|
||
with open(cfg_path) as f:
|
||
cfg = yaml.safe_load(f)
|
||
_KNOWN_AGENT_IDS = list(
|
||
cfg.get(
|
||
"daemon",
|
||
{}).get(
|
||
"agent_profiles",
|
||
{}).keys())
|
||
except Exception:
|
||
_KNOWN_AGENT_IDS = []
|
||
|
||
|
||
def _extract_mentions(text: str) -> list:
|
||
"""从文本中自动提取 @agent-id 格式的 mention"""
|
||
import re
|
||
_init_agent_ids()
|
||
candidates = set(
|
||
re.findall(
|
||
r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)',
|
||
text))
|
||
return [a for a in candidates if a in _KNOWN_AGENT_IDS]
|
||
|
||
|
||
# --- Comments ---
|
||
|
||
@router.get("/tasks/{task_id}/comments")
|
||
async def get_comments(project_id: str, task_id: str,
|
||
comment_type: Optional[str] = None):
|
||
bb = _bb(project_id)
|
||
comments = bb.get_comments(task_id, comment_type=comment_type)
|
||
return {"comments": [dict(c.__dict__) for c in comments]}
|
||
|
||
|
||
@router.post("/tasks/{task_id}/comments")
|
||
async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]):
|
||
bb = _bb(project_id)
|
||
mentions_raw = body.get("mentions")
|
||
comment_body = body["body"]
|
||
|
||
# #04: 自动从 body 提取 @mention,与显式传的 mentions 取并集
|
||
auto_mentions = _extract_mentions(comment_body)
|
||
if isinstance(mentions_raw, str):
|
||
try:
|
||
explicit_mentions = json.loads(mentions_raw)
|
||
except Exception:
|
||
explicit_mentions = []
|
||
elif isinstance(mentions_raw, list):
|
||
explicit_mentions = mentions_raw
|
||
else:
|
||
explicit_mentions = []
|
||
merged_mentions = list(set(explicit_mentions + auto_mentions))
|
||
|
||
cid = bb.add_comment(task_id, body["author"], comment_body,
|
||
comment_type=body.get("comment_type", "general"),
|
||
mentions=merged_mentions)
|
||
if merged_mentions:
|
||
bb.record_mentions(cid, task_id, merged_mentions)
|
||
# #10: SSE 通知前端黑板有新 comment
|
||
try:
|
||
from src.api.sse_routes import get_broker
|
||
broker = get_broker()
|
||
broker.publish_sync("comment_added", {
|
||
"project_id": project_id,
|
||
"task_id": task_id,
|
||
"comment_id": cid,
|
||
"author": body["author"],
|
||
})
|
||
except Exception:
|
||
pass
|
||
|
||
return {"ok": True, "comment_id": cid, "mentions": merged_mentions}
|
||
|
||
|
||
# --- Outputs ---
|
||
|
||
@router.get("/tasks/{task_id}/outputs")
|
||
async def get_outputs(project_id: str, task_id: str):
|
||
bb = _bb(project_id)
|
||
return {"outputs": [dict(o.__dict__) for o in bb.get_outputs(task_id)]}
|
||
|
||
|
||
@router.post("/tasks/{task_id}/outputs")
|
||
async def write_output(project_id: str, task_id: str, body: Dict[str, Any]):
|
||
bb = _bb(project_id)
|
||
|
||
# 字段校验 + Agent-friendly 错误
|
||
agent = body.get("agent")
|
||
if not agent:
|
||
raise HTTPException(422, {
|
||
"error": "validation_failed",
|
||
"detail": "Missing required field: agent",
|
||
"hint": "Provide your agent ID, e.g. 'zhangfei-dev'",
|
||
})
|
||
|
||
# type 字段:接受 type 或 content_type(别名兼容)
|
||
output_type = body.get("type") or body.get("content_type")
|
||
valid_types = sorted(OUTPUT_TYPES)
|
||
if not output_type:
|
||
raise HTTPException(422, {
|
||
"error": "validation_failed",
|
||
"detail": "Missing required field: type",
|
||
"valid_values": {"type": valid_types},
|
||
"hint": "Use 'type' field. Also accepts 'content_type' as alias.",
|
||
})
|
||
if output_type not in OUTPUT_TYPES:
|
||
raise HTTPException(422, {
|
||
"error": "validation_failed",
|
||
"detail": f"Invalid type: '{output_type}'",
|
||
"valid_values": {"type": valid_types},
|
||
})
|
||
|
||
title = body.get("title")
|
||
if not title:
|
||
raise HTTPException(422, {
|
||
"error": "validation_failed",
|
||
"detail": "Missing required field: title",
|
||
"hint": "Provide a brief title describing this output",
|
||
})
|
||
|
||
# 内容模式:content(直传)或 content_path(引用)
|
||
content = body.get("content")
|
||
content_path = body.get("content_path") or body.get("path")
|
||
|
||
if content and not content_path:
|
||
# 内容直传模式:自动写文件
|
||
import os
|
||
artifacts_dir = os.path.join(
|
||
os.path.dirname(bb.db_path), "artifacts", task_id
|
||
)
|
||
os.makedirs(artifacts_dir, exist_ok=True)
|
||
# 安全文件名
|
||
safe_name = "".join(
|
||
c if c.isalnum() or c in "._-" else "_" for c in title)
|
||
if not safe_name:
|
||
safe_name = "output"
|
||
file_path = os.path.join(artifacts_dir, safe_name)
|
||
with open(file_path, "w", encoding="utf-8") as f:
|
||
f.write(content)
|
||
content_path = file_path
|
||
|
||
oid = bb.write_output(
|
||
task_id, agent, output_type, title,
|
||
content_path=content_path,
|
||
summary=body.get("summary"),
|
||
metadata=body.get("metadata"),
|
||
)
|
||
return {"ok": True, "output_id": oid}
|
||
|
||
|
||
# --- Decisions ---
|
||
|
||
@router.get("/tasks/{task_id}/decisions")
|
||
async def get_decisions(project_id: str, task_id: str):
|
||
bb = _bb(project_id)
|
||
return {"decisions": [dict(d.__dict__) for d in bb.get_decisions(task_id)]}
|
||
|
||
|
||
@router.post("/tasks/{task_id}/decisions")
|
||
async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]):
|
||
bb = _bb(project_id)
|
||
did = bb.add_decision(task_id, body["decider"], body["decision"],
|
||
body["rationale"],
|
||
alternatives=body.get("alternatives"))
|
||
return {"ok": True, "decision_id": did}
|
||
|
||
|
||
# --- Observations ---
|
||
|
||
@router.post("/tasks/{task_id}/observations")
|
||
async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]):
|
||
bb = _bb(project_id)
|
||
oid = bb.add_observation(task_id, body["observer"], body["body"],
|
||
severity=body.get("severity", "info"))
|
||
return {"ok": True, "observation_id": oid}
|
||
|
||
|
||
# --- Reviews ---
|
||
|
||
@router.get("/tasks/{task_id}/reviews")
|
||
async def get_reviews(project_id: str, task_id: str):
|
||
bb = _bb(project_id)
|
||
return {"reviews": [dict(r.__dict__) for r in bb.get_reviews(task_id)]}
|
||
|
||
|
||
@router.post("/tasks/{task_id}/reviews")
|
||
async def add_review(project_id: str, task_id: str, body: Dict[str, Any]):
|
||
bb = _bb(project_id)
|
||
review = Review(
|
||
id=body["id"], task_id=task_id, reviewer=body["reviewer"],
|
||
review_type=body["review_type"], verdict=body["verdict"],
|
||
summary=body["summary"], confidence=body.get("confidence"),
|
||
round=body.get("round", 1), max_rounds=body.get("max_rounds", 3),
|
||
)
|
||
bb.add_review(review)
|
||
return {"ok": True, "review_id": review.id}
|
||
|
||
|
||
@router.patch("/tasks/{task_id}")
|
||
async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]):
|
||
"""更新任务元数据(归档、标题等)"""
|
||
bb = _bb(project_id)
|
||
task = bb.get_task(task_id)
|
||
if not task:
|
||
raise HTTPException(404, f"Task {task_id} not found")
|
||
allowed = {"archived", "title", "description", "priority", "risk_level"}
|
||
updates = {k: v for k, v in body.items() if k in allowed}
|
||
if not updates:
|
||
return {"ok": True}
|
||
# 直接用 SQL 更新
|
||
import sqlite3
|
||
conn = sqlite3.connect(str(bb.db_path), timeout=5)
|
||
try:
|
||
set_clause = ", ".join(f"{k}=?" for k in updates)
|
||
conn.execute(f"UPDATE tasks SET {set_clause}, updated_at=datetime('now') WHERE id=?",
|
||
(*updates.values(), task_id))
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
return {"ok": True}
|
||
|
||
|
||
# --- Per-task Events & Experiences ---
|
||
|
||
@router.get("/tasks/{task_id}/events")
|
||
async def get_task_events(project_id: str, task_id: str,
|
||
limit: int = Query(50, le=200)):
|
||
q = _q(project_id)
|
||
return {"events": q.task_events(task_id, limit)}
|
||
|
||
|
||
@router.get("/tasks/{task_id}/experiences")
|
||
async def get_task_experiences(project_id: str, task_id: str):
|
||
q = _q(project_id)
|
||
return {"experiences": q.task_experiences(task_id)}
|
||
|
||
|
||
# --- Global Events ---
|
||
|
||
@router.get("/events")
|
||
async def get_events(project_id: str, limit: int = Query(50, le=200)):
|
||
q = _q(project_id)
|
||
return {"events": q.recent_events(limit)}
|
||
|
||
|
||
# --- Summary ---
|
||
|
||
@router.get("/summary")
|
||
async def task_summary(project_id: str):
|
||
q = _q(project_id)
|
||
return {"summary": q.task_summary()}
|
||
|
||
|
||
# --- Archive (v2.8) ---
|
||
|
||
@router.post("/tasks/{task_id}/archive")
|
||
async def archive_task(project_id: str, task_id: str,
|
||
body: Optional[Dict[str, Any]] = None):
|
||
bb = _bb(project_id)
|
||
task = bb.get_task(task_id)
|
||
if not task:
|
||
raise HTTPException(404, f"Task not found: {task_id}")
|
||
action = (body or {}).get("action", "archive")
|
||
if action == "unarchive":
|
||
bb.unarchive_task(task_id)
|
||
return {"ok": True, "action": "unarchived"}
|
||
else:
|
||
bb.archive_task(task_id)
|
||
return {"ok": True, "action": "archived"}
|
||
|
||
|
||
@router.post("/tasks/archive-done")
|
||
async def archive_done_tasks(project_id: str):
|
||
bb = _bb(project_id)
|
||
count = bb.archive_done_tasks()
|
||
return {"ok": True, "archived_count": count}
|
||
|
||
|
||
# --- Helper ---
|
||
|
||
def _task_to_dict(t: Task) -> Dict[str, Any]:
|
||
d = {k: v for k, v in t.__dict__.items() if v is not None}
|
||
return d
|