auto-sync: 2026-06-05 23:25:51

This commit is contained in:
cfdaily
2026-06-05 23:25:51 +08:00
parent 83e1d3b252
commit df54cf9f2b
+390
View File
@@ -0,0 +1,390 @@
"""ST1-ST3 压力测试
并发场景测试:Agent 竞争、全局限制、广播并发。
需要 RUN_INTEGRATION=1 + 生产 daemon 运行。
"""
import json
import os
import sys
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List
import pytest
import requests as http_requests
# 指向部署目录
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
# ── 常量 ──
API_BASE = os.environ.get("API_BASE", "http://localhost:8083")
POLL_INTERVAL = 5
MAX_WAIT_DISPATCH = 120
MAX_WAIT_AGENT = 300
E2E_PREFIX = "e2e-stress-"
pytestmark = pytest.mark.e2e
# ── E2E gate ──
skip_no_integration = pytest.mark.skipif(
not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E stress tests against real daemon",
)
# ── 工具函数 ──
def _check_environment():
"""环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达"""
try:
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
data = resp.json()
if data.get("status") != "running" or not data.get("ticker_running"):
pytest.skip(f"Daemon not ready: {data}")
return data
except Exception as e:
pytest.skip(f"Production API not available at {API_BASE}: {e}")
def _cleanup_project(pid: str):
"""清理测试项目"""
try:
http_requests.delete(
f"{API_BASE}/api/projects/{pid}?physical=true", timeout=10
)
except Exception:
pass
def _poll_task(pid, tid, timeout, terminal_states=None):
"""轮询任务状态直到终态或超时"""
terminal = terminal_states or ("done", "failed", "cancelled")
deadline = time.time() + timeout
last_status = None
while time.time() < deadline:
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
)
if resp.status_code == 200:
data = resp.json()
last_status = data.get("status")
if last_status in terminal:
return data
except Exception:
pass
time.sleep(POLL_INTERVAL)
# 超时返回最后状态
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
)
return resp.json() if resp.status_code == 200 else {"status": "unknown"}
except Exception:
return {"status": "unknown"}
def _get_task(pid, tid) -> Dict[str, Any]:
"""获取任务详情"""
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
assert resp.status_code == 200, f"Get task failed: {resp.text}"
return resp.json()
# ===================================================================
# ST1: Counter Competition — 同 Agent 并发竞争
# ===================================================================
@skip_no_integration
class TestST1CounterCompetition:
"""ST1: 同时创建 3 个任务指定同一 Agent → 第 2、3 个 AgentBusyError
→ 第 1 个完成后第 2 个 acquire 成功
验证 counter acquire 互斥:同 agent 同时只能有一个活跃 session。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects: List[str] = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="ST1") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200, f"Create project failed: {resp.text}"
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200, f"Create task failed: {resp.text}"
return tid
def test_st11_agent_counter_competition(self):
"""ST1-1: 3 个任务指定同 Agent → 串行执行,counter 互斥"""
pid = self._create_project("ST1-1")
# 同时创建 3 个任务
tid1 = self._create_task(
pid,
title="ST1 任务1echo st1-first",
description="请执行 echo st1-first 并标记done。E2E压力测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
tid2 = self._create_task(
pid,
title="ST1 任务2echo st1-second",
description="请执行 echo st1-second 并标记done。E2E压力测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
tid3 = self._create_task(
pid,
title="ST1 任务3echo st1-third",
description="请执行 echo st1-third 并标记done。E2E压力测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 ST1-1: 等待3个竞争任务完成 (pid={pid})")
print(f" 任务: {tid1}, {tid2}, {tid3}")
# 等待任务1完成(最先被调度)
result1 = _poll_task(
pid, tid1, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status1 = result1.get("status")
print(f" 任务1状态: {status1}")
assert status1 != "pending", "任务1未被调度"
# 等待任务2完成(在任务1完成后应被调度)
result2 = _poll_task(
pid, tid2, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status2 = result2.get("status")
print(f" 任务2状态: {status2}")
assert status2 != "pending", f"任务2在任务1完成后未被调度"
# 等待任务3完成
result3 = _poll_task(
pid, tid3, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status3 = result3.get("status")
print(f" 任务3状态: {status3}")
assert status3 != "pending", f"任务3在任务2完成后未被调度"
# 验证 routing_decisions 中有 skip/blocked 记录
from src.utils import get_data_root
db_path = get_data_root() / pid / "blackboard.db"
if db_path.exists():
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
conn.row_factory = sq3.Row
try:
# 检查任务2和3是否有被 skip 的记录
for tid_label, tid in [("任务2", tid2), ("任务3", tid3)]:
rows = conn.execute(
"SELECT outcome FROM routing_decisions WHERE task_id=? ORDER BY id",
(tid,),
).fetchall()
outcomes = [r["outcome"] for r in rows]
print(f" {tid_label} routing outcomes: {outcomes}")
finally:
conn.close()
print(f" ✅ Counter 竞争验证通过")
# ===================================================================
# ST2: Global Limit — 多 Agent 全局限制
# ===================================================================
@skip_no_integration
class TestST2GlobalLimit:
"""ST2: 同时创建 5 个任务指定不同 Agent → 全部 acquire 成功
→ 第 6 个被拒绝
验证全局并发限制:不同 agent 可以并行,但全局有上限。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects: List[str] = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="ST2", agents=None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or [
"zhangfei-dev", "simayi-challenger",
"pangtong-fujunshi", "jiangwei-infra", "zhaoyun-data",
]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": config,
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_st21_multi_agent_concurrent(self):
"""ST2-1: 5 个任务指定不同 Agent → 全部可调度"""
agents = [
"zhangfei-dev", "simayi-challenger",
"pangtong-fujunshi", "jiangwei-infra", "zhaoyun-data",
]
pid = self._create_project("ST2-1", agents=agents)
# 创建 5 个任务,各指定不同 Agent
task_ids = []
for i, agent in enumerate(agents):
tid = self._create_task(
pid,
title=f"ST2 任务{i+1}echo st2-{i}",
description=f"请执行 echo st2-{i} 并标记done。E2E压力测试,不需要做其他事。",
assignee=agent,
task_type="coding",
)
task_ids.append(tid)
print(f"\n🚀 ST2-1: 等待5个不同Agent任务完成 (pid={pid})")
# 等待所有任务完成
results = {}
for i, tid in enumerate(task_ids):
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
results[tid] = result.get("status")
print(f" 任务{i+1} ({agents[i]}): {results[tid]}")
# 至少 3 个任务应该被成功调度(不同 agent 可并行)
dispatched = sum(1 for s in results.values() if s != "pending")
assert dispatched >= 3, (
f"应有至少3个任务被调度,实际: {dispatched}/5, results: {results}"
)
print(f" ✅ 多Agent并发验证通过 ({dispatched}/5 被调度)")
# ===================================================================
# ST3: Broadcast Concurrent — 并发广播
# ===================================================================
@skip_no_integration
class TestST3BroadcastConcurrent:
"""ST3: 同时广播 3 个任务 → 全部任务在 5min 内到达终态
验证广播认领在并发场景下的稳定性。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects: List[str] = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="ST3", agents=None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger", "pangtong-fujunshi"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": config,
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_st31_broadcast_3_concurrent(self):
"""ST3-1: 同时广播 3 个无 assignee 任务 → 全部在 5min 内到达终态"""
pid = self._create_project("ST3-1")
# 同时创建 3 个无 assignee 的广播任务
task_ids = []
for i in range(3):
tid = self._create_task(
pid,
title=f"ST3 广播任务{i+1}echo st3-bc-{i}",
description=(
f"请执行 echo st3-bc-{i} 并标记done。\n"
"这是E2E广播压力测试,不需要做其他事。"
),
task_type="coding",
# 不指定 assignee → 广播认领
)
task_ids.append(tid)
print(f"\n🚀 ST3-1: 等待3个广播任务终态 (pid={pid}, timeout=300s)")
# 等待所有任务到达终态(5min 超时)
deadline = time.time() + 300 # 5 minutes
results = {}
for i, tid in enumerate(task_ids):
remaining = max(10, deadline - time.time())
result = _poll_task(
pid, tid, timeout=int(remaining),
terminal_states=("done", "failed", "cancelled", "blocked"),
)
results[tid] = result.get("status")
print(f" 广播任务{i+1}: status={results[tid]}, assignee={result.get('assignee')}")
# 验证所有任务都离开了 pending 状态
all_dispatched = all(s != "pending" for s in results.values())
assert all_dispatched, (
f"有广播任务未被认领: {results}"
)
# 统计终态分布
done_count = sum(1 for s in results.values() if s == "done")
failed_count = sum(1 for s in results.values() if s == "failed")
print(f" 终态分布: done={done_count}, failed={failed_count}")
print(f" ✅ 广播并发验证通过 ({done_count}/3 成功)")