auto-sync: 2026-05-18 11:37:38

This commit is contained in:
cfdaily
2026-05-18 11:37:38 +08:00
parent 59f3f74a95
commit 52b5321248
3 changed files with 0 additions and 967 deletions
-318
View File
@@ -1,318 +0,0 @@
"""API 路由 — Card CRUD + Task 嵌套(v2.7"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.operations import Blackboard, CardOps
from src.blackboard.models import Task, Card, CARD_VALID_STATUSES, CARD_TYPE_SET
from src.blackboard.queries import Queries
from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS, COMMENT_TYPES, OUTPUT_TYPES
from src.utils import get_data_root
router = APIRouter(prefix="/api/projects/{project_id}", tags=["cards"])
def _db_path(project_id: str) -> Path:
return get_data_root() / project_id / "blackboard.db"
def _bb(project_id: str) -> Blackboard:
return Blackboard(_db_path(project_id))
def _card_ops(project_id: str) -> CardOps:
from src.blackboard.db import init_db
db = _db_path(project_id)
init_db(db) # 确保 DB 已迁移
return CardOps(db)
def _q(project_id: str) -> Queries:
return Queries(_db_path(project_id))
# ===================================================================
# Card CRUD
# ===================================================================
@router.get("/cards")
async def list_cards(project_id: str, status: Optional[str] = None):
ops = _card_ops(project_id)
cards = ops.list_cards(status=status)
result = []
for c in cards:
d = _card_to_dict(c)
# 附带进度信息
progress = ops.card_progress(c.id)
d["progress"] = progress
result.append(d)
return {"cards": result}
@router.post("/cards")
async def create_card(project_id: str, body: Dict[str, Any]):
ops = _card_ops(project_id)
card = Card(
id=body["id"],
name=body["name"],
description=body.get("description", ""),
card_type=body.get("card_type", "default"),
stages_json=json.dumps(body.get("stages", [])),
labels_json=json.dumps(body.get("labels", [])),
)
ops.create_card(card)
return {"ok": True, "card_id": card.id}
@router.get("/cards/{card_id}")
async def get_card(project_id: str, card_id: str):
ops = _card_ops(project_id)
card = ops.get_card(card_id)
if not card:
raise HTTPException(404, f"Card not found: {card_id}")
result = _card_to_dict(card)
result["progress"] = ops.card_progress(card_id)
return result
@router.patch("/cards/{card_id}")
async def update_card(project_id: str, card_id: str, body: Dict[str, Any]):
ops = _card_ops(project_id)
updates = {}
if "name" in body:
updates["name"] = body["name"]
if "description" in body:
updates["description"] = body["description"]
if "card_type" in body:
updates["card_type"] = body["card_type"]
if "stages" in body:
updates["stages_json"] = json.dumps(body["stages"])
if "labels" in body:
updates["labels_json"] = json.dumps(body["labels"])
if "status" in body:
updates["status"] = body["status"]
if not ops.update_card(card_id, **updates):
raise HTTPException(404, f"Card not found: {card_id}")
return {"ok": True}
@router.post("/cards/{card_id}/archive")
async def archive_card(project_id: str, card_id: str):
ops = _card_ops(project_id)
if not ops.update_card(card_id, status="archived"):
raise HTTPException(404, f"Card not found: {card_id}")
return {"ok": True}
@router.post("/cards/{card_id}/delete")
async def delete_card(project_id: str, card_id: str):
ops = _card_ops(project_id)
if not ops.delete_card(card_id):
raise HTTPException(404, f"Card not found or not archivable: {card_id}")
return {"ok": True}
@router.post("/cards/{card_id}/fork")
async def fork_card(project_id: str, card_id: str, body: Dict[str, Any]):
"""Fork Card:复制 stages + 元数据,不复制 Task"""
ops = _card_ops(project_id)
new_card = ops.fork_card(
card_id,
new_card_id=body["new_card_id"],
new_name=body["new_name"],
)
if not new_card:
raise HTTPException(404, f"Source card not found: {card_id}")
return {"ok": True, "card_id": new_card.id}
@router.post("/cards/{card_id}/refresh-status")
async def refresh_card_status(project_id: str, card_id: str):
"""手动刷新 Card 状态(从 Task 聚合推导)"""
ops = _card_ops(project_id)
new_status = ops.refresh_card_status(card_id)
return {"ok": True, "status": new_status}
# ===================================================================
# Card 内 Task CRUD
# ===================================================================
@router.get("/cards/{card_id}/tasks")
async def list_card_tasks(project_id: str, card_id: str,
status: Optional[str] = None):
bb = _bb(project_id)
tasks = bb.list_tasks(status=status, card_id=card_id)
return {"tasks": [_task_to_dict(t) for t in tasks]}
@router.post("/cards/{card_id}/tasks")
async def create_card_task(project_id: str, card_id: str,
body: Dict[str, Any]):
bb = _bb(project_id)
task = Task(
id=body["id"],
title=body["title"],
description=body.get("description"),
task_type=body.get("task_type", "coding"),
priority=body.get("priority", 5),
assignee=body.get("assignee"),
assigned_by=body.get("assigned_by", "user"),
depends_on=json.dumps(body["depends_on"]) if "depends_on" in body else None,
risk_level=body.get("risk_level", "standard"),
card_id=card_id,
stage=body.get("stage"),
)
bb.create_task(task)
return {"ok": True, "task_id": task.id}
@router.get("/cards/{card_id}/tasks/{task_id}")
async def get_card_task(project_id: str, card_id: str, task_id: str,
expand: Optional[str] = None):
bb = _bb(project_id)
task = bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task not found: {task_id}")
if task.card_id != card_id:
raise HTTPException(404, f"Task {task_id} not in card {card_id}")
result = _task_to_dict(task)
if expand == "all":
q = _q(project_id)
detail = q.task_detail(task_id)
if detail:
result["comments_count"] = detail.get("comments_count", 0)
result["outputs_count"] = detail.get("outputs_count", 0)
result["review_status"] = detail.get("review_status")
result["comments"] = [dict(c.__dict__) for c in bb.get_comments(task_id)]
result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)]
result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)]
return result
@router.post("/cards/{card_id}/tasks/{task_id}/status")
async def update_card_task_status(project_id: str, card_id: str,
task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
task = bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task not found: {task_id}")
if task.card_id != card_id:
raise HTTPException(404, f"Task {task_id} not in card {card_id}")
new_status = body.get("status")
if not new_status:
raise HTTPException(422, "Missing required field: status")
from src.blackboard.db import VALID_TRANSITIONS
current = task.status
allowed = VALID_TRANSITIONS.get(current, set())
if new_status not in allowed:
raise HTTPException(409, {
"error": "invalid_transition",
"detail": f"Cannot transition from {current} to {new_status}",
"valid_transitions": {current: sorted(allowed)},
})
if not bb.update_task_status(task_id, new_status, agent=body.get("agent")):
raise HTTPException(409, "Status update failed")
# 刷新 Card 状态
ops = _card_ops(project_id)
ops.refresh_card_status(card_id)
# SSE 推送
try:
from src.api.sse_routes import get_broker
broker = get_broker()
broker.publish_sync("task_updated", {
"project_id": project_id,
"card_id": card_id,
"task_id": task_id,
"old_status": current,
"new_status": new_status,
})
except Exception:
pass
return {"ok": True, "old_status": current, "new_status": new_status}
# ===================================================================
# Card 内其他操作(comments, outputs, reviews 等)
# ===================================================================
@router.post("/cards/{card_id}/tasks/{task_id}/outputs")
async def write_card_task_output(project_id: str, card_id: str,
task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
agent = body.get("agent")
if not agent:
raise HTTPException(422, "Missing required field: agent")
output_type = body.get("type") or body.get("content_type")
if not output_type:
raise HTTPException(422, "Missing required field: type")
if output_type not in OUTPUT_TYPES:
raise HTTPException(422, f"Invalid type: {output_type}")
title = body.get("title")
if not title:
raise HTTPException(422, "Missing required field: title")
content = body.get("content")
content_path = body.get("content_path") or body.get("path")
if content and not content_path:
import os
artifacts_dir = os.path.join(
os.path.dirname(bb.db_path), "artifacts", task_id
)
os.makedirs(artifacts_dir, exist_ok=True)
safe_name = "".join(c if c.isalnum() or c in "._-" else "_" for c in title)
if not safe_name:
safe_name = "output"
file_path = os.path.join(artifacts_dir, safe_name)
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
content_path = file_path
oid = bb.write_output(
task_id, agent, output_type, title,
content_path=content_path,
summary=body.get("summary"),
metadata=body.get("metadata"),
)
return {"ok": True, "output_id": oid}
# ===================================================================
# Helper
# ===================================================================
def _card_to_dict(c: Card) -> Dict[str, Any]:
d = {k: v for k, v in c.__dict__.items() if v is not None}
# 解析 JSON 字段
if "stages_json" in d:
try:
d["stages"] = json.loads(d.pop("stages_json"))
except (json.JSONDecodeError, KeyError):
d["stages"] = []
if "labels_json" in d:
try:
d["labels"] = json.loads(d.pop("labels_json"))
except (json.JSONDecodeError, KeyError):
d["labels"] = []
return d
def _task_to_dict(t: Task) -> Dict[str, Any]:
d = {k: v for k, v in t.__dict__.items() if v is not None}
return d
-143
View File
@@ -1,143 +0,0 @@
"""API 路由 — Mail Tabv2.7
Mail 是一个特殊的 Project (_mail),每个 Mail 是一个点对点的双节点 Task。
显示形式为独立 Tab,以列表形式展示。
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task
from src.blackboard.db import init_db
from src.utils import get_data_root
router = APIRouter(prefix="/api/mail", tags=["mail"])
MAIL_PROJECT_ID = "_mail"
def _db_path() -> Path:
root = get_data_root()
db = root / MAIL_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
return db
def _bb() -> Blackboard:
return Blackboard(_db_path())
@router.get("")
async def list_mail(status: Optional[str] = None,
_from: Optional[str] = None,
to: Optional[str] = None,
limit: int = Query(50, le=200)):
"""Mail 列表"""
bb = _bb()
tasks = bb.list_tasks(status=status, assignee=to)
result = []
for t in tasks:
# 解析 Mail 元数据
meta = json.loads(t.must_haves or "{}") if t.must_haves else {}
result.append({
"id": t.id,
"title": t.title,
"from": meta.get("from", t.assigned_by),
"to": t.assignee,
"status": t.status,
"type": meta.get("type", "inform"),
"is_read": meta.get("is_read", False),
"created_at": t.created_at,
"description": t.description,
})
return {"mails": result, "total": len(result)}
@router.get("/{mail_id}")
async def get_mail(mail_id: str):
"""Mail 详情"""
bb = _bb()
task = bb.get_task(mail_id)
if not task:
raise HTTPException(404, f"Mail not found: {mail_id}")
meta = json.loads(task.must_haves or "{}") if task.must_haves else {}
return {
"id": task.id,
"title": task.title,
"from": meta.get("from", task.assigned_by),
"to": task.assignee,
"status": task.status,
"type": meta.get("type", "inform"),
"is_read": meta.get("is_read", False),
"created_at": task.created_at,
"description": task.description,
"outputs": [dict(o.__dict__) for o in bb.get_outputs(mail_id)],
"comments": [dict(c.__dict__) for c in bb.get_comments(mail_id)],
}
@router.patch("/{mail_id}")
async def update_mail(mail_id: str, body: Dict[str, Any]):
"""更新 Mail(标记已读/已执行)"""
bb = _bb()
task = bb.get_task(mail_id)
if not task:
raise HTTPException(404, f"Mail not found: {mail_id}")
# 更新 must_haves 里的 is_read
meta = json.loads(task.must_haves or "{}") if task.must_haves else {}
if "is_read" in body:
meta["is_read"] = body["is_read"]
# 如果标记为已执行,更新状态
if body.get("mark_executed"):
meta["is_read"] = True
if task.status not in ("done", "cancelled"):
bb.update_task_status(mail_id, "done", agent="mail-api")
# 写回 must_haves
conn = bb._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET must_haves=? WHERE id=?",
(json.dumps(meta), mail_id),
)
conn.commit()
finally:
conn.close()
return {"ok": True}
@router.post("")
async def send_mail(body: Dict[str, Any]):
"""发送 Mail(创建 Task"""
bb = _bb()
meta = {
"from": body.get("from", "user"),
"type": body.get("type", "inform"),
"is_read": False,
}
task = Task(
id=body["id"],
title=body["title"],
description=body.get("description", ""),
assignee=body.get("to"),
assigned_by=body.get("from", "user"),
must_haves=json.dumps(meta),
card_id="default",
task_type="mail",
)
bb.create_task(task)
return {"ok": True, "mail_id": task.id}
-506
View File
@@ -1,506 +0,0 @@
"""v2.7 测试:Card CRUD + 三级层次结构 + Registry 改造"""
import json
from pathlib import Path
import pytest
from src.blackboard.db import init_db, get_connection
from src.blackboard.models import Task, Card
from src.blackboard.operations import Blackboard, CardOps
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
# ===================================================================
# Fixtures
# ===================================================================
@pytest.fixture
def db_path(tmp_path):
p = tmp_path / "test.db"
init_db(p)
return p
@pytest.fixture
def bb(db_path):
return Blackboard(db_path)
@pytest.fixture
def card_ops(db_path):
return CardOps(db_path)
@pytest.fixture
def registry(tmp_path):
return ProjectRegistry(tmp_path)
# ===================================================================
# Card 模型
# ===================================================================
class TestCardModel:
def test_card_defaults(self):
c = Card(id="c1", name="Test Card")
assert c.status == "active"
assert c.card_type == "default"
assert c.stages_json == "[]"
assert c.labels_json == "[]"
def test_card_from_row(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
c = card_ops.get_card("c1")
assert c is not None
assert c.name == "Test"
assert c.status == "active"
# ===================================================================
# CardOps CRUD
# ===================================================================
class TestCardOps:
def test_create_card(self, card_ops):
card = Card(id="c1", name="动量策略v1", card_type="strategy",
stages_json=json.dumps([
{"id": "research", "name": "因子研究"},
{"id": "coding", "name": "策略编码"},
]))
result = card_ops.create_card(card)
assert result.id == "c1"
assert result.name == "动量策略v1"
def test_get_card(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
c = card_ops.get_card("c1")
assert c is not None
assert c.name == "Test"
def test_get_nonexistent_card(self, card_ops):
assert card_ops.get_card("nope") is None
def test_list_cards(self, card_ops):
card_ops.create_card(Card(id="c1", name="A"))
card_ops.create_card(Card(id="c2", name="B"))
cards = card_ops.list_cards()
assert len(cards) == 3 # default + c1 + c2
def test_list_cards_by_status(self, card_ops):
card_ops.create_card(Card(id="c1", name="A"))
card_ops.create_card(Card(id="c2", name="B"))
card_ops.update_card("c2", status="archived")
active = card_ops.list_cards(status="active")
assert len(active) == 2 # default + c1
def test_update_card(self, card_ops):
card_ops.create_card(Card(id="c1", name="Old"))
assert card_ops.update_card("c1", name="New", description="Updated")
c = card_ops.get_card("c1")
assert c.name == "New"
assert c.description == "Updated"
def test_update_card_stages(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
new_stages = json.dumps([{"id": "s1", "name": "Stage 1"}])
card_ops.update_card("c1", stages_json=new_stages)
c = card_ops.get_card("c1")
assert json.loads(c.stages_json) == [{"id": "s1", "name": "Stage 1"}]
def test_archive_card(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
card_ops.update_card("c1", status="archived")
c = card_ops.get_card("c1")
assert c.status == "archived"
def test_delete_card(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
# 只能删除 archived/deleted 状态的
assert not card_ops.delete_card("c1")
card_ops.update_card("c1", status="archived")
assert card_ops.delete_card("c1")
assert card_ops.get_card("c1") is None
def test_fork_card(self, card_ops):
card_ops.create_card(Card(
id="c1", name="Strategy v1", card_type="strategy",
stages_json=json.dumps([{"id": "s1", "name": "Research"}]),
))
forked = card_ops.fork_card("c1", "c2", "Strategy v2")
assert forked is not None
assert forked.id == "c2"
assert forked.name == "Strategy v2"
assert forked.card_type == "strategy"
assert json.loads(forked.stages_json) == [{"id": "s1", "name": "Research"}]
def test_fork_nonexistent(self, card_ops):
assert card_ops.fork_card("nope", "c2", "New") is None
# ===================================================================
# Card 状态聚合
# ===================================================================
class TestCardStatusAggregation:
def test_empty_card_is_active(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Empty"))
status = card_ops.compute_card_status("c1")
assert status == "active"
def test_all_pending(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.create_task(Task(id="t2", title="T2", card_id="c1"))
status = card_ops.compute_card_status("c1")
assert status == "pending"
def test_has_working(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
status = card_ops.compute_card_status("c1")
assert status == "working"
def test_has_review(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
bb.update_task_status("t1", "review", agent="test")
status = card_ops.compute_card_status("c1")
assert status == "review"
def test_all_done(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
bb.update_task_status("t1", "review", agent="test")
bb.update_task_status("t1", "done", agent="test")
status = card_ops.compute_card_status("c1")
assert status == "done"
def test_cancelled_not_in_aggregation(self, card_ops, bb):
"""cancelled Task 不参与聚合"""
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "cancelled", agent="test")
status = card_ops.compute_card_status("c1")
assert status == "active" # 空(cancelled 排除)→ active
def test_manual_status_not_overridden(self, card_ops, bb):
"""手动状态(archived)不参与聚合"""
card_ops.create_card(Card(id="c1", name="Test"))
card_ops.update_card("c1", status="archived")
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
status = card_ops.compute_card_status("c1")
assert status == "archived"
def test_refresh_card_status(self, card_ops, bb):
card_ops.create_card(Card(id="c1", name="Test"))
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
new_status = card_ops.refresh_card_status("c1")
assert new_status == "working"
c = card_ops.get_card("c1")
assert c.status == "working"
# ===================================================================
# Card 进度
# ===================================================================
class TestCardProgress:
def test_empty_card_progress(self, card_ops):
card_ops.create_card(Card(id="c1", name="Test"))
progress = card_ops.card_progress("c1")
assert progress["total_tasks"] == 0
assert progress["progress_pct"] == 0
def test_card_progress_with_stages(self, card_ops, bb):
stages = [
{"id": "research", "name": "因子研究"},
{"id": "coding", "name": "策略编码"},
]
card_ops.create_card(Card(
id="c1", name="Test",
stages_json=json.dumps(stages),
))
# 2 个 research task1 done
bb.create_task(Task(id="t1", title="T1", card_id="c1", stage="research"))
bb.create_task(Task(id="t2", title="T2", card_id="c1", stage="research"))
bb.update_task_status("t1", "claimed", agent="test")
bb.update_task_status("t1", "working", agent="test")
bb.update_task_status("t1", "review", agent="test")
bb.update_task_status("t1", "done", agent="test")
progress = card_ops.card_progress("c1")
assert progress["total_tasks"] == 2
assert progress["done_tasks"] == 1
assert progress["progress_pct"] == 50
# Stage progress
stages_prog = progress["stages"]
research_stage = next(s for s in stages_prog if s["id"] == "research")
assert research_stage["total"] == 2
assert research_stage["done"] == 1
# ===================================================================
# Task card_id / stage
# ===================================================================
class TestTaskCardId:
def test_task_default_card_id(self, bb):
bb.create_task(Task(id="t1", title="T1"))
t = bb.get_task("t1")
# 默认 card_id 由 DB DEFAULT 设为 'default'
# 但 Task model 初始为 Nonefrom_row 读 DB 的值
assert t.card_id == "default"
def test_task_with_card_id(self, bb):
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
t = bb.get_task("t1")
assert t.card_id == "c1"
def test_task_with_stage(self, bb):
bb.create_task(Task(id="t1", title="T1", card_id="c1", stage="research"))
t = bb.get_task("t1")
assert t.stage == "research"
def test_list_tasks_by_card(self, bb):
bb.create_task(Task(id="t1", title="T1", card_id="c1"))
bb.create_task(Task(id="t2", title="T2", card_id="c2"))
bb.create_task(Task(id="t3", title="T3", card_id="c1"))
tasks = bb.list_tasks(card_id="c1")
assert len(tasks) == 2
assert all(t.card_id == "c1" for t in tasks)
# ===================================================================
# Registry 改造
# ===================================================================
class TestRegistryV2:
def test_create_project_sqlite(self, registry):
info = registry.create_project("p1", "Project 1",
agents=["a1", "a2"])
assert info["name"] == "Project 1"
assert info["agents"] == ["a1", "a2"]
assert info["status"] == "active"
def test_registry_persists(self, tmp_path):
r1 = ProjectRegistry(tmp_path)
r1.create_project("p1", "P1")
r2 = ProjectRegistry(tmp_path)
assert r2.get_project("p1") is not None
def test_auto_discover(self, tmp_path):
# 创建含 blackboard.db 的目录
(tmp_path / "discovered-proj").mkdir()
init_db(tmp_path / "discovered-proj" / "blackboard.db")
registry = ProjectRegistry(tmp_path)
found = registry.discover_projects()
assert "discovered-proj" in found
info = registry.get_project("discovered-proj")
assert info["source"] == "auto_discovered"
def test_auto_discover_skips_no_db(self, tmp_path):
(tmp_path / "empty-dir").mkdir()
registry = ProjectRegistry(tmp_path)
found = registry.discover_projects()
assert "empty-dir" not in found
def test_auto_discover_skips_special(self, tmp_path):
(tmp_path / "_internal").mkdir()
(tmp_path / "_internal" / "blackboard.db").touch()
registry = ProjectRegistry(tmp_path)
found = registry.discover_projects()
assert "_internal" not in found
def test_archive_no_move(self, tmp_path):
registry = ProjectRegistry(tmp_path)
registry.create_project("p1", "P1")
# 创建目录和文件
(tmp_path / "p1" / "test.txt").write_text("data")
registry.archive_project("p1")
# 目录仍然存在
assert (tmp_path / "p1").exists()
assert (tmp_path / "p1" / "test.txt").exists()
# 状态改为 archived
info = registry.get_project("p1")
assert info["status"] == "archived"
def test_migrate_from_yaml(self, tmp_path):
import yaml
yaml_path = tmp_path / "_registry.yaml"
with open(yaml_path, "w") as f:
yaml.dump({
"projects": {
"yaml-proj": {
"name": "YAML Project",
"description": "From YAML",
"status": "active",
"agents": ["a1"],
}
}
}, f)
registry = ProjectRegistry(tmp_path)
count = registry.migrate_from_yaml(yaml_path)
assert count == 1
info = registry.get_project("yaml-proj")
assert info is not None
assert info["name"] == "YAML Project"
# ===================================================================
# v2.7 迁移
# ===================================================================
class TestV27Migration:
def test_migrate_adds_card_id(self, tmp_path):
"""旧 DB 迁移后所有 task 有 card_id"""
db_path = tmp_path / "old.db"
conn = get_connection(db_path)
try:
# 先 init(创建旧版表)
from src.blackboard.db import _SCHEMA_STATEMENTS
for stmt in _SCHEMA_STATEMENTS:
try:
conn.execute(stmt)
except Exception:
pass
conn.commit()
# 插入旧版 task(无 card_id
conn.execute(
"INSERT INTO tasks (id, title, status) VALUES ('t1', 'Old Task', 'pending')"
)
conn.commit()
finally:
conn.close()
# init_db 触发迁移
init_db(db_path)
# 验证 card_id 已回填
conn = get_connection(db_path)
try:
row = conn.execute("SELECT card_id FROM tasks WHERE id='t1'").fetchone()
assert row["card_id"] == "default"
# 验证 default Card 存在
card = conn.execute("SELECT * FROM cards WHERE id='default'").fetchone()
assert card is not None
assert card["name"] == "Default"
finally:
conn.close()
def test_new_db_has_card_id(self, tmp_path):
"""新 DB 直接包含 card_id 和 stage"""
db_path = tmp_path / "new.db"
init_db(db_path)
conn = get_connection(db_path)
try:
# tasks 表有 card_id 和 stage
cols = [r[1] for r in conn.execute("PRAGMA table_info(tasks)").fetchall()]
assert "card_id" in cols
assert "stage" in cols
# cards 表存在
cols = [r[1] for r in conn.execute("PRAGMA table_info(cards)").fetchall()]
assert "id" in cols
assert "name" in cols
assert "status" in cols
assert "stages_json" in cols
finally:
conn.close()
# ===================================================================
# Mail API
# ===================================================================
class TestMailRoutes:
"""Mail Tab API 测试(用 Blackboard 直接操作 _mail DB"""
def test_mail_create_and_list(self, tmp_path):
from src.blackboard.operations import Blackboard
from src.blackboard.db import init_db
mail_db = tmp_path / "_mail" / "blackboard.db"
mail_db.parent.mkdir(parents=True)
init_db(mail_db)
bb = Blackboard(mail_db)
meta = {"from": "pangtong", "type": "review_request", "is_read": False}
bb.create_task(Task(
id="mail-001", title="请评审v2.7代码",
description="代码已提交,请评审",
assignee="simayi-challenger",
assigned_by="pangtong-fujunshi",
must_haves=json.dumps(meta),
card_id="default",
task_type="mail",
))
tasks = bb.list_tasks()
assert len(tasks) == 1
assert tasks[0].title == "请评审v2.7代码"
assert tasks[0].assignee == "simayi-challenger"
# 检查 must_haves 元数据
t = bb.get_task("mail-001")
m = json.loads(t.must_haves)
assert m["from"] == "pangtong"
assert m["is_read"] is False
def test_mail_mark_read(self, tmp_path):
from src.blackboard.operations import Blackboard
from src.blackboard.db import init_db
mail_db = tmp_path / "_mail" / "blackboard.db"
mail_db.parent.mkdir(parents=True)
init_db(mail_db)
bb = Blackboard(mail_db)
meta = {"from": "zhangfei", "is_read": False}
bb.create_task(Task(
id="mail-002", title="测试",
must_haves=json.dumps(meta), card_id="default",
))
# 更新 is_read
conn = bb._conn()
try:
meta["is_read"] = True
conn.execute("UPDATE tasks SET must_haves=? WHERE id=?",
(json.dumps(meta), "mail-002"))
conn.commit()
finally:
conn.close()
t = bb.get_task("mail-002")
assert json.loads(t.must_haves)["is_read"] is True
def test_mail_filter_by_to(self, tmp_path):
from src.blackboard.operations import Blackboard
from src.blackboard.db import init_db
mail_db = tmp_path / "_mail" / "blackboard.db"
mail_db.parent.mkdir(parents=True)
init_db(mail_db)
bb = Blackboard(mail_db)
bb.create_task(Task(id="m1", title="A", assignee="simayi", card_id="default"))
bb.create_task(Task(id="m2", title="B", assignee="zhangfei", card_id="default"))
bb.create_task(Task(id="m3", title="C", assignee="simayi", card_id="default"))
tasks = bb.list_tasks(assignee="simayi")
assert len(tasks) == 2