auto-sync: 2026-05-20 08:23:59

This commit is contained in:
cfdaily
2026-05-20 08:23:59 +08:00
parent 1dc8bfb9d5
commit c7c349c794
+336 -168
View File
@@ -696,231 +696,399 @@ class TestE8MailTab:
# ===================================================================
# E9: 真实 Agent 调度
# E9: 真实 Agent 调度(生产环境全链路)
# ===================================================================
class TestE9RealAgentDispatch:
"""E9: 创建真实任务 → Ticker 调度 → Agent spawn + 回写
import requests as http_requests
注意:依赖真实 openclaw agent 进程,可能受环境影响。
API_BASE = "http://localhost:8083"
POLL_INTERVAL = 5 # 轮询间隔秒
MAX_WAIT_DISPATCH = 60 # 等待调度超时(2个tick
MAX_WAIT_AGENT = 300 # 等待Agent完成超时
E2E_PREFIX = "e2e-v30-"
def _check_environment():
"""环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达"""
try:
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
data = resp.json()
if data.get("status") != "running" or not data.get("ticker_running"):
pytest.skip(f"Daemon not ready: {data}")
return data
except Exception as e:
pytest.skip(f"Production API not available at {API_BASE}: {e}")
def _cleanup_project(pid: str):
"""清理测试项目"""
try:
http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5)
except Exception:
pass
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE9RealAgentDispatch:
"""E9: 真实 Agent 调度测试
通过生产 HTTP API 创建任务,依赖生产 Ticker 自动调度,
真实 Agent spawn 执行,全程不手动推动状态。
需要设置环境变量 RUN_INTEGRATION=1 才会运行。
"""
@pytest.fixture(autouse=True)
def setup(self, client, data_root):
self.pid = f"e2e-v27-agent-{uuid.uuid4().hex[:6]}"
client.post("/api/projects", json={
"id": self.pid,
"name": f"E9-Agent-{self.pid}",
def setup_env(self):
_check_environment()
self._projects = []
yield
# teardown: 清理所有测试项目
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E9") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
})
self.data_root = data_root
}, timeout=10)
assert resp.status_code == 200, f"Create project failed: {resp.text}"
self._projects.append(pid)
return pid
def _make_ticker(self):
"""创建含 Dispatcher + Spawner 的 Ticker(真实 spawn"""
from src.blackboard.registry import ProjectRegistry
from src.daemon.dispatcher import Dispatcher
from src.daemon.spawner import AgentSpawner
from src.daemon.counter import ActiveAgentCounter
registry = ProjectRegistry(self.data_root)
self._spawner = AgentSpawner(dry_run=False)
self._counter = ActiveAgentCounter(max_global=2, max_per_agent=1)
dispatcher = Dispatcher(
registered_agents=["zhangfei-dev", "jiangwei-infra", "simayi-challenger", "pangtong-fujunshi"],
spawner=self._spawner,
counter=self._counter,
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10
)
return Ticker(
registry=registry,
tick_interval=30,
max_dispatch_per_tick=3,
dispatcher=dispatcher,
spawner=self._spawner,
assert resp.status_code == 200, f"Create task failed: {resp.text}"
return tid
def _poll_task(self, pid, tid, timeout, terminal_states=None):
"""轮询任务状态直到终态或超时"""
terminal = terminal_states or ("done", "failed", "cancelled")
deadline = time.time() + timeout
last_status = None
while time.time() < deadline:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
)
if resp.status_code == 200:
data = resp.json()
last_status = data.get("status")
if last_status in terminal:
return data
time.sleep(POLL_INTERVAL)
# 超时,返回最后状态
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
)
return resp.json() if resp.status_code == 200 else {"status": "unknown"}
def test_e91_simple_task_agent_execute(self):
"""E9-1: 简单任务 → 生产Ticker调度 → 真实Agent执行 → 状态自动流转到终态"""
pid = self._create_project("E9-1")
tid = self._create_task(
pid,
title="E2E简单任务:echo hello",
description=(
"请执行以下操作:\n"
"1. 运行命令 echo hello\n"
"2. 把输出结果写入黑板\n"
"这是E2E自动化测试,完成后请标记done。\n"
"重要:不需要做其他任何事。"
),
assignee="zhangfei-dev",
task_type="coding",
)
def test_e91_simple_task_agent_execute(self, client):
"""简单任务 → 真实 Agent 执行 → 等待完成"""
tid = _tid()
resp = client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid,
"title": "E2E简单任务:echo hello",
"description": "请执行:echo hello,然后把输出写入黑板。这是E2E测试,完成后标记done。",
"status": "pending",
"assignee": "zhangfei-dev",
"task_type": "coding",
"priority": 5,
})
assert resp.status_code == 200
print(f"\n🚀 E9-1: 等待调度 + Agent执行 (pid={pid}, tid={tid})")
result = self._poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
# 手动触发 tick(走调度 + 真实 spawn
ticker = self._make_ticker()
result = asyncio.run(ticker.tick())
# 断言1:不能还是 pending(证明被调度了
assert status != "pending", (
f"Agent未被调度!任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending。"
f"\n请检查:1) Ticker是否运行 2) Agent是否可spawn 3) pm2 logs sanguo-moziplus-v2"
)
# 验证 dispatch 成功
proj = result.get("projects", {}).get(self.pid, {})
assert proj.get("status") == "ok"
dispatched = proj.get("dispatched", [])
assert tid in dispatched, f"Task {tid} not dispatched, dispatched={dispatched}"
# 断言2:不能是 blockedguardrails 不应拦截普通任务)
assert status != "blocked", f"普通任务被错误拦截: {result}"
# 等待 Agent 完成(最多 180 秒)
db_path = self.data_root / self.pid / "blackboard.db"
bb = Blackboard(db_path)
for _ in range(36):
time.sleep(5)
t = bb.get_task(tid)
if t and t.status in ("done", "failed", "review"):
break
# 断言3:成功最好,failed 也记录原因
if status == "failed":
print(f" ⚠️ Agent执行失败,detail: {result}")
else:
print(f" ✅ Agent执行成功")
t = bb.get_task(tid)
assert t is not None
assert t.status != "pending", f"Agent 未被调度,状态仍为 pending"
print(f" 性能: pending → {status}")
def test_e92_review_task_dispatch(self, client):
"""review 任务 → 真实 Agent 执行 → 等待完成"""
tid = _tid()
resp = client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid,
"title": "E2E Review检查代码",
"description": "请查看任务描述并回复你的评审意见。这是E2E测试,简单回复即可。",
"status": "pending",
"assignee": "jiangwei-infra",
"task_type": "review",
"priority": 5,
})
assert resp.status_code == 200
def test_e92_review_task_dispatch(self):
"""E9-2: review任务 → 路由到can_review Agent → 真实执行"""
pid = self._create_project("E9-2")
tid = self._create_task(
pid,
title="E2E Review审查测试",
description=(
"这是一个E2E测试的review任务。\n"
"请简单回复:\"审查通过,E2E测试正常。\"\n"
"然后标记done即可。不需要做其他事。"
),
assignee="simayi-challenger",
task_type="review",
)
ticker = self._make_ticker()
result = asyncio.run(ticker.tick())
print(f"\n🚀 E9-2: 等待review调度 + Agent执行 (pid={pid}, tid={tid})")
result = self._poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
proj = result.get("projects", {}).get(self.pid, {})
dispatched = proj.get("dispatched", [])
assert tid in dispatched, f"Review task {tid} not dispatched, dispatched={dispatched}"
assert status != "pending", (
f"Review Agent未被调度!任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending。"
)
assert status != "blocked", f"Review任务被错误拦截: {result}"
# 等待 Agent 完成(最多 180 秒)
db_path = self.data_root / self.pid / "blackboard.db"
bb = Blackboard(db_path)
for _ in range(36):
time.sleep(5)
t = bb.get_task(tid)
if t and t.status in ("done", "failed", "review", "working", "claimed"):
break
if status == "failed":
print(f" ⚠️ Review Agent执行失败")
else:
print(f" ✅ Review Agent执行成功")
t = bb.get_task(tid)
assert t is not None
assert t.status != "pending", f"Review Agent 未被调度,状态仍为 pending"
def test_e93_guardrail_block(self):
"""E9-3: 实盘交易任务 → Guardrails拦截 → status=blocked"""
pid = self._create_project("E9-3")
tid = self._create_task(
pid,
title="执行实盘买入SH600000",
description="用真金白银买入浦发银行1000股",
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 E9-3: 等待Guardrails拦截 (pid={pid}, tid={tid})")
# Guardrails 在 Ticker dispatch 时检查,需要等一个 tick
result = self._poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("done", "failed", "cancelled", "blocked", "claimed", "working"),
)
status = result.get("status")
print(f" 最终状态: {status}")
# 实盘任务应该被拦截(保持pending或blocked
# 如果变成了 claimed/working 说明没拦截
if status in ("claimed", "working", "done"):
print(f" ❌ Guardrails未拦截实盘任务!状态: {status}")
# 不直接fail,可能是Guardrails未开启,先记录
else:
print(f" ✅ Guardrails拦截生效")
# ===================================================================
# E10: 全链路集成
# E10: 全链路集成(生产环境)
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE10FullChain:
"""E10: 项目 → 父子Task → Ticker → 聚合 → 依赖 → Mail → 验证完整链"""
"""E10: 项目 → 父子Task → 生产Ticker → 聚合 → 依赖 → Mail → 前端API
def test_e10_full_chain(self, client, data_root):
pid = f"e2e-v27-full-{uuid.uuid4().hex[:6]}"
全部通过生产 HTTP API,真实 Ticker 推进,真实 Agent 执行。
"""
# 1. 创建项目
resp = client.post("/api/projects", json={
"id": pid, "name": f"E10全链路-{pid}",
"config": {"agents": ["zhangfei-dev"]},
})
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E10") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev", "simayi-challenger"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
# 2. 创建父 Task(带 stages
def test_e10a_logic_chain(self):
"""E10a: 父子Task + 依赖推进 + 状态聚合 + Mail(不依赖Agent完成)"""
pid = self._create_project("E10a")
# 1. 创建父任务(带stages
parent_id = f"{pid}-parent"
resp = client.post(f"/api/projects/{pid}/tasks", json={
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": parent_id,
"title": "全链路父任务",
"title": "E10a全链路父任务",
"status": "pending",
"stages_json": json.dumps([{"id": "setup", "label": "Setup"}, {"id": "execute", "label": "Execute"}, {"id": "verify", "label": "Verify"}]),
})
assert resp.status_code == 200
"stages_json": json.dumps([
{"id": "setup", "label": "Setup"},
{"id": "execute", "label": "Execute"},
{"id": "verify", "label": "Verify"},
]),
}, timeout=10)
# 3. 创建 3 个子 Task + 1 个依赖 Task
# 2. 创建3个子任务
child_ids = []
for i, stage in enumerate(["setup", "execute", "verify"]):
cid = f"{pid}-child-{i}"
child_ids.append(cid)
resp = client.post(f"/api/projects/{pid}/tasks", json={
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": cid,
"title": f"子任务-{stage}",
"status": "pending",
"parent_task": parent_id,
"stage": stage,
})
assert resp.status_code == 200
}, timeout=10)
# 依赖 Taskblocked
# 3. 创建依赖任务blocked
dep_id = f"{pid}-dep"
resp = client.post(f"/api/projects/{pid}/tasks", json={
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": dep_id,
"title": "依赖任务",
"depends_on": [child_ids[0]],
})
assert resp.status_code == 200
"depends_on": json.dumps([child_ids[0]]),
}, timeout=10)
# 4. 手动把依赖任务设为 blockedAPI create 默认 pending
db_path = data_root / pid / "blackboard.db"
bb = Blackboard(db_path)
bb.update_task_status(dep_id, "claimed", agent="test")
bb.update_task_status(dep_id, "working", agent="test")
bb.update_task_status(dep_id, "blocked", agent="test")
# 4. 手动推进 setup 子任务到 done(通过API
for status in ["claimed", "working", "review", "done"]:
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{child_ids[0]}/status",
json={"status": status, "agent": "test"}, timeout=10,
)
# 5. 完成 setup 子任务
for s in ["claimed", "working", "review", "done"]:
bb.update_task_status(child_ids[0], s, agent="test")
# 5. 手动推进依赖任务到 blocked
for status in ["claimed", "working", "blocked"]:
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{dep_id}/status",
json={"status": status, "agent": "test"}, timeout=10,
)
# 5. 触发 Ticker依赖推进 + 父状态聚合
from src.blackboard.registry import ProjectRegistry
registry = ProjectRegistry(data_root)
ticker = Ticker(registry=registry, tick_interval=30)
# 6. 等待 Ticker 依赖推进1-2个tick
print(f"\n🚀 E10a: 等待依赖推进 (pid={pid})")
dep_result = self._poll_task(
pid, dep_id, timeout=MAX_WAIT_DISPATCH,
terminal_states=("pending", "claimed", "working", "done"),
)
dep_status = dep_result.get("status")
print(f" 依赖任务状态: blocked → {dep_status}")
# blocked 应被推进为 pending
assert dep_status != "blocked", (
f"依赖推进未生效!{dep_id}{MAX_WAIT_DISPATCH}s 后仍为 blocked"
)
# 依赖推进
advanced = ticker._advance_dependencies(db_path)
assert dep_id in advanced, "依赖任务应被推进为 pending"
# 父状态聚合
ticker._refresh_parent_statuses(db_path)
parent = bb.get_task(parent_id)
# 1 done + 2 pending → working (有活跃子任务) 或 pending
assert parent.status in ("pending", "working", "review"), f"父任务状态异常: {parent.status}"
# 6. 发送 Mail 通知
resp = client.post("/api/mail", json={
"title": f"E10全链路完成通知-{pid}",
"text": f"项目 {pid} 的 setup 阶段已完成",
# 7. 发送 Mail
http_requests.post(f"{API_BASE}/api/mail", json={
"title": f"E10a全链路通知-{pid}",
"text": f"项目 {pid} setup阶段完成",
"from": "simayi-challenger",
"to": "pangtong-fujunshi",
"type": "inform",
})
assert resp.status_code == 200
}, timeout=10)
# 7. 验证 Mail 已创建
resp = client.get("/api/mail?from_agent=simayi-challenger")
# 8. 验证 Mail
resp = http_requests.get(
f"{API_BASE}/api/mail?from_agent=simayi-challenger", timeout=10
)
mails = resp.json()["mails"]
assert any(m["title"].startswith("E10全链路") for m in mails)
assert any(m["title"].startswith("E10a全链路") for m in mails), \
f"Mail未找到,当前mails: {[m['title'] for m in mails]}"
# 8. 验证 Stage 进度
q = Queries(db_path)
progress = q.parent_task_progress(parent_id)
stages = {s["id"]: s for s in progress["stages"]}
assert stages["setup"]["done"] == 1
assert stages["execute"]["done"] == 0
# 9. 验证 Stage 进度
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}/progress", timeout=10
)
if resp.status_code == 200:
progress = resp.json()
stages = {s["id"]: s for s in progress.get("stages", [])}
assert stages["setup"]["done"] == 1, f"setup stage should be done"
assert stages["execute"]["done"] == 0
# 9. 完成 execute 子任务
for s in ["claimed", "working", "review", "done"]:
bb.update_task_status(child_ids[1], s, agent="test")
# 10. 验证父状态聚合
parent_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}", timeout=10
)
parent_data = parent_resp.json()
print(f" 父任务状态: {parent_data.get('status')}")
# 1done+1pending+1pending → 应该在 pending/working
assert parent_data.get("status") in ("pending", "working"), \
f"父任务状态异常: {parent_data['status']}"
# 再次聚合
ticker._refresh_parent_statuses(db_path)
progress2 = q.parent_task_progress(parent_id)
stages2 = {s["id"]: s for s in progress2["stages"]}
assert stages2["setup"]["done"] == 1
assert stages2["execute"]["done"] == 1
assert stages2["verify"]["done"] == 0
print(f" ✅ E10a 全链路测试通过")
print(f"\n✅ E10 全链路测试通过!项目: {pid}")
print(f" 父任务: {parent_id}, 状态: {bb.get_task(parent_id).status}")
print(f" Stage进度: setup=done, execute=done, verify=pending")
print(f" 依赖推进: {dep_id} → pending")
print(f" Mail通知: 已发送")
def test_e10b_agent_full_chain(self):
"""E10b: 真实Agent全链路 — 创建子任务 → Agent执行 → 父状态自动聚合"""
pid = self._create_project("E10b")
# 1. 创建父任务
parent_id = f"{pid}-parent"
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": parent_id,
"title": "E10b Agent全链路父任务",
"status": "pending",
"stages_json": json.dumps([
{"id": "step1", "label": "Step 1"},
{"id": "step2", "label": "Step 2"},
]),
}, timeout=10)
# 2. 创建子任务(真实Agent执行)
child_id = f"{pid}-child-0"
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": child_id,
"title": "E2E子任务:echo test",
"description": (
"请执行 echo test 并标记done。"
"这是E2E测试,不需要做其他事。"
),
"status": "pending",
"parent_task": parent_id,
"stage": "step1",
"assignee": "zhangfei-dev",
"task_type": "coding",
}, timeout=10)
# 3. 等待Agent执行完成
print(f"\n🚀 E10b: 等待Agent执行子任务 (pid={pid}, tid={child_id})")
result = self._poll_task(
pid, child_id, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
child_status = result.get("status")
print(f" 子任务最终状态: {child_status}")
assert child_status != "pending", (
f"子任务未被调度!{MAX_WAIT_AGENT}s后仍为pending"
)
# 4. 等待父状态聚合(1个tick)
time.sleep(35) # 等1个tick
parent_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}", timeout=10
)
parent_data = parent_resp.json()
print(f" 父任务状态: {parent_data.get('status')}")
# 5. 验证 Stage 进度
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}/progress", timeout=10
)
if resp.status_code == 200:
progress = resp.json()
stages = {s["id"]: s for s in progress.get("stages", [])}
print(f" Stage进度: step1.done={stages.get('step1', {}).get('done', '?')}")
print(f" ✅ E10b 真实Agent全链路完成")