auto-sync: 2026-05-17 00:36:11

This commit is contained in:
cfdaily
2026-05-17 00:36:11 +08:00
parent 78233c9fbe
commit 7792c4ef4c
+595
View File
@@ -0,0 +1,595 @@
"""黑板写操作(CRUD"""
from __future__ import annotations
import json
import logging
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from .db import (
SCHEMA_SQL,
VALID_TRANSITIONS,
VALID_STATUSES,
COMMENT_TYPES,
EVENT_TYPES,
OUTPUT_TYPES,
REVIEW_TYPES,
VERDICT_TYPES,
EXPERIENCE_SOURCES,
EXPERIENCE_CATEGORIES,
EXPERIENCE_STATUSES,
ATTEMPT_OUTCOMES,
TERMINAL_STATUSES,
get_connection,
init_db,
)
from .models import (
Task, Comment, Output, Decision, Observation, Event,
Review, Experience,
)
logger = logging.getLogger("moziplus-v2.blackboard")
class Blackboard:
"""黑板操作封装(per-project"""
def __init__(self, db_path: Path):
self.db_path = db_path
init_db(db_path)
def _conn(self) -> sqlite3.Connection:
return get_connection(self.db_path)
# ===================================================================
# Task CRUD
# ===================================================================
def create_task(self, task: Task) -> Task:
"""创建任务"""
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"""INSERT INTO tasks
(id, title, description, status, assignee, assigned_by,
depends_on, parent_task, priority, task_type, deadline,
retry_count, max_retries, must_haves, risk_level,
estimated_duration_minutes, escalated)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(task.id, task.title, task.description, task.status,
task.assignee, task.assigned_by,
task.depends_on, task.parent_task, task.priority,
task.task_type, task.deadline,
task.retry_count, task.max_retries, task.must_haves,
task.risk_level, task.estimated_duration_minutes,
1 if task.escalated else 0),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task.id, task.assigned_by, "task_created",
json.dumps({"title": task.title, "task_type": task.task_type})),
)
conn.commit()
return task
finally:
conn.close()
def get_task(self, task_id: str) -> Optional[Task]:
"""获取单个任务"""
conn = self._conn()
try:
row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
return Task.from_row(row) if row else None
finally:
conn.close()
def update_task_status(self, task_id: str, new_status: str,
agent: Optional[str] = None,
detail: Optional[Dict] = None) -> bool:
"""更新任务状态(校验合法转换)"""
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not row:
return False
old_status = row["status"]
allowed = VALID_TRANSITIONS.get(old_status, set())
# 终态不可转换;同状态不变
if old_status in TERMINAL_STATUSES:
return False
if old_status == new_status:
return True
if new_status not in allowed:
logger.warning("Invalid transition %s%s for task %s",
old_status, new_status, task_id)
return False
now = datetime.utcnow().isoformat()
updates = {"status": new_status, "updated_at": now}
if new_status == "claimed":
updates["claimed_at"] = now
if agent:
updates["assignee"] = agent
elif new_status == "working":
updates["started_at"] = now
elif new_status in ("done", "failed", "cancelled"):
updates["completed_at"] = now
elif new_status == "pending" and old_status == "failed":
updates["retry_count"] = (
conn.execute("SELECT retry_count FROM tasks WHERE id=?",
(task_id,)).fetchone()["retry_count"] + 1
)
set_clause = ", ".join(f"{k}=?" for k in updates)
conn.execute(
f"UPDATE tasks SET {set_clause} WHERE id=?",
(*updates.values(), task_id),
)
event_type = f"task_{new_status}"
if event_type not in EVENT_TYPES:
event_type = "daemon_tick"
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, event_type,
json.dumps({"from": old_status, "to": new_status, **(detail or {})})),
)
conn.commit()
return True
finally:
conn.close()
def claim_task(self, task_id: str, agent_id: str) -> bool:
"""原子 CAS 认领"""
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
cursor = conn.execute(
"UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now'), "
"updated_at=datetime('now') "
"WHERE id=? AND status='pending' AND (assignee IS NULL OR assignee=?)",
(agent_id, task_id, agent_id),
)
if cursor.rowcount > 0:
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent_id, "task_claimed",
json.dumps({"by": agent_id})),
)
conn.commit()
return True
conn.commit()
return False
finally:
conn.close()
def list_tasks(self, status: Optional[str] = None,
assignee: Optional[str] = None) -> List[Task]:
"""列出任务"""
conn = self._conn()
try:
query = "SELECT * FROM tasks"
conditions = []
params = []
if status:
conditions.append("status=?")
params.append(status)
if assignee:
conditions.append("assignee=?")
params.append(assignee)
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY priority ASC, created_at ASC"
rows = conn.execute(query, params).fetchall()
return [Task.from_row(r) for r in rows]
finally:
conn.close()
# ===================================================================
# Comment
# ===================================================================
def add_comment(self, task_id: str, author: str, body: str,
comment_type: str = "general",
mentions: Optional[List[str]] = None) -> int:
"""添加评论"""
if comment_type not in COMMENT_TYPES:
raise ValueError(f"Invalid comment_type: {comment_type}")
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
cursor = conn.execute(
"INSERT INTO comments (task_id, author, comment_type, body, mentions) "
"VALUES (?,?,?,?,?)",
(task_id, author, comment_type, body,
json.dumps(mentions or [])),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, author, "comment_added",
json.dumps({"comment_type": comment_type,
"body_preview": body[:100],
"mentions": mentions})),
)
conn.commit()
return cursor.lastrowid
finally:
conn.close()
def get_comments(self, task_id: str,
comment_type: Optional[str] = None) -> List[Comment]:
"""获取任务评论"""
conn = self._conn()
try:
if comment_type:
rows = conn.execute(
"SELECT * FROM comments WHERE task_id=? AND comment_type=? "
"ORDER BY created_at ASC",
(task_id, comment_type),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM comments WHERE task_id=? ORDER BY created_at ASC",
(task_id,),
).fetchall()
return [Comment.from_row(r) for r in rows]
finally:
conn.close()
# ===================================================================
# Output
# ===================================================================
def write_output(self, task_id: str, agent: str, output_type: str,
title: str, content_path: Optional[str] = None,
summary: Optional[str] = None,
metadata: Optional[Dict] = None,
attempt_number: int = 1) -> int:
"""写入产出"""
if output_type not in OUTPUT_TYPES:
raise ValueError(f"Invalid output_type: {output_type}")
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
cursor = conn.execute(
"INSERT INTO outputs "
"(task_id, agent, output_type, title, content_path, summary, metadata, attempt_number) "
"VALUES (?,?,?,?,?,?,?,?)",
(task_id, agent, output_type, title, content_path, summary,
json.dumps(metadata or {}), attempt_number),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, "output_written",
json.dumps({"output_id": title, "type": output_type})),
)
conn.commit()
return cursor.lastrowid
finally:
conn.close()
def get_outputs(self, task_id: str) -> List[Output]:
"""获取任务产出"""
conn = self._conn()
try:
rows = conn.execute(
"SELECT * FROM outputs WHERE task_id=? ORDER BY created_at ASC",
(task_id,),
).fetchall()
return [Output.from_row(r) for r in rows]
finally:
conn.close()
# ===================================================================
# Decision
# ===================================================================
def add_decision(self, task_id: str, decider: str, decision: str,
rationale: str,
alternatives: Optional[List[str]] = None) -> int:
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
cursor = conn.execute(
"INSERT INTO decisions (task_id, decider, decision, rationale, alternatives) "
"VALUES (?,?,?,?,?)",
(task_id, decider, decision, rationale,
json.dumps(alternatives or [])),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, decider, "decision_recorded",
json.dumps({"decision": decision})),
)
conn.commit()
return cursor.lastrowid
finally:
conn.close()
def get_decisions(self, task_id: str) -> List[Decision]:
conn = self._conn()
try:
rows = conn.execute(
"SELECT * FROM decisions WHERE task_id=? ORDER BY created_at ASC",
(task_id,),
).fetchall()
return [Decision.from_row(r) for r in rows]
finally:
conn.close()
# ===================================================================
# Observation
# ===================================================================
def add_observation(self, task_id: str, observer: str, body: str,
severity: str = "info") -> int:
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
cursor = conn.execute(
"INSERT INTO observations (task_id, observer, severity, body) VALUES (?,?,?,?)",
(task_id, observer, severity, body),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, observer, "observation_added",
json.dumps({"severity": severity})),
)
conn.commit()
return cursor.lastrowid
finally:
conn.close()
def get_observations(self, task_id: str,
unresolved_only: bool = False) -> List[Observation]:
conn = self._conn()
try:
query = "SELECT * FROM observations WHERE task_id=?"
params: list = [task_id]
if unresolved_only:
query += " AND resolved_by IS NULL"
query += " ORDER BY created_at ASC"
rows = conn.execute(query, params).fetchall()
return [Observation.from_row(r) for r in rows]
finally:
conn.close()
# ===================================================================
# Review
# ===================================================================
def add_review(self, review: Review) -> str:
if review.review_type not in REVIEW_TYPES:
raise ValueError(f"Invalid review_type: {review.review_type}")
if review.verdict not in VERDICT_TYPES:
raise ValueError(f"Invalid verdict: {review.verdict}")
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"""INSERT INTO reviews
(id, task_id, output_id, reviewer, review_type, verdict,
confidence, round, max_rounds, consensus_reached,
summary, detail_path)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(review.id, review.task_id, review.output_id, review.reviewer,
review.review_type, review.verdict,
review.confidence, review.round, review.max_rounds,
1 if review.consensus_reached else 0,
review.summary, review.detail_path),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(review.task_id, review.reviewer, "task_reviewed",
json.dumps({"verdict": review.verdict, "confidence": review.confidence})),
)
conn.commit()
return review.id
finally:
conn.close()
def get_reviews(self, task_id: str) -> List[Review]:
conn = self._conn()
try:
rows = conn.execute(
"SELECT * FROM reviews WHERE task_id=? ORDER BY created_at ASC",
(task_id,),
).fetchall()
return [Review.from_row(r) for r in rows]
finally:
conn.close()
# ===================================================================
# Experience
# ===================================================================
def add_experience(self, exp: Experience) -> str:
if exp.source not in EXPERIENCE_SOURCES:
raise ValueError(f"Invalid source: {exp.source}")
if exp.category not in EXPERIENCE_CATEGORIES:
raise ValueError(f"Invalid category: {exp.category}")
if exp.status not in EXPERIENCE_STATUSES:
raise ValueError(f"Invalid status: {exp.status}")
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"""INSERT INTO experiences
(experience_id, source, task_id, summary, category, confidence,
status, skill_id, usage_count, last_used_at, created_at, created_by,
updated_at, deprecated_reason)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(exp.experience_id, exp.source, exp.task_id, exp.summary,
exp.category, exp.confidence, exp.status, exp.skill_id,
exp.usage_count, exp.last_used_at,
exp.created_at or datetime.utcnow().isoformat(),
exp.created_by, exp.updated_at, exp.deprecated_reason),
)
for tag in exp.tags:
conn.execute(
"INSERT OR IGNORE INTO experience_tags (experience_id, tag) VALUES (?,?)",
(exp.experience_id, tag),
)
conn.commit()
return exp.experience_id
finally:
conn.close()
def query_experiences(self, tags: Optional[List[str]] = None,
status: str = "active",
limit: int = 10) -> List[Experience]:
conn = self._conn()
try:
if tags:
placeholders = ",".join("?" * len(tags))
rows = conn.execute(
f"""SELECT DISTINCT e.* FROM experiences e
JOIN experience_tags et ON e.experience_id = et.experience_id
WHERE et.tag IN ({placeholders}) AND e.status=?
ORDER BY e.usage_count DESC, e.created_at DESC
LIMIT ?""",
(*tags, status, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM experiences WHERE status=? "
"ORDER BY usage_count DESC, created_at DESC LIMIT ?",
(status, limit),
).fetchall()
result = []
for row in rows:
exp = Experience.from_row(row)
tag_rows = conn.execute(
"SELECT tag FROM experience_tags WHERE experience_id=?",
(exp.experience_id,),
).fetchall()
exp.tags = [r["tag"] for r in tag_rows]
result.append(exp)
return result
finally:
conn.close()
def touch_experience(self, experience_id: str) -> None:
"""增加引用计数"""
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE experiences SET usage_count=usage_count+1, "
"last_used_at=datetime('now') WHERE experience_id=?",
(experience_id,),
)
conn.commit()
finally:
conn.close()
# ===================================================================
# Event
# ===================================================================
def add_event(self, event_type: str, task_id: Optional[str] = None,
agent: Optional[str] = None,
detail: Optional[Dict] = None) -> int:
conn = self._conn()
try:
cursor = conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, event_type, json.dumps(detail or {})),
)
conn.commit()
return cursor.lastrowid
finally:
conn.close()
def get_events(self, task_id: Optional[str] = None,
limit: int = 50) -> List[Event]:
conn = self._conn()
try:
if task_id:
rows = conn.execute(
"SELECT * FROM events WHERE task_id=? ORDER BY created_at DESC LIMIT ?",
(task_id, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM events ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [Event.from_row(r) for r in rows]
finally:
conn.close()
# ===================================================================
# Task Attempt
# ===================================================================
def add_task_attempt(self, task_id: str, attempt_number: int,
agent: str, outcome: str,
metadata: Optional[Dict] = None) -> int:
if outcome not in ATTEMPT_OUTCOMES:
raise ValueError(f"Invalid outcome: {outcome}")
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
cursor = conn.execute(
"INSERT INTO task_attempts "
"(task_id, attempt_number, agent, outcome, metadata) VALUES (?,?,?,?,?)",
(task_id, attempt_number, agent, outcome,
json.dumps(metadata or {})),
)
conn.commit()
return cursor.lastrowid
finally:
conn.close()
# ===================================================================
# Agent
# ===================================================================
def upsert_agent(self, agent_id: str, role: Optional[str] = None,
capabilities: Optional[List[str]] = None) -> None:
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT OR REPLACE INTO agents (agent_id, role, capabilities, last_active) "
"VALUES (?,?,?,datetime('now'))",
(agent_id, role, json.dumps(capabilities or [])),
)
conn.commit()
finally:
conn.close()
def get_agent(self, agent_id: str) -> Optional[Dict]:
conn = self._conn()
try:
row = conn.execute(
"SELECT * FROM agents WHERE agent_id=?", (agent_id,)
).fetchone()
return dict(row) if row else None
finally:
conn.close()
def update_agent_status(self, agent_id: str, status: str,
current_task: Optional[str] = None) -> None:
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE agents SET current_status=?, current_task=?, "
"last_active=datetime('now') WHERE agent_id=?",
(status, current_task, agent_id),
)
conn.commit()
finally:
conn.close()