Files
2026-06-05 23:25:51 +08:00

391 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ST1-ST3 压力测试
并发场景测试:Agent 竞争、全局限制、广播并发。
需要 RUN_INTEGRATION=1 + 生产 daemon 运行。
"""
import json
import os
import sys
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List
import pytest
import requests as http_requests
# 指向部署目录
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
# ── 常量 ──
API_BASE = os.environ.get("API_BASE", "http://localhost:8083")
POLL_INTERVAL = 5
MAX_WAIT_DISPATCH = 120
MAX_WAIT_AGENT = 300
E2E_PREFIX = "e2e-stress-"
pytestmark = pytest.mark.e2e
# ── E2E gate ──
skip_no_integration = pytest.mark.skipif(
not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E stress tests against real daemon",
)
# ── 工具函数 ──
def _check_environment():
"""环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达"""
try:
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
data = resp.json()
if data.get("status") != "running" or not data.get("ticker_running"):
pytest.skip(f"Daemon not ready: {data}")
return data
except Exception as e:
pytest.skip(f"Production API not available at {API_BASE}: {e}")
def _cleanup_project(pid: str):
"""清理测试项目"""
try:
http_requests.delete(
f"{API_BASE}/api/projects/{pid}?physical=true", timeout=10
)
except Exception:
pass
def _poll_task(pid, tid, timeout, terminal_states=None):
"""轮询任务状态直到终态或超时"""
terminal = terminal_states or ("done", "failed", "cancelled")
deadline = time.time() + timeout
last_status = None
while time.time() < deadline:
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
)
if resp.status_code == 200:
data = resp.json()
last_status = data.get("status")
if last_status in terminal:
return data
except Exception:
pass
time.sleep(POLL_INTERVAL)
# 超时返回最后状态
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
)
return resp.json() if resp.status_code == 200 else {"status": "unknown"}
except Exception:
return {"status": "unknown"}
def _get_task(pid, tid) -> Dict[str, Any]:
"""获取任务详情"""
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
assert resp.status_code == 200, f"Get task failed: {resp.text}"
return resp.json()
# ===================================================================
# ST1: Counter Competition — 同 Agent 并发竞争
# ===================================================================
@skip_no_integration
class TestST1CounterCompetition:
"""ST1: 同时创建 3 个任务指定同一 Agent → 第 2、3 个 AgentBusyError
→ 第 1 个完成后第 2 个 acquire 成功
验证 counter acquire 互斥:同 agent 同时只能有一个活跃 session。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects: List[str] = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="ST1") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200, f"Create project failed: {resp.text}"
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200, f"Create task failed: {resp.text}"
return tid
def test_st11_agent_counter_competition(self):
"""ST1-1: 3 个任务指定同 Agent → 串行执行,counter 互斥"""
pid = self._create_project("ST1-1")
# 同时创建 3 个任务
tid1 = self._create_task(
pid,
title="ST1 任务1echo st1-first",
description="请执行 echo st1-first 并标记done。E2E压力测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
tid2 = self._create_task(
pid,
title="ST1 任务2echo st1-second",
description="请执行 echo st1-second 并标记done。E2E压力测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
tid3 = self._create_task(
pid,
title="ST1 任务3echo st1-third",
description="请执行 echo st1-third 并标记done。E2E压力测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 ST1-1: 等待3个竞争任务完成 (pid={pid})")
print(f" 任务: {tid1}, {tid2}, {tid3}")
# 等待任务1完成(最先被调度)
result1 = _poll_task(
pid, tid1, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status1 = result1.get("status")
print(f" 任务1状态: {status1}")
assert status1 != "pending", "任务1未被调度"
# 等待任务2完成(在任务1完成后应被调度)
result2 = _poll_task(
pid, tid2, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status2 = result2.get("status")
print(f" 任务2状态: {status2}")
assert status2 != "pending", f"任务2在任务1完成后未被调度"
# 等待任务3完成
result3 = _poll_task(
pid, tid3, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status3 = result3.get("status")
print(f" 任务3状态: {status3}")
assert status3 != "pending", f"任务3在任务2完成后未被调度"
# 验证 routing_decisions 中有 skip/blocked 记录
from src.utils import get_data_root
db_path = get_data_root() / pid / "blackboard.db"
if db_path.exists():
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
conn.row_factory = sq3.Row
try:
# 检查任务2和3是否有被 skip 的记录
for tid_label, tid in [("任务2", tid2), ("任务3", tid3)]:
rows = conn.execute(
"SELECT outcome FROM routing_decisions WHERE task_id=? ORDER BY id",
(tid,),
).fetchall()
outcomes = [r["outcome"] for r in rows]
print(f" {tid_label} routing outcomes: {outcomes}")
finally:
conn.close()
print(f" ✅ Counter 竞争验证通过")
# ===================================================================
# ST2: Global Limit — 多 Agent 全局限制
# ===================================================================
@skip_no_integration
class TestST2GlobalLimit:
"""ST2: 同时创建 5 个任务指定不同 Agent → 全部 acquire 成功
→ 第 6 个被拒绝
验证全局并发限制:不同 agent 可以并行,但全局有上限。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects: List[str] = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="ST2", agents=None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or [
"zhangfei-dev", "simayi-challenger",
"pangtong-fujunshi", "jiangwei-infra", "zhaoyun-data",
]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": config,
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_st21_multi_agent_concurrent(self):
"""ST2-1: 5 个任务指定不同 Agent → 全部可调度"""
agents = [
"zhangfei-dev", "simayi-challenger",
"pangtong-fujunshi", "jiangwei-infra", "zhaoyun-data",
]
pid = self._create_project("ST2-1", agents=agents)
# 创建 5 个任务,各指定不同 Agent
task_ids = []
for i, agent in enumerate(agents):
tid = self._create_task(
pid,
title=f"ST2 任务{i+1}echo st2-{i}",
description=f"请执行 echo st2-{i} 并标记done。E2E压力测试,不需要做其他事。",
assignee=agent,
task_type="coding",
)
task_ids.append(tid)
print(f"\n🚀 ST2-1: 等待5个不同Agent任务完成 (pid={pid})")
# 等待所有任务完成
results = {}
for i, tid in enumerate(task_ids):
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
results[tid] = result.get("status")
print(f" 任务{i+1} ({agents[i]}): {results[tid]}")
# 至少 3 个任务应该被成功调度(不同 agent 可并行)
dispatched = sum(1 for s in results.values() if s != "pending")
assert dispatched >= 3, (
f"应有至少3个任务被调度,实际: {dispatched}/5, results: {results}"
)
print(f" ✅ 多Agent并发验证通过 ({dispatched}/5 被调度)")
# ===================================================================
# ST3: Broadcast Concurrent — 并发广播
# ===================================================================
@skip_no_integration
class TestST3BroadcastConcurrent:
"""ST3: 同时广播 3 个任务 → 全部任务在 5min 内到达终态
验证广播认领在并发场景下的稳定性。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects: List[str] = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="ST3", agents=None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger", "pangtong-fujunshi"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": config,
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_st31_broadcast_3_concurrent(self):
"""ST3-1: 同时广播 3 个无 assignee 任务 → 全部在 5min 内到达终态"""
pid = self._create_project("ST3-1")
# 同时创建 3 个无 assignee 的广播任务
task_ids = []
for i in range(3):
tid = self._create_task(
pid,
title=f"ST3 广播任务{i+1}echo st3-bc-{i}",
description=(
f"请执行 echo st3-bc-{i} 并标记done。\n"
"这是E2E广播压力测试,不需要做其他事。"
),
task_type="coding",
# 不指定 assignee → 广播认领
)
task_ids.append(tid)
print(f"\n🚀 ST3-1: 等待3个广播任务终态 (pid={pid}, timeout=300s)")
# 等待所有任务到达终态(5min 超时)
deadline = time.time() + 300 # 5 minutes
results = {}
for i, tid in enumerate(task_ids):
remaining = max(10, deadline - time.time())
result = _poll_task(
pid, tid, timeout=int(remaining),
terminal_states=("done", "failed", "cancelled", "blocked"),
)
results[tid] = result.get("status")
print(f" 广播任务{i+1}: status={results[tid]}, assignee={result.get('assignee')}")
# 验证所有任务都离开了 pending 状态
all_dispatched = all(s != "pending" for s in results.values())
assert all_dispatched, (
f"有广播任务未被认领: {results}"
)
# 统计终态分布
done_count = sum(1 for s in results.values() if s == "done")
failed_count = sum(1 for s in results.values() if s == "failed")
print(f" 终态分布: done={done_count}, failed={failed_count}")
print(f" ✅ 广播并发验证通过 ({done_count}/3 成功)")