diff --git a/tests/e2e/test_e2e_stress.py b/tests/e2e/test_e2e_stress.py new file mode 100644 index 0000000..8cdefef --- /dev/null +++ b/tests/e2e/test_e2e_stress.py @@ -0,0 +1,390 @@ +"""ST1-ST3 压力测试 + +并发场景测试:Agent 竞争、全局限制、广播并发。 +需要 RUN_INTEGRATION=1 + 生产 daemon 运行。 +""" + +import json +import os +import sys +import time +import uuid +from pathlib import Path +from typing import Any, Dict, List + +import pytest +import requests as http_requests + +# 指向部署目录 +DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" +sys.path.insert(0, str(DEPLOY_DIR)) + +# ── 常量 ── + +API_BASE = os.environ.get("API_BASE", "http://localhost:8083") +POLL_INTERVAL = 5 +MAX_WAIT_DISPATCH = 120 +MAX_WAIT_AGENT = 300 +E2E_PREFIX = "e2e-stress-" + +pytestmark = pytest.mark.e2e + + +# ── E2E gate ── + +skip_no_integration = pytest.mark.skipif( + not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run E2E stress tests against real daemon", +) + + +# ── 工具函数 ── + +def _check_environment(): + """环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达""" + try: + resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5) + data = resp.json() + if data.get("status") != "running" or not data.get("ticker_running"): + pytest.skip(f"Daemon not ready: {data}") + return data + except Exception as e: + pytest.skip(f"Production API not available at {API_BASE}: {e}") + + +def _cleanup_project(pid: str): + """清理测试项目""" + try: + http_requests.delete( + f"{API_BASE}/api/projects/{pid}?physical=true", timeout=10 + ) + except Exception: + pass + + +def _poll_task(pid, tid, timeout, terminal_states=None): + """轮询任务状态直到终态或超时""" + terminal = terminal_states or ("done", "failed", "cancelled") + deadline = time.time() + timeout + last_status = None + while time.time() < deadline: + try: + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10 + ) + if resp.status_code == 200: + data = resp.json() + last_status = data.get("status") + if last_status in terminal: + return data + except Exception: + pass + time.sleep(POLL_INTERVAL) + # 超时返回最后状态 + try: + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10 + ) + return resp.json() if resp.status_code == 200 else {"status": "unknown"} + except Exception: + return {"status": "unknown"} + + +def _get_task(pid, tid) -> Dict[str, Any]: + """获取任务详情""" + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + assert resp.status_code == 200, f"Get task failed: {resp.text}" + return resp.json() + + +# =================================================================== +# ST1: Counter Competition — 同 Agent 并发竞争 +# =================================================================== + +@skip_no_integration +class TestST1CounterCompetition: + """ST1: 同时创建 3 个任务指定同一 Agent → 第 2、3 个 AgentBusyError + → 第 1 个完成后第 2 个 acquire 成功 + + 验证 counter acquire 互斥:同 agent 同时只能有一个活跃 session。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects: List[str] = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="ST1") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }, timeout=10) + assert resp.status_code == 200, f"Create project failed: {resp.text}" + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200, f"Create task failed: {resp.text}" + return tid + + def test_st11_agent_counter_competition(self): + """ST1-1: 3 个任务指定同 Agent → 串行执行,counter 互斥""" + pid = self._create_project("ST1-1") + + # 同时创建 3 个任务 + tid1 = self._create_task( + pid, + title="ST1 任务1:echo st1-first", + description="请执行 echo st1-first 并标记done。E2E压力测试,不需要做其他事。", + assignee="zhangfei-dev", + task_type="coding", + ) + tid2 = self._create_task( + pid, + title="ST1 任务2:echo st1-second", + description="请执行 echo st1-second 并标记done。E2E压力测试,不需要做其他事。", + assignee="zhangfei-dev", + task_type="coding", + ) + tid3 = self._create_task( + pid, + title="ST1 任务3:echo st1-third", + description="请执行 echo st1-third 并标记done。E2E压力测试,不需要做其他事。", + assignee="zhangfei-dev", + task_type="coding", + ) + + print(f"\n🚀 ST1-1: 等待3个竞争任务完成 (pid={pid})") + print(f" 任务: {tid1}, {tid2}, {tid3}") + + # 等待任务1完成(最先被调度) + result1 = _poll_task( + pid, tid1, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status1 = result1.get("status") + print(f" 任务1状态: {status1}") + assert status1 != "pending", "任务1未被调度" + + # 等待任务2完成(在任务1完成后应被调度) + result2 = _poll_task( + pid, tid2, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status2 = result2.get("status") + print(f" 任务2状态: {status2}") + assert status2 != "pending", f"任务2在任务1完成后未被调度" + + # 等待任务3完成 + result3 = _poll_task( + pid, tid3, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status3 = result3.get("status") + print(f" 任务3状态: {status3}") + assert status3 != "pending", f"任务3在任务2完成后未被调度" + + # 验证 routing_decisions 中有 skip/blocked 记录 + from src.utils import get_data_root + db_path = get_data_root() / pid / "blackboard.db" + if db_path.exists(): + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + conn.row_factory = sq3.Row + try: + # 检查任务2和3是否有被 skip 的记录 + for tid_label, tid in [("任务2", tid2), ("任务3", tid3)]: + rows = conn.execute( + "SELECT outcome FROM routing_decisions WHERE task_id=? ORDER BY id", + (tid,), + ).fetchall() + outcomes = [r["outcome"] for r in rows] + print(f" {tid_label} routing outcomes: {outcomes}") + finally: + conn.close() + + print(f" ✅ Counter 竞争验证通过") + + +# =================================================================== +# ST2: Global Limit — 多 Agent 全局限制 +# =================================================================== + +@skip_no_integration +class TestST2GlobalLimit: + """ST2: 同时创建 5 个任务指定不同 Agent → 全部 acquire 成功 + → 第 6 个被拒绝 + + 验证全局并发限制:不同 agent 可以并行,但全局有上限。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects: List[str] = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="ST2", agents=None) -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + config = {"agents": agents or [ + "zhangfei-dev", "simayi-challenger", + "pangtong-fujunshi", "jiangwei-infra", "zhaoyun-data", + ]} + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": config, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200 + return tid + + def test_st21_multi_agent_concurrent(self): + """ST2-1: 5 个任务指定不同 Agent → 全部可调度""" + agents = [ + "zhangfei-dev", "simayi-challenger", + "pangtong-fujunshi", "jiangwei-infra", "zhaoyun-data", + ] + pid = self._create_project("ST2-1", agents=agents) + + # 创建 5 个任务,各指定不同 Agent + task_ids = [] + for i, agent in enumerate(agents): + tid = self._create_task( + pid, + title=f"ST2 任务{i+1}:echo st2-{i}", + description=f"请执行 echo st2-{i} 并标记done。E2E压力测试,不需要做其他事。", + assignee=agent, + task_type="coding", + ) + task_ids.append(tid) + + print(f"\n🚀 ST2-1: 等待5个不同Agent任务完成 (pid={pid})") + + # 等待所有任务完成 + results = {} + for i, tid in enumerate(task_ids): + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + results[tid] = result.get("status") + print(f" 任务{i+1} ({agents[i]}): {results[tid]}") + + # 至少 3 个任务应该被成功调度(不同 agent 可并行) + dispatched = sum(1 for s in results.values() if s != "pending") + assert dispatched >= 3, ( + f"应有至少3个任务被调度,实际: {dispatched}/5, results: {results}" + ) + + print(f" ✅ 多Agent并发验证通过 ({dispatched}/5 被调度)") + + +# =================================================================== +# ST3: Broadcast Concurrent — 并发广播 +# =================================================================== + +@skip_no_integration +class TestST3BroadcastConcurrent: + """ST3: 同时广播 3 个任务 → 全部任务在 5min 内到达终态 + + 验证广播认领在并发场景下的稳定性。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects: List[str] = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="ST3", agents=None) -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + config = {"agents": agents or ["zhangfei-dev", "simayi-challenger", "pangtong-fujunshi"]} + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": config, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200 + return tid + + def test_st31_broadcast_3_concurrent(self): + """ST3-1: 同时广播 3 个无 assignee 任务 → 全部在 5min 内到达终态""" + pid = self._create_project("ST3-1") + + # 同时创建 3 个无 assignee 的广播任务 + task_ids = [] + for i in range(3): + tid = self._create_task( + pid, + title=f"ST3 广播任务{i+1}:echo st3-bc-{i}", + description=( + f"请执行 echo st3-bc-{i} 并标记done。\n" + "这是E2E广播压力测试,不需要做其他事。" + ), + task_type="coding", + # 不指定 assignee → 广播认领 + ) + task_ids.append(tid) + + print(f"\n🚀 ST3-1: 等待3个广播任务终态 (pid={pid}, timeout=300s)") + + # 等待所有任务到达终态(5min 超时) + deadline = time.time() + 300 # 5 minutes + results = {} + + for i, tid in enumerate(task_ids): + remaining = max(10, deadline - time.time()) + result = _poll_task( + pid, tid, timeout=int(remaining), + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + results[tid] = result.get("status") + print(f" 广播任务{i+1}: status={results[tid]}, assignee={result.get('assignee')}") + + # 验证所有任务都离开了 pending 状态 + all_dispatched = all(s != "pending" for s in results.values()) + assert all_dispatched, ( + f"有广播任务未被认领: {results}" + ) + + # 统计终态分布 + done_count = sum(1 for s in results.values() if s == "done") + failed_count = sum(1 for s in results.values() if s == "failed") + print(f" 终态分布: done={done_count}, failed={failed_count}") + print(f" ✅ 广播并发验证通过 ({done_count}/3 成功)")