auto-sync: 2026-05-17 00:36:33

This commit is contained in:
cfdaily
2026-05-17 00:36:33 +08:00
parent 7792c4ef4c
commit fac7444a7d
+114
View File
@@ -0,0 +1,114 @@
"""黑板读操作(查询封装)"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
from .db import get_connection
from .models import Task
class Queries:
"""黑板查询(只读)"""
def __init__(self, db_path: Path):
self.db_path = db_path
def _conn(self):
return get_connection(self.db_path)
def task_summary(self) -> Dict[str, int]:
"""任务状态汇总"""
conn = self._conn()
try:
rows = conn.execute(
"SELECT status, COUNT(*) as cnt FROM tasks GROUP BY status"
).fetchall()
return {r["status"]: r["cnt"] for r in rows}
finally:
conn.close()
def tasks_by_assignee(self, assignee: str) -> List[Task]:
"""查询某 Agent 的任务"""
conn = self._conn()
try:
rows = conn.execute(
"SELECT * FROM tasks WHERE assignee=? ORDER BY priority ASC",
(assignee,),
).fetchall()
return [Task.from_row(r) for r in rows]
finally:
conn.close()
def blocked_tasks_with_deps(self) -> List[Dict[str, Any]]:
"""查询 blocked 任务及其依赖"""
conn = self._conn()
try:
rows = conn.execute(
"SELECT * FROM tasks WHERE status='blocked'"
).fetchall()
result = []
for r in rows:
deps = json.loads(r["depends_on"] or "[]")
if deps:
dep_rows = conn.execute(
f"SELECT id, status FROM tasks WHERE id IN ({','.join('?' * len(deps))})",
deps,
).fetchall()
dep_info = {dr["id"]: dr["status"] for dr in dep_rows}
all_done = all(s == "done" for s in dep_info.values())
else:
dep_info = {}
all_done = True
result.append({
"task_id": r["id"],
"title": r["title"],
"depends_on": deps,
"dep_status": dep_info,
"all_deps_done": all_done,
})
return result
finally:
conn.close()
def pending_dispatchable(self) -> List[Task]:
"""查询可调度的 pending 任务(依赖已满足)"""
conn = self._conn()
try:
rows = conn.execute(
"SELECT * FROM tasks WHERE status='pending' ORDER BY priority ASC"
).fetchall()
result = []
for r in rows:
deps = json.loads(r["depends_on"] or "[]")
if not deps:
result.append(Task.from_row(r))
continue
# 检查依赖是否全部完成
placeholders = ",".join("?" * len(deps))
dep_rows = conn.execute(
f"SELECT id FROM tasks WHERE id IN ({placeholders}) AND status='done'",
deps,
).fetchall()
if len(dep_rows) == len(deps):
result.append(Task.from_row(r))
return result
finally:
conn.close()
def recent_events(self, limit: int = 20) -> List[Dict[str, Any]]:
"""最近事件"""
conn = self._conn()
try:
rows = conn.execute(
"SELECT * FROM events ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def db_size_bytes(self) -> int:
return self.db_path.stat().st_size if self.db_path.exists() else 0