Files
sanguo_moziplus_v2/src/blackboard/registry.py
T
2026-05-18 00:24:37 +08:00

287 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""多项目管理 — registry.db (SQLite) + 自动发现"""
from __future__ import annotations
import logging
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger("moziplus-v2.registry")
class ProjectRegistry:
"""项目注册表(registry.db SQLite"""
def __init__(self, root: Path):
self.root = root
self.db_path = root / "registry.db"
self._init_db()
def _init_db(self) -> None:
"""初始化 registry.db"""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
conn = self._connect()
try:
conn.execute("""CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT DEFAULT '',
status TEXT DEFAULT 'active',
source TEXT DEFAULT 'manual',
card_count INTEGER DEFAULT 0,
task_count INTEGER DEFAULT 0,
config_json TEXT DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT,
archived_at TEXT
)""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_projects_status ON projects(status)")
conn.commit()
finally:
conn.close()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
return conn
def _now(self) -> str:
return datetime.utcnow().isoformat()
# ===================================================================
# CRUD
# ===================================================================
def create_project(self, project_id: str, name: str,
description: str = "",
source: str = "manual",
agents: Optional[List[str]] = None) -> Dict[str, Any]:
"""创建项目"""
conn = self._connect()
try:
conn.execute("BEGIN IMMEDIATE")
existing = conn.execute(
"SELECT id FROM projects WHERE id=?", (project_id,)
).fetchone()
if existing:
raise ValueError(f"Project '{project_id}' already exists")
import json
config = json.dumps({"agents": agents or []})
now = self._now()
conn.execute(
"""INSERT INTO projects (id, name, description, status, source, config_json, created_at)
VALUES (?,?,?,?,?,?,?)""",
(project_id, name, description, "active", source, config, now),
)
# 创建项目目录
project_dir = self.root / project_id
project_dir.mkdir(parents=True, exist_ok=True)
for subdir in ("artifacts", "experiences", "skills", "config"):
(project_dir / subdir).mkdir(exist_ok=True)
# 写 per-project config skeleton
import yaml
project_yaml = project_dir / "config" / "project.yaml"
if not project_yaml.exists():
with open(project_yaml, "w") as f:
yaml.dump({
"project": {
"name": name,
"description": description,
"agents": agents or [],
}
}, f, default_flow_style=False, allow_unicode=True)
conn.commit()
logger.info("Project created: %s (%s)", project_id, name)
result = self.get_project(project_id) or {}
result["agents"] = agents or []
return result
finally:
conn.close()
def get_project(self, project_id: str) -> Optional[Dict[str, Any]]:
"""获取单个项目"""
conn = self._connect()
try:
row = conn.execute(
"SELECT * FROM projects WHERE id=?", (project_id,)
).fetchone()
return dict(row) if row else None
finally:
conn.close()
def list_projects(self, status: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
"""列出项目"""
conn = self._connect()
try:
if status:
rows = conn.execute(
"SELECT * FROM projects WHERE status=?", (status,)
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM projects"
).fetchall()
return {r["id"]: dict(r) for r in rows}
finally:
conn.close()
def update_project(self, project_id: str, **kwargs) -> bool:
"""更新项目元数据"""
conn = self._connect()
try:
conn.execute("BEGIN IMMEDIATE")
existing = conn.execute(
"SELECT id FROM projects WHERE id=?", (project_id,)
).fetchone()
if not existing:
return False
allowed_fields = {"name", "description", "status", "config_json",
"card_count", "task_count", "updated_at", "archived_at"}
updates = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not updates:
return True
updates["updated_at"] = self._now()
set_clause = ", ".join(f"{k}=?" for k in updates)
conn.execute(
f"UPDATE projects SET {set_clause} WHERE id=?",
(*updates.values(), project_id),
)
conn.commit()
return True
finally:
conn.close()
def archive_project(self, project_id: str) -> bool:
"""归档项目(只改 status,不移动目录)"""
return self.update_project(
project_id,
status="archived",
archived_at=self._now(),
)
def delete_project(self, project_id: str) -> bool:
"""删除项目(仅从注册表移除)"""
conn = self._connect()
try:
conn.execute("BEGIN IMMEDIATE")
cursor = conn.execute(
"DELETE FROM projects WHERE id=?", (project_id,)
)
conn.commit()
return cursor.rowcount > 0
finally:
conn.close()
# ===================================================================
# 自动发现
# ===================================================================
def discover_projects(self, scan_dir: Optional[Path] = None) -> List[str]:
"""扫描目录,自动注册新项目(只注册含 blackboard.db 的目录)"""
scan_dir = scan_dir or self.root
discovered = []
if not scan_dir.exists():
return discovered
conn = self._connect()
try:
for child in sorted(scan_dir.iterdir()):
if not child.is_dir():
continue
# 跳过特殊目录
if child.name.startswith(("_", ".")):
continue
# 只注册含 blackboard.db 的目录
db_file = child / "blackboard.db"
if not db_file.exists():
continue
# 检查是否已注册
existing = conn.execute(
"SELECT id FROM projects WHERE id=?", (child.name,)
).fetchone()
if existing:
continue
# 自动注册
now = self._now()
conn.execute(
"""INSERT INTO projects (id, name, description, status, source, created_at)
VALUES (?,?,?,?,?,?)""",
(child.name, child.name, "auto_discovered", "active",
"auto_discovered", now),
)
discovered.append(child.name)
logger.info("Auto-discovered project: %s", child.name)
if discovered:
conn.commit()
finally:
conn.close()
return discovered
# ===================================================================
# 迁移(从 _registry.yaml
# ===================================================================
def migrate_from_yaml(self, yaml_path: Optional[Path] = None) -> int:
"""从旧 _registry.yaml 迁移项目数据"""
import yaml
yaml_path = yaml_path or self.root / "_registry.yaml"
if not yaml_path.exists():
return 0
with open(yaml_path) as f:
data = yaml.safe_load(f) or {}
projects = data.get("projects", {})
migrated = 0
conn = self._connect()
try:
for pid, info in projects.items():
existing = conn.execute(
"SELECT id FROM projects WHERE id=?", (pid,)
).fetchone()
if existing:
continue
import json
now = self._now()
conn.execute(
"""INSERT INTO projects (id, name, description, status, source, config_json, created_at, archived_at)
VALUES (?,?,?,?,?,?,?,?)""",
(pid, info.get("name", pid), info.get("description", ""),
info.get("status", "active"), "yaml_migration",
json.dumps({"agents": info.get("agents", [])}),
info.get("created_at", now),
info.get("archived_at")),
)
migrated += 1
if migrated:
conn.commit()
logger.info("Migrated %d projects from YAML", migrated)
finally:
conn.close()
return migrated
def reload(self) -> None:
"""兼容旧接口(SQLite 不需要 reload cache"""
pass