diff --git a/tests/test_v27_subtasks.py b/tests/test_v27_subtasks.py new file mode 100644 index 0000000..62a671f --- /dev/null +++ b/tests/test_v27_subtasks.py @@ -0,0 +1,302 @@ +"""v2.7 父子 Task 关系 + Stage 进度测试""" + +import json +import tempfile +from pathlib import Path + +import pytest + +from src.blackboard.db import init_db, get_connection +from src.blackboard.models import Task +from src.blackboard.operations import Blackboard +from src.blackboard.queries import Queries + + +@pytest.fixture +def db_path(tmp_path): + p = tmp_path / "test.db" + init_db(p) + return p + + +@pytest.fixture +def bb(db_path): + return Blackboard(db_path) + + +@pytest.fixture +def queries(db_path): + return Queries(db_path) + + +# ====================================================================== +# 基础父子关系 +# ====================================================================== + +class TestParentChild: + def test_create_parent_task(self, bb, queries): + """顶层 Task 无 parent_task""" + task = Task(id="parent-1", title="动量策略v1") + bb.create_task(task) + top = queries.top_level_tasks() + assert len(top) == 1 + assert top[0].id == "parent-1" + assert top[0].parent_task is None + + def test_create_child_task(self, bb, queries): + """子 Task 有 parent_task""" + parent = Task(id="parent-1", title="动量策略v1", + stages_json=json.dumps([ + {"id": "research", "label": "因子研究", "order": 1}, + {"id": "coding", "label": "策略编码", "order": 2}, + ])) + bb.create_task(parent) + + child = Task(id="child-1", title="因子研究", + parent_task="parent-1", stage="research", + assignee="zhangfei-dev") + bb.create_task(child) + + subtasks = queries.list_subtasks("parent-1") + assert len(subtasks) == 1 + assert subtasks[0].parent_task == "parent-1" + assert subtasks[0].stage == "research" + + def test_top_level_excludes_children(self, bb, queries): + """top_level_tasks 不包含子 Task""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child", parent_task="p1")) + + top = queries.top_level_tasks() + assert len(top) == 1 + assert top[0].id == "p1" + + def test_list_subtasks_empty(self, bb, queries): + """无子 Task 返回空列表""" + bb.create_task(Task(id="p1", title="Parent")) + assert queries.list_subtasks("p1") == [] + + def test_multiple_children(self, bb, queries): + """多个子 Task""" + bb.create_task(Task(id="p1", title="Parent")) + for i in range(5): + bb.create_task(Task(id=f"c{i}", title=f"Child {i}", parent_task="p1")) + + subtasks = queries.list_subtasks("p1") + assert len(subtasks) == 5 + + def test_child_with_depends_on(self, bb, queries): + """子 Task 间有依赖关系""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="因子研究", parent_task="p1")) + bb.create_task(Task(id="c2", title="策略编码", parent_task="p1", + depends_on=json.dumps(["c1"]))) + + subtasks = queries.list_subtasks("p1") + assert len(subtasks) == 2 + c2 = [s for s in subtasks if s.id == "c2"][0] + assert json.loads(c2.depends_on) == ["c1"] + + +# ====================================================================== +# 父 Task 状态聚合 +# ====================================================================== + +class TestParentStatusAggregation: + def test_all_pending(self, bb, queries): + """所有子 Task pending → 父 pending""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child 1", parent_task="p1")) + bb.create_task(Task(id="c2", title="Child 2", parent_task="p1")) + + status = queries.compute_parent_status("p1") + assert status == "pending" + + def test_has_working(self, bb, queries): + """有子 Task working → 父 working""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child 1", parent_task="p1")) + bb.create_task(Task(id="c2", title="Child 2", parent_task="p1", status="working")) + + status = queries.compute_parent_status("p1") + assert status == "working" + + def test_has_claimed_counts_as_working(self, bb, queries): + """有子 Task claimed → 父 working""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child 1", parent_task="p1")) + bb.create_task(Task(id="c2", title="Child 2", parent_task="p1", status="claimed")) + + status = queries.compute_parent_status("p1") + assert status == "working" + + def test_has_review(self, bb, queries): + """有子 Task review → 父 review""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child 1", parent_task="p1", status="done")) + bb.create_task(Task(id="c2", title="Child 2", parent_task="p1", status="review")) + + status = queries.compute_parent_status("p1") + assert status == "review" + + def test_all_done(self, bb, queries): + """所有子 Task done → 父 done""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child 1", parent_task="p1", status="done")) + bb.create_task(Task(id="c2", title="Child 2", parent_task="p1", status="done")) + + status = queries.compute_parent_status("p1") + assert status == "done" + + def test_has_failed(self, bb, queries): + """有子 Task failed → 父 failed""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child 1", parent_task="p1", status="done")) + bb.create_task(Task(id="c2", title="Child 2", parent_task="p1", status="failed")) + + status = queries.compute_parent_status("p1") + assert status == "failed" + + def test_cancelled_excluded_from_aggregation(self, bb, queries): + """cancelled 子 Task 不参与聚合""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child 1", parent_task="p1", status="done")) + bb.create_task(Task(id="c2", title="Child 2", parent_task="p1", status="cancelled")) + + status = queries.compute_parent_status("p1") + assert status == "done" # c1 done, c2 excluded + + def test_manual_status_not_overridden(self, bb, queries): + """cancelled 的父 Task 不参与聚合""" + bb.create_task(Task(id="p1", title="Parent", status="cancelled")) + bb.create_task(Task(id="c1", title="Child 1", parent_task="p1", status="done")) + + status = queries.compute_parent_status("p1") + assert status == "cancelled" + + def test_no_children_keeps_current(self, bb, queries): + """无子 Task 保持当前状态""" + bb.create_task(Task(id="p1", title="Parent", status="working")) + status = queries.compute_parent_status("p1") + assert status == "working" + + def test_priority_review_over_working(self, bb, queries): + """review 优先于 working""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child 1", parent_task="p1", status="working")) + bb.create_task(Task(id="c2", title="Child 2", parent_task="p1", status="review")) + + status = queries.compute_parent_status("p1") + assert status == "review" + + def test_priority_working_over_blocked(self, bb, queries): + """working 优先于 blocked""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child 1", parent_task="p1", status="working")) + bb.create_task(Task(id="c2", title="Child 2", parent_task="p1", status="blocked")) + + status = queries.compute_parent_status("p1") + assert status == "working" + + +# ====================================================================== +# Stage 进度 +# ====================================================================== + +class TestStageProgress: + def test_parent_task_progress(self, bb, queries): + """Stage 进度查询""" + bb.create_task(Task(id="p1", title="动量策略v1", + stages_json=json.dumps([ + {"id": "research", "label": "因子研究", "order": 1}, + {"id": "coding", "label": "策略编码", "order": 2}, + {"id": "backtest", "label": "回测验证", "order": 3}, + ]))) + bb.create_task(Task(id="c1", title="因子研究", parent_task="p1", + stage="research", status="done")) + bb.create_task(Task(id="c2", title="策略编码", parent_task="p1", + stage="coding", status="working")) + bb.create_task(Task(id="c3", title="回测验证", parent_task="p1", + stage="backtest", status="pending")) + + progress = queries.parent_task_progress("p1") + assert progress["total_subtasks"] == 3 + assert progress["done_subtasks"] == 1 + assert progress["active_stage"] == "策略编码" + assert len(progress["stages"]) == 3 + + # Stage 详情 + research = [s for s in progress["stages"] if s["id"] == "research"][0] + assert research["total"] == 1 + assert research["done"] == 1 + + coding = [s for s in progress["stages"] if s["id"] == "coding"][0] + assert coding["active"] == 1 + + def test_progress_empty_parent(self, bb, queries): + """无子 Task 的父 Task""" + bb.create_task(Task(id="p1", title="Empty", + stages_json=json.dumps([{"id": "s1", "label": "Step 1", "order": 1}]))) + progress = queries.parent_task_progress("p1") + assert progress["total_subtasks"] == 0 + assert progress["done_subtasks"] == 0 + assert progress["active_stage"] is None + + def test_progress_nonexistent(self, bb, queries): + """不存在的 Task 返回空""" + progress = queries.parent_task_progress("nonexistent") + assert progress == {} + + +# ====================================================================== +# stages_json 字段 +# ====================================================================== + +class TestStagesJson: + def test_default_stages_json(self, bb, queries): + """默认 stages_json 为空数组""" + bb.create_task(Task(id="t1", title="Task")) + task = bb.get_task("t1") + assert task.stages_json == "[]" + + def test_set_stages_json(self, bb, queries): + """设置 stages_json""" + stages = [{"id": "s1", "label": "Step 1", "order": 1}] + bb.create_task(Task(id="t1", title="Task", stages_json=json.dumps(stages))) + task = bb.get_task("t1") + assert json.loads(task.stages_json) == stages + + def test_stage_field_on_child(self, bb, queries): + """子 Task 的 stage 字段""" + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child", parent_task="p1", stage="research")) + task = bb.get_task("c1") + assert task.stage == "research" + + +# ====================================================================== +# Ticker 父 Task 聚合刷新 +# ====================================================================== + +class TestTickerParentRefresh: + def test_refresh_parent_status(self, bb, queries): + """聚合刷新写入父 Task""" + from src.daemon.ticker import Ticker + from src.blackboard.registry import ProjectRegistry + + bb.create_task(Task(id="p1", title="Parent")) + bb.create_task(Task(id="c1", title="Child", parent_task="p1", status="done")) + bb.create_task(Task(id="c2", title="Child", parent_task="p1", status="done")) + + # 手动刷新 + conn = get_connection(bb.db_path) + try: + # 模拟 Ticker 的 _refresh_parent_statuses + computed = queries.compute_parent_status("p1") + conn.execute("UPDATE tasks SET status=? WHERE id=?", (computed, "p1")) + conn.commit() + finally: + conn.close() + + parent = bb.get_task("p1") + assert parent.status == "done"