"""黑板数据库连接管理(per-project SQLite)""" from __future__ import annotations import sqlite3 from pathlib import Path from typing import Optional def init_db(db_path: Path) -> None: """初始化数据库,创建全部表""" db_path.parent.mkdir(parents=True, exist_ok=True) conn = _connect(db_path) try: for stmt in _SCHEMA_STATEMENTS: conn.execute(stmt) # v2.6.1 迁移:旧 DB 可能缺少新字段 _migrate_v261(conn) # v2.7 迁移:stages_json _migrate_v27(conn) # v2.8 迁移:新状态 + 归档字段 _migrate_v28(conn) conn.commit() finally: conn.close() def _migrate_v261(conn: sqlite3.Connection) -> None: """v2.6.1 数据迁移:为旧 DB 添加新字段/新表(安全 ALTER)""" # tasks 表新增字段 _safe_add_column(conn, "tasks", "current_agent", "TEXT") _safe_add_column(conn, "tasks", "previous_agent", "TEXT") _safe_add_column(conn, "tasks", "next_capability", "TEXT") # routing_decisions 表(如果不存在,CREATE TABLE IF NOT EXISTS 已处理) # idx_tasks_current_agent 索引 try: conn.execute( "CREATE INDEX IF NOT EXISTS idx_tasks_current_agent ON tasks(current_agent)") except sqlite3.OperationalError: pass def _migrate_v27(conn: sqlite3.Connection) -> None: """v2.7 数据迁移:stages_json 字段(动态 Stage 定义)""" _safe_add_column(conn, "tasks", "stages_json", "TEXT DEFAULT '[]'") # 确保 stage 列存在(旧 v2.7 迁移可能已添加) _safe_add_column(conn, "tasks", "stage", "TEXT") def _migrate_v28(conn: sqlite3.Connection) -> None: """v2.8 数据迁移:3 新状态 + 归档字段 + CHECK 约束更新""" # 1. 添加归档字段 _safe_add_column(conn, "tasks", "archived", "INTEGER DEFAULT 0") _safe_add_column(conn, "tasks", "archived_at", "TEXT") # 2. CHECK 约束需要重建表(SQLite 不支持 ALTER CHECK) # 先检查当前 CHECK 是否已包含新状态 row = conn.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name='tasks'" ).fetchone() if row and "paused" not in row["sql"]: # 重建 tasks 表以更新 CHECK 约束 conn.executescript(""" CREATE TABLE tasks_v28 ( id TEXT PRIMARY KEY, title TEXT NOT NULL, description TEXT, status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending','claimed','working','review','paused','escalated','waiting_human','done','failed','blocked','cancelled')), assignee TEXT, assigned_by TEXT, depends_on TEXT, parent_task TEXT, priority INTEGER NOT NULL DEFAULT 5, task_type TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')), claimed_at TEXT, started_at TEXT, completed_at TEXT, deadline TEXT, retry_count INTEGER NOT NULL DEFAULT 0, max_retries INTEGER NOT NULL DEFAULT 2, must_haves TEXT, risk_level TEXT DEFAULT 'standard', estimated_duration_minutes INTEGER, escalated INTEGER DEFAULT 0, current_agent TEXT, previous_agent TEXT, next_capability TEXT, stage TEXT, stages_json TEXT DEFAULT '[]', archived INTEGER DEFAULT 0, archived_at TEXT ); INSERT INTO tasks_v28 SELECT * FROM tasks; DROP TABLE tasks; ALTER TABLE tasks_v28 RENAME TO tasks; """) # 重建索引 conn.executescript(""" CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status); CREATE INDEX IF NOT EXISTS idx_tasks_assignee ON tasks(assignee); CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_task); CREATE INDEX IF NOT EXISTS idx_tasks_current_agent ON tasks(current_agent); """) column: str, col_type: str) -> None: """安全添加列(已存在则跳过)""" try: conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {col_type}") except sqlite3.OperationalError as e: if "duplicate column" not in str(e).lower(): raise def get_connection(db_path: Path) -> sqlite3.Connection: """获取数据库连接(WAL + busy_timeout + foreign_keys)""" return _connect(db_path) def _connect(db_path: Path) -> sqlite3.Connection: conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.execute("PRAGMA busy_timeout=5000") return conn # --------------------------------------------------------------------------- # 状态机 # --------------------------------------------------------------------------- VALID_STATUSES = frozenset({ "pending", "claimed", "working", "review", "paused", "escalated", "waiting_human", "done", "failed", "blocked", "cancelled", }) TERMINAL_STATUSES = frozenset({"done"}) # 手动状态(不参与聚合推导) MANUAL_STATUSES = frozenset({"cancelled", "paused"}) VALID_TRANSITIONS = { "pending": {"claimed", "cancelled"}, "claimed": {"working", "paused", "pending", "cancelled"}, "working": {"review", "blocked", "failed", "paused", "escalated", "waiting_human", "cancelled"}, "paused": {"working", "cancelled"}, "review": {"done", "pending", "failed", "escalated", "waiting_human", "cancelled"}, "blocked": {"pending", "escalated", "cancelled"}, "failed": {"pending", "escalated", "cancelled"}, "escalated": {"working", "pending", "cancelled"}, "waiting_human": {"working", "done", "cancelled"}, "done": set(), "cancelled": set(), } COMMENT_TYPES = frozenset({ "general", "handoff", "observation", "rebuttal", "rebuttal_response", "debate_argument", "debate_rebuttal", "debate_judgment", }) SEVERITY_LEVELS = frozenset({"blocking", "warning", "info", "audit"}) EVENT_TYPES = frozenset({ "task_created", "task_claimed", "task_started", "task_completed", "task_failed", "task_blocked", "task_unblocked", "task_reviewed", "task_cancelled", "task_retried", "comment_added", "output_written", "observation_added", "decision_recorded", "agent_spawned", "agent_completed", "agent_zombie_detected", "session_spawned", "session_archived", "session_cleanup", "daemon_tick", "daemon_manual_tick", }) OUTPUT_TYPES = frozenset({"code", "document", "data", "config", "other"}) REVIEW_TYPES = frozenset({"plan_review", "output_review", "guardrail", "final_review"}) VERDICT_TYPES = frozenset({"approved", "rejected", "needs_revision"}) EXPERIENCE_SOURCES = frozenset({ "task_completion", "error_correction", "review_finding", "manual", }) EXPERIENCE_CATEGORIES = frozenset({ "pitfall", "best_practice", "pattern", "anti_pattern", }) EXPERIENCE_STATUSES = frozenset({"draft", "active", "deprecated"}) ATTEMPT_OUTCOMES = frozenset({ "completed", "blocked", "crashed", "timed_out", "spawn_failed", "reclaimed", }) # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- _SCHEMA_STATEMENTS = [ # tasks """CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, title TEXT NOT NULL, description TEXT, status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending','claimed','working','review','paused','escalated','waiting_human','done','failed','blocked','cancelled')), assignee TEXT, assigned_by TEXT, depends_on TEXT, parent_task TEXT, priority INTEGER NOT NULL DEFAULT 5, task_type TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')), claimed_at TEXT, started_at TEXT, completed_at TEXT, deadline TEXT, retry_count INTEGER NOT NULL DEFAULT 0, max_retries INTEGER NOT NULL DEFAULT 2, must_haves TEXT, risk_level TEXT DEFAULT 'standard', estimated_duration_minutes INTEGER, escalated INTEGER DEFAULT 0, current_agent TEXT, previous_agent TEXT, next_capability TEXT, stage TEXT, stages_json TEXT DEFAULT '[]' )""", "CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)", "CREATE INDEX IF NOT EXISTS idx_tasks_assignee ON tasks(assignee)", "CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_task)", # comments """CREATE TABLE IF NOT EXISTS comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL REFERENCES tasks(id), author TEXT NOT NULL, comment_type TEXT NOT NULL DEFAULT 'general' CHECK (comment_type IN ('general','handoff','observation','rebuttal','rebuttal_response','debate_argument','debate_rebuttal','debate_judgment')), body TEXT NOT NULL, mentions TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) )""", "CREATE INDEX IF NOT EXISTS idx_comments_task ON comments(task_id)", "CREATE INDEX IF NOT EXISTS idx_comments_type ON comments(task_id, comment_type)", "CREATE INDEX IF NOT EXISTS idx_comments_author ON comments(author)", # outputs """CREATE TABLE IF NOT EXISTS outputs ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL REFERENCES tasks(id), agent TEXT NOT NULL, output_type TEXT NOT NULL CHECK (output_type IN ('code','document','data','config','other')), title TEXT NOT NULL, content_path TEXT, summary TEXT, metadata TEXT, attempt_number INTEGER DEFAULT 1, created_at TEXT NOT NULL DEFAULT (datetime('now')) )""", "CREATE INDEX IF NOT EXISTS idx_outputs_task ON outputs(task_id)", # decisions """CREATE TABLE IF NOT EXISTS decisions ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL REFERENCES tasks(id), decider TEXT NOT NULL, decision TEXT NOT NULL, rationale TEXT NOT NULL, alternatives TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) )""", "CREATE INDEX IF NOT EXISTS idx_decisions_task ON decisions(task_id)", # observations """CREATE TABLE IF NOT EXISTS observations ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT REFERENCES tasks(id), observer TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'info' CHECK (severity IN ('blocking','warning','info','audit')), body TEXT NOT NULL, resolved_by TEXT, resolved_at TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) )""", # events """CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT, agent TEXT, event_type TEXT NOT NULL, detail TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) )""", "CREATE INDEX IF NOT EXISTS idx_events_task ON events(task_id)", "CREATE INDEX IF NOT EXISTS idx_events_time ON events(created_at)", # agents """CREATE TABLE IF NOT EXISTS agents ( agent_id TEXT PRIMARY KEY, role TEXT, current_status TEXT DEFAULT 'idle', current_task TEXT, last_active TEXT, capabilities TEXT )""", # task_attempts """CREATE TABLE IF NOT EXISTS task_attempts ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL REFERENCES tasks(id), attempt_number INTEGER NOT NULL, agent TEXT NOT NULL, outcome TEXT NOT NULL CHECK (outcome IN ('completed','blocked','crashed','timed_out','spawn_failed','reclaimed')), exit_code INTEGER, log_path TEXT, summary TEXT, metadata TEXT, started_at TEXT NOT NULL DEFAULT (datetime('now')), completed_at TEXT )""", "CREATE INDEX IF NOT EXISTS idx_attempts_task ON task_attempts(task_id)", # reviews """CREATE TABLE IF NOT EXISTS reviews ( id TEXT PRIMARY KEY, task_id TEXT NOT NULL REFERENCES tasks(id), output_id TEXT, reviewer TEXT NOT NULL, review_type TEXT NOT NULL CHECK (review_type IN ('plan_review','output_review','guardrail','final_review')), verdict TEXT NOT NULL CHECK (verdict IN ('approved','rejected','needs_revision')), confidence REAL, round INTEGER NOT NULL DEFAULT 1, max_rounds INTEGER NOT NULL DEFAULT 3, consensus_reached INTEGER DEFAULT 0, summary TEXT NOT NULL, detail_path TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) )""", "CREATE INDEX IF NOT EXISTS idx_reviews_task ON reviews(task_id)", "CREATE INDEX IF NOT EXISTS idx_reviews_output ON reviews(output_id)", # experiences """CREATE TABLE IF NOT EXISTS experiences ( experience_id TEXT PRIMARY KEY, source TEXT NOT NULL CHECK (source IN ('task_completion','error_correction','review_finding','manual')), task_id TEXT, summary TEXT NOT NULL, category TEXT NOT NULL CHECK (category IN ('pitfall','best_practice','pattern','anti_pattern')), confidence REAL DEFAULT 0.8, status TEXT DEFAULT 'active' CHECK (status IN ('draft','active','deprecated')), skill_id TEXT, usage_count INTEGER DEFAULT 0, last_used_at TEXT, created_at TEXT NOT NULL, created_by TEXT NOT NULL, updated_at TEXT, deprecated_reason TEXT )""", # experience_tags """CREATE TABLE IF NOT EXISTS experience_tags ( experience_id TEXT NOT NULL REFERENCES experiences(experience_id), tag TEXT NOT NULL, PRIMARY KEY (experience_id, tag) )""", "CREATE INDEX IF NOT EXISTS idx_exptags_tag ON experience_tags(tag)", # routing_decisions — 路由审计日志(v2.6.1) """CREATE TABLE IF NOT EXISTS routing_decisions ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL REFERENCES tasks(id), from_status TEXT, to_status TEXT, mode TEXT NOT NULL CHECK (mode IN ('deterministic','agent_handoff','llm_route','fallback')), selected_agent TEXT NOT NULL, previous_agent TEXT, reason TEXT, confidence REAL, model TEXT, latency_ms INTEGER, task_type TEXT, requested_capability TEXT, outcome TEXT, detail TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) )""", "CREATE INDEX IF NOT EXISTS idx_routing_task ON routing_decisions(task_id)", "CREATE INDEX IF NOT EXISTS idx_routing_mode ON routing_decisions(mode)", "CREATE INDEX IF NOT EXISTS idx_routing_time ON routing_decisions(created_at)", ]