"""黑板读操作(查询封装)""" from __future__ import annotations import json from pathlib import Path from typing import Any, Dict, List, Optional from .db import get_connection, MANUAL_STATUSES from .models import Task class Queries: """黑板查询(只读)""" def __init__(self, db_path: Path): self.db_path = db_path def _conn(self): return get_connection(self.db_path) def task_summary(self) -> Dict[str, int]: """任务状态汇总""" conn = self._conn() try: rows = conn.execute( "SELECT status, COUNT(*) as cnt FROM tasks GROUP BY status" ).fetchall() return {r["status"]: r["cnt"] for r in rows} finally: conn.close() def tasks_by_assignee(self, assignee: str) -> List[Task]: """查询某 Agent 的任务""" conn = self._conn() try: rows = conn.execute( "SELECT * FROM tasks WHERE assignee=? ORDER BY priority ASC", (assignee,), ).fetchall() return [Task.from_row(r) for r in rows] finally: conn.close() def blocked_tasks_with_deps(self) -> List[Dict[str, Any]]: """查询 blocked 任务及其依赖""" conn = self._conn() try: rows = conn.execute( "SELECT * FROM tasks WHERE status='blocked'" ).fetchall() result = [] for r in rows: deps = json.loads(r["depends_on"] or "[]") if deps: dep_rows = conn.execute( f"SELECT id, status FROM tasks WHERE id IN ({','.join('?' * len(deps))})", deps, ).fetchall() dep_info = {dr["id"]: dr["status"] for dr in dep_rows} all_done = all(s == "done" for s in dep_info.values()) else: dep_info = {} all_done = True result.append({ "task_id": r["id"], "title": r["title"], "depends_on": deps, "dep_status": dep_info, "all_deps_done": all_done, }) return result finally: conn.close() def tasks_by_status(self, status: str) -> List[Task]: """查询指定状态的所有任务""" conn = self._conn() try: rows = conn.execute( "SELECT * FROM tasks WHERE status=? ORDER BY priority ASC", (status,), ).fetchall() return [Task.from_row(r) for r in rows] finally: conn.close() def pending_dispatchable(self) -> List[Task]: """查询可调度的 pending 任务(依赖已满足)""" conn = self._conn() try: rows = conn.execute( "SELECT * FROM tasks WHERE status='pending' ORDER BY priority ASC" ).fetchall() result = [] for r in rows: deps = json.loads(r["depends_on"] or "[]") if not deps: result.append(Task.from_row(r)) continue # 检查依赖是否全部完成 placeholders = ",".join("?" * len(deps)) dep_rows = conn.execute( f"SELECT id FROM tasks WHERE id IN ({placeholders}) AND status='done'", deps, ).fetchall() if len(dep_rows) == len(deps): result.append(Task.from_row(r)) return result finally: conn.close() def recent_events(self, limit: int = 20) -> List[Dict[str, Any]]: """最近事件""" conn = self._conn() try: rows = conn.execute( "SELECT * FROM events ORDER BY created_at DESC LIMIT ?", (limit,), ).fetchall() return [dict(r) for r in rows] finally: conn.close() def task_detail(self, task_id: str) -> Optional[Dict[str, Any]]: """任务详情聚合(含关联数据)""" conn = self._conn() try: row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone() if not row: return None task = dict(row) # 关联评论数 + 产出数 task["comments_count"] = conn.execute( "SELECT COUNT(*) FROM comments WHERE task_id=?", (task_id,) ).fetchone()[0] task["outputs_count"] = conn.execute( "SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,) ).fetchone()[0] # 最新审查状态 rev_row = conn.execute( "SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1", (task_id,), ).fetchone() task["review_status"] = rev_row["verdict"] if rev_row else None # 最新事件 evt_row = conn.execute( "SELECT detail FROM events WHERE task_id=? ORDER BY created_at DESC LIMIT 1", (task_id,), ).fetchone() task["latest_event_detail"] = evt_row["detail"] if evt_row else None return task finally: conn.close() def task_events(self, task_id: str, limit: int = 50) -> List[Dict[str, Any]]: """任务事件列表""" conn = self._conn() try: rows = conn.execute( "SELECT * FROM events WHERE task_id=? ORDER BY created_at DESC LIMIT ?", (task_id, limit), ).fetchall() return [dict(r) for r in rows] finally: conn.close() def task_experiences(self, task_id: str) -> List[Dict[str, Any]]: """任务关联经验""" conn = self._conn() try: rows = conn.execute( """SELECT e.*, GROUP_CONCAT(et.tag) as tags FROM experiences e LEFT JOIN experience_tags et ON e.experience_id = et.experience_id WHERE e.task_id=? GROUP BY e.experience_id ORDER BY e.created_at DESC""", (task_id,), ).fetchall() return [dict(r) for r in rows] finally: conn.close() # =================================================================== # v2.7 父子关系查询 # =================================================================== def list_subtasks(self, parent_task_id: str) -> List[Task]: """列出某个父 Task 的所有子 Task""" conn = self._conn() try: rows = conn.execute( "SELECT * FROM tasks WHERE parent_task=? ORDER BY priority ASC, created_at ASC", (parent_task_id,), ).fetchall() return [Task.from_row(r) for r in rows] finally: conn.close() def top_level_tasks(self) -> List[Task]: """列出所有顶层 Task(parent_task IS NULL)""" conn = self._conn() try: rows = conn.execute( "SELECT * FROM tasks WHERE parent_task IS NULL ORDER BY priority ASC, created_at ASC" ).fetchall() return [Task.from_row(r) for r in rows] finally: conn.close() def compute_parent_status(self, parent_task_id: str) -> Optional[str]: """从子 Task 聚合推导父 Task 状态 优先级:review > working > pending > blocked > failed 手动状态(cancelled)不参与聚合 """ conn = self._conn() try: # 检查父 Task 是否有手动状态 parent_row = conn.execute( "SELECT status FROM tasks WHERE id=?", (parent_task_id,) ).fetchone() if not parent_row: return None if parent_row["status"] in MANUAL_STATUSES: return parent_row["status"] # 聚合子 Task 状态(排除 cancelled) rows = conn.execute( "SELECT status, COUNT(*) as cnt FROM tasks " "WHERE parent_task=? AND status != 'cancelled' " "GROUP BY status", (parent_task_id,), ).fetchall() if not rows: # 无子 Task,保持原状态 return parent_row["status"] status_counts = {r["status"]: r["cnt"] for r in rows} total = sum(status_counts.values()) done_count = status_counts.get("done", 0) # 所有 done → done if done_count == total: return "done" # 有 review → review if status_counts.get("review", 0) > 0: return "review" # 有 working/claimed → working if status_counts.get("working", 0) > 0 or status_counts.get("claimed", 0) > 0: return "working" # 有 pending → pending if status_counts.get("pending", 0) > 0: return "pending" # 有 failed → failed(优先于 blocked:失败比等待更严重) if status_counts.get("failed", 0) > 0: return "failed" # 有 blocked → blocked if status_counts.get("blocked", 0) > 0: return "blocked" return parent_row["status"] finally: conn.close() def parent_task_progress(self, parent_task_id: str) -> Dict[str, Any]: """父 Task 的 Stage 进度信息""" conn = self._conn() try: parent_row = conn.execute( "SELECT * FROM tasks WHERE id=?", (parent_task_id,) ).fetchone() if not parent_row: return {} parent = dict(parent_row) stages = json.loads(parent.get("stages_json") or "[]") # 子 Task 统计 total_row = conn.execute( "SELECT COUNT(*) as cnt FROM tasks WHERE parent_task=? AND status != 'cancelled'", (parent_task_id,), ).fetchone() done_row = conn.execute( "SELECT COUNT(*) as cnt FROM tasks WHERE parent_task=? AND status='done'", (parent_task_id,), ).fetchone() total = total_row["cnt"] if total_row else 0 done = done_row["cnt"] if done_row else 0 # 按 stage 分组 stage_progress = [] for stage in stages: stage_id = stage.get("id", "") s_total = conn.execute( "SELECT COUNT(*) as cnt FROM tasks WHERE parent_task=? AND stage=? AND status != 'cancelled'", (parent_task_id, stage_id), ).fetchone()["cnt"] s_done = conn.execute( "SELECT COUNT(*) as cnt FROM tasks WHERE parent_task=? AND stage=? AND status='done'", (parent_task_id, stage_id), ).fetchone()["cnt"] s_active = conn.execute( "SELECT COUNT(*) as cnt FROM tasks WHERE parent_task=? AND stage=? AND status IN ('working','review','claimed')", (parent_task_id, stage_id), ).fetchone()["cnt"] stage_progress.append({ "id": stage_id, "label": stage.get("label", stage_id), "order": stage.get("order", 0), "total": s_total, "done": s_done, "active": s_active, }) # 当前活跃 stage active_stage = None for sp in stage_progress: if sp["active"] > 0 or (sp["total"] > 0 and sp["done"] < sp["total"]): if not active_stage and sp["done"] < sp["total"]: active_stage = sp["label"] return { "task_id": parent_task_id, "title": parent.get("title", ""), "total_subtasks": total, "done_subtasks": done, "active_stage": active_stage, "stages": stage_progress, } finally: conn.close() def db_size_bytes(self) -> int: return self.db_path.stat().st_size if self.db_path.exists() else 0