391 lines
14 KiB
Python
391 lines
14 KiB
Python
"""ST1-ST3 压力测试
|
||
|
||
并发场景测试:Agent 竞争、全局限制、广播并发。
|
||
需要 RUN_INTEGRATION=1 + 生产 daemon 运行。
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import sys
|
||
import time
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List
|
||
|
||
import pytest
|
||
import requests as http_requests
|
||
|
||
# 指向部署目录
|
||
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
|
||
sys.path.insert(0, str(DEPLOY_DIR))
|
||
|
||
# ── 常量 ──
|
||
|
||
API_BASE = os.environ.get("API_BASE", "http://localhost:8083")
|
||
POLL_INTERVAL = 5
|
||
MAX_WAIT_DISPATCH = 120
|
||
MAX_WAIT_AGENT = 300
|
||
E2E_PREFIX = "e2e-stress-"
|
||
|
||
pytestmark = pytest.mark.e2e
|
||
|
||
|
||
# ── E2E gate ──
|
||
|
||
skip_no_integration = pytest.mark.skipif(
|
||
not os.environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run E2E stress tests against real daemon",
|
||
)
|
||
|
||
|
||
# ── 工具函数 ──
|
||
|
||
def _check_environment():
|
||
"""环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达"""
|
||
try:
|
||
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
|
||
data = resp.json()
|
||
if data.get("status") != "running" or not data.get("ticker_running"):
|
||
pytest.skip(f"Daemon not ready: {data}")
|
||
return data
|
||
except Exception as e:
|
||
pytest.skip(f"Production API not available at {API_BASE}: {e}")
|
||
|
||
|
||
def _cleanup_project(pid: str):
|
||
"""清理测试项目"""
|
||
try:
|
||
http_requests.delete(
|
||
f"{API_BASE}/api/projects/{pid}?physical=true", timeout=10
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _poll_task(pid, tid, timeout, terminal_states=None):
|
||
"""轮询任务状态直到终态或超时"""
|
||
terminal = terminal_states or ("done", "failed", "cancelled")
|
||
deadline = time.time() + timeout
|
||
last_status = None
|
||
while time.time() < deadline:
|
||
try:
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
|
||
)
|
||
if resp.status_code == 200:
|
||
data = resp.json()
|
||
last_status = data.get("status")
|
||
if last_status in terminal:
|
||
return data
|
||
except Exception:
|
||
pass
|
||
time.sleep(POLL_INTERVAL)
|
||
# 超时返回最后状态
|
||
try:
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
|
||
)
|
||
return resp.json() if resp.status_code == 200 else {"status": "unknown"}
|
||
except Exception:
|
||
return {"status": "unknown"}
|
||
|
||
|
||
def _get_task(pid, tid) -> Dict[str, Any]:
|
||
"""获取任务详情"""
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
assert resp.status_code == 200, f"Get task failed: {resp.text}"
|
||
return resp.json()
|
||
|
||
|
||
# ===================================================================
|
||
# ST1: Counter Competition — 同 Agent 并发竞争
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestST1CounterCompetition:
|
||
"""ST1: 同时创建 3 个任务指定同一 Agent → 第 2、3 个 AgentBusyError
|
||
→ 第 1 个完成后第 2 个 acquire 成功
|
||
|
||
验证 counter acquire 互斥:同 agent 同时只能有一个活跃 session。
|
||
"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects: List[str] = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="ST1") -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": {"agents": ["zhangfei-dev"]},
|
||
}, timeout=10)
|
||
assert resp.status_code == 200, f"Create project failed: {resp.text}"
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs) -> str:
|
||
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
|
||
)
|
||
assert resp.status_code == 200, f"Create task failed: {resp.text}"
|
||
return tid
|
||
|
||
def test_st11_agent_counter_competition(self):
|
||
"""ST1-1: 3 个任务指定同 Agent → 串行执行,counter 互斥"""
|
||
pid = self._create_project("ST1-1")
|
||
|
||
# 同时创建 3 个任务
|
||
tid1 = self._create_task(
|
||
pid,
|
||
title="ST1 任务1:echo st1-first",
|
||
description="请执行 echo st1-first 并标记done。E2E压力测试,不需要做其他事。",
|
||
assignee="zhangfei-dev",
|
||
task_type="coding",
|
||
)
|
||
tid2 = self._create_task(
|
||
pid,
|
||
title="ST1 任务2:echo st1-second",
|
||
description="请执行 echo st1-second 并标记done。E2E压力测试,不需要做其他事。",
|
||
assignee="zhangfei-dev",
|
||
task_type="coding",
|
||
)
|
||
tid3 = self._create_task(
|
||
pid,
|
||
title="ST1 任务3:echo st1-third",
|
||
description="请执行 echo st1-third 并标记done。E2E压力测试,不需要做其他事。",
|
||
assignee="zhangfei-dev",
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 ST1-1: 等待3个竞争任务完成 (pid={pid})")
|
||
print(f" 任务: {tid1}, {tid2}, {tid3}")
|
||
|
||
# 等待任务1完成(最先被调度)
|
||
result1 = _poll_task(
|
||
pid, tid1, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status1 = result1.get("status")
|
||
print(f" 任务1状态: {status1}")
|
||
assert status1 != "pending", "任务1未被调度"
|
||
|
||
# 等待任务2完成(在任务1完成后应被调度)
|
||
result2 = _poll_task(
|
||
pid, tid2, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status2 = result2.get("status")
|
||
print(f" 任务2状态: {status2}")
|
||
assert status2 != "pending", f"任务2在任务1完成后未被调度"
|
||
|
||
# 等待任务3完成
|
||
result3 = _poll_task(
|
||
pid, tid3, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status3 = result3.get("status")
|
||
print(f" 任务3状态: {status3}")
|
||
assert status3 != "pending", f"任务3在任务2完成后未被调度"
|
||
|
||
# 验证 routing_decisions 中有 skip/blocked 记录
|
||
from src.utils import get_data_root
|
||
db_path = get_data_root() / pid / "blackboard.db"
|
||
if db_path.exists():
|
||
import sqlite3 as sq3
|
||
conn = sq3.connect(str(db_path))
|
||
conn.row_factory = sq3.Row
|
||
try:
|
||
# 检查任务2和3是否有被 skip 的记录
|
||
for tid_label, tid in [("任务2", tid2), ("任务3", tid3)]:
|
||
rows = conn.execute(
|
||
"SELECT outcome FROM routing_decisions WHERE task_id=? ORDER BY id",
|
||
(tid,),
|
||
).fetchall()
|
||
outcomes = [r["outcome"] for r in rows]
|
||
print(f" {tid_label} routing outcomes: {outcomes}")
|
||
finally:
|
||
conn.close()
|
||
|
||
print(f" ✅ Counter 竞争验证通过")
|
||
|
||
|
||
# ===================================================================
|
||
# ST2: Global Limit — 多 Agent 全局限制
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestST2GlobalLimit:
|
||
"""ST2: 同时创建 5 个任务指定不同 Agent → 全部 acquire 成功
|
||
→ 第 6 个被拒绝
|
||
|
||
验证全局并发限制:不同 agent 可以并行,但全局有上限。
|
||
"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects: List[str] = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="ST2", agents=None) -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
config = {"agents": agents or [
|
||
"zhangfei-dev", "simayi-challenger",
|
||
"pangtong-fujunshi", "jiangwei-infra", "zhaoyun-data",
|
||
]}
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": config,
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs) -> str:
|
||
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
|
||
)
|
||
assert resp.status_code == 200
|
||
return tid
|
||
|
||
def test_st21_multi_agent_concurrent(self):
|
||
"""ST2-1: 5 个任务指定不同 Agent → 全部可调度"""
|
||
agents = [
|
||
"zhangfei-dev", "simayi-challenger",
|
||
"pangtong-fujunshi", "jiangwei-infra", "zhaoyun-data",
|
||
]
|
||
pid = self._create_project("ST2-1", agents=agents)
|
||
|
||
# 创建 5 个任务,各指定不同 Agent
|
||
task_ids = []
|
||
for i, agent in enumerate(agents):
|
||
tid = self._create_task(
|
||
pid,
|
||
title=f"ST2 任务{i+1}:echo st2-{i}",
|
||
description=f"请执行 echo st2-{i} 并标记done。E2E压力测试,不需要做其他事。",
|
||
assignee=agent,
|
||
task_type="coding",
|
||
)
|
||
task_ids.append(tid)
|
||
|
||
print(f"\n🚀 ST2-1: 等待5个不同Agent任务完成 (pid={pid})")
|
||
|
||
# 等待所有任务完成
|
||
results = {}
|
||
for i, tid in enumerate(task_ids):
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
results[tid] = result.get("status")
|
||
print(f" 任务{i+1} ({agents[i]}): {results[tid]}")
|
||
|
||
# 至少 3 个任务应该被成功调度(不同 agent 可并行)
|
||
dispatched = sum(1 for s in results.values() if s != "pending")
|
||
assert dispatched >= 3, (
|
||
f"应有至少3个任务被调度,实际: {dispatched}/5, results: {results}"
|
||
)
|
||
|
||
print(f" ✅ 多Agent并发验证通过 ({dispatched}/5 被调度)")
|
||
|
||
|
||
# ===================================================================
|
||
# ST3: Broadcast Concurrent — 并发广播
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestST3BroadcastConcurrent:
|
||
"""ST3: 同时广播 3 个任务 → 全部任务在 5min 内到达终态
|
||
|
||
验证广播认领在并发场景下的稳定性。
|
||
"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects: List[str] = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="ST3", agents=None) -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger", "pangtong-fujunshi"]}
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": config,
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs) -> str:
|
||
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
|
||
)
|
||
assert resp.status_code == 200
|
||
return tid
|
||
|
||
def test_st31_broadcast_3_concurrent(self):
|
||
"""ST3-1: 同时广播 3 个无 assignee 任务 → 全部在 5min 内到达终态"""
|
||
pid = self._create_project("ST3-1")
|
||
|
||
# 同时创建 3 个无 assignee 的广播任务
|
||
task_ids = []
|
||
for i in range(3):
|
||
tid = self._create_task(
|
||
pid,
|
||
title=f"ST3 广播任务{i+1}:echo st3-bc-{i}",
|
||
description=(
|
||
f"请执行 echo st3-bc-{i} 并标记done。\n"
|
||
"这是E2E广播压力测试,不需要做其他事。"
|
||
),
|
||
task_type="coding",
|
||
# 不指定 assignee → 广播认领
|
||
)
|
||
task_ids.append(tid)
|
||
|
||
print(f"\n🚀 ST3-1: 等待3个广播任务终态 (pid={pid}, timeout=300s)")
|
||
|
||
# 等待所有任务到达终态(5min 超时)
|
||
deadline = time.time() + 300 # 5 minutes
|
||
results = {}
|
||
|
||
for i, tid in enumerate(task_ids):
|
||
remaining = max(10, deadline - time.time())
|
||
result = _poll_task(
|
||
pid, tid, timeout=int(remaining),
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
results[tid] = result.get("status")
|
||
print(f" 广播任务{i+1}: status={results[tid]}, assignee={result.get('assignee')}")
|
||
|
||
# 验证所有任务都离开了 pending 状态
|
||
all_dispatched = all(s != "pending" for s in results.values())
|
||
assert all_dispatched, (
|
||
f"有广播任务未被认领: {results}"
|
||
)
|
||
|
||
# 统计终态分布
|
||
done_count = sum(1 for s in results.values() if s == "done")
|
||
failed_count = sum(1 for s in results.values() if s == "failed")
|
||
print(f" 终态分布: done={done_count}, failed={failed_count}")
|
||
print(f" ✅ 广播并发验证通过 ({done_count}/3 成功)")
|