"""v2.7 测试:Card CRUD + 三级层次结构 + Registry 改造""" import json from pathlib import Path import pytest from src.blackboard.db import init_db, get_connection from src.blackboard.models import Task, Card from src.blackboard.operations import Blackboard, CardOps from src.blackboard.queries import Queries from src.blackboard.registry import ProjectRegistry # =================================================================== # Fixtures # =================================================================== @pytest.fixture def db_path(tmp_path): p = tmp_path / "test.db" init_db(p) return p @pytest.fixture def bb(db_path): return Blackboard(db_path) @pytest.fixture def card_ops(db_path): return CardOps(db_path) @pytest.fixture def registry(tmp_path): return ProjectRegistry(tmp_path) # =================================================================== # Card 模型 # =================================================================== class TestCardModel: def test_card_defaults(self): c = Card(id="c1", name="Test Card") assert c.status == "active" assert c.card_type == "default" assert c.stages_json == "[]" assert c.labels_json == "[]" def test_card_from_row(self, card_ops): card_ops.create_card(Card(id="c1", name="Test")) c = card_ops.get_card("c1") assert c is not None assert c.name == "Test" assert c.status == "active" # =================================================================== # CardOps CRUD # =================================================================== class TestCardOps: def test_create_card(self, card_ops): card = Card(id="c1", name="动量策略v1", card_type="strategy", stages_json=json.dumps([ {"id": "research", "name": "因子研究"}, {"id": "coding", "name": "策略编码"}, ])) result = card_ops.create_card(card) assert result.id == "c1" assert result.name == "动量策略v1" def test_get_card(self, card_ops): card_ops.create_card(Card(id="c1", name="Test")) c = card_ops.get_card("c1") assert c is not None assert c.name == "Test" def test_get_nonexistent_card(self, card_ops): assert card_ops.get_card("nope") is None def test_list_cards(self, card_ops): card_ops.create_card(Card(id="c1", name="A")) card_ops.create_card(Card(id="c2", name="B")) cards = card_ops.list_cards() assert len(cards) == 3 # default + c1 + c2 def test_list_cards_by_status(self, card_ops): card_ops.create_card(Card(id="c1", name="A")) card_ops.create_card(Card(id="c2", name="B")) card_ops.update_card("c2", status="archived") active = card_ops.list_cards(status="active") assert len(active) == 2 # default + c1 def test_update_card(self, card_ops): card_ops.create_card(Card(id="c1", name="Old")) assert card_ops.update_card("c1", name="New", description="Updated") c = card_ops.get_card("c1") assert c.name == "New" assert c.description == "Updated" def test_update_card_stages(self, card_ops): card_ops.create_card(Card(id="c1", name="Test")) new_stages = json.dumps([{"id": "s1", "name": "Stage 1"}]) card_ops.update_card("c1", stages_json=new_stages) c = card_ops.get_card("c1") assert json.loads(c.stages_json) == [{"id": "s1", "name": "Stage 1"}] def test_archive_card(self, card_ops): card_ops.create_card(Card(id="c1", name="Test")) card_ops.update_card("c1", status="archived") c = card_ops.get_card("c1") assert c.status == "archived" def test_delete_card(self, card_ops): card_ops.create_card(Card(id="c1", name="Test")) # 只能删除 archived/deleted 状态的 assert not card_ops.delete_card("c1") card_ops.update_card("c1", status="archived") assert card_ops.delete_card("c1") assert card_ops.get_card("c1") is None def test_fork_card(self, card_ops): card_ops.create_card(Card( id="c1", name="Strategy v1", card_type="strategy", stages_json=json.dumps([{"id": "s1", "name": "Research"}]), )) forked = card_ops.fork_card("c1", "c2", "Strategy v2") assert forked is not None assert forked.id == "c2" assert forked.name == "Strategy v2" assert forked.card_type == "strategy" assert json.loads(forked.stages_json) == [{"id": "s1", "name": "Research"}] def test_fork_nonexistent(self, card_ops): assert card_ops.fork_card("nope", "c2", "New") is None # =================================================================== # Card 状态聚合 # =================================================================== class TestCardStatusAggregation: def test_empty_card_is_active(self, card_ops, bb): card_ops.create_card(Card(id="c1", name="Empty")) status = card_ops.compute_card_status("c1") assert status == "active" def test_all_pending(self, card_ops, bb): card_ops.create_card(Card(id="c1", name="Test")) bb.create_task(Task(id="t1", title="T1", card_id="c1")) bb.create_task(Task(id="t2", title="T2", card_id="c1")) status = card_ops.compute_card_status("c1") assert status == "pending" def test_has_working(self, card_ops, bb): card_ops.create_card(Card(id="c1", name="Test")) bb.create_task(Task(id="t1", title="T1", card_id="c1")) bb.update_task_status("t1", "claimed", agent="test") bb.update_task_status("t1", "working", agent="test") status = card_ops.compute_card_status("c1") assert status == "working" def test_has_review(self, card_ops, bb): card_ops.create_card(Card(id="c1", name="Test")) bb.create_task(Task(id="t1", title="T1", card_id="c1")) bb.update_task_status("t1", "claimed", agent="test") bb.update_task_status("t1", "working", agent="test") bb.update_task_status("t1", "review", agent="test") status = card_ops.compute_card_status("c1") assert status == "review" def test_all_done(self, card_ops, bb): card_ops.create_card(Card(id="c1", name="Test")) bb.create_task(Task(id="t1", title="T1", card_id="c1")) bb.update_task_status("t1", "claimed", agent="test") bb.update_task_status("t1", "working", agent="test") bb.update_task_status("t1", "review", agent="test") bb.update_task_status("t1", "done", agent="test") status = card_ops.compute_card_status("c1") assert status == "done" def test_cancelled_not_in_aggregation(self, card_ops, bb): """cancelled Task 不参与聚合""" card_ops.create_card(Card(id="c1", name="Test")) bb.create_task(Task(id="t1", title="T1", card_id="c1")) bb.update_task_status("t1", "cancelled", agent="test") status = card_ops.compute_card_status("c1") assert status == "active" # 空(cancelled 排除)→ active def test_manual_status_not_overridden(self, card_ops, bb): """手动状态(archived)不参与聚合""" card_ops.create_card(Card(id="c1", name="Test")) card_ops.update_card("c1", status="archived") bb.create_task(Task(id="t1", title="T1", card_id="c1")) bb.update_task_status("t1", "claimed", agent="test") bb.update_task_status("t1", "working", agent="test") status = card_ops.compute_card_status("c1") assert status == "archived" def test_refresh_card_status(self, card_ops, bb): card_ops.create_card(Card(id="c1", name="Test")) bb.create_task(Task(id="t1", title="T1", card_id="c1")) bb.update_task_status("t1", "claimed", agent="test") bb.update_task_status("t1", "working", agent="test") new_status = card_ops.refresh_card_status("c1") assert new_status == "working" c = card_ops.get_card("c1") assert c.status == "working" # =================================================================== # Card 进度 # =================================================================== class TestCardProgress: def test_empty_card_progress(self, card_ops): card_ops.create_card(Card(id="c1", name="Test")) progress = card_ops.card_progress("c1") assert progress["total_tasks"] == 0 assert progress["progress_pct"] == 0 def test_card_progress_with_stages(self, card_ops, bb): stages = [ {"id": "research", "name": "因子研究"}, {"id": "coding", "name": "策略编码"}, ] card_ops.create_card(Card( id="c1", name="Test", stages_json=json.dumps(stages), )) # 2 个 research task,1 done bb.create_task(Task(id="t1", title="T1", card_id="c1", stage="research")) bb.create_task(Task(id="t2", title="T2", card_id="c1", stage="research")) bb.update_task_status("t1", "claimed", agent="test") bb.update_task_status("t1", "working", agent="test") bb.update_task_status("t1", "review", agent="test") bb.update_task_status("t1", "done", agent="test") progress = card_ops.card_progress("c1") assert progress["total_tasks"] == 2 assert progress["done_tasks"] == 1 assert progress["progress_pct"] == 50 # Stage progress stages_prog = progress["stages"] research_stage = next(s for s in stages_prog if s["id"] == "research") assert research_stage["total"] == 2 assert research_stage["done"] == 1 # =================================================================== # Task card_id / stage # =================================================================== class TestTaskCardId: def test_task_default_card_id(self, bb): bb.create_task(Task(id="t1", title="T1")) t = bb.get_task("t1") # 默认 card_id 由 DB DEFAULT 设为 'default' # 但 Task model 初始为 None,from_row 读 DB 的值 assert t.card_id == "default" def test_task_with_card_id(self, bb): bb.create_task(Task(id="t1", title="T1", card_id="c1")) t = bb.get_task("t1") assert t.card_id == "c1" def test_task_with_stage(self, bb): bb.create_task(Task(id="t1", title="T1", card_id="c1", stage="research")) t = bb.get_task("t1") assert t.stage == "research" def test_list_tasks_by_card(self, bb): bb.create_task(Task(id="t1", title="T1", card_id="c1")) bb.create_task(Task(id="t2", title="T2", card_id="c2")) bb.create_task(Task(id="t3", title="T3", card_id="c1")) tasks = bb.list_tasks(card_id="c1") assert len(tasks) == 2 assert all(t.card_id == "c1" for t in tasks) # =================================================================== # Registry 改造 # =================================================================== class TestRegistryV2: def test_create_project_sqlite(self, registry): info = registry.create_project("p1", "Project 1", agents=["a1", "a2"]) assert info["name"] == "Project 1" assert info["agents"] == ["a1", "a2"] assert info["status"] == "active" def test_registry_persists(self, tmp_path): r1 = ProjectRegistry(tmp_path) r1.create_project("p1", "P1") r2 = ProjectRegistry(tmp_path) assert r2.get_project("p1") is not None def test_auto_discover(self, tmp_path): # 创建含 blackboard.db 的目录 (tmp_path / "discovered-proj").mkdir() init_db(tmp_path / "discovered-proj" / "blackboard.db") registry = ProjectRegistry(tmp_path) found = registry.discover_projects() assert "discovered-proj" in found info = registry.get_project("discovered-proj") assert info["source"] == "auto_discovered" def test_auto_discover_skips_no_db(self, tmp_path): (tmp_path / "empty-dir").mkdir() registry = ProjectRegistry(tmp_path) found = registry.discover_projects() assert "empty-dir" not in found def test_auto_discover_skips_special(self, tmp_path): (tmp_path / "_internal").mkdir() (tmp_path / "_internal" / "blackboard.db").touch() registry = ProjectRegistry(tmp_path) found = registry.discover_projects() assert "_internal" not in found def test_archive_no_move(self, tmp_path): registry = ProjectRegistry(tmp_path) registry.create_project("p1", "P1") # 创建目录和文件 (tmp_path / "p1" / "test.txt").write_text("data") registry.archive_project("p1") # 目录仍然存在 assert (tmp_path / "p1").exists() assert (tmp_path / "p1" / "test.txt").exists() # 状态改为 archived info = registry.get_project("p1") assert info["status"] == "archived" def test_migrate_from_yaml(self, tmp_path): import yaml yaml_path = tmp_path / "_registry.yaml" with open(yaml_path, "w") as f: yaml.dump({ "projects": { "yaml-proj": { "name": "YAML Project", "description": "From YAML", "status": "active", "agents": ["a1"], } } }, f) registry = ProjectRegistry(tmp_path) count = registry.migrate_from_yaml(yaml_path) assert count == 1 info = registry.get_project("yaml-proj") assert info is not None assert info["name"] == "YAML Project" # =================================================================== # v2.7 迁移 # =================================================================== class TestV27Migration: def test_migrate_adds_card_id(self, tmp_path): """旧 DB 迁移后所有 task 有 card_id""" db_path = tmp_path / "old.db" conn = get_connection(db_path) try: # 先 init(创建旧版表) from src.blackboard.db import _SCHEMA_STATEMENTS for stmt in _SCHEMA_STATEMENTS: try: conn.execute(stmt) except Exception: pass conn.commit() # 插入旧版 task(无 card_id) conn.execute( "INSERT INTO tasks (id, title, status) VALUES ('t1', 'Old Task', 'pending')" ) conn.commit() finally: conn.close() # init_db 触发迁移 init_db(db_path) # 验证 card_id 已回填 conn = get_connection(db_path) try: row = conn.execute("SELECT card_id FROM tasks WHERE id='t1'").fetchone() assert row["card_id"] == "default" # 验证 default Card 存在 card = conn.execute("SELECT * FROM cards WHERE id='default'").fetchone() assert card is not None assert card["name"] == "Default" finally: conn.close() def test_new_db_has_card_id(self, tmp_path): """新 DB 直接包含 card_id 和 stage""" db_path = tmp_path / "new.db" init_db(db_path) conn = get_connection(db_path) try: # tasks 表有 card_id 和 stage cols = [r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()] assert "card_id" in cols assert "stage" in cols # cards 表存在 cols = [r[1] for r in conn.execute("PRAGMA table_info(cards)").fetchall()] assert "id" in cols assert "name" in cols assert "status" in cols assert "stages_json" in cols finally: conn.close() # =================================================================== # Mail API # =================================================================== class TestMailRoutes: """Mail Tab API 测试(用 Blackboard 直接操作 _mail DB)""" def test_mail_create_and_list(self, tmp_path): from src.blackboard.operations import Blackboard from src.blackboard.db import init_db mail_db = tmp_path / "_mail" / "blackboard.db" mail_db.parent.mkdir(parents=True) init_db(mail_db) bb = Blackboard(mail_db) meta = {"from": "pangtong", "type": "review_request", "is_read": False} bb.create_task(Task( id="mail-001", title="请评审v2.7代码", description="代码已提交,请评审", assignee="simayi-challenger", assigned_by="pangtong-fujunshi", must_haves=json.dumps(meta), card_id="default", task_type="mail", )) tasks = bb.list_tasks() assert len(tasks) == 1 assert tasks[0].title == "请评审v2.7代码" assert tasks[0].assignee == "simayi-challenger" # 检查 must_haves 元数据 t = bb.get_task("mail-001") m = json.loads(t.must_haves) assert m["from"] == "pangtong" assert m["is_read"] is False def test_mail_mark_read(self, tmp_path): from src.blackboard.operations import Blackboard from src.blackboard.db import init_db mail_db = tmp_path / "_mail" / "blackboard.db" mail_db.parent.mkdir(parents=True) init_db(mail_db) bb = Blackboard(mail_db) meta = {"from": "zhangfei", "is_read": False} bb.create_task(Task( id="mail-002", title="测试", must_haves=json.dumps(meta), card_id="default", )) # 更新 is_read conn = bb._conn() try: meta["is_read"] = True conn.execute("UPDATE tasks SET must_haves=? WHERE id=?", (json.dumps(meta), "mail-002")) conn.commit() finally: conn.close() t = bb.get_task("mail-002") assert json.loads(t.must_haves)["is_read"] is True def test_mail_filter_by_to(self, tmp_path): from src.blackboard.operations import Blackboard from src.blackboard.db import init_db mail_db = tmp_path / "_mail" / "blackboard.db" mail_db.parent.mkdir(parents=True) init_db(mail_db) bb = Blackboard(mail_db) bb.create_task(Task(id="m1", title="A", assignee="simayi", card_id="default")) bb.create_task(Task(id="m2", title="B", assignee="zhangfei", card_id="default")) bb.create_task(Task(id="m3", title="C", assignee="simayi", card_id="default")) tasks = bb.list_tasks(assignee="simayi") assert len(tasks) == 2