diff --git a/tests/test_e2e_v27.py b/tests/test_e2e_v27.py index 0d88614..5e4e3ed 100644 --- a/tests/test_e2e_v27.py +++ b/tests/test_e2e_v27.py @@ -470,3 +470,428 @@ class TestE6DependencyChain: self.bb.update_task_status("chain-b", s, agent="test") advanced2 = ticker._advance_dependencies(self.db_path) assert "chain-c" in advanced2 + + +# =================================================================== +# E7: 超时回收 +# =================================================================== + +class TestE7Timeout: + """E7: claimed/working 超时回收""" + + @pytest.fixture(autouse=True) + def setup(self, data_root): + self.pid = _pid() + db_path = data_root / self.pid / "blackboard.db" + db_path.parent.mkdir(parents=True, exist_ok=True) + init_db(db_path) + self.db_path = db_path + self.bb = Blackboard(db_path) + + def test_e71_claimed_timeout_to_pending(self): + """claimed 超过 claim_timeout → pending""" + self.bb.create_task(Task(id="to-001", title="超时测试", status="pending")) + self.bb.update_task_status("to-001", "claimed", agent="test-agent") + # 手动把 claimed_at 设为 10 分钟前 + conn = get_connection(self.db_path) + try: + conn.execute( + "UPDATE tasks SET claimed_at=datetime('now','-10 minutes') WHERE id=?", + ("to-001",) + ) + conn.commit() + finally: + conn.close() + + ticker = Ticker.__new__(Ticker) + ticker.claim_timeout_minutes = 5.0 + ticker.default_task_timeout_minutes = 30.0 + reclaimed = ticker._check_timeouts(self.db_path) + assert "to-001" in reclaimed + assert self.bb.get_task("to-001").status == "pending" + + def test_e72_working_timeout_to_failed(self): + """working 超过 task_timeout → failed""" + self.bb.create_task(Task(id="to-002", title="工作超时", status="pending")) + self.bb.update_task_status("to-002", "claimed", agent="test-agent") + self.bb.update_task_status("to-002", "working", agent="test-agent") + # 手动把 started_at 设为 60 分钟前 + conn = get_connection(self.db_path) + try: + conn.execute( + "UPDATE tasks SET started_at=datetime('now','-60 minutes') WHERE id=?", + ("to-002",) + ) + conn.commit() + finally: + conn.close() + + ticker = Ticker.__new__(Ticker) + ticker.claim_timeout_minutes = 5.0 + ticker.default_task_timeout_minutes = 30.0 + reclaimed = ticker._check_timeouts(self.db_path) + assert "to-002" in reclaimed + assert self.bb.get_task("to-002").status == "failed" + + +# =================================================================== +# E8: Mail Tab 6 端点 +# =================================================================== + +class TestE8MailTab: + """E8: Mail 端到端""" + + @pytest.fixture(autouse=True) + def setup(self, client): + self.mail_ids = [] + + def _send_mail(self, client, **kwargs): + resp = client.post("/api/mail", json=kwargs) + assert resp.status_code == 200 + data = resp.json() + assert data["ok"] is True + mid = data["mail_id"] + self.mail_ids.append(mid) + return mid + + def test_e81_send_inform_auto_done(self, client): + """inform 类型自动 done""" + mid = self._send_mail(client, + title="通知测试", + text="这是一条通知", + from_="pangtong-fujunshi", + to="simayi-challenger", + type="inform", + ) + resp = client.get(f"/api/mail/{mid}") + data = resp.json() + assert data["status"] == "done" + + def test_e82_send_task_assign_pending(self, client): + """task-assign 类型保持 pending""" + mid = self._send_mail(client, + title="任务分配", + text="请完成此任务", + from_="pangtong-fujunshi", + to="zhangfei-dev", + type="task-assign", + ) + resp = client.get(f"/api/mail/{mid}") + data = resp.json() + assert data["status"] == "pending" + + def test_e83_list_with_filters(self, client): + """列表 + 筛选""" + self._send_mail(client, + title="筛选测试1", + text="body", + from_="pangtong-fujunshi", + to="simayi-challenger", + type="text", + ) + self._send_mail(client, + title="筛选测试2", + text="body", + from_="zhangfei-dev", + to="simayi-challenger", + type="text", + ) + # 按 from 筛选 + resp = client.get("/api/mail?from_agent=pangtong-fujunshi") + mails = resp.json()["mails"] + assert any(m["title"] == "筛选测试1" for m in mails) + # 按 to 筛选 + resp = client.get("/api/mail?to_agent=simayi-challenger") + mails = resp.json()["mails"] + assert len(mails) >= 2 + + def test_e84_mail_detail_with_comments(self, client): + """详情 + 评论""" + mid = self._send_mail(client, + title="详情测试", + text="正文内容", + from_="pangtong-fujunshi", + to="simayi-challenger", + type="text", + ) + # 添加评论(通过 blackboard 直接写) + from src.api.mail_routes import _db_path + bb = Blackboard(_db_path()) + bb.add_comment(mid, "simayi-challenger", "收到,正在处理", comment_type="reply") + + resp = client.get(f"/api/mail/{mid}") + data = resp.json() + assert data["title"] == "详情测试" + assert len(data["comments"]) == 1 + assert data["comments"][0]["author"] == "simayi-challenger" + + def test_e85_mark_read(self, client): + """标记已读""" + mid = self._send_mail(client, + title="已读测试", + text="body", + from_="pangtong-fujunshi", + to="simayi-challenger", + type="text", + ) + # 确认初始未读 + resp = client.get(f"/api/mail/{mid}") + assert resp.json()["is_read"] is False + # 标记已读 + resp = client.patch(f"/api/mail/{mid}", json={"is_read": True}) + assert resp.status_code == 200 + # 验证 + resp = client.get(f"/api/mail/{mid}") + assert resp.json()["is_read"] is True + + def test_e86_mark_executed(self, client): + """标记已执行""" + mid = self._send_mail(client, + title="执行测试", + text="body", + from_="pangtong-fujunshi", + to="zhangfei-dev", + type="task-assign", + ) + resp = client.patch(f"/api/mail/{mid}", json={"mark_executed": True}) + assert resp.status_code == 200 + resp = client.get(f"/api/mail/{mid}") + data = resp.json() + assert data["is_read"] is True + assert data["status"] == "done" + + def test_e87_summary_and_agents(self, client): + """统计 + Agent 列表""" + self._send_mail(client, + title="统计测试", + text="body", + from_="zhaoyun-data", + to="guanyu-dev", + type="inform", + ) + resp = client.get("/api/mail/summary") + data = resp.json() + assert "total" in data + assert "unread" in data + assert data["total"] > 0 + + resp = client.get("/api/mail/agents/list") + data = resp.json() + assert "agents" in data + assert isinstance(data["agents"], list) + + +# =================================================================== +# E9: 真实 Agent 调度 +# =================================================================== + +class TestE9RealAgentDispatch: + """E9: 创建真实任务 → Ticker 调度 → Agent spawn + 回写 + + 注意:依赖真实 openclaw agent 进程,可能受环境影响。 + """ + + @pytest.fixture(autouse=True) + def setup(self, client, data_root): + self.pid = f"e2e-v27-agent-{uuid.uuid4().hex[:6]}" + client.post("/api/projects", json={ + "id": self.pid, + "name": f"E9-Agent-{self.pid}", + "config": {"agents": ["zhangfei-dev"]}, + }) + self.data_root = data_root + + def test_e91_simple_task_agent_execute(self, client): + """简单任务 → Agent 执行 → 完成""" + tid = _tid() + resp = client.post(f"/api/projects/{self.pid}/tasks", json={ + "id": tid, + "title": "E2E简单任务:echo hello", + "description": "请执行:echo hello,然后把输出写入黑板。这是E2E测试,完成后标记done。", + "status": "pending", + "assignee": "zhangfei-dev", + "task_type": "coding", + "priority": 5, + }) + assert resp.status_code == 200 + + # 手动触发 tick(走调度) + from src.blackboard.registry import ProjectRegistry + registry = ProjectRegistry(self.data_root) + from src.daemon.dispatcher import Dispatcher + dispatcher = Dispatcher(registry) + ticker = Ticker( + registry=registry, + tick_interval=30, + max_dispatch_per_tick=3, + dispatcher=dispatcher, + ) + result = asyncio.get_event_loop().run_until_complete(ticker.tick()) + + # 验证 tick 结果有调度 + assert self.pid in result.get("projects", {}) + + # 等待 Agent 完成(最多 120 秒) + db_path = self.data_root / self.pid / "blackboard.db" + bb = Blackboard(db_path) + for _ in range(24): + time.sleep(5) + t = bb.get_task(tid) + if t and t.status in ("done", "failed", "review"): + break + + t = bb.get_task(tid) + assert t is not None + # Agent 可能完成也可能超时,只要不是 pending 就算调度成功 + assert t.status != "pending", f"Agent 未被调度,状态仍为 pending" + + def test_e92_review_task_dispatch(self, client): + """review 任务 → 调度到 simayi-challenger""" + tid = _tid() + resp = client.post(f"/api/projects/{self.pid}/tasks", json={ + "id": tid, + "title": "E2E Review:检查代码", + "description": "请查看任务描述并回复你的评审意见。这是E2E测试,简单回复即可。", + "status": "pending", + "assignee": "simayi-challenger", + "task_type": "review", + "priority": 5, + }) + assert resp.status_code == 200 + + from src.blackboard.registry import ProjectRegistry + registry = ProjectRegistry(self.data_root) + from src.daemon.dispatcher import Dispatcher + dispatcher = Dispatcher(registry) + ticker = Ticker( + registry=registry, + tick_interval=30, + max_dispatch_per_tick=3, + dispatcher=dispatcher, + ) + result = asyncio.get_event_loop().run_until_complete(ticker.tick()) + + db_path = self.data_root / self.pid / "blackboard.db" + bb = Blackboard(db_path) + for _ in range(24): + time.sleep(5) + t = bb.get_task(tid) + if t and t.status in ("done", "failed", "review", "working", "claimed"): + break + + t = bb.get_task(tid) + assert t is not None + assert t.status != "pending", f"Review Agent 未被调度,状态仍为 pending" + + +# =================================================================== +# E10: 全链路集成 +# =================================================================== + +class TestE10FullChain: + """E10: 项目 → 父子Task → Ticker → 聚合 → 依赖 → Mail → 验证完整链""" + + def test_e10_full_chain(self, client, data_root): + pid = f"e2e-v27-full-{uuid.uuid4().hex[:6]}" + + # 1. 创建项目 + resp = client.post("/api/projects", json={ + "id": pid, "name": f"E10全链路-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }) + assert resp.status_code == 200 + + # 2. 创建父 Task(带 stages) + parent_id = f"{pid}-parent" + resp = client.post(f"/api/projects/{pid}/tasks", json={ + "id": parent_id, + "title": "全链路父任务", + "status": "pending", + "stages_json": json.dumps(["setup", "execute", "verify"]), + }) + assert resp.status_code == 200 + + # 3. 创建 3 个子 Task + 1 个依赖 Task + child_ids = [] + for i, stage in enumerate(["setup", "execute", "verify"]): + cid = f"{pid}-child-{i}" + child_ids.append(cid) + resp = client.post(f"/api/projects/{pid}/tasks", json={ + "id": cid, + "title": f"子任务-{stage}", + "status": "pending", + "parent_task": parent_id, + "stage": stage, + }) + assert resp.status_code == 200 + + # 依赖 Task(blocked) + dep_id = f"{pid}-dep" + resp = client.post(f"/api/projects/{pid}/tasks", json={ + "id": dep_id, + "title": "依赖任务", + "status": "blocked", + "depends_on": child_ids[0], # 依赖 setup 子任务 + }) + assert resp.status_code == 200 + + # 4. 完成 setup 子任务 + db_path = data_root / pid / "blackboard.db" + bb = Blackboard(db_path) + for s in ["claimed", "working", "review", "done"]: + bb.update_task_status(child_ids[0], s, agent="test") + + # 5. 触发 Ticker(依赖推进 + 父状态聚合) + from src.blackboard.registry import ProjectRegistry + registry = ProjectRegistry(data_root) + ticker = Ticker(registry=registry, tick_interval=30) + + # 依赖推进 + advanced = ticker._advance_dependencies(db_path) + assert dep_id in advanced, "依赖任务应被推进为 pending" + + # 父状态聚合 + ticker._refresh_parent_statuses(db_path) + parent = bb.get_task(parent_id) + # 1 done + 2 pending → working (有活跃子任务) 或 pending + assert parent.status in ("pending", "working", "review"), f"父任务状态异常: {parent.status}" + + # 6. 发送 Mail 通知 + resp = client.post("/api/mail", json={ + "title": f"E10全链路完成通知-{pid}", + "text": f"项目 {pid} 的 setup 阶段已完成", + "from": "simayi-challenger", + "to": "pangtong-fujunshi", + "type": "inform", + }) + assert resp.status_code == 200 + + # 7. 验证 Mail 已创建 + resp = client.get("/api/mail?from_agent=simayi-challenger") + mails = resp.json()["mails"] + assert any(m["title"].startswith("E10全链路") for m in mails) + + # 8. 验证 Stage 进度 + q = Queries(db_path) + progress = q.parent_task_progress(parent_id) + stages = {s["stage"]: s for s in progress["stages"]} + assert stages["setup"]["done"] == 1 + assert stages["execute"]["done"] == 0 + + # 9. 完成 execute 子任务 + for s in ["claimed", "working", "review", "done"]: + bb.update_task_status(child_ids[1], s, agent="test") + + # 再次聚合 + ticker._refresh_parent_statuses(db_path) + progress2 = q.parent_task_progress(parent_id) + stages2 = {s["stage"]: s for s in progress2["stages"]} + assert stages2["setup"]["done"] == 1 + assert stages2["execute"]["done"] == 1 + assert stages2["verify"]["done"] == 0 + + print(f"\n✅ E10 全链路测试通过!项目: {pid}") + print(f" 父任务: {parent_id}, 状态: {bb.get_task(parent_id).status}") + print(f" Stage进度: setup=done, execute=done, verify=pending") + print(f" 依赖推进: {dep_id} → pending") + print(f" Mail通知: 已发送")