auto-sync: 2026-06-01 23:35:25

This commit is contained in:
cfdaily
2026-06-01 23:35:25 +08:00
parent 3f371a0040
commit 655aecab40
+491
View File
@@ -1097,3 +1097,494 @@ class TestE10FullChain:
print(f" Stage进度: step1.done={stages.get('step1', {}).get('done', '?')}")
print(f" ✅ E10b 真实Agent全链路完成")
# ===================================================================
# E11: Acquire-First 真实 Agent E2E#07.1
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE11AcquireFirstE2E:
"""E11: #07.1 Acquire-First Phase 1-4 真实 Agent E2E
验证真实 daemon 调度路径中 counter acquire + session check + spawn 的完整流程。
通过检查 routing_decisions 表和任务状态来验证。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E11") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_e11a_dispatch_with_routing_audit(self):
"""E11a: 任务调度 → 检查 routing_decisions 记录了 dispatch + selected_agent
验证 Phase 1-4 正常完成:counter acquire → session check → spawn。
routing_decisions 应记录 dispatched outcome + selected_agent。
"""
pid = self._create_project("E11a")
tid = self._create_task(
pid,
title="E2E Acquire-Firstecho hello",
description=(
"请执行 echo hello-world 并标记done。\n"
"这是E2E测试 #07.1 Acquire-First,不需要做其他事。"
),
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 E11a: 等待调度 + Agent执行 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
# 验证任务被调度(不是 pending)
assert status != "pending", (
f"任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending,调度未生效"
)
# 验证 routing_decisions 有记录
db_path = DATA_ROOT / pid / "blackboard.db"
if db_path.exists():
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
conn.row_factory = sq3.Row
try:
rows = conn.execute(
"SELECT * FROM routing_decisions WHERE task_id=? ORDER BY id DESC LIMIT 1",
(tid,),
).fetchall()
assert len(rows) > 0, f"routing_decisions 无记录 for {tid}"
row = rows[0]
print(f" routing: mode={row['mode']} agent={row['selected_agent']} outcome={row['outcome']}")
assert row["selected_agent"] == "zhangfei-dev"
assert row["outcome"] == "dispatched"
finally:
conn.close()
print(f" ✅ Acquire-First 调度审计验证通过")
def test_e11b_concurrent_dispatch_counter_block(self):
"""E11b: 同 agent 连续两个任务 → 第二个应被 skipcounter full 或 session busy
#07.1 Phase 1 counter acquire 互斥:同 agent 同时只能有一个活跃 session。
第二个任务在第一个完成前应被 skip。
"""
pid = self._create_project("E11b")
# 创建两个任务,同 assignee
tid1 = self._create_task(
pid,
title="E2E Counter-Block 任务1echo first",
description="请执行 echo first 并标记done。E2E测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
tid2 = self._create_task(
pid,
title="E2E Counter-Block 任务2echo second",
description="请执行 echo second 并标记done。E2E测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 E11b: 等待两个任务调度 (pid={pid})")
# 等第一个完成或至少开始执行
result1 = _poll_task(
pid, tid1, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status1 = result1.get("status")
print(f" 任务1状态: {status1}")
assert status1 != "pending", "任务1未被调度"
# 等第二个(可能被 skip 后在第一个完成后重新调度)
result2 = _poll_task(
pid, tid2, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status2 = result2.get("status")
print(f" 任务2状态: {status2}")
# 验证 routing_decisions 中有 skipped 记录(第二个任务在某次 tick 被 skip)
db_path = DATA_ROOT / pid / "blackboard.db"
if db_path.exists():
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
conn.row_factory = sq3.Row
try:
rows = conn.execute(
"SELECT outcome, detail FROM routing_decisions WHERE task_id=? ORDER BY id",
(tid2,),
).fetchall()
outcomes = [r["outcome"] for r in rows]
print(f" 任务2 routing outcomes: {outcomes}")
# 应该有至少一个 skippedcounter blocked 或 session busy
# 或者最终 dispatched(在第一个任务完成后重新调度)
assert len(rows) > 0, "任务2无 routing 记录"
finally:
conn.close()
print(f" ✅ Counter block 并发调度验证完成")
# ===================================================================
# E12: _check_timeouts 统一 + crash_limit E2E#07.2
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE12TimeoutsUnifiedE2E:
"""E12: #07.2 _check_timeouts 统一超时 + crash_limit + updated_at fallback
通过 DB 操作模拟超时场景,等待 ticker 真实处理。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E12") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_e12a_crash_limit_marks_failed(self):
"""E12a: 3 次 crash → _check_timeouts 标 failed
手动写 3 条 crashed attempt30min 内),等 ticker → 验证 failed。
"""
pid = self._create_project("E12a")
# 创建 working 任务
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "Crash Limit 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 写 3 条 crash attempt
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
for i in range(3):
conn.execute(
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
"VALUES (?, ?, ?, ?, datetime('now', ?))",
(tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 E12a: 3次crash已写入,等待ticker处理 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("failed", "done", "cancelled"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status == "failed", (
f"3次crash后任务应为failed,实际: {status}"
)
print(f" ✅ crash_limit 统一检查验证通过")
def test_e12b_updated_at_fallback_reclaim(self):
"""E12b: working 任务无 started_at → updated_at fallback → 超时回收
#07.3 ACT-1: PM2 重启后 mail 孤儿任务只有 updated_at
_check_timeouts 用 updated_at 作为 fallback。
"""
pid = self._create_project("E12b")
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "updated_at Fallback 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 清空 started_at/claimed_at,设 updated_at 为 60 分钟前
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
conn.execute(
"UPDATE tasks SET started_at=NULL, claimed_at=NULL, "
"updated_at=datetime('now','-60 minutes') WHERE id=?",
(tid,),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 E12b: updated_at 设为60min前,等待ticker回收 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("failed", "done", "cancelled"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "working", (
f"超时任务应被回收,实际仍为 working"
)
print(f" ✅ updated_at fallback 回收验证通过")
# ===================================================================
# E13: Compact Hanging 不标 failed E2E#07.3 ACT-2
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE13CompactHangingE2E:
"""E13: compact_hanging outcome → 任务保持 working(不标 failed
通过 DB 写入 compact_hanging attempt,验证 ticker 不将其标为 failed。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E13") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_e13a_compact_hanging_keeps_working(self):
"""E13a: compact_hanging attempt → ticker 超时检查不应立即标 failed
写入 compact_hanging attempt(非 crash),验证任务保持 working
直到真正超时才被回收。
"""
pid = self._create_project("E13a")
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "Compact Hanging 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 写入 compact_hanging attempt(最近的时间)
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
conn.execute(
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
"VALUES (?, ?, ?, ?, datetime('now'))",
(tid, 1, "zhangfei-dev", "compact_hanging"),
)
# 设 started_at 为最近时间(不应超时)
conn.execute(
"UPDATE tasks SET started_at=datetime('now','-5 minutes') WHERE id=?",
(tid,),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 E13a: compact_hanging attempt 已写入,等一个 tick 验证不被标 failed")
# 等一个 tick30s + buffer
time.sleep(45)
# 验证任务仍为 working
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
assert resp.status_code == 200
status = resp.json().get("status")
print(f" 一个 tick 后状态: {status}")
assert status == "working", (
f"compact_hanging 后任务应保持 working,实际: {status}"
)
print(f" ✅ compact_hanging 不标 failed 验证通过")
# ===================================================================
# E14: _rollback_current_agent E2E#07.2
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE14RollbackE2E:
"""E14: crash 后 current_agent 回退验证
通过 DB 操作模拟 crash,验证 current_agent 被回退。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E14") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_e14a_rollback_on_crash(self):
"""E14a: 3次crash → failed + current_agent 回退到 assignee
#07.2: crash 回退由 _rollback_current_agent 执行。
"""
pid = self._create_project("E14a")
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "Rollback 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 设置 current_agent 并写 3 次 crash attempt
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
conn.execute(
"UPDATE tasks SET current_agent='zhangfei-dev' WHERE id=?",
(tid,),
)
for i in range(3):
conn.execute(
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
"VALUES (?, ?, ?, ?, datetime('now', ?))",
(tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 E14a: 3次crash + current_agent 已设置,等待ticker (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("failed", "done", "cancelled"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status == "failed", f"应为 failed,实际: {status}"
# 验证 current_agent 已回退(等于 assignee 或 None
conn2 = sq3.connect(str(db_path))
conn2.row_factory = sq3.Row
try:
row = conn2.execute(
"SELECT current_agent, assignee FROM tasks WHERE id=?", (tid,)
).fetchone()
print(f" current_agent={row['current_agent']} assignee={row['assignee']}")
# _rollback_current_agent 应将 current_agent 回退为 assignee
assert row["current_agent"] == row["assignee"], (
f"crash后 current_agent 应回退为 assignee"
f"实际 current_agent={row['current_agent']} assignee={row['assignee']}"
)
finally:
conn2.close()
print(f" ✅ crash rollback current_agent 验证通过")