Files
sanguo_moziplus_v2/src/api/project_routes.py
T
2026-05-22 22:58:04 +08:00

187 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""API 路由 — 项目管理"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException
from src.blackboard.registry import ProjectRegistry
from src.utils import get_data_root
router = APIRouter(prefix="/api/projects", tags=["projects"])
def _registry() -> ProjectRegistry:
return ProjectRegistry(get_data_root())
@router.get("")
async def list_projects():
reg = _registry()
projects = reg.list_projects()
from pathlib import Path
import sqlite3
# 实时统计每个项目的任务数
for pid, info in projects.items():
if info.get("status") in ("archived", "deleted"):
continue
db_path = Path(reg.root) / pid / "blackboard.db"
if db_path.exists():
try:
conn = sqlite3.connect(str(db_path), timeout=5)
total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
archived = total - active
conn.close()
info['task_count'] = active
info['task_count_total'] = total
info['task_count_archived'] = archived
except Exception:
pass
# 虚拟项目 _general:如果 blackboard.db 存在则插入
general_db = Path(reg.root) / "_general" / "blackboard.db"
if general_db.exists() and "_general" not in projects:
try:
conn = sqlite3.connect(str(general_db), timeout=5)
total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
conn.close()
projects["_general"] = {
"id": "_general", "name": "一般任务", "description": "无项目归属的通用任务",
"status": "active", "source": "virtual",
"task_count": active, "task_count_total": total, "task_count_archived": total - active,
}
except Exception:
pass
elif "_general" in projects:
general_db_check = Path(reg.root) / "_general" / "blackboard.db"
if general_db_check.exists():
try:
conn = sqlite3.connect(str(general_db_check), timeout=5)
total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
conn.close()
projects["_general"]["task_count"] = active
projects["_general"]["task_count_total"] = total
projects["_general"]["task_count_archived"] = total - active
except Exception:
pass
# 不过滤 archived 项目,前端按任务级 archived 字段自行筛选
return {"projects": projects}
@router.post("")
async def create_project(body: Dict[str, Any]):
reg = _registry()
try:
info = reg.create_project(
body["id"], body["name"],
agents=body.get("agents", []),
description=body.get("description", ""),
)
return {"ok": True, "project_id": body["id"]}
except ValueError as e:
raise HTTPException(409, str(e))
@router.get("/{project_id}")
async def get_project(project_id: str):
reg = _registry()
info = reg.get_project(project_id)
if not info:
raise HTTPException(404, f"Project not found: {project_id}")
return info
@router.post("/{project_id}/archive")
async def archive_project(project_id: str):
reg = _registry()
if not reg.archive_project(project_id):
raise HTTPException(404, f"Project not found: {project_id}")
return {"ok": True}
@router.delete("/{project_id}")
async def delete_project(project_id: str):
"""逻辑删除项目(status→deleted"""
reg = _registry()
# 检查项目存在
info = reg.get_project(project_id)
if not info:
raise HTTPException(404, f"Project not found: {project_id}")
if not reg.delete_project(project_id):
raise HTTPException(500, "Delete failed")
return {"ok": True}
@router.patch("/{project_id}")
async def update_project(project_id: str, body: Dict[str, Any]):
"""更新项目元数据(name/description 等)"""
reg = _registry()
allowed = {"name", "description", "status"}
updates = {k: v for k, v in body.items() if k in allowed}
if not updates:
return {"ok": True}
if not reg.update_project(project_id, **updates):
raise HTTPException(404, f"Project not found: {project_id}")
return {"ok": True}
@router.post("/{project_id}/tasks/{task_id}/move")
async def move_task(project_id: str, task_id: str, body: Dict[str, Any]):
"""移动任务到另一个项目(含子任务一起移动)"""
target_project = body.get("target_project_id")
if not target_project:
raise HTTPException(422, "Missing target_project_id")
reg = _registry()
# 验证目标项目存在
target = reg.get_project(target_project)
if not target or target.get("status") not in ("active",):
raise HTTPException(404, f"Target project not found: {target_project}")
# 从源项目读任务
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task as TaskModel
src_bb = Blackboard(Path(reg.root) / project_id / "blackboard.db")
task = src_bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task not found: {task_id}")
# 查找子任务
children = src_bb.list_tasks(parent_task=task_id)
tgt_bb = Blackboard(Path(reg.root) / target_project / "blackboard.db")
moved_ids = []
# 先移动子任务
for child in children:
moved_child = TaskModel(
id=child.id, title=child.title, description=child.description,
status="pending", assignee=child.assignee, assigned_by=child.assigned_by,
priority=child.priority, task_type=child.task_type,
risk_level=child.risk_level, parent_task=child.parent_task,
stage=child.stage, stages_json=child.stages_json,
depends_on=child.depends_on, must_haves=child.must_haves,
)
tgt_bb.create_task(moved_child)
src_bb.update_task_status(child.id, "cancelled", detail=f"Moved to {target_project}")
moved_ids.append(child.id)
# 移动主任务
moved_task = TaskModel(
id=task.id, title=task.title, description=task.description,
status="pending", assignee=task.assignee, assigned_by=task.assigned_by,
priority=task.priority, task_type=task.task_type,
risk_level=task.risk_level, parent_task=task.parent_task,
stage=task.stage, stages_json=task.stages_json,
depends_on=task.depends_on, must_haves=task.must_haves,
)
tgt_bb.create_task(moved_task)
src_bb.update_task_status(task_id, "cancelled", detail=f"Moved to {target_project}")
moved_ids.insert(0, task_id)
return {"ok": True, "moved_to": target_project, "moved_ids": moved_ids}