From 7792c4ef4c9b2e146f6d17abaf2eadf188428d98 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 17 May 2026 00:36:11 +0800 Subject: [PATCH] auto-sync: 2026-05-17 00:36:11 --- src/blackboard/operations.py | 595 +++++++++++++++++++++++++++++++++++ 1 file changed, 595 insertions(+) create mode 100644 src/blackboard/operations.py diff --git a/src/blackboard/operations.py b/src/blackboard/operations.py new file mode 100644 index 0000000..90a8865 --- /dev/null +++ b/src/blackboard/operations.py @@ -0,0 +1,595 @@ +"""黑板写操作(CRUD)""" + +from __future__ import annotations + +import json +import logging +import sqlite3 +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +from .db import ( + SCHEMA_SQL, + VALID_TRANSITIONS, + VALID_STATUSES, + COMMENT_TYPES, + EVENT_TYPES, + OUTPUT_TYPES, + REVIEW_TYPES, + VERDICT_TYPES, + EXPERIENCE_SOURCES, + EXPERIENCE_CATEGORIES, + EXPERIENCE_STATUSES, + ATTEMPT_OUTCOMES, + TERMINAL_STATUSES, + get_connection, + init_db, +) +from .models import ( + Task, Comment, Output, Decision, Observation, Event, + Review, Experience, +) + +logger = logging.getLogger("moziplus-v2.blackboard") + + +class Blackboard: + """黑板操作封装(per-project)""" + + def __init__(self, db_path: Path): + self.db_path = db_path + init_db(db_path) + + def _conn(self) -> sqlite3.Connection: + return get_connection(self.db_path) + + # =================================================================== + # Task CRUD + # =================================================================== + + def create_task(self, task: Task) -> Task: + """创建任务""" + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + """INSERT INTO tasks + (id, title, description, status, assignee, assigned_by, + depends_on, parent_task, priority, task_type, deadline, + retry_count, max_retries, must_haves, risk_level, + estimated_duration_minutes, escalated) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", + (task.id, task.title, task.description, task.status, + task.assignee, task.assigned_by, + task.depends_on, task.parent_task, task.priority, + task.task_type, task.deadline, + task.retry_count, task.max_retries, task.must_haves, + task.risk_level, task.estimated_duration_minutes, + 1 if task.escalated else 0), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task.id, task.assigned_by, "task_created", + json.dumps({"title": task.title, "task_type": task.task_type})), + ) + conn.commit() + return task + finally: + conn.close() + + def get_task(self, task_id: str) -> Optional[Task]: + """获取单个任务""" + conn = self._conn() + try: + row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone() + return Task.from_row(row) if row else None + finally: + conn.close() + + def update_task_status(self, task_id: str, new_status: str, + agent: Optional[str] = None, + detail: Optional[Dict] = None) -> bool: + """更新任务状态(校验合法转换)""" + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,) + ).fetchone() + if not row: + return False + + old_status = row["status"] + allowed = VALID_TRANSITIONS.get(old_status, set()) + + # 终态不可转换;同状态不变 + if old_status in TERMINAL_STATUSES: + return False + if old_status == new_status: + return True + if new_status not in allowed: + logger.warning("Invalid transition %s → %s for task %s", + old_status, new_status, task_id) + return False + + now = datetime.utcnow().isoformat() + updates = {"status": new_status, "updated_at": now} + if new_status == "claimed": + updates["claimed_at"] = now + if agent: + updates["assignee"] = agent + elif new_status == "working": + updates["started_at"] = now + elif new_status in ("done", "failed", "cancelled"): + updates["completed_at"] = now + elif new_status == "pending" and old_status == "failed": + updates["retry_count"] = ( + conn.execute("SELECT retry_count FROM tasks WHERE id=?", + (task_id,)).fetchone()["retry_count"] + 1 + ) + + set_clause = ", ".join(f"{k}=?" for k in updates) + conn.execute( + f"UPDATE tasks SET {set_clause} WHERE id=?", + (*updates.values(), task_id), + ) + + event_type = f"task_{new_status}" + if event_type not in EVENT_TYPES: + event_type = "daemon_tick" + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, agent, event_type, + json.dumps({"from": old_status, "to": new_status, **(detail or {})})), + ) + conn.commit() + return True + finally: + conn.close() + + def claim_task(self, task_id: str, agent_id: str) -> bool: + """原子 CAS 认领""" + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + cursor = conn.execute( + "UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now'), " + "updated_at=datetime('now') " + "WHERE id=? AND status='pending' AND (assignee IS NULL OR assignee=?)", + (agent_id, task_id, agent_id), + ) + if cursor.rowcount > 0: + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, agent_id, "task_claimed", + json.dumps({"by": agent_id})), + ) + conn.commit() + return True + conn.commit() + return False + finally: + conn.close() + + def list_tasks(self, status: Optional[str] = None, + assignee: Optional[str] = None) -> List[Task]: + """列出任务""" + conn = self._conn() + try: + query = "SELECT * FROM tasks" + conditions = [] + params = [] + if status: + conditions.append("status=?") + params.append(status) + if assignee: + conditions.append("assignee=?") + params.append(assignee) + if conditions: + query += " WHERE " + " AND ".join(conditions) + query += " ORDER BY priority ASC, created_at ASC" + rows = conn.execute(query, params).fetchall() + return [Task.from_row(r) for r in rows] + finally: + conn.close() + + # =================================================================== + # Comment + # =================================================================== + + def add_comment(self, task_id: str, author: str, body: str, + comment_type: str = "general", + mentions: Optional[List[str]] = None) -> int: + """添加评论""" + if comment_type not in COMMENT_TYPES: + raise ValueError(f"Invalid comment_type: {comment_type}") + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + cursor = conn.execute( + "INSERT INTO comments (task_id, author, comment_type, body, mentions) " + "VALUES (?,?,?,?,?)", + (task_id, author, comment_type, body, + json.dumps(mentions or [])), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, author, "comment_added", + json.dumps({"comment_type": comment_type, + "body_preview": body[:100], + "mentions": mentions})), + ) + conn.commit() + return cursor.lastrowid + finally: + conn.close() + + def get_comments(self, task_id: str, + comment_type: Optional[str] = None) -> List[Comment]: + """获取任务评论""" + conn = self._conn() + try: + if comment_type: + rows = conn.execute( + "SELECT * FROM comments WHERE task_id=? AND comment_type=? " + "ORDER BY created_at ASC", + (task_id, comment_type), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM comments WHERE task_id=? ORDER BY created_at ASC", + (task_id,), + ).fetchall() + return [Comment.from_row(r) for r in rows] + finally: + conn.close() + + # =================================================================== + # Output + # =================================================================== + + def write_output(self, task_id: str, agent: str, output_type: str, + title: str, content_path: Optional[str] = None, + summary: Optional[str] = None, + metadata: Optional[Dict] = None, + attempt_number: int = 1) -> int: + """写入产出""" + if output_type not in OUTPUT_TYPES: + raise ValueError(f"Invalid output_type: {output_type}") + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + cursor = conn.execute( + "INSERT INTO outputs " + "(task_id, agent, output_type, title, content_path, summary, metadata, attempt_number) " + "VALUES (?,?,?,?,?,?,?,?)", + (task_id, agent, output_type, title, content_path, summary, + json.dumps(metadata or {}), attempt_number), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, agent, "output_written", + json.dumps({"output_id": title, "type": output_type})), + ) + conn.commit() + return cursor.lastrowid + finally: + conn.close() + + def get_outputs(self, task_id: str) -> List[Output]: + """获取任务产出""" + conn = self._conn() + try: + rows = conn.execute( + "SELECT * FROM outputs WHERE task_id=? ORDER BY created_at ASC", + (task_id,), + ).fetchall() + return [Output.from_row(r) for r in rows] + finally: + conn.close() + + # =================================================================== + # Decision + # =================================================================== + + def add_decision(self, task_id: str, decider: str, decision: str, + rationale: str, + alternatives: Optional[List[str]] = None) -> int: + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + cursor = conn.execute( + "INSERT INTO decisions (task_id, decider, decision, rationale, alternatives) " + "VALUES (?,?,?,?,?)", + (task_id, decider, decision, rationale, + json.dumps(alternatives or [])), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, decider, "decision_recorded", + json.dumps({"decision": decision})), + ) + conn.commit() + return cursor.lastrowid + finally: + conn.close() + + def get_decisions(self, task_id: str) -> List[Decision]: + conn = self._conn() + try: + rows = conn.execute( + "SELECT * FROM decisions WHERE task_id=? ORDER BY created_at ASC", + (task_id,), + ).fetchall() + return [Decision.from_row(r) for r in rows] + finally: + conn.close() + + # =================================================================== + # Observation + # =================================================================== + + def add_observation(self, task_id: str, observer: str, body: str, + severity: str = "info") -> int: + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + cursor = conn.execute( + "INSERT INTO observations (task_id, observer, severity, body) VALUES (?,?,?,?)", + (task_id, observer, severity, body), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, observer, "observation_added", + json.dumps({"severity": severity})), + ) + conn.commit() + return cursor.lastrowid + finally: + conn.close() + + def get_observations(self, task_id: str, + unresolved_only: bool = False) -> List[Observation]: + conn = self._conn() + try: + query = "SELECT * FROM observations WHERE task_id=?" + params: list = [task_id] + if unresolved_only: + query += " AND resolved_by IS NULL" + query += " ORDER BY created_at ASC" + rows = conn.execute(query, params).fetchall() + return [Observation.from_row(r) for r in rows] + finally: + conn.close() + + # =================================================================== + # Review + # =================================================================== + + def add_review(self, review: Review) -> str: + if review.review_type not in REVIEW_TYPES: + raise ValueError(f"Invalid review_type: {review.review_type}") + if review.verdict not in VERDICT_TYPES: + raise ValueError(f"Invalid verdict: {review.verdict}") + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + """INSERT INTO reviews + (id, task_id, output_id, reviewer, review_type, verdict, + confidence, round, max_rounds, consensus_reached, + summary, detail_path) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", + (review.id, review.task_id, review.output_id, review.reviewer, + review.review_type, review.verdict, + review.confidence, review.round, review.max_rounds, + 1 if review.consensus_reached else 0, + review.summary, review.detail_path), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (review.task_id, review.reviewer, "task_reviewed", + json.dumps({"verdict": review.verdict, "confidence": review.confidence})), + ) + conn.commit() + return review.id + finally: + conn.close() + + def get_reviews(self, task_id: str) -> List[Review]: + conn = self._conn() + try: + rows = conn.execute( + "SELECT * FROM reviews WHERE task_id=? ORDER BY created_at ASC", + (task_id,), + ).fetchall() + return [Review.from_row(r) for r in rows] + finally: + conn.close() + + # =================================================================== + # Experience + # =================================================================== + + def add_experience(self, exp: Experience) -> str: + if exp.source not in EXPERIENCE_SOURCES: + raise ValueError(f"Invalid source: {exp.source}") + if exp.category not in EXPERIENCE_CATEGORIES: + raise ValueError(f"Invalid category: {exp.category}") + if exp.status not in EXPERIENCE_STATUSES: + raise ValueError(f"Invalid status: {exp.status}") + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + """INSERT INTO experiences + (experience_id, source, task_id, summary, category, confidence, + status, skill_id, usage_count, last_used_at, created_at, created_by, + updated_at, deprecated_reason) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", + (exp.experience_id, exp.source, exp.task_id, exp.summary, + exp.category, exp.confidence, exp.status, exp.skill_id, + exp.usage_count, exp.last_used_at, + exp.created_at or datetime.utcnow().isoformat(), + exp.created_by, exp.updated_at, exp.deprecated_reason), + ) + for tag in exp.tags: + conn.execute( + "INSERT OR IGNORE INTO experience_tags (experience_id, tag) VALUES (?,?)", + (exp.experience_id, tag), + ) + conn.commit() + return exp.experience_id + finally: + conn.close() + + def query_experiences(self, tags: Optional[List[str]] = None, + status: str = "active", + limit: int = 10) -> List[Experience]: + conn = self._conn() + try: + if tags: + placeholders = ",".join("?" * len(tags)) + rows = conn.execute( + f"""SELECT DISTINCT e.* FROM experiences e + JOIN experience_tags et ON e.experience_id = et.experience_id + WHERE et.tag IN ({placeholders}) AND e.status=? + ORDER BY e.usage_count DESC, e.created_at DESC + LIMIT ?""", + (*tags, status, limit), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM experiences WHERE status=? " + "ORDER BY usage_count DESC, created_at DESC LIMIT ?", + (status, limit), + ).fetchall() + + result = [] + for row in rows: + exp = Experience.from_row(row) + tag_rows = conn.execute( + "SELECT tag FROM experience_tags WHERE experience_id=?", + (exp.experience_id,), + ).fetchall() + exp.tags = [r["tag"] for r in tag_rows] + result.append(exp) + return result + finally: + conn.close() + + def touch_experience(self, experience_id: str) -> None: + """增加引用计数""" + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + "UPDATE experiences SET usage_count=usage_count+1, " + "last_used_at=datetime('now') WHERE experience_id=?", + (experience_id,), + ) + conn.commit() + finally: + conn.close() + + # =================================================================== + # Event + # =================================================================== + + def add_event(self, event_type: str, task_id: Optional[str] = None, + agent: Optional[str] = None, + detail: Optional[Dict] = None) -> int: + conn = self._conn() + try: + cursor = conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, agent, event_type, json.dumps(detail or {})), + ) + conn.commit() + return cursor.lastrowid + finally: + conn.close() + + def get_events(self, task_id: Optional[str] = None, + limit: int = 50) -> List[Event]: + conn = self._conn() + try: + if task_id: + rows = conn.execute( + "SELECT * FROM events WHERE task_id=? ORDER BY created_at DESC LIMIT ?", + (task_id, limit), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM events ORDER BY created_at DESC LIMIT ?", + (limit,), + ).fetchall() + return [Event.from_row(r) for r in rows] + finally: + conn.close() + + # =================================================================== + # Task Attempt + # =================================================================== + + def add_task_attempt(self, task_id: str, attempt_number: int, + agent: str, outcome: str, + metadata: Optional[Dict] = None) -> int: + if outcome not in ATTEMPT_OUTCOMES: + raise ValueError(f"Invalid outcome: {outcome}") + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + cursor = conn.execute( + "INSERT INTO task_attempts " + "(task_id, attempt_number, agent, outcome, metadata) VALUES (?,?,?,?,?)", + (task_id, attempt_number, agent, outcome, + json.dumps(metadata or {})), + ) + conn.commit() + return cursor.lastrowid + finally: + conn.close() + + # =================================================================== + # Agent + # =================================================================== + + def upsert_agent(self, agent_id: str, role: Optional[str] = None, + capabilities: Optional[List[str]] = None) -> None: + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + "INSERT OR REPLACE INTO agents (agent_id, role, capabilities, last_active) " + "VALUES (?,?,?,datetime('now'))", + (agent_id, role, json.dumps(capabilities or [])), + ) + conn.commit() + finally: + conn.close() + + def get_agent(self, agent_id: str) -> Optional[Dict]: + conn = self._conn() + try: + row = conn.execute( + "SELECT * FROM agents WHERE agent_id=?", (agent_id,) + ).fetchone() + return dict(row) if row else None + finally: + conn.close() + + def update_agent_status(self, agent_id: str, status: str, + current_task: Optional[str] = None) -> None: + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + "UPDATE agents SET current_status=?, current_task=?, " + "last_active=datetime('now') WHERE agent_id=?", + (status, current_task, agent_id), + ) + conn.commit() + finally: + conn.close()