diff --git a/tests/test_v27_cards.py b/tests/test_v27_cards.py new file mode 100644 index 0000000..b5edfdd --- /dev/null +++ b/tests/test_v27_cards.py @@ -0,0 +1,426 @@ +"""v2.7 测试:Card CRUD + 三级层次结构 + Registry 改造""" + +import json +from pathlib import Path + +import pytest + +from src.blackboard.db import init_db, get_connection +from src.blackboard.models import Task, Card +from src.blackboard.operations import Blackboard, CardOps +from src.blackboard.queries import Queries +from src.blackboard.registry import ProjectRegistry + + +# =================================================================== +# Fixtures +# =================================================================== + +@pytest.fixture +def db_path(tmp_path): + p = tmp_path / "test.db" + init_db(p) + return p + + +@pytest.fixture +def bb(db_path): + return Blackboard(db_path) + + +@pytest.fixture +def card_ops(db_path): + return CardOps(db_path) + + +@pytest.fixture +def registry(tmp_path): + return ProjectRegistry(tmp_path) + + +# =================================================================== +# Card 模型 +# =================================================================== + +class TestCardModel: + def test_card_defaults(self): + c = Card(id="c1", name="Test Card") + assert c.status == "active" + assert c.card_type == "default" + assert c.stages_json == "[]" + assert c.labels_json == "[]" + + def test_card_from_row(self, card_ops): + card_ops.create_card(Card(id="c1", name="Test")) + c = card_ops.get_card("c1") + assert c is not None + assert c.name == "Test" + assert c.status == "active" + + +# =================================================================== +# CardOps CRUD +# =================================================================== + +class TestCardOps: + def test_create_card(self, card_ops): + card = Card(id="c1", name="动量策略v1", card_type="strategy", + stages_json=json.dumps([ + {"id": "research", "name": "因子研究"}, + {"id": "coding", "name": "策略编码"}, + ])) + result = card_ops.create_card(card) + assert result.id == "c1" + assert result.name == "动量策略v1" + + def test_get_card(self, card_ops): + card_ops.create_card(Card(id="c1", name="Test")) + c = card_ops.get_card("c1") + assert c is not None + assert c.name == "Test" + + def test_get_nonexistent_card(self, card_ops): + assert card_ops.get_card("nope") is None + + def test_list_cards(self, card_ops): + card_ops.create_card(Card(id="c1", name="A")) + card_ops.create_card(Card(id="c2", name="B")) + cards = card_ops.list_cards() + assert len(cards) == 3 # default + c1 + c2 + + def test_list_cards_by_status(self, card_ops): + card_ops.create_card(Card(id="c1", name="A")) + card_ops.create_card(Card(id="c2", name="B")) + card_ops.update_card("c2", status="archived") + active = card_ops.list_cards(status="active") + assert len(active) == 2 # default + c1 + + def test_update_card(self, card_ops): + card_ops.create_card(Card(id="c1", name="Old")) + assert card_ops.update_card("c1", name="New", description="Updated") + c = card_ops.get_card("c1") + assert c.name == "New" + assert c.description == "Updated" + + def test_update_card_stages(self, card_ops): + card_ops.create_card(Card(id="c1", name="Test")) + new_stages = json.dumps([{"id": "s1", "name": "Stage 1"}]) + card_ops.update_card("c1", stages_json=new_stages) + c = card_ops.get_card("c1") + assert json.loads(c.stages_json) == [{"id": "s1", "name": "Stage 1"}] + + def test_archive_card(self, card_ops): + card_ops.create_card(Card(id="c1", name="Test")) + card_ops.update_card("c1", status="archived") + c = card_ops.get_card("c1") + assert c.status == "archived" + + def test_delete_card(self, card_ops): + card_ops.create_card(Card(id="c1", name="Test")) + # 只能删除 archived/deleted 状态的 + assert not card_ops.delete_card("c1") + card_ops.update_card("c1", status="archived") + assert card_ops.delete_card("c1") + assert card_ops.get_card("c1") is None + + def test_fork_card(self, card_ops): + card_ops.create_card(Card( + id="c1", name="Strategy v1", card_type="strategy", + stages_json=json.dumps([{"id": "s1", "name": "Research"}]), + )) + forked = card_ops.fork_card("c1", "c2", "Strategy v2") + assert forked is not None + assert forked.id == "c2" + assert forked.name == "Strategy v2" + assert forked.card_type == "strategy" + assert json.loads(forked.stages_json) == [{"id": "s1", "name": "Research"}] + + def test_fork_nonexistent(self, card_ops): + assert card_ops.fork_card("nope", "c2", "New") is None + + +# =================================================================== +# Card 状态聚合 +# =================================================================== + +class TestCardStatusAggregation: + def test_empty_card_is_active(self, card_ops, bb): + card_ops.create_card(Card(id="c1", name="Empty")) + status = card_ops.compute_card_status("c1") + assert status == "active" + + def test_all_pending(self, card_ops, bb): + card_ops.create_card(Card(id="c1", name="Test")) + bb.create_task(Task(id="t1", title="T1", card_id="c1")) + bb.create_task(Task(id="t2", title="T2", card_id="c1")) + status = card_ops.compute_card_status("c1") + assert status == "pending" + + def test_has_working(self, card_ops, bb): + card_ops.create_card(Card(id="c1", name="Test")) + bb.create_task(Task(id="t1", title="T1", card_id="c1")) + bb.update_task_status("t1", "claimed", agent="test") + bb.update_task_status("t1", "working", agent="test") + status = card_ops.compute_card_status("c1") + assert status == "working" + + def test_has_review(self, card_ops, bb): + card_ops.create_card(Card(id="c1", name="Test")) + bb.create_task(Task(id="t1", title="T1", card_id="c1")) + bb.update_task_status("t1", "claimed", agent="test") + bb.update_task_status("t1", "working", agent="test") + bb.update_task_status("t1", "review", agent="test") + status = card_ops.compute_card_status("c1") + assert status == "review" + + def test_all_done(self, card_ops, bb): + card_ops.create_card(Card(id="c1", name="Test")) + bb.create_task(Task(id="t1", title="T1", card_id="c1")) + bb.update_task_status("t1", "claimed", agent="test") + bb.update_task_status("t1", "working", agent="test") + bb.update_task_status("t1", "review", agent="test") + bb.update_task_status("t1", "done", agent="test") + status = card_ops.compute_card_status("c1") + assert status == "done" + + def test_cancelled_not_in_aggregation(self, card_ops, bb): + """cancelled Task 不参与聚合""" + card_ops.create_card(Card(id="c1", name="Test")) + bb.create_task(Task(id="t1", title="T1", card_id="c1")) + bb.update_task_status("t1", "cancelled", agent="test") + status = card_ops.compute_card_status("c1") + assert status == "active" # 空(cancelled 排除)→ active + + def test_manual_status_not_overridden(self, card_ops, bb): + """手动状态(archived)不参与聚合""" + card_ops.create_card(Card(id="c1", name="Test")) + card_ops.update_card("c1", status="archived") + bb.create_task(Task(id="t1", title="T1", card_id="c1")) + bb.update_task_status("t1", "claimed", agent="test") + bb.update_task_status("t1", "working", agent="test") + status = card_ops.compute_card_status("c1") + assert status == "archived" + + def test_refresh_card_status(self, card_ops, bb): + card_ops.create_card(Card(id="c1", name="Test")) + bb.create_task(Task(id="t1", title="T1", card_id="c1")) + bb.update_task_status("t1", "claimed", agent="test") + bb.update_task_status("t1", "working", agent="test") + new_status = card_ops.refresh_card_status("c1") + assert new_status == "working" + c = card_ops.get_card("c1") + assert c.status == "working" + + +# =================================================================== +# Card 进度 +# =================================================================== + +class TestCardProgress: + def test_empty_card_progress(self, card_ops): + card_ops.create_card(Card(id="c1", name="Test")) + progress = card_ops.card_progress("c1") + assert progress["total_tasks"] == 0 + assert progress["progress_pct"] == 0 + + def test_card_progress_with_stages(self, card_ops, bb): + stages = [ + {"id": "research", "name": "因子研究"}, + {"id": "coding", "name": "策略编码"}, + ] + card_ops.create_card(Card( + id="c1", name="Test", + stages_json=json.dumps(stages), + )) + # 2 个 research task,1 done + bb.create_task(Task(id="t1", title="T1", card_id="c1", stage="research")) + bb.create_task(Task(id="t2", title="T2", card_id="c1", stage="research")) + bb.update_task_status("t1", "claimed", agent="test") + bb.update_task_status("t1", "working", agent="test") + bb.update_task_status("t1", "review", agent="test") + bb.update_task_status("t1", "done", agent="test") + + progress = card_ops.card_progress("c1") + assert progress["total_tasks"] == 2 + assert progress["done_tasks"] == 1 + assert progress["progress_pct"] == 50 + + # Stage progress + stages_prog = progress["stages"] + research_stage = next(s for s in stages_prog if s["id"] == "research") + assert research_stage["total"] == 2 + assert research_stage["done"] == 1 + + +# =================================================================== +# Task card_id / stage +# =================================================================== + +class TestTaskCardId: + def test_task_default_card_id(self, bb): + bb.create_task(Task(id="t1", title="T1")) + t = bb.get_task("t1") + # 默认 card_id 由 DB DEFAULT 设为 'default' + # 但 Task model 初始为 None,from_row 读 DB 的值 + assert t.card_id == "default" + + def test_task_with_card_id(self, bb): + bb.create_task(Task(id="t1", title="T1", card_id="c1")) + t = bb.get_task("t1") + assert t.card_id == "c1" + + def test_task_with_stage(self, bb): + bb.create_task(Task(id="t1", title="T1", card_id="c1", stage="research")) + t = bb.get_task("t1") + assert t.stage == "research" + + def test_list_tasks_by_card(self, bb): + bb.create_task(Task(id="t1", title="T1", card_id="c1")) + bb.create_task(Task(id="t2", title="T2", card_id="c2")) + bb.create_task(Task(id="t3", title="T3", card_id="c1")) + tasks = bb.list_tasks(card_id="c1") + assert len(tasks) == 2 + assert all(t.card_id == "c1" for t in tasks) + + +# =================================================================== +# Registry 改造 +# =================================================================== + +class TestRegistryV2: + def test_create_project_sqlite(self, registry): + info = registry.create_project("p1", "Project 1", + agents=["a1", "a2"]) + assert info["name"] == "Project 1" + assert info["agents"] == ["a1", "a2"] + assert info["status"] == "active" + + def test_registry_persists(self, tmp_path): + r1 = ProjectRegistry(tmp_path) + r1.create_project("p1", "P1") + r2 = ProjectRegistry(tmp_path) + assert r2.get_project("p1") is not None + + def test_auto_discover(self, tmp_path): + # 创建含 blackboard.db 的目录 + (tmp_path / "discovered-proj").mkdir() + init_db(tmp_path / "discovered-proj" / "blackboard.db") + + registry = ProjectRegistry(tmp_path) + found = registry.discover_projects() + assert "discovered-proj" in found + info = registry.get_project("discovered-proj") + assert info["source"] == "auto_discovered" + + def test_auto_discover_skips_no_db(self, tmp_path): + (tmp_path / "empty-dir").mkdir() + registry = ProjectRegistry(tmp_path) + found = registry.discover_projects() + assert "empty-dir" not in found + + def test_auto_discover_skips_special(self, tmp_path): + (tmp_path / "_internal").mkdir() + (tmp_path / "_internal" / "blackboard.db").touch() + registry = ProjectRegistry(tmp_path) + found = registry.discover_projects() + assert "_internal" not in found + + def test_archive_no_move(self, tmp_path): + registry = ProjectRegistry(tmp_path) + registry.create_project("p1", "P1") + # 创建目录和文件 + (tmp_path / "p1" / "test.txt").write_text("data") + registry.archive_project("p1") + # 目录仍然存在 + assert (tmp_path / "p1").exists() + assert (tmp_path / "p1" / "test.txt").exists() + # 状态改为 archived + info = registry.get_project("p1") + assert info["status"] == "archived" + + def test_migrate_from_yaml(self, tmp_path): + import yaml + yaml_path = tmp_path / "_registry.yaml" + with open(yaml_path, "w") as f: + yaml.dump({ + "projects": { + "yaml-proj": { + "name": "YAML Project", + "description": "From YAML", + "status": "active", + "agents": ["a1"], + } + } + }, f) + + registry = ProjectRegistry(tmp_path) + count = registry.migrate_from_yaml(yaml_path) + assert count == 1 + info = registry.get_project("yaml-proj") + assert info is not None + assert info["name"] == "YAML Project" + + +# =================================================================== +# v2.7 迁移 +# =================================================================== + +class TestV27Migration: + def test_migrate_adds_card_id(self, tmp_path): + """旧 DB 迁移后所有 task 有 card_id""" + db_path = tmp_path / "old.db" + conn = get_connection(db_path) + try: + # 先 init(创建旧版表) + from src.blackboard.db import _SCHEMA_STATEMENTS + for stmt in _SCHEMA_STATEMENTS: + try: + conn.execute(stmt) + except Exception: + pass + conn.commit() + + # 插入旧版 task(无 card_id) + conn.execute( + "INSERT INTO tasks (id, title, status) VALUES ('t1', 'Old Task', 'pending')" + ) + conn.commit() + finally: + conn.close() + + # init_db 触发迁移 + init_db(db_path) + + # 验证 card_id 已回填 + conn = get_connection(db_path) + try: + row = conn.execute("SELECT card_id FROM tasks WHERE id='t1'").fetchone() + assert row["card_id"] == "default" + + # 验证 default Card 存在 + card = conn.execute("SELECT * FROM cards WHERE id='default'").fetchone() + assert card is not None + assert card["name"] == "Default" + finally: + conn.close() + + def test_new_db_has_card_id(self, tmp_path): + """新 DB 直接包含 card_id 和 stage""" + db_path = tmp_path / "new.db" + init_db(db_path) + + conn = get_connection(db_path) + try: + # tasks 表有 card_id 和 stage + cols = [r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()] + assert "card_id" in cols + assert "stage" in cols + + # cards 表存在 + cols = [r[1] for r in conn.execute("PRAGMA table_info(cards)").fetchall()] + assert "id" in cols + assert "name" in cols + assert "status" in cols + assert "stages_json" in cols + finally: + conn.close()