"""ST1-ST3 压力测试 并发场景测试:Agent 竞争、全局限制、广播并发。 需要 RUN_INTEGRATION=1 + 生产 daemon 运行。 """ import json import os import sys import time import uuid from pathlib import Path from typing import Any, Dict, List import pytest import requests as http_requests # 指向部署目录 DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" sys.path.insert(0, str(DEPLOY_DIR)) # ── 常量 ── API_BASE = os.environ.get("API_BASE", "http://localhost:8083") POLL_INTERVAL = 5 MAX_WAIT_DISPATCH = 120 MAX_WAIT_AGENT = 300 E2E_PREFIX = "e2e-stress-" pytestmark = pytest.mark.e2e # ── E2E gate ── skip_no_integration = pytest.mark.skipif( not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run E2E stress tests against real daemon", ) # ── 工具函数 ── def _check_environment(): """环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达""" try: resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5) data = resp.json() if data.get("status") != "running" or not data.get("ticker_running"): pytest.skip(f"Daemon not ready: {data}") return data except Exception as e: pytest.skip(f"Production API not available at {API_BASE}: {e}") def _cleanup_project(pid: str): """清理测试项目""" try: http_requests.delete( f"{API_BASE}/api/projects/{pid}?physical=true", timeout=10 ) except Exception: pass def _poll_task(pid, tid, timeout, terminal_states=None): """轮询任务状态直到终态或超时""" terminal = terminal_states or ("done", "failed", "cancelled") deadline = time.time() + timeout last_status = None while time.time() < deadline: try: resp = http_requests.get( f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10 ) if resp.status_code == 200: data = resp.json() last_status = data.get("status") if last_status in terminal: return data except Exception: pass time.sleep(POLL_INTERVAL) # 超时返回最后状态 try: resp = http_requests.get( f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10 ) return resp.json() if resp.status_code == 200 else {"status": "unknown"} except Exception: return {"status": "unknown"} def _get_task(pid, tid) -> Dict[str, Any]: """获取任务详情""" resp = http_requests.get( f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, ) assert resp.status_code == 200, f"Get task failed: {resp.text}" return resp.json() # =================================================================== # ST1: Counter Competition — 同 Agent 并发竞争 # =================================================================== @skip_no_integration class TestST1CounterCompetition: """ST1: 同时创建 3 个任务指定同一 Agent → 第 2、3 个 AgentBusyError → 第 1 个完成后第 2 个 acquire 成功 验证 counter acquire 互斥:同 agent 同时只能有一个活跃 session。 """ @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects: List[str] = [] yield for pid in self._projects: _cleanup_project(pid) def _create_project(self, name_prefix="ST1") -> str: pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" resp = http_requests.post(f"{API_BASE}/api/projects", json={ "id": pid, "name": f"{name_prefix}-{pid}", "config": {"agents": ["zhangfei-dev"]}, }, timeout=10) assert resp.status_code == 200, f"Create project failed: {resp.text}" self._projects.append(pid) return pid def _create_task(self, pid, **kwargs) -> str: tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" body = {"id": tid, "status": "pending", "priority": 5, **kwargs} resp = http_requests.post( f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, ) assert resp.status_code == 200, f"Create task failed: {resp.text}" return tid def test_st11_agent_counter_competition(self): """ST1-1: 3 个任务指定同 Agent → 串行执行,counter 互斥""" pid = self._create_project("ST1-1") # 同时创建 3 个任务 tid1 = self._create_task( pid, title="ST1 任务1:echo st1-first", description="请执行 echo st1-first 并标记done。E2E压力测试,不需要做其他事。", assignee="zhangfei-dev", task_type="coding", ) tid2 = self._create_task( pid, title="ST1 任务2:echo st1-second", description="请执行 echo st1-second 并标记done。E2E压力测试,不需要做其他事。", assignee="zhangfei-dev", task_type="coding", ) tid3 = self._create_task( pid, title="ST1 任务3:echo st1-third", description="请执行 echo st1-third 并标记done。E2E压力测试,不需要做其他事。", assignee="zhangfei-dev", task_type="coding", ) print(f"\n🚀 ST1-1: 等待3个竞争任务完成 (pid={pid})") print(f" 任务: {tid1}, {tid2}, {tid3}") # 等待任务1完成(最先被调度) result1 = _poll_task( pid, tid1, timeout=MAX_WAIT_AGENT, terminal_states=("done", "failed", "cancelled", "blocked"), ) status1 = result1.get("status") print(f" 任务1状态: {status1}") assert status1 != "pending", "任务1未被调度" # 等待任务2完成(在任务1完成后应被调度) result2 = _poll_task( pid, tid2, timeout=MAX_WAIT_AGENT, terminal_states=("done", "failed", "cancelled", "blocked"), ) status2 = result2.get("status") print(f" 任务2状态: {status2}") assert status2 != "pending", f"任务2在任务1完成后未被调度" # 等待任务3完成 result3 = _poll_task( pid, tid3, timeout=MAX_WAIT_AGENT, terminal_states=("done", "failed", "cancelled", "blocked"), ) status3 = result3.get("status") print(f" 任务3状态: {status3}") assert status3 != "pending", f"任务3在任务2完成后未被调度" # 验证 routing_decisions 中有 skip/blocked 记录 from src.utils import get_data_root db_path = get_data_root() / pid / "blackboard.db" if db_path.exists(): import sqlite3 as sq3 conn = sq3.connect(str(db_path)) conn.row_factory = sq3.Row try: # 检查任务2和3是否有被 skip 的记录 for tid_label, tid in [("任务2", tid2), ("任务3", tid3)]: rows = conn.execute( "SELECT outcome FROM routing_decisions WHERE task_id=? ORDER BY id", (tid,), ).fetchall() outcomes = [r["outcome"] for r in rows] print(f" {tid_label} routing outcomes: {outcomes}") finally: conn.close() print(f" ✅ Counter 竞争验证通过") # =================================================================== # ST2: Global Limit — 多 Agent 全局限制 # =================================================================== @skip_no_integration class TestST2GlobalLimit: """ST2: 同时创建 5 个任务指定不同 Agent → 全部 acquire 成功 → 第 6 个被拒绝 验证全局并发限制:不同 agent 可以并行,但全局有上限。 """ @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects: List[str] = [] yield for pid in self._projects: _cleanup_project(pid) def _create_project(self, name_prefix="ST2", agents=None) -> str: pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" config = {"agents": agents or [ "zhangfei-dev", "simayi-challenger", "pangtong-fujunshi", "jiangwei-infra", "zhaoyun-data", ]} resp = http_requests.post(f"{API_BASE}/api/projects", json={ "id": pid, "name": f"{name_prefix}-{pid}", "config": config, }, timeout=10) assert resp.status_code == 200 self._projects.append(pid) return pid def _create_task(self, pid, **kwargs) -> str: tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" body = {"id": tid, "status": "pending", "priority": 5, **kwargs} resp = http_requests.post( f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, ) assert resp.status_code == 200 return tid def test_st21_multi_agent_concurrent(self): """ST2-1: 5 个任务指定不同 Agent → 全部可调度""" agents = [ "zhangfei-dev", "simayi-challenger", "pangtong-fujunshi", "jiangwei-infra", "zhaoyun-data", ] pid = self._create_project("ST2-1", agents=agents) # 创建 5 个任务,各指定不同 Agent task_ids = [] for i, agent in enumerate(agents): tid = self._create_task( pid, title=f"ST2 任务{i+1}:echo st2-{i}", description=f"请执行 echo st2-{i} 并标记done。E2E压力测试,不需要做其他事。", assignee=agent, task_type="coding", ) task_ids.append(tid) print(f"\n🚀 ST2-1: 等待5个不同Agent任务完成 (pid={pid})") # 等待所有任务完成 results = {} for i, tid in enumerate(task_ids): result = _poll_task( pid, tid, timeout=MAX_WAIT_AGENT, terminal_states=("done", "failed", "cancelled", "blocked"), ) results[tid] = result.get("status") print(f" 任务{i+1} ({agents[i]}): {results[tid]}") # 至少 3 个任务应该被成功调度(不同 agent 可并行) dispatched = sum(1 for s in results.values() if s != "pending") assert dispatched >= 3, ( f"应有至少3个任务被调度,实际: {dispatched}/5, results: {results}" ) print(f" ✅ 多Agent并发验证通过 ({dispatched}/5 被调度)") # =================================================================== # ST3: Broadcast Concurrent — 并发广播 # =================================================================== @skip_no_integration class TestST3BroadcastConcurrent: """ST3: 同时广播 3 个任务 → 全部任务在 5min 内到达终态 验证广播认领在并发场景下的稳定性。 """ @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects: List[str] = [] yield for pid in self._projects: _cleanup_project(pid) def _create_project(self, name_prefix="ST3", agents=None) -> str: pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" config = {"agents": agents or ["zhangfei-dev", "simayi-challenger", "pangtong-fujunshi"]} resp = http_requests.post(f"{API_BASE}/api/projects", json={ "id": pid, "name": f"{name_prefix}-{pid}", "config": config, }, timeout=10) assert resp.status_code == 200 self._projects.append(pid) return pid def _create_task(self, pid, **kwargs) -> str: tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" body = {"id": tid, "status": "pending", "priority": 5, **kwargs} resp = http_requests.post( f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, ) assert resp.status_code == 200 return tid def test_st31_broadcast_3_concurrent(self): """ST3-1: 同时广播 3 个无 assignee 任务 → 全部在 5min 内到达终态""" pid = self._create_project("ST3-1") # 同时创建 3 个无 assignee 的广播任务 task_ids = [] for i in range(3): tid = self._create_task( pid, title=f"ST3 广播任务{i+1}:echo st3-bc-{i}", description=( f"请执行 echo st3-bc-{i} 并标记done。\n" "这是E2E广播压力测试,不需要做其他事。" ), task_type="coding", # 不指定 assignee → 广播认领 ) task_ids.append(tid) print(f"\n🚀 ST3-1: 等待3个广播任务终态 (pid={pid}, timeout=300s)") # 等待所有任务到达终态(5min 超时) deadline = time.time() + 300 # 5 minutes results = {} for i, tid in enumerate(task_ids): remaining = max(10, deadline - time.time()) result = _poll_task( pid, tid, timeout=int(remaining), terminal_states=("done", "failed", "cancelled", "blocked"), ) results[tid] = result.get("status") print(f" 广播任务{i+1}: status={results[tid]}, assignee={result.get('assignee')}") # 验证所有任务都离开了 pending 状态 all_dispatched = all(s != "pending" for s in results.values()) assert all_dispatched, ( f"有广播任务未被认领: {results}" ) # 统计终态分布 done_count = sum(1 for s in results.values() if s == "done") failed_count = sum(1 for s in results.values() if s == "failed") print(f" 终态分布: done={done_count}, failed={failed_count}") print(f" ✅ 广播并发验证通过 ({done_count}/3 成功)")