auto-sync: 2026-05-18 00:30:52

This commit is contained in:
cfdaily
2026-05-18 00:30:52 +08:00
parent d20cb243e0
commit 1fb3068587
+426
View File
@@ -0,0 +1,426 @@
"""v2.7 测试:Card CRUD + 三级层次结构 + Registry 改造"""
import json
from pathlib import Path
import pytest
from src.blackboard.db import init_db, get_connection
from src.blackboard.models import Task, Card
from src.blackboard.operations import Blackboard, CardOps
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
# ===================================================================
# Fixtures
# ===================================================================
@pytest.fixture
def db_path(tmp_path):
p = tmp_path / "test.db"
init_db(p)
return p
@pytest.fixture
def bb(db_path):
return Blackboard(db_path)
@pytest.fixture
def card_ops(db_path):
return CardOps(db_path)
@pytest.fixture
def registry(tmp_path):
return ProjectRegistry(tmp_path)
# ===================================================================
# Card 模型
# ===================================================================
class TestCardModel:
def test_card_defaults(self):
c = Card(id="c1", name="Test Card")
assert c.status == "active"
assert c.card_type == "default"
assert c.stages_json == "[]"
assert c.labels_json == "[]"
def test_card_from_row(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
c = card_ops.get_card("c1")
assert c is not None
assert c.name == "Test"
assert c.status == "active"
# ===================================================================
# CardOps CRUD
# ===================================================================
class TestCardOps:
def test_create_card(self, card_ops):
card = Card(id="c1", name="动量策略v1", card_type="strategy",
stages_json=json.dumps([
{"id": "research", "name": "因子研究"},
{"id": "coding", "name": "策略编码"},
]))
result = card_ops.create_card(card)
assert result.id == "c1"
assert result.name == "动量策略v1"
def test_get_card(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
c = card_ops.get_card("c1")
assert c is not None
assert c.name == "Test"
def test_get_nonexistent_card(self, card_ops):
assert card_ops.get_card("nope") is None
def test_list_cards(self, card_ops):
card_ops.create_card(Card(id="c1", name="A"))
card_ops.create_card(Card(id="c2", name="B"))
cards = card_ops.list_cards()
assert len(cards) == 3 # default + c1 + c2
def test_list_cards_by_status(self, card_ops):
card_ops.create_card(Card(id="c1", name="A"))
card_ops.create_card(Card(id="c2", name="B"))
card_ops.update_card("c2", status="archived")
active = card_ops.list_cards(status="active")
assert len(active) == 2 # default + c1
def test_update_card(self, card_ops):
card_ops.create_card(Card(id="c1", name="Old"))
assert card_ops.update_card("c1", name="New", description="Updated")
c = card_ops.get_card("c1")
assert c.name == "New"
assert c.description == "Updated"
def test_update_card_stages(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
new_stages = json.dumps([{"id": "s1", "name": "Stage 1"}])
card_ops.update_card("c1", stages_json=new_stages)
c = card_ops.get_card("c1")
assert json.loads(c.stages_json) == [{"id": "s1", "name": "Stage 1"}]
def test_archive_card(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
card_ops.update_card("c1", status="archived")
c = card_ops.get_card("c1")
assert c.status == "archived"
def test_delete_card(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
# 只能删除 archived/deleted 状态的
assert not card_ops.delete_card("c1")
card_ops.update_card("c1", status="archived")
assert card_ops.delete_card("c1")
assert card_ops.get_card("c1") is None
def test_fork_card(self, card_ops):
card_ops.create_card(Card(
id="c1", name="Strategy v1", card_type="strategy",
stages_json=json.dumps([{"id": "s1", "name": "Research"}]),
))
forked = card_ops.fork_card("c1", "c2", "Strategy v2")
assert forked is not None
assert forked.id == "c2"
assert forked.name == "Strategy v2"
assert forked.card_type == "strategy"
assert json.loads(forked.stages_json) == [{"id": "s1", "name": "Research"}]
def test_fork_nonexistent(self, card_ops):
assert card_ops.fork_card("nope", "c2", "New") is None
# ===================================================================
# Card 状态聚合
# ===================================================================
class TestCardStatusAggregation:
def test_empty_card_is_active(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Empty"))
status = card_ops.compute_card_status("c1")
assert status == "active"
def test_all_pending(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.create_task(Task(id="t2", title="T2", card_id="c1"))
status = card_ops.compute_card_status("c1")
assert status == "pending"
def test_has_working(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
status = card_ops.compute_card_status("c1")
assert status == "working"
def test_has_review(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
bb.update_task_status("t1", "review", agent="test")
status = card_ops.compute_card_status("c1")
assert status == "review"
def test_all_done(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
bb.update_task_status("t1", "review", agent="test")
bb.update_task_status("t1", "done", agent="test")
status = card_ops.compute_card_status("c1")
assert status == "done"
def test_cancelled_not_in_aggregation(self, card_ops, bb):
"""cancelled Task 不参与聚合"""
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "cancelled", agent="test")
status = card_ops.compute_card_status("c1")
assert status == "active" # 空(cancelled 排除)→ active
def test_manual_status_not_overridden(self, card_ops, bb):
"""手动状态(archived)不参与聚合"""
card_ops.create_card(Card(id="c1", name="Test"))
card_ops.update_card("c1", status="archived")
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
status = card_ops.compute_card_status("c1")
assert status == "archived"
def test_refresh_card_status(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
new_status = card_ops.refresh_card_status("c1")
assert new_status == "working"
c = card_ops.get_card("c1")
assert c.status == "working"
# ===================================================================
# Card 进度
# ===================================================================
class TestCardProgress:
def test_empty_card_progress(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
progress = card_ops.card_progress("c1")
assert progress["total_tasks"] == 0
assert progress["progress_pct"] == 0
def test_card_progress_with_stages(self, card_ops, bb):
stages = [
{"id": "research", "name": "因子研究"},
{"id": "coding", "name": "策略编码"},
]
card_ops.create_card(Card(
id="c1", name="Test",
stages_json=json.dumps(stages),
))
# 2 个 research task1 done
bb.create_task(Task(id="t1", title="T1", card_id="c1", stage="research"))
bb.create_task(Task(id="t2", title="T2", card_id="c1", stage="research"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
bb.update_task_status("t1", "review", agent="test")
bb.update_task_status("t1", "done", agent="test")
progress = card_ops.card_progress("c1")
assert progress["total_tasks"] == 2
assert progress["done_tasks"] == 1
assert progress["progress_pct"] == 50
# Stage progress
stages_prog = progress["stages"]
research_stage = next(s for s in stages_prog if s["id"] == "research")
assert research_stage["total"] == 2
assert research_stage["done"] == 1
# ===================================================================
# Task card_id / stage
# ===================================================================
class TestTaskCardId:
def test_task_default_card_id(self, bb):
bb.create_task(Task(id="t1", title="T1"))
t = bb.get_task("t1")
# 默认 card_id 由 DB DEFAULT 设为 'default'
# 但 Task model 初始为 Nonefrom_row 读 DB 的值
assert t.card_id == "default"
def test_task_with_card_id(self, bb):
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
t = bb.get_task("t1")
assert t.card_id == "c1"
def test_task_with_stage(self, bb):
bb.create_task(Task(id="t1", title="T1", card_id="c1", stage="research"))
t = bb.get_task("t1")
assert t.stage == "research"
def test_list_tasks_by_card(self, bb):
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.create_task(Task(id="t2", title="T2", card_id="c2"))
bb.create_task(Task(id="t3", title="T3", card_id="c1"))
tasks = bb.list_tasks(card_id="c1")
assert len(tasks) == 2
assert all(t.card_id == "c1" for t in tasks)
# ===================================================================
# Registry 改造
# ===================================================================
class TestRegistryV2:
def test_create_project_sqlite(self, registry):
info = registry.create_project("p1", "Project 1",
agents=["a1", "a2"])
assert info["name"] == "Project 1"
assert info["agents"] == ["a1", "a2"]
assert info["status"] == "active"
def test_registry_persists(self, tmp_path):
r1 = ProjectRegistry(tmp_path)
r1.create_project("p1", "P1")
r2 = ProjectRegistry(tmp_path)
assert r2.get_project("p1") is not None
def test_auto_discover(self, tmp_path):
# 创建含 blackboard.db 的目录
(tmp_path / "discovered-proj").mkdir()
init_db(tmp_path / "discovered-proj" / "blackboard.db")
registry = ProjectRegistry(tmp_path)
found = registry.discover_projects()
assert "discovered-proj" in found
info = registry.get_project("discovered-proj")
assert info["source"] == "auto_discovered"
def test_auto_discover_skips_no_db(self, tmp_path):
(tmp_path / "empty-dir").mkdir()
registry = ProjectRegistry(tmp_path)
found = registry.discover_projects()
assert "empty-dir" not in found
def test_auto_discover_skips_special(self, tmp_path):
(tmp_path / "_internal").mkdir()
(tmp_path / "_internal" / "blackboard.db").touch()
registry = ProjectRegistry(tmp_path)
found = registry.discover_projects()
assert "_internal" not in found
def test_archive_no_move(self, tmp_path):
registry = ProjectRegistry(tmp_path)
registry.create_project("p1", "P1")
# 创建目录和文件
(tmp_path / "p1" / "test.txt").write_text("data")
registry.archive_project("p1")
# 目录仍然存在
assert (tmp_path / "p1").exists()
assert (tmp_path / "p1" / "test.txt").exists()
# 状态改为 archived
info = registry.get_project("p1")
assert info["status"] == "archived"
def test_migrate_from_yaml(self, tmp_path):
import yaml
yaml_path = tmp_path / "_registry.yaml"
with open(yaml_path, "w") as f:
yaml.dump({
"projects": {
"yaml-proj": {
"name": "YAML Project",
"description": "From YAML",
"status": "active",
"agents": ["a1"],
}
}
}, f)
registry = ProjectRegistry(tmp_path)
count = registry.migrate_from_yaml(yaml_path)
assert count == 1
info = registry.get_project("yaml-proj")
assert info is not None
assert info["name"] == "YAML Project"
# ===================================================================
# v2.7 迁移
# ===================================================================
class TestV27Migration:
def test_migrate_adds_card_id(self, tmp_path):
"""旧 DB 迁移后所有 task 有 card_id"""
db_path = tmp_path / "old.db"
conn = get_connection(db_path)
try:
# 先 init(创建旧版表)
from src.blackboard.db import _SCHEMA_STATEMENTS
for stmt in _SCHEMA_STATEMENTS:
try:
conn.execute(stmt)
except Exception:
pass
conn.commit()
# 插入旧版 task(无 card_id
conn.execute(
"INSERT INTO tasks (id, title, status) VALUES ('t1', 'Old Task', 'pending')"
)
conn.commit()
finally:
conn.close()
# init_db 触发迁移
init_db(db_path)
# 验证 card_id 已回填
conn = get_connection(db_path)
try:
row = conn.execute("SELECT card_id FROM tasks WHERE id='t1'").fetchone()
assert row["card_id"] == "default"
# 验证 default Card 存在
card = conn.execute("SELECT * FROM cards WHERE id='default'").fetchone()
assert card is not None
assert card["name"] == "Default"
finally:
conn.close()
def test_new_db_has_card_id(self, tmp_path):
"""新 DB 直接包含 card_id 和 stage"""
db_path = tmp_path / "new.db"
init_db(db_path)
conn = get_connection(db_path)
try:
# tasks 表有 card_id 和 stage
cols = [r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()]
assert "card_id" in cols
assert "stage" in cols
# cards 表存在
cols = [r[1] for r in conn.execute("PRAGMA table_info(cards)").fetchall()]
assert "id" in cols
assert "name" in cols
assert "status" in cols
assert "stages_json" in cols
finally:
conn.close()