"""黑板写操作(CRUD)""" from __future__ import annotations import json import logging import sqlite3 from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from .db import ( VALID_TRANSITIONS, VALID_STATUSES, COMMENT_TYPES, EVENT_TYPES, OUTPUT_TYPES, REVIEW_TYPES, VERDICT_TYPES, EXPERIENCE_SOURCES, EXPERIENCE_CATEGORIES, EXPERIENCE_STATUSES, ATTEMPT_OUTCOMES, TERMINAL_STATUSES, get_connection, init_db, ) from .models import ( Task, Comment, Output, Decision, Observation, Event, Review, Experience, ) logger = logging.getLogger("moziplus-v2.blackboard") class Blackboard: """黑板操作封装(per-project)""" def __init__(self, db_path: Path): self.db_path = db_path init_db(db_path) def _conn(self) -> sqlite3.Connection: return get_connection(self.db_path) # =================================================================== # Task CRUD # =================================================================== def create_task(self, task: Task) -> Task: """创建任务""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( """INSERT INTO tasks (id, title, description, status, assignee, assigned_by, depends_on, parent_task, priority, task_type, deadline, retry_count, max_retries, must_haves, risk_level, estimated_duration_minutes, escalated, stage, stages_json, archived, archived_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (task.id, task.title, task.description, task.status, task.assignee, task.assigned_by, task.depends_on, task.parent_task, task.priority, task.task_type, task.deadline, task.retry_count, task.max_retries, task.must_haves, task.risk_level, task.estimated_duration_minutes, 1 if task.escalated else 0, task.stage, task.stages_json, 1 if task.archived else 0, task.archived_at), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task.id, task.assigned_by, "task_created", json.dumps({"title": task.title, "task_type": task.task_type})), ) conn.commit() return task finally: conn.close() def get_task(self, task_id: str) -> Optional[Task]: """获取单个任务""" conn = self._conn() try: row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone() return Task.from_row(row) if row else None finally: conn.close() def update_task_status(self, task_id: str, new_status: str, agent: Optional[str] = None, detail: Optional[Dict] = None) -> bool: """更新任务状态(校验合法转换)""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") row = conn.execute( "SELECT status FROM tasks WHERE id=?", (task_id,) ).fetchone() if not row: return False old_status = row["status"] allowed = VALID_TRANSITIONS.get(old_status, set()) # 终态不可转换;同状态不变 if old_status in TERMINAL_STATUSES: return False if old_status == new_status: return True if new_status not in allowed: logger.warning("Invalid transition %s → %s for task %s", old_status, new_status, task_id) return False now = datetime.utcnow().isoformat() updates = {"status": new_status, "updated_at": now} if new_status == "claimed": updates["claimed_at"] = now if agent: updates["assignee"] = agent elif new_status == "working": updates["started_at"] = now elif new_status in ("done", "failed", "cancelled"): updates["completed_at"] = now elif new_status == "paused": updates["completed_at"] = now # paused 也记录时间用于恢复 updates["resumed_from"] = old_status # 记录暂停前状态 elif new_status == "pending": # 所有 →pending 转换都清空 assignee(与 ticker._transition_status L414 对齐) updates["assignee"] = None updates["claimed_at"] = None updates["current_agent"] = None if old_status == "failed": # 仅 failed→pending 递增 retry_count updates["retry_count"] = ( conn.execute("SELECT retry_count FROM tasks WHERE id=?", (task_id,)).fetchone()["retry_count"] + 1 ) set_clause = ", ".join(f"{k}=?" for k in updates) conn.execute( f"UPDATE tasks SET {set_clause} WHERE id=?", (*updates.values(), task_id), ) event_type = f"task_{new_status}" if event_type not in EVENT_TYPES: event_type = "daemon_tick" conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent, event_type, json.dumps({"from": old_status, "to": new_status, **(detail or {})})), ) conn.commit() return True finally: conn.close() def claim_task(self, task_id: str, agent_id: str) -> bool: """原子 CAS 认领""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") cursor = conn.execute( "UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now'), " "updated_at=datetime('now') " "WHERE id=? AND status='pending' AND (assignee IS NULL OR assignee=?)", (agent_id, task_id, agent_id), ) if cursor.rowcount > 0: conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent_id, "task_claimed", json.dumps({"by": agent_id})), ) conn.commit() return True conn.commit() return False finally: conn.close() def list_tasks(self, status: Optional[str] = None, assignee: Optional[str] = None, assigned_by: Optional[str] = None, parent_task: Optional[str] = None) -> List[Task]: """列出任务""" conn = self._conn() try: query = "SELECT * FROM tasks" conditions = [] params = [] if status: conditions.append("status=?") params.append(status) if assignee: conditions.append("assignee=?") params.append(assignee) if assigned_by: conditions.append("assigned_by=?") params.append(assigned_by) if parent_task is not None: conditions.append("parent_task=?") params.append(parent_task) if conditions: query += " WHERE " + " AND ".join(conditions) query += " ORDER BY priority ASC, created_at ASC" rows = conn.execute(query, params).fetchall() return [Task.from_row(r) for r in rows] finally: conn.close() def update_must_haves(self, task_id: str, must_haves: str) -> bool: """更新 Task 的 must_haves 字段(用于 Mail 元数据等)""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET must_haves=?, updated_at=datetime('now') WHERE id=?", (must_haves, task_id), ) conn.commit() return True finally: conn.close() # =================================================================== # Comment # =================================================================== def add_comment(self, task_id: str, author: str, body: str, comment_type: str = "general", mentions: Optional[List[str]] = None) -> int: """添加评论""" if comment_type not in COMMENT_TYPES: raise ValueError(f"Invalid comment_type: {comment_type}") conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") cursor = conn.execute( "INSERT INTO comments (task_id, author, comment_type, body, mentions) " "VALUES (?,?,?,?,?)", (task_id, author, comment_type, body, json.dumps(mentions or [])), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, author, "comment_added", json.dumps({"comment_type": comment_type, "body_preview": body[:100], "mentions": mentions})), ) conn.commit() return cursor.lastrowid finally: conn.close() def get_comments(self, task_id: str, comment_type: Optional[str] = None) -> List[Comment]: """获取任务评论""" conn = self._conn() try: if comment_type: rows = conn.execute( "SELECT * FROM comments WHERE task_id=? AND comment_type=? " "ORDER BY created_at ASC", (task_id, comment_type), ).fetchall() else: rows = conn.execute( "SELECT * FROM comments WHERE task_id=? ORDER BY created_at ASC", (task_id,), ).fetchall() return [Comment.from_row(r) for r in rows] finally: conn.close() # =================================================================== # Output # =================================================================== def write_output(self, task_id: str, agent: str, output_type: str, title: str, content_path: Optional[str] = None, summary: Optional[str] = None, metadata: Optional[Dict] = None, attempt_number: int = 1) -> int: """写入产出""" if output_type not in OUTPUT_TYPES: raise ValueError(f"Invalid output_type: {output_type}") conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") cursor = conn.execute( "INSERT INTO outputs " "(task_id, agent, output_type, title, content_path, summary, metadata, attempt_number) " "VALUES (?,?,?,?,?,?,?,?)", (task_id, agent, output_type, title, content_path, summary, json.dumps(metadata or {}), attempt_number), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent, "output_written", json.dumps({"output_id": title, "type": output_type})), ) conn.commit() return cursor.lastrowid finally: conn.close() def get_outputs(self, task_id: str) -> List[Output]: """获取任务产出""" conn = self._conn() try: rows = conn.execute( "SELECT * FROM outputs WHERE task_id=? ORDER BY created_at ASC", (task_id,), ).fetchall() return [Output.from_row(r) for r in rows] finally: conn.close() # =================================================================== # Decision # =================================================================== def add_decision(self, task_id: str, decider: str, decision: str, rationale: str, alternatives: Optional[List[str]] = None) -> int: conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") cursor = conn.execute( "INSERT INTO decisions (task_id, decider, decision, rationale, alternatives) " "VALUES (?,?,?,?,?)", (task_id, decider, decision, rationale, json.dumps(alternatives or [])), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, decider, "decision_recorded", json.dumps({"decision": decision})), ) conn.commit() return cursor.lastrowid finally: conn.close() def get_decisions(self, task_id: str) -> List[Decision]: conn = self._conn() try: rows = conn.execute( "SELECT * FROM decisions WHERE task_id=? ORDER BY created_at ASC", (task_id,), ).fetchall() return [Decision.from_row(r) for r in rows] finally: conn.close() # =================================================================== # Observation # =================================================================== def add_observation(self, task_id: Optional[str], observer: str, body: str, severity: str = "info") -> int: conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") cursor = conn.execute( "INSERT INTO observations (task_id, observer, severity, body) VALUES (?,?,?,?)", (task_id, observer, severity, body), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, observer, "observation_added", json.dumps({"severity": severity})), ) conn.commit() return cursor.lastrowid finally: conn.close() def get_observations(self, task_id: str, unresolved_only: bool = False) -> List[Observation]: conn = self._conn() try: query = "SELECT * FROM observations WHERE task_id=?" params: list = [task_id] if unresolved_only: query += " AND resolved_by IS NULL" query += " ORDER BY created_at ASC" rows = conn.execute(query, params).fetchall() return [Observation.from_row(r) for r in rows] finally: conn.close() # =================================================================== # Review # =================================================================== def add_review(self, review: Review) -> str: if review.review_type not in REVIEW_TYPES: raise ValueError(f"Invalid review_type: {review.review_type}") if review.verdict not in VERDICT_TYPES: raise ValueError(f"Invalid verdict: {review.verdict}") conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( """INSERT INTO reviews (id, task_id, output_id, reviewer, review_type, verdict, confidence, round, max_rounds, consensus_reached, summary, detail_path) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", (review.id, review.task_id, review.output_id, review.reviewer, review.review_type, review.verdict, review.confidence, review.round, review.max_rounds, 1 if review.consensus_reached else 0, review.summary, review.detail_path), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (review.task_id, review.reviewer, "task_reviewed", json.dumps({"verdict": review.verdict, "confidence": review.confidence})), ) conn.commit() return review.id finally: conn.close() def get_reviews(self, task_id: str) -> List[Review]: conn = self._conn() try: rows = conn.execute( "SELECT * FROM reviews WHERE task_id=? ORDER BY created_at ASC", (task_id,), ).fetchall() return [Review.from_row(r) for r in rows] finally: conn.close() # =================================================================== # Experience # =================================================================== def add_experience(self, exp: Experience) -> str: if exp.source not in EXPERIENCE_SOURCES: raise ValueError(f"Invalid source: {exp.source}") if exp.category not in EXPERIENCE_CATEGORIES: raise ValueError(f"Invalid category: {exp.category}") if exp.status not in EXPERIENCE_STATUSES: raise ValueError(f"Invalid status: {exp.status}") conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( """INSERT INTO experiences (experience_id, source, task_id, summary, category, confidence, status, skill_id, usage_count, last_used_at, created_at, created_by, updated_at, deprecated_reason) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (exp.experience_id, exp.source, exp.task_id, exp.summary, exp.category, exp.confidence, exp.status, exp.skill_id, exp.usage_count, exp.last_used_at, exp.created_at or datetime.utcnow().isoformat(), exp.created_by, exp.updated_at, exp.deprecated_reason), ) for tag in exp.tags: conn.execute( "INSERT OR IGNORE INTO experience_tags (experience_id, tag) VALUES (?,?)", (exp.experience_id, tag), ) conn.commit() return exp.experience_id finally: conn.close() def query_experiences(self, tags: Optional[List[str]] = None, status: str = "active", limit: int = 10) -> List[Experience]: conn = self._conn() try: if tags: placeholders = ",".join("?" * len(tags)) rows = conn.execute( f"""SELECT DISTINCT e.* FROM experiences e JOIN experience_tags et ON e.experience_id = et.experience_id WHERE et.tag IN ({placeholders}) AND e.status=? ORDER BY e.usage_count DESC, e.created_at DESC LIMIT ?""", (*tags, status, limit), ).fetchall() else: rows = conn.execute( "SELECT * FROM experiences WHERE status=? " "ORDER BY usage_count DESC, created_at DESC LIMIT ?", (status, limit), ).fetchall() result = [] for row in rows: exp = Experience.from_row(row) tag_rows = conn.execute( "SELECT tag FROM experience_tags WHERE experience_id=?", (exp.experience_id,), ).fetchall() exp.tags = [r["tag"] for r in tag_rows] result.append(exp) return result finally: conn.close() def touch_experience(self, experience_id: str) -> None: """增加引用计数""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE experiences SET usage_count=usage_count+1, " "last_used_at=datetime('now') WHERE experience_id=?", (experience_id,), ) conn.commit() finally: conn.close() # =================================================================== # Event # =================================================================== def add_event(self, event_type: str, task_id: Optional[str] = None, agent: Optional[str] = None, detail: Optional[Dict] = None) -> int: conn = self._conn() try: cursor = conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent, event_type, json.dumps(detail or {})), ) conn.commit() return cursor.lastrowid finally: conn.close() def get_events(self, task_id: Optional[str] = None, limit: int = 50) -> List[Event]: conn = self._conn() try: if task_id: rows = conn.execute( "SELECT * FROM events WHERE task_id=? ORDER BY created_at DESC LIMIT ?", (task_id, limit), ).fetchall() else: rows = conn.execute( "SELECT * FROM events ORDER BY created_at DESC LIMIT ?", (limit,), ).fetchall() return [Event.from_row(r) for r in rows] finally: conn.close() # =================================================================== # Task Attempt # =================================================================== def add_task_attempt(self, task_id: str, attempt_number: int, agent: str, outcome: str, metadata: Optional[Dict] = None) -> int: if outcome not in ATTEMPT_OUTCOMES: raise ValueError(f"Invalid outcome: {outcome}") conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") cursor = conn.execute( "INSERT INTO task_attempts " "(task_id, attempt_number, agent, outcome, metadata) VALUES (?,?,?,?,?)", (task_id, attempt_number, agent, outcome, json.dumps(metadata or {})), ) conn.commit() return cursor.lastrowid finally: conn.close() # =================================================================== # Agent # =================================================================== def upsert_agent(self, agent_id: str, role: Optional[str] = None, capabilities: Optional[List[str]] = None) -> None: conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( "INSERT OR REPLACE INTO agents (agent_id, role, capabilities, last_active) " "VALUES (?,?,?,datetime('now'))", (agent_id, role, json.dumps(capabilities or [])), ) conn.commit() finally: conn.close() def get_agent(self, agent_id: str) -> Optional[Dict]: conn = self._conn() try: row = conn.execute( "SELECT * FROM agents WHERE agent_id=?", (agent_id,) ).fetchone() return dict(row) if row else None finally: conn.close() def update_agent_status(self, agent_id: str, status: str, current_task: Optional[str] = None) -> None: conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE agents SET current_status=?, current_task=?, " "last_active=datetime('now') WHERE agent_id=?", (status, current_task, agent_id), ) conn.commit() finally: conn.close() # =================================================================== # Archive (v2.8) # =================================================================== def archive_task(self, task_id: str) -> bool: """归档单个任务""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET archived=1, archived_at=datetime('now'), " "updated_at=datetime('now') WHERE id=?", (task_id,), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, "daemon", "task_archived", json.dumps({"action": "archive"})), ) conn.commit() return True finally: conn.close() def unarchive_task(self, task_id: str) -> bool: """取消归档""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET archived=0, archived_at=NULL, " "updated_at=datetime('now') WHERE id=?", (task_id,), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, "daemon", "task_unarchived", json.dumps({"action": "unarchive"})), ) conn.commit() return True finally: conn.close() def archive_done_tasks(self) -> int: """一键归档所有 done 状态的任务""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") cursor = conn.execute( "UPDATE tasks SET archived=1, archived_at=datetime('now'), " "updated_at=datetime('now') " "WHERE status='done' AND archived=0" ) count = cursor.rowcount if count > 0: conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (None, "daemon", "task_archived", json.dumps({"action": "archive_done_batch", "count": count})), ) conn.commit() return count finally: conn.close() # ── Checkpoint CRUD(M3) ── def create_checkpoint( self, task_id: str, cp_type: str, title: str, payload: dict, description: str | None = None, checkpoint_id: str | None = None, ) -> dict: """创建 Checkpoint""" import uuid # BUG-33: 校验 payload 结构必须含 version 字段 if not isinstance(payload, dict) or "version" not in payload: raise ValueError("payload must be a dict containing 'version' field") cp_id = checkpoint_id or f"cp-{uuid.uuid4().hex[:8]}" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( "INSERT INTO checkpoints (id, task_id, type, title, description, payload) " "VALUES (?,?,?,?,?,?)", (cp_id, task_id, cp_type, title, description, json.dumps(payload)), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, "daemon", "checkpoint_created", json.dumps({"checkpoint_id": cp_id, "type": cp_type, "title": title})), ) conn.commit() return {"id": cp_id, "task_id": task_id, "type": cp_type, "title": title, "status": "pending"} finally: conn.close() def list_checkpoints(self, task_id: str) -> list[dict]: """列出 task 的所有 checkpoint""" conn = self._conn() try: rows = conn.execute( "SELECT * FROM checkpoints WHERE task_id=? ORDER BY created_at", (task_id,), ).fetchall() return [dict(r) for r in rows] finally: conn.close() def get_checkpoint(self, checkpoint_id: str) -> dict | None: """获取单个 checkpoint""" conn = self._conn() try: row = conn.execute( "SELECT * FROM checkpoints WHERE id=?", (checkpoint_id,) ).fetchone() return dict(row) if row else None finally: conn.close() def resolve_checkpoint( self, checkpoint_id: str, action: str, resolved_by: str = "user", note: str | None = None, ) -> dict | None: """通过/驳回 checkpoint,并自动推进 task 状态 approve: verify → done, decision/action → working reject: 一律 → working """ conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") row = conn.execute( "SELECT * FROM checkpoints WHERE id=?", (checkpoint_id,) ).fetchone() if not row: return None cp = dict(row) if cp["status"] != "pending": return {"error": f"Checkpoint already {cp['status']}"} # 更新 checkpoint 状态 new_status = "approved" if action == "approve" else "rejected" conn.execute( "UPDATE checkpoints SET status=?, resolved_at=datetime('now'), " "resolved_by=?, resolve_note=? WHERE id=?", (new_status, resolved_by, note, checkpoint_id), ) # 推进 task 状态 task_id = cp["task_id"] cp_type = cp["type"] task_row = conn.execute( "SELECT status FROM tasks WHERE id=?", (task_id,) ).fetchone() if not task_row: conn.commit() return {"error": "Task not found"} task_status = task_row["status"] # BUG-32: 安全校验 — task 必须处于 waiting_human 才能推进 if task_status != "waiting_human": conn.commit() return {"error": f"Task is {task_status}, expected waiting_human"} if action == "approve": if cp_type == "verify": new_task_status = "done" else: new_task_status = "working" else: new_task_status = "working" conn.execute( "UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?", (new_task_status, task_id), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, resolved_by, f"checkpoint_{new_status}", json.dumps({ "checkpoint_id": checkpoint_id, "action": action, "task_status": new_task_status, "note": note, })), ) conn.commit() return { "checkpoint_id": checkpoint_id, "checkpoint_status": new_status, "task_id": task_id, "task_status": new_task_status, } finally: conn.close() # =================================================================== # 四相循环 (v2.9 #01) # =================================================================== def get_subtasks_summary(self, parent_id: str) -> Optional[Dict[str, Any]]: """获取 parent task 下所有 sub task 的状态摘要 Returns: {"total": N, "done": N, "failed": N, "cancelled": N, "other": N, "all_terminal": bool} 如果 parent 不存在或没有 sub task,返回 None """ conn = self._conn() try: # 确认 parent 存在 parent = conn.execute( "SELECT id, status, round_count FROM tasks WHERE id=?", (parent_id,) ).fetchone() if not parent: return None rows = conn.execute( "SELECT status, COUNT(*) as cnt FROM tasks WHERE parent_task=? GROUP BY status", (parent_id,) ).fetchall() if not rows: return None summary = {"parent_id": parent_id, "parent_status": parent["status"], "round_count": parent["round_count"], "total": 0, "done": 0, "failed": 0, "cancelled": 0, "other": 0} for row in rows: cnt = row["cnt"] summary["total"] += cnt if row["status"] in ("done", "failed", "cancelled"): summary[row["status"]] += cnt else: summary["other"] += cnt summary["all_terminal"] = summary["other"] == 0 and summary["total"] > 0 return summary finally: conn.close() def get_aggregate_outputs(self, parent_id: str) -> List[Dict[str, Any]]: """聚合 parent task 下所有 sub task 的 outputs""" conn = self._conn() try: sub_ids = conn.execute( "SELECT id FROM tasks WHERE parent_task=?", (parent_id,) ).fetchall() if not sub_ids: return [] ids = [r["id"] for r in sub_ids] placeholders = ",".join("?" * len(ids)) rows = conn.execute( f"SELECT o.*, t.title as task_title FROM outputs o " f"JOIN tasks t ON o.task_id = t.id " f"WHERE o.task_id IN ({placeholders}) " f"ORDER BY o.created_at ASC", ids ).fetchall() return [dict(r) for r in rows] finally: conn.close() def get_round_comments(self, parent_id: str) -> List[Dict[str, Any]]: """获取 parent task 及其 sub task 的所有 comments""" conn = self._conn() try: # parent 自身的 comments + 所有 sub 的 comments rows = conn.execute( """SELECT c.*, t.title as task_title FROM comments c JOIN tasks t ON c.task_id = t.id WHERE c.task_id = ? OR c.task_id IN (SELECT id FROM tasks WHERE parent_task = ?) ORDER BY c.created_at ASC""", (parent_id, parent_id) ).fetchall() return [dict(r) for r in rows] finally: conn.close() def increment_round_count(self, parent_id: str) -> int: """递增 parent task 的 round_count,返回新值""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET round_count = round_count + 1, updated_at=datetime('now') WHERE id=?", (parent_id,) ) row = conn.execute( "SELECT round_count FROM tasks WHERE id=?", (parent_id,) ).fetchone() conn.commit() return row["round_count"] if row else 0 finally: conn.close() # =================================================================== # Mention Queue (v2.9 #01) # =================================================================== def record_mentions(self, comment_id: int, task_id: str, mentioned_agents: List[str]) -> int: """写入 mention_queue 记录(comment 创建时调用)""" if not mentioned_agents: return 0 conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") count = 0 for agent_id in mentioned_agents: exists = conn.execute( "SELECT id FROM mention_queue WHERE comment_id=? AND mentioned_agent=?", (comment_id, agent_id) ).fetchone() if not exists: conn.execute( "INSERT INTO mention_queue (comment_id, task_id, mentioned_agent, status) " "VALUES (?,?,?,'pending')", (comment_id, task_id, agent_id) ) count += 1 conn.commit() return count finally: conn.close() def get_pending_mentions(self, max_retries: int = 5) -> List[Dict[str, Any]]: """获取所有 pending 且未超过重试上限的 mentions""" conn = self._conn() try: rows = conn.execute( """SELECT mq.*, c.body as comment_body, c.author as comment_author FROM mention_queue mq JOIN comments c ON mq.comment_id = c.id WHERE mq.status = 'pending' AND mq.retry_count < ? ORDER BY mq.created_at ASC""", (max_retries,) ).fetchall() return [dict(r) for r in rows] finally: conn.close() def mark_mention_notified(self, mention_id: int) -> bool: """标记 mention 为已通知""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE mention_queue SET status='notified', notified_at=datetime('now') WHERE id=?", (mention_id,) ) conn.commit() return True finally: conn.close() def mark_mention_retry(self, mention_id: int) -> bool: """递增 mention 重试计数""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute("UPDATE mention_queue SET retry_count=retry_count+1 WHERE id=?", (mention_id,)) conn.commit() return True finally: conn.close() def mark_mention_failed(self, mention_id: int) -> bool: """标记 mention 为失败(超过重试上限)""" conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") conn.execute("UPDATE mention_queue SET status='failed' WHERE id=?", (mention_id,)) conn.commit() return True finally: conn.close()