From 655aecab40388d64897dea204ad953c592e9ba9e Mon Sep 17 00:00:00 2001 From: cfdaily Date: Mon, 1 Jun 2026 23:35:25 +0800 Subject: [PATCH] auto-sync: 2026-06-01 23:35:25 --- tests/test_e2e_v27.py | 491 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 491 insertions(+) diff --git a/tests/test_e2e_v27.py b/tests/test_e2e_v27.py index 843f47a..58804d6 100644 --- a/tests/test_e2e_v27.py +++ b/tests/test_e2e_v27.py @@ -1097,3 +1097,494 @@ class TestE10FullChain: print(f" Stage进度: step1.done={stages.get('step1', {}).get('done', '?')}") print(f" ✅ E10b 真实Agent全链路完成") + + +# =================================================================== +# E11: Acquire-First 真实 Agent E2E(#07.1) +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE11AcquireFirstE2E: + """E11: #07.1 Acquire-First Phase 1-4 真实 Agent E2E + + 验证真实 daemon 调度路径中 counter acquire + session check + spawn 的完整流程。 + 通过检查 routing_decisions 表和任务状态来验证。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="E11") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200 + return tid + + def test_e11a_dispatch_with_routing_audit(self): + """E11a: 任务调度 → 检查 routing_decisions 记录了 dispatch + selected_agent + + 验证 Phase 1-4 正常完成:counter acquire → session check → spawn。 + routing_decisions 应记录 dispatched outcome + selected_agent。 + """ + pid = self._create_project("E11a") + tid = self._create_task( + pid, + title="E2E Acquire-First:echo hello", + description=( + "请执行 echo hello-world 并标记done。\n" + "这是E2E测试 #07.1 Acquire-First,不需要做其他事。" + ), + assignee="zhangfei-dev", + task_type="coding", + ) + + print(f"\n🚀 E11a: 等待调度 + Agent执行 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + # 验证任务被调度(不是 pending) + assert status != "pending", ( + f"任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending,调度未生效" + ) + + # 验证 routing_decisions 有记录 + db_path = DATA_ROOT / pid / "blackboard.db" + if db_path.exists(): + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + conn.row_factory = sq3.Row + try: + rows = conn.execute( + "SELECT * FROM routing_decisions WHERE task_id=? ORDER BY id DESC LIMIT 1", + (tid,), + ).fetchall() + assert len(rows) > 0, f"routing_decisions 无记录 for {tid}" + row = rows[0] + print(f" routing: mode={row['mode']} agent={row['selected_agent']} outcome={row['outcome']}") + assert row["selected_agent"] == "zhangfei-dev" + assert row["outcome"] == "dispatched" + finally: + conn.close() + + print(f" ✅ Acquire-First 调度审计验证通过") + + def test_e11b_concurrent_dispatch_counter_block(self): + """E11b: 同 agent 连续两个任务 → 第二个应被 skip(counter full 或 session busy) + + #07.1 Phase 1 counter acquire 互斥:同 agent 同时只能有一个活跃 session。 + 第二个任务在第一个完成前应被 skip。 + """ + pid = self._create_project("E11b") + + # 创建两个任务,同 assignee + tid1 = self._create_task( + pid, + title="E2E Counter-Block 任务1:echo first", + description="请执行 echo first 并标记done。E2E测试,不需要做其他事。", + assignee="zhangfei-dev", + task_type="coding", + ) + tid2 = self._create_task( + pid, + title="E2E Counter-Block 任务2:echo second", + description="请执行 echo second 并标记done。E2E测试,不需要做其他事。", + assignee="zhangfei-dev", + task_type="coding", + ) + + print(f"\n🚀 E11b: 等待两个任务调度 (pid={pid})") + + # 等第一个完成或至少开始执行 + result1 = _poll_task( + pid, tid1, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status1 = result1.get("status") + print(f" 任务1状态: {status1}") + assert status1 != "pending", "任务1未被调度" + + # 等第二个(可能被 skip 后在第一个完成后重新调度) + result2 = _poll_task( + pid, tid2, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status2 = result2.get("status") + print(f" 任务2状态: {status2}") + + # 验证 routing_decisions 中有 skipped 记录(第二个任务在某次 tick 被 skip) + db_path = DATA_ROOT / pid / "blackboard.db" + if db_path.exists(): + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + conn.row_factory = sq3.Row + try: + rows = conn.execute( + "SELECT outcome, detail FROM routing_decisions WHERE task_id=? ORDER BY id", + (tid2,), + ).fetchall() + outcomes = [r["outcome"] for r in rows] + print(f" 任务2 routing outcomes: {outcomes}") + # 应该有至少一个 skipped(counter blocked 或 session busy) + # 或者最终 dispatched(在第一个任务完成后重新调度) + assert len(rows) > 0, "任务2无 routing 记录" + finally: + conn.close() + + print(f" ✅ Counter block 并发调度验证完成") + + +# =================================================================== +# E12: _check_timeouts 统一 + crash_limit E2E(#07.2) +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE12TimeoutsUnifiedE2E: + """E12: #07.2 _check_timeouts 统一超时 + crash_limit + updated_at fallback + + 通过 DB 操作模拟超时场景,等待 ticker 真实处理。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="E12") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def test_e12a_crash_limit_marks_failed(self): + """E12a: 3 次 crash → _check_timeouts 标 failed + + 手动写 3 条 crashed attempt(30min 内),等 ticker → 验证 failed。 + """ + pid = self._create_project("E12a") + + # 创建 working 任务 + tid = f"e2e-task-{uuid.uuid4().hex[:8]}" + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", + json={"id": tid, "title": "Crash Limit 测试", "status": "pending", + "assignee": "zhangfei-dev"}, + timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + + # 写 3 条 crash attempt + db_path = DATA_ROOT / pid / "blackboard.db" + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + try: + for i in range(3): + conn.execute( + "INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) " + "VALUES (?, ?, ?, ?, datetime('now', ?))", + (tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"), + ) + conn.commit() + finally: + conn.close() + + print(f"\n🚀 E12a: 3次crash已写入,等待ticker处理 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_DISPATCH, + terminal_states=("failed", "done", "cancelled"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status == "failed", ( + f"3次crash后任务应为failed,实际: {status}" + ) + print(f" ✅ crash_limit 统一检查验证通过") + + def test_e12b_updated_at_fallback_reclaim(self): + """E12b: working 任务无 started_at → updated_at fallback → 超时回收 + + #07.3 ACT-1: PM2 重启后 mail 孤儿任务只有 updated_at, + _check_timeouts 用 updated_at 作为 fallback。 + """ + pid = self._create_project("E12b") + + tid = f"e2e-task-{uuid.uuid4().hex[:8]}" + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", + json={"id": tid, "title": "updated_at Fallback 测试", "status": "pending", + "assignee": "zhangfei-dev"}, + timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + + # 清空 started_at/claimed_at,设 updated_at 为 60 分钟前 + db_path = DATA_ROOT / pid / "blackboard.db" + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + try: + conn.execute( + "UPDATE tasks SET started_at=NULL, claimed_at=NULL, " + "updated_at=datetime('now','-60 minutes') WHERE id=?", + (tid,), + ) + conn.commit() + finally: + conn.close() + + print(f"\n🚀 E12b: updated_at 设为60min前,等待ticker回收 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_DISPATCH, + terminal_states=("failed", "done", "cancelled"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status != "working", ( + f"超时任务应被回收,实际仍为 working" + ) + print(f" ✅ updated_at fallback 回收验证通过") + + +# =================================================================== +# E13: Compact Hanging 不标 failed E2E(#07.3 ACT-2) +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE13CompactHangingE2E: + """E13: compact_hanging outcome → 任务保持 working(不标 failed) + + 通过 DB 写入 compact_hanging attempt,验证 ticker 不将其标为 failed。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="E13") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def test_e13a_compact_hanging_keeps_working(self): + """E13a: compact_hanging attempt → ticker 超时检查不应立即标 failed + + 写入 compact_hanging attempt(非 crash),验证任务保持 working + 直到真正超时才被回收。 + """ + pid = self._create_project("E13a") + + tid = f"e2e-task-{uuid.uuid4().hex[:8]}" + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", + json={"id": tid, "title": "Compact Hanging 测试", "status": "pending", + "assignee": "zhangfei-dev"}, + timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + + # 写入 compact_hanging attempt(最近的时间) + db_path = DATA_ROOT / pid / "blackboard.db" + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + try: + conn.execute( + "INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) " + "VALUES (?, ?, ?, ?, datetime('now'))", + (tid, 1, "zhangfei-dev", "compact_hanging"), + ) + # 设 started_at 为最近时间(不应超时) + conn.execute( + "UPDATE tasks SET started_at=datetime('now','-5 minutes') WHERE id=?", + (tid,), + ) + conn.commit() + finally: + conn.close() + + print(f"\n🚀 E13a: compact_hanging attempt 已写入,等一个 tick 验证不被标 failed") + # 等一个 tick(30s + buffer) + time.sleep(45) + + # 验证任务仍为 working + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + assert resp.status_code == 200 + status = resp.json().get("status") + print(f" 一个 tick 后状态: {status}") + + assert status == "working", ( + f"compact_hanging 后任务应保持 working,实际: {status}" + ) + print(f" ✅ compact_hanging 不标 failed 验证通过") + + +# =================================================================== +# E14: _rollback_current_agent E2E(#07.2) +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE14RollbackE2E: + """E14: crash 后 current_agent 回退验证 + + 通过 DB 操作模拟 crash,验证 current_agent 被回退。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="E14") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def test_e14a_rollback_on_crash(self): + """E14a: 3次crash → failed + current_agent 回退到 assignee + + #07.2: crash 回退由 _rollback_current_agent 执行。 + """ + pid = self._create_project("E14a") + + tid = f"e2e-task-{uuid.uuid4().hex[:8]}" + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", + json={"id": tid, "title": "Rollback 测试", "status": "pending", + "assignee": "zhangfei-dev"}, + timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + + # 设置 current_agent 并写 3 次 crash attempt + db_path = DATA_ROOT / pid / "blackboard.db" + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + try: + conn.execute( + "UPDATE tasks SET current_agent='zhangfei-dev' WHERE id=?", + (tid,), + ) + for i in range(3): + conn.execute( + "INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) " + "VALUES (?, ?, ?, ?, datetime('now', ?))", + (tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"), + ) + conn.commit() + finally: + conn.close() + + print(f"\n🚀 E14a: 3次crash + current_agent 已设置,等待ticker (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_DISPATCH, + terminal_states=("failed", "done", "cancelled"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status == "failed", f"应为 failed,实际: {status}" + + # 验证 current_agent 已回退(等于 assignee 或 None) + conn2 = sq3.connect(str(db_path)) + conn2.row_factory = sq3.Row + try: + row = conn2.execute( + "SELECT current_agent, assignee FROM tasks WHERE id=?", (tid,) + ).fetchone() + print(f" current_agent={row['current_agent']} assignee={row['assignee']}") + # _rollback_current_agent 应将 current_agent 回退为 assignee + assert row["current_agent"] == row["assignee"], ( + f"crash后 current_agent 应回退为 assignee," + f"实际 current_agent={row['current_agent']} assignee={row['assignee']}" + ) + finally: + conn2.close() + + print(f" ✅ crash rollback current_agent 验证通过")