Files
sanguo_moziplus_v2/src/api/task_routes.py
T
cfdaily cc2e5aa64c
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 30s
CI / notify-on-failure (pull_request) Successful in 0s
[moz] fix(api): Review M1 修复 — expand=all 保持旧格式 + _toolchain 加入 _VIRTUAL_PROJECTS
- M1: expand=all 保持旧 list 格式(向后兼容 TaskModal .map()/.length)
- 细粒度 expand=comments,events 用新 {items,total_count,limit} 格式
- S1(PR#73): _toolchain 加入 _VIRTUAL_PROJECTS
- S1(PR#72): 移除 _validate_project 未使用 import
2026-06-14 14:22:14 +08:00

332 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Task 核心路由 — CRUD、状态、归档"""
from __future__ import annotations
import json
import os
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException
from src.blackboard.models import Task
from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS
from src.utils import get_data_root
from src.api.shared import (
_bb,
_q,
_task_to_dict,
_extract_mentions,
)
router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"])
# --------------------------------------------------------------------------- #
# Tasks
# --------------------------------------------------------------------------- #
@router.get("/tasks")
async def list_tasks(project_id: str,
status: Optional[str] = None,
assignee: Optional[str] = None,
parent_task: Optional[str] = None,
q: Optional[str] = None):
bb = _bb(project_id)
tasks = bb.list_tasks(status=status, assignee=assignee, parent_task=parent_task)
if q:
q_lower = q.lower()
tasks = [t for t in tasks if q_lower in (t.title or "").lower()]
return {"tasks": [_task_to_dict(t) for t in tasks]}
@router.get("/tasks/{task_id}")
async def get_task(project_id: str, task_id: str,
expand: Optional[str] = None):
bb = _bb(project_id)
task = bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task not found: {task_id}")
result = _task_to_dict(task)
if not expand:
return result
# expand=all: 保持旧格式(list + 聚合字段),向后兼容前端 TaskModal
if expand == "all":
q = _q(project_id)
detail = q.task_detail(task_id)
if detail:
result["comments_count"] = detail.get("comments_count", 0)
result["outputs_count"] = detail.get("outputs_count", 0)
result["review_status"] = detail.get("review_status")
result["latest_event_detail"] = detail.get("latest_event_detail")
result["comments"] = [dict(c.__dict__)
for c in bb.get_comments(task_id)]
result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)]
result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)]
result["decisions"] = [dict(d.__dict__)
for d in bb.get_decisions(task_id)]
result["events"] = q.task_events(task_id)
result["experiences"] = q.task_experiences(task_id)
return result
# 细粒度 expand: 新格式(comments/events 带 limit + total_count
expand_list = expand.split(",")
q = _q(project_id)
if "comments" in expand_list:
all_comments = bb.get_comments(task_id)
result["comments"] = {
"items": [dict(c.__dict__) for c in all_comments[-20:]],
"total_count": len(all_comments),
"limit": 20,
}
if "events" in expand_list:
all_events = q.task_events(task_id, limit=99999)
result["events"] = {
"items": all_events[-30:],
"total_count": len(all_events),
"limit": 30,
}
if "outputs" in expand_list:
result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)]
if "reviews" in expand_list:
result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)]
if "decisions" in expand_list:
result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)]
return result
@router.post("/tasks")
async def create_task(project_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
# ID: 后端生成 {project_id_prefix}-{date}-{seq}
task_id = body.get("id")
if not task_id:
import re
prefix = re.sub(r'[^a-z0-9]', '-', project_id.lower()).strip('-')[:20]
date_str = datetime.now().strftime('%Y%m%d')
# seq: 查当前项目最大 seq
db_path = get_data_root() / project_id / "blackboard.db"
try:
conn = sqlite3.connect(str(db_path), timeout=5)
max_id_row = conn.execute(
"SELECT id FROM tasks WHERE id LIKE ? ORDER BY id DESC LIMIT 1",
(f"{prefix}-{date_str}-%",)
).fetchone()
conn.close()
seq = int(max_id_row[0].split('-')[-1]) + 1 if max_id_row else 1
except Exception:
seq = 1
task_id = f"{prefix}-{date_str}-{seq:04d}"
# Title: LLM 生成(如果前端没传)
title = body.get("title", "")
if not title or not title.strip():
desc = body.get("description", "") or ""
title = await _generate_title(desc) or ((desc[:30] + "") if len(desc) > 30 else (desc or "新军令"))
# #04: assignee 由 @mention 推断
description_text = body.get("description") or ""
description_mentions = _extract_mentions(description_text)
assignee = description_mentions[0] if description_mentions else None
# 兼容:显式传 assignee 且无 @mention 时保留(deprecated
if not assignee and body.get("assignee"):
assignee = body.get("assignee")
task = Task(
id=task_id, title=title,
description=body.get("description"),
task_type=body.get("task_type", None),
priority=body.get("priority", 5),
assignee=assignee,
assigned_by=body.get("assigned_by", "user"),
depends_on=json.dumps(
body["depends_on"]) if "depends_on" in body else None,
parent_task=body.get("parent_task"),
risk_level=body.get("risk_level", "standard"),
stage=body.get("stage"),
stages_json=body.get("stages_json", "[]"),
)
bb.create_task(task)
result = {"ok": True, "task_id": task.id, "title": task.title}
if body.get("assignee") and not description_mentions:
result["warning"] = "assignee parameter is deprecated, use @mention in description"
return result
async def _generate_title(description: str) -> str | None:
"""调用 LLM 生成简短标题"""
if not description or len(description) < 5:
return None
try:
from openai import OpenAI
import json as _json
# 从 OpenClaw 配置读 zhipu 凭据
base_url = "https://open.bigmodel.cn/api/paas/v4"
api_key = ""
model = "glm-4-flash"
oc_cfg = Path(os.environ.get(
"OPENCLAW_HOME", str(Path.home() / ".openclaw")
)) / "openclaw.json"
if oc_cfg.exists():
with open(oc_cfg) as f:
cfg = _json.load(f)
zhipu = cfg.get("models", {}).get("providers", {}).get("zhipu", {})
if zhipu.get("baseUrl"):
base_url = zhipu["baseUrl"]
if zhipu.get("apiKey"):
api_key = zhipu["apiKey"]
if not api_key:
return None # 没配 API key 就跳过
client = OpenAI(base_url=base_url, api_key=api_key)
resp = client.chat.completions.create(
model=model,
messages=[
{"role": "system",
"content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"},
{"role": "user", "content": description[:500]},
],
max_tokens=50,
temperature=0.3,
timeout=10,
)
title = resp.choices[0].message.content.strip().strip('"').strip("'")
if title and len(title) <= 30:
return title
except Exception as e:
import logging
logging.getLogger(
"moziplus-v2").warning(f"Title generation failed: {e}")
return None
@router.get("/tasks/{task_id}/progress")
async def task_progress(project_id: str, task_id: str):
"""Task Stage 进度(含子 Task 统计)"""
queries = _q(project_id)
progress = queries.parent_task_progress(task_id)
if not progress:
raise HTTPException(404, "Task not found")
return progress
@router.post("/tasks/{task_id}/claim")
async def claim_task(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
if not bb.claim_task(task_id, body["agent"]):
raise HTTPException(
409, "Claim failed (already claimed or wrong assignee)")
return {"ok": True}
@router.post("/tasks/{task_id}/status")
async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
old_task = bb.get_task(task_id)
if not old_task:
raise HTTPException(404, {
"error": "not_found",
"detail": f"Task {task_id} not found in project {project_id}",
})
new_status = body.get("status")
if not new_status:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: status",
"valid_values": {"status": sorted(VALID_STATUSES)},
})
# 检查转换是否合法
current = old_task.status
allowed = VALID_TRANSITIONS.get(current, set())
if new_status not in allowed:
raise HTTPException(409, {
"error": "invalid_transition",
"detail": f"Cannot transition from {current} to {new_status}",
"valid_transitions": {current: sorted(allowed)},
"hint": f"From '{current}', valid targets are: {sorted(allowed)}",
})
if not bb.update_task_status(task_id, new_status,
agent=body.get("agent")):
raise HTTPException(409, {
"error": "transition_failed",
"detail": f"Status update failed for {task_id}",
})
# SSE 推送
try:
from src.api.sse_routes import get_broker
broker = get_broker()
broker.publish_sync("task_updated", {
"project_id": project_id,
"task_id": task_id,
"old_status": current,
"new_status": new_status,
"agent": body.get("agent"),
})
except Exception:
pass
return {"ok": True, "old_status": current, "new_status": new_status}
@router.patch("/tasks/{task_id}")
async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]):
"""更新任务元数据(归档、标题等)"""
bb = _bb(project_id)
task = bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task {task_id} not found")
allowed = {"archived", "title", "description", "priority", "risk_level"}
updates = {k: v for k, v in body.items() if k in allowed}
if not updates:
return {"ok": True}
# 直接用 SQL 更新
conn = sqlite3.connect(str(bb.db_path), timeout=5)
try:
set_clause = ", ".join(f"{k}=?" for k in updates)
conn.execute(f"UPDATE tasks SET {set_clause}, updated_at=datetime('now') WHERE id=?",
(*updates.values(), task_id))
conn.commit()
finally:
conn.close()
return {"ok": True}
# --------------------------------------------------------------------------- #
# Archive (v2.8)
# --------------------------------------------------------------------------- #
@router.post("/tasks/{task_id}/archive")
async def archive_task(project_id: str, task_id: str,
body: Optional[Dict[str, Any]] = None):
bb = _bb(project_id)
task = bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task not found: {task_id}")
action = (body or {}).get("action", "archive")
if action == "unarchive":
bb.unarchive_task(task_id)
return {"ok": True, "action": "unarchived"}
else:
bb.archive_task(task_id)
return {"ok": True, "action": "archived"}
@router.post("/tasks/archive-done")
async def archive_done_tasks(project_id: str):
bb = _bb(project_id)
count = bb.archive_done_tasks()
return {"ok": True, "archived_count": count}