diff --git a/src/blackboard/queries.py b/src/blackboard/queries.py new file mode 100644 index 0000000..3b7470d --- /dev/null +++ b/src/blackboard/queries.py @@ -0,0 +1,114 @@ +"""黑板读操作(查询封装)""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, List, Optional + +from .db import get_connection +from .models import Task + + +class Queries: + """黑板查询(只读)""" + + def __init__(self, db_path: Path): + self.db_path = db_path + + def _conn(self): + return get_connection(self.db_path) + + def task_summary(self) -> Dict[str, int]: + """任务状态汇总""" + conn = self._conn() + try: + rows = conn.execute( + "SELECT status, COUNT(*) as cnt FROM tasks GROUP BY status" + ).fetchall() + return {r["status"]: r["cnt"] for r in rows} + finally: + conn.close() + + def tasks_by_assignee(self, assignee: str) -> List[Task]: + """查询某 Agent 的任务""" + conn = self._conn() + try: + rows = conn.execute( + "SELECT * FROM tasks WHERE assignee=? ORDER BY priority ASC", + (assignee,), + ).fetchall() + return [Task.from_row(r) for r in rows] + finally: + conn.close() + + def blocked_tasks_with_deps(self) -> List[Dict[str, Any]]: + """查询 blocked 任务及其依赖""" + conn = self._conn() + try: + rows = conn.execute( + "SELECT * FROM tasks WHERE status='blocked'" + ).fetchall() + result = [] + for r in rows: + deps = json.loads(r["depends_on"] or "[]") + if deps: + dep_rows = conn.execute( + f"SELECT id, status FROM tasks WHERE id IN ({','.join('?' * len(deps))})", + deps, + ).fetchall() + dep_info = {dr["id"]: dr["status"] for dr in dep_rows} + all_done = all(s == "done" for s in dep_info.values()) + else: + dep_info = {} + all_done = True + result.append({ + "task_id": r["id"], + "title": r["title"], + "depends_on": deps, + "dep_status": dep_info, + "all_deps_done": all_done, + }) + return result + finally: + conn.close() + + def pending_dispatchable(self) -> List[Task]: + """查询可调度的 pending 任务(依赖已满足)""" + conn = self._conn() + try: + rows = conn.execute( + "SELECT * FROM tasks WHERE status='pending' ORDER BY priority ASC" + ).fetchall() + result = [] + for r in rows: + deps = json.loads(r["depends_on"] or "[]") + if not deps: + result.append(Task.from_row(r)) + continue + # 检查依赖是否全部完成 + placeholders = ",".join("?" * len(deps)) + dep_rows = conn.execute( + f"SELECT id FROM tasks WHERE id IN ({placeholders}) AND status='done'", + deps, + ).fetchall() + if len(dep_rows) == len(deps): + result.append(Task.from_row(r)) + return result + finally: + conn.close() + + def recent_events(self, limit: int = 20) -> List[Dict[str, Any]]: + """最近事件""" + conn = self._conn() + try: + rows = conn.execute( + "SELECT * FROM events ORDER BY created_at DESC LIMIT ?", + (limit,), + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + def db_size_bytes(self) -> int: + return self.db_path.stat().st_size if self.db_path.exists() else 0