Files
sanguo_moziplus_v2/src/api/checkpoint_routes.py
T
cfdaily eaaf42b37d fix(lint): 修复 PR #14 引入的 lint 回退 (119→0)
PR #14 从旧分支复制文件导致回退了 PR #10 的 lint 修复。
修复内容:
- autoflake 移除未使用导入/变量
- autopep8 修复缩进/空格
- 手动修复 F821(pathlib→Path), F541(f-string), F841(未使用变量)
- 所有修复均通过 flake8 --max-line-length=120 --extend-ignore=E501 检查 (0 errors)
2026-06-10 23:37:46 +08:00

135 lines
3.9 KiB
Python

"""Checkpoint API — M3 人工确认点
Agent 创建 checkpoint → 用户 approve/reject → 后端自动推进 task 状态
"""
from __future__ import annotations
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional
from src.blackboard.operations import Blackboard
from src.utils import get_data_root
router = APIRouter(
prefix="/api/projects/{project_id}/tasks/{task_id}/checkpoints",
tags=["checkpoints"])
# ── 请求模型 ──
class CreateCheckpointRequest(BaseModel):
type: str # verify | decision | action
title: str
payload: dict
description: Optional[str] = None
id: Optional[str] = None # 可选自定义 ID
class ResolveCheckpointRequest(BaseModel):
resolved_by: str = "user"
note: Optional[str] = None
# ── 工具 ──
def _bb(project_id: str) -> Blackboard:
db_path = get_data_root() / project_id / "blackboard.db"
if not db_path.exists():
raise HTTPException(status_code=404, detail="Project not found")
return Blackboard(db_path)
# ── API ──
@router.get("")
def list_checkpoints(project_id: str, task_id: str):
"""列出 task 的所有 checkpoint"""
bb = _bb(project_id)
cps = bb.list_checkpoints(task_id)
return {"checkpoints": cps}
@router.post("")
def create_checkpoint(project_id: str, task_id: str,
req: CreateCheckpointRequest):
"""Agent 创建 checkpoint"""
if req.type not in ("verify", "decision", "action"):
raise HTTPException(status_code=400,
detail=f"Invalid checkpoint type: {req.type}")
bb = _bb(project_id)
# 验证 task 存在
task = bb.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
result = bb.create_checkpoint(
task_id=task_id,
cp_type=req.type,
title=req.title,
payload=req.payload,
description=req.description,
checkpoint_id=req.id,
)
return {"ok": True, **result}
@router.post("/{checkpoint_id}/approve")
def approve_checkpoint(project_id: str, task_id: str,
checkpoint_id: str, req: ResolveCheckpointRequest):
"""用户通过 checkpoint → 自动推进 task 状态"""
bb = _bb(project_id)
result = bb.resolve_checkpoint(
checkpoint_id,
"approve",
req.resolved_by,
req.note)
if result is None:
raise HTTPException(status_code=404, detail="Checkpoint not found")
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
# #10: SSE 通知前端 checkpoint 已处理
try:
from src.api.sse_routes import get_broker
broker = get_broker()
broker.publish_sync("checkpoint_resolved", {
"project_id": project_id,
"task_id": task_id,
"checkpoint_id": checkpoint_id,
"action": "approve",
})
except Exception:
pass
return {"ok": True, **result}
@router.post("/{checkpoint_id}/reject")
def reject_checkpoint(project_id: str, task_id: str,
checkpoint_id: str, req: ResolveCheckpointRequest):
"""用户驳回 checkpoint → task 回到 working"""
bb = _bb(project_id)
result = bb.resolve_checkpoint(
checkpoint_id,
"reject",
req.resolved_by,
req.note)
if result is None:
raise HTTPException(status_code=404, detail="Checkpoint not found")
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
# #10: SSE 通知前端 checkpoint 已处理
try:
from src.api.sse_routes import get_broker
broker = get_broker()
broker.publish_sync("checkpoint_resolved", {
"project_id": project_id,
"task_id": task_id,
"checkpoint_id": checkpoint_id,
"action": "reject",
})
except Exception:
pass
return {"ok": True, **result}