import pytest pytestmark = pytest.mark.e2e skip_no_integration = pytest.mark.skipif( not __import__("os").environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon", ) """v3.1 端到端测试 — 新增场景覆盖 覆盖 v3.1 新增功能: E9-4 广播认领:无 assignee → 广播 → Agent 认领 → done E9-5 状态机:暂停 → 恢复 (resumed_from) E9-6 状态机:cancelled → 重新启动 → done E9-7 超时处理:claimed 超时 → pending (assignee 清空) E9-8 缓存头:HTML no-cache + JS/CSS immutable E10c 失败重试链:failed → pending → 广播 → done E10d 完整生命周期:pending → claimed → working → review → done 需要 RUN_INTEGRATION=1 + 生产 daemon 运行。 """ import json import os import sqlite3 import sys import time import uuid from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, Optional import pytest import requests as http_requests # ── 路径设置 ── DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" sys.path.insert(0, str(DEPLOY_DIR)) from src.utils import get_data_root # ── 常量 ── API_BASE = "http://localhost:8083" POLL_INTERVAL = 5 # 轮询间隔秒 MAX_WAIT_DISPATCH = 120 # 等待调度超时(~4个tick,给 tick 时序留余量) MAX_WAIT_AGENT = 300 # 等待 Agent 完成超时 E2E_PREFIX = "e2e-v31-" DATA_ROOT = get_data_root() # ── 工具函数 ── def _check_environment(): """环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达""" try: resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5) data = resp.json() if data.get("status") != "running" or not data.get("ticker_running"): pytest.skip(f"Daemon not ready: {data}") return data except Exception as e: pytest.skip(f"Production API not available at {API_BASE}: {e}") def _cleanup_project(pid: str): """清理测试项目""" try: http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5) except Exception: pass def _create_project(project_list: list, name_prefix: str = "E9", agents: list = None) -> str: """创建测试项目,自动注册到 project_list 用于 teardown""" pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]} resp = http_requests.post(f"{API_BASE}/api/projects", json={ "id": pid, "name": f"{name_prefix}-{pid}", "config": config, }, timeout=10) assert resp.status_code == 200, f"Create project failed: {resp.text}" project_list.append(pid) return pid def _create_task(pid: str, **kwargs) -> str: """创建测试任务""" tid = kwargs.pop("id", None) or f"e2e-task-{uuid.uuid4().hex[:8]}" body = {"id": tid, "status": "pending", "priority": 5, **kwargs} resp = http_requests.post( f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, ) assert resp.status_code == 200, f"Create task failed: {resp.text}" return tid def _get_task(pid: str, tid: str) -> Dict[str, Any]: """获取任务详情""" resp = http_requests.get( f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, ) assert resp.status_code == 200, f"Get task failed: {resp.text}" return resp.json() def _update_status(pid: str, tid: str, status: str, agent: str = "test") -> Dict: """手动更新任务状态""" resp = http_requests.post( f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", json={"status": status, "agent": agent}, timeout=10, ) return resp.json() def _poll_task(pid: str, tid: str, timeout: int, terminal_states: tuple = None) -> Dict[str, Any]: """轮询任务状态直到终态或超时""" terminal = terminal_states or ("done", "failed", "cancelled") deadline = time.time() + timeout last_status = None while time.time() < deadline: try: resp = http_requests.get( f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, ) if resp.status_code == 200: data = resp.json() last_status = data.get("status") if last_status in terminal: return data except Exception: pass time.sleep(POLL_INTERVAL) # 超时返回最后状态 try: resp = http_requests.get( f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, ) return resp.json() if resp.status_code == 200 else {"status": "unknown"} except Exception: return {"status": "unknown"} def _get_db_path(pid: str) -> Path: """获取项目的 blackboard.db 路径""" return DATA_ROOT / pid / "blackboard.db" def _patch_db_claimed_at(pid: str, tid: str, claimed_at: str): """直接操作 DB 设置 claimed_at 时间戳(模拟超时)""" db_path = _get_db_path(pid) assert db_path.exists(), f"DB not found: {db_path}" conn = sqlite3.connect(str(db_path)) try: conn.execute( "UPDATE tasks SET claimed_at=? WHERE id=?", (claimed_at, tid), ) conn.commit() finally: conn.close() # =================================================================== # E9-4: 广播认领 # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") @skip_no_integration class TestE94BroadcastClaim: """E9-4: 无 assignee 任务 → 广播认领 → Agent 执行 → done""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_broadcast_claim(self): """创建不指定 assignee 的任务,等待广播认领并执行完成""" pid = _create_project(self._projects, "E9-4", agents=["zhangfei-dev", "simayi-challenger"]) tid = _create_task( pid, title="E2E广播认领任务:echo broadcast", description=( "这是一个E2E测试的广播认领任务。\n" "请执行 echo broadcast 并标记done。\n" "这是E2E自动化测试,不需要做其他事。" ), task_type="coding", # 不指定 assignee → 触发广播认领 ) print(f"\n🚀 E9-4: 等待广播认领 (pid={pid}, tid={tid})") result = _poll_task( pid, tid, timeout=MAX_WAIT_AGENT, terminal_states=("done", "failed", "cancelled", "blocked"), ) status = result.get("status") print(f" 最终状态: {status}") # 必须被认领(不是 pending) assert status != "pending", ( f"广播认领未生效!任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending。" f"\n请检查:1) Ticker广播 2) Agent spawn 3) _get_idle_agents()" ) # 不能被拦截 assert status != "blocked", f"广播任务被错误拦截: {result}" # 验证 assignee 已设置 assignee = result.get("assignee") print(f" 认领Agent: {assignee}") assert assignee, f"任务已离开pending但assignee为空: {result}" if status == "done": print(f" ✅ 广播认领执行成功") else: print(f" ⚠️ 广播认领后状态: {status}") # =================================================================== # E9-5: 暂停→恢复 (resumed_from) # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") @skip_no_integration class TestE95PauseResume: """E9-5: 手动推状态到 working → paused → 恢复 → 验证 resumed_from""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_pause_resume_resumed_from(self): """working → paused → 恢复 working,验证 resumed_from 字段""" pid = _create_project(self._projects, "E9-5") tid = _create_task( pid, title="E2E暂停恢复测试", description="测试暂停恢复功能", assignee="zhangfei-dev", ) # 手动推到 claimed → working r1 = _update_status(pid, tid, "claimed", agent="zhangfei-dev") assert r1.get("ok"), f"claimed失败: {r1}" r2 = _update_status(pid, tid, "working", agent="zhangfei-dev") assert r2.get("ok"), f"working失败: {r2}" # 暂停 r3 = _update_status(pid, tid, "paused", agent="test") assert r3.get("ok"), f"paused失败: {r3}" # 验证 resumed_from == "working" task = _get_task(pid, tid) resumed_from = task.get("resumed_from") print(f"\n🚀 E9-5: 暂停后 resumed_from={resumed_from}") assert resumed_from == "working", ( f"resumed_from 应为 'working',实际: {resumed_from}" ) assert task.get("status") == "paused" # 恢复到 working r4 = _update_status(pid, tid, "working", agent="zhangfei-dev") assert r4.get("ok"), f"恢复working失败: {r4}" # 验证恢复后状态 task2 = _get_task(pid, tid) print(f" 恢复后 status={task2.get('status')}") assert task2.get("status") == "working", ( f"恢复后状态应为 working,实际: {task2.get('status')}" ) print(f" ✅ 暂停恢复流程正确") def test_review_pause_resume(self): """review → paused → 恢复 review""" pid = _create_project(self._projects, "E9-5b") tid = _create_task( pid, title="E2E Review暂停恢复", assignee="simayi-challenger", ) _update_status(pid, tid, "claimed", agent="simayi-challenger") _update_status(pid, tid, "working", agent="simayi-challenger") _update_status(pid, tid, "review", agent="simayi-challenger") # 暂停 r = _update_status(pid, tid, "paused", agent="test") assert r.get("ok"), f"paused失败: {r}" task = _get_task(pid, tid) assert task.get("resumed_from") == "review", ( f"resumed_from 应为 'review',实际: {task.get('resumed_from')}" ) # 恢复到 review r2 = _update_status(pid, tid, "review", agent="simayi-challenger") assert r2.get("ok"), f"恢复review失败: {r2}" task2 = _get_task(pid, tid) assert task2.get("status") == "review" print(f"\n ✅ Review暂停恢复流程正确 (resumed_from=review)") # =================================================================== # E9-6: cancelled → 重新启动 # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") @skip_no_integration class TestE96CancelledRestart: """E9-6: cancelled → pending(重新启动)→ Agent 执行 → done""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_cancelled_to_pending_restart(self): """cancelled → pending → 等待调度执行""" pid = _create_project(self._projects, "E9-6") tid = _create_task( pid, title="E2E取消重启任务:echo restart", description=( "请执行 echo restart 并标记done。" "这是E2E测试,不需要做其他事。" ), assignee="zhangfei-dev", ) # 手动推到 cancelled r1 = _update_status(pid, tid, "cancelled") assert r1.get("ok"), f"cancelled失败: {r1}" task = _get_task(pid, tid) assert task.get("status") == "cancelled" # 重新启动 → pending r2 = _update_status(pid, tid, "pending") assert r2.get("ok"), f"pending重启失败: {r2}" task2 = _get_task(pid, tid) assert task2.get("status") == "pending" # assignee 应被清空(v3.1: pending时清空assignee) assert task2.get("assignee") is None or task2.get("assignee") == "", ( f"重新启动后assignee应清空,实际: {task2.get('assignee')}" ) print(f"\n🚀 E9-6: 等待重新调度执行 (pid={pid}, tid={tid})") # 等待调度执行 result = _poll_task( pid, tid, timeout=MAX_WAIT_AGENT, terminal_states=("done", "failed", "cancelled", "blocked"), ) status = result.get("status") print(f" 重启后最终状态: {status}") assert status != "pending", ( f"重启后未被调度!{MAX_WAIT_AGENT}s后仍为pending" ) if status == "done": print(f" ✅ 取消重启流程正确") else: print(f" ⚠️ 重启后状态: {status}") # =================================================================== # E9-7: claimed 超时 → pending (assignee 清空) # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") @skip_no_integration class TestE97ClaimedTimeout: """E9-7: claimed 超时 → pending (assignee 清空)""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_claimed_timeout_to_pending(self): """claimed 任务超时 → ticker 重置为 pending → assignee 清空""" pid = _create_project(self._projects, "E9-7") tid = _create_task( pid, title="E2E超时测试任务", description="测试claimed超时处理", assignee="zhangfei-dev", ) # 手动推到 claimed r1 = _update_status(pid, tid, "claimed", agent="zhangfei-dev") assert r1.get("ok"), f"claimed失败: {r1}" # 验证 claimed task = _get_task(pid, tid) assert task.get("status") == "claimed" # 直接操作 DB:把 claimed_at 设为 2 小时前(模拟超时) two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat() _patch_db_claimed_at(pid, tid, two_hours_ago) print(f"\n🚀 E9-7: 已设claimed_at为2小时前,等待ticker处理 (pid={pid}, tid={tid})") # 等待 ticker 处理(1-2 个 tick) # poll 直到状态不是 claimed(变为 pending 或 escalated) result = _poll_task( pid, tid, timeout=MAX_WAIT_DISPATCH, terminal_states=("pending", "escalated"), ) status = result.get("status") print(f" 超时后状态: {status}") # 应该回到 pending(或 escalated 如果 retry_count >= 3) assert status != "claimed", ( f"超时处理未生效!任务 {tid} 在 {MAX_WAIT_DISPATCH}s 后仍为 claimed" ) # assignee 应被清空 assignee = result.get("assignee") print(f" assignee: {assignee}") assert assignee is None or assignee == "", ( f"超时重置后assignee应清空,实际: {assignee}" ) print(f" ✅ claimed超时处理正确 (status={status}, assignee cleared)") # =================================================================== # E9-8: 缓存头验证 # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") @skip_no_integration class TestE98CacheHeaders: """E9-8: 验证 CachedStaticFiles 缓存头""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() def test_html_no_cache(self): """HTML 页面应为 no-cache""" resp = http_requests.get(f"{API_BASE}/", timeout=10) if resp.status_code != 200: pytest.skip(f"Frontend not served at {API_BASE}/: {resp.status_code}") cache_control = resp.headers.get("cache-control", "") print(f"\n🚀 E9-8a: HTML Cache-Control: {cache_control}") assert "no-cache" in cache_control or "no-store" in cache_control, ( f"HTML 应为 no-cache/no-store,实际: {cache_control}" ) def test_js_immutable(self): """JS 文件应为 immutable + 长缓存""" # 先获取 HTML 找到 JS 文件路径 html_resp = http_requests.get(f"{API_BASE}/", timeout=10) if html_resp.status_code != 200: pytest.skip(f"Frontend not available") import re js_matches = re.findall(r'src="(/assets/[^"]+\.js)"', html_resp.text) if not js_matches: pytest.skip("No JS files found in HTML") js_path = js_matches[0] resp = http_requests.get(f"{API_BASE}{js_path}", timeout=10) cache_control = resp.headers.get("cache-control", "") print(f" E9-8b: JS ({js_path}) Cache-Control: {cache_control}") assert "immutable" in cache_control, ( f"JS 应含 immutable,实际: {cache_control}" ) assert "31536000" in cache_control, ( f"JS max-age 应为 31536000,实际: {cache_control}" ) def test_css_immutable(self): """CSS 文件应为 immutable + 长缓存""" html_resp = http_requests.get(f"{API_BASE}/", timeout=10) if html_resp.status_code != 200: pytest.skip("Frontend not available") import re css_matches = re.findall(r'href="(/assets/[^"]+\.css)"', html_resp.text) if not css_matches: pytest.skip("No CSS files found in HTML") css_path = css_matches[0] resp = http_requests.get(f"{API_BASE}{css_path}", timeout=10) cache_control = resp.headers.get("cache-control", "") print(f" E9-8c: CSS ({css_path}) Cache-Control: {cache_control}") assert "immutable" in cache_control, ( f"CSS 应含 immutable,实际: {cache_control}" ) print(f" ✅ 缓存头验证通过") # =================================================================== # E10c: 失败重试链 # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") @skip_no_integration class TestE10cRetryChain: """E10c: failed → pending(手动重试)→ 广播 → 认领 → done""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_failed_to_pending_retry(self): """手动模拟失败 → 重试 → 等待调度完成""" pid = _create_project(self._projects, "E10c", agents=["zhangfei-dev", "simayi-challenger"]) tid = _create_task( pid, title="E2E重试任务:echo retry", description=( "请执行 echo retry 并标记done。" "这是E2E测试,不需要做其他事。" ), assignee="zhangfei-dev", ) # 手动推到 failed(模拟 Agent 执行失败) _update_status(pid, tid, "claimed", agent="zhangfei-dev") _update_status(pid, tid, "working", agent="zhangfei-dev") r_fail = _update_status(pid, tid, "failed", agent="zhangfei-dev") assert r_fail.get("ok"), f"failed失败: {r_fail}" task = _get_task(pid, tid) assert task.get("status") == "failed" print(f"\n🚀 E10c: 任务已标记failed,准备重试") # 手动重试 → pending r_retry = _update_status(pid, tid, "pending") assert r_retry.get("ok"), f"重试pending失败: {r_retry}" task2 = _get_task(pid, tid) assert task2.get("status") == "pending" # assignee 应被清空 assert task2.get("assignee") is None or task2.get("assignee") == "", ( f"重试后assignee应清空,实际: {task2.get('assignee')}" ) # retry_count 应递增 retry_count = task2.get("retry_count", 0) or 0 print(f" retry_count: {retry_count}") # 等待重新调度执行 result = _poll_task( pid, tid, timeout=MAX_WAIT_AGENT, terminal_states=("done", "failed", "cancelled", "blocked"), ) status = result.get("status") print(f" 重试后最终状态: {status}") assert status != "pending", ( f"重试后未被调度!{MAX_WAIT_AGENT}s后仍为pending" ) if status == "done": print(f" ✅ 失败重试链正确") else: print(f" ⚠️ 重试后状态: {status}") # =================================================================== # E10d: 完整生命周期(广播认领版) # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") @skip_no_integration class TestE10dFullLifecycle: """E10d: 无 assignee → 广播认领 → claimed → working → review → done 验证完整状态转换链 + events 记录。 """ @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_full_lifecycle_with_review(self): """完整生命周期:创建 → 广播 → 认领 → 执行 → review → done""" pid = _create_project(self._projects, "E10d", agents=["zhangfei-dev", "simayi-challenger"]) # 第一步:编码任务(张飞执行) code_tid = _create_task( pid, title="E2E完整链路:编码任务", description=( "请执行 echo lifecycle 并标记done。" "这是E2E完整生命周期测试,不需要做其他事。" ), task_type="coding", # 不指定 assignee → 广播认领 ) print(f"\n🚀 E10d: 等待编码任务广播认领 (pid={pid}, tid={code_tid})") result = _poll_task( pid, code_tid, timeout=MAX_WAIT_AGENT, terminal_states=("done", "failed", "cancelled", "blocked"), ) code_status = result.get("status") print(f" 编码任务最终状态: {code_status}") assert code_status != "pending", "编码任务未被认领" if code_status == "done": # 验证 events 记录存在 events_resp = http_requests.get( f"{API_BASE}/api/projects/{pid}/tasks/{code_tid}/events", timeout=10, ) if events_resp.status_code == 200: events = events_resp.json() event_types = [e.get("event_type") for e in events.get("events", [])] print(f" Events: {event_types}") # 应该有状态变化事件 assert any("claimed" in str(e) or "started" in str(e) for e in event_types), ( f"缺少状态变化事件: {event_types}" ) # 第二步:review 任务(不依赖 Agent 执行,手动推) review_tid = _create_task( pid, title="E2E完整链路:review任务", description="测试review状态", assignee="simayi-challenger", ) # 手动推完整生命周期 transitions = ["claimed", "working", "review", "done"] for s in transitions: r = _update_status(pid, review_tid, s, agent="simayi-challenger") assert r.get("ok"), f"{s}失败: {r}" task = _get_task(pid, review_tid) assert task.get("status") == "done" print(f" Review任务手动生命周期: ✅") # 第三步:验证 done → cancelled(取消已完成任务) r_cancel = _update_status(pid, review_tid, "cancelled") assert r_cancel.get("ok"), f"done→cancelled失败: {r_cancel}" task3 = _get_task(pid, review_tid) assert task3.get("status") == "cancelled" print(f" done→cancelled: ✅") print(f" ✅ E10d 完整生命周期测试通过") # =================================================================== # E15: Prompt v3.0 广播三级响应 E2E # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") @skip_no_integration class TestE15PromptV3Broadcast: """E15: Prompt v3.0 广播认领三级响应 E2E 创建一个 assignee 不匹配任何已注册 Agent 的任务, 验证广播后 Agent 写了 observation comment(而非静默 NO_REPLY)。 """ @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_broadcast_observation_comment(self): """广播任务 → Agent 写 observation comment Prompt v3.0 的 _build_claim_prompt 三级响应: - 匹配 → claim - 不匹配但能帮忙 → observation comment - 不匹配且帮不上 → NO_REPLY(静默) 创建一个 assignee=simayi-challenger 但 task_type=coding 的任务, 司马懿收到后应写 observation comment(挑战者视角),而不是执行。 """ pid = _create_project(self._projects, "E15", agents=["simayi-challenger"]) tid = _create_task( pid, title="E2E Prompt v3.0:观察型任务", description=( "这是一个编码任务,但 assignee 是司马懿。\n" "按照 Prompt v3.0 三级响应:\n" "- 如果你认为应该由其他人执行,请写 observation comment\n" "- 不需要实际执行编码\n" "- 标记 done 即可\n" "这是E2E测试,验证广播三级响应。" ), assignee="simayi-challenger", task_type="coding", ) print(f"\n🚀 E15: 等待广播认领+Agent响应 (pid={pid}, tid={tid})") result = _poll_task( pid, tid, timeout=MAX_WAIT_AGENT, terminal_states=("done", "failed", "cancelled", "blocked"), ) status = result.get("status") print(f" 最终状态: {status}") assert status != "pending", "任务未被调度" # 检查是否有 comment(Agent 响应的证据) db_path = _get_db_path(pid) if db_path.exists(): import sqlite3 as sq3 conn = sq3.connect(str(db_path)) try: comments = conn.execute( "SELECT author, comment_type, body FROM comments " "WHERE task_id=? ORDER BY id DESC LIMIT 5", (tid,), ).fetchall() print(f" Comments ({len(comments)}):") for c in comments: print(f" [{c[0]}] {c[1]}: {c[2][:80]}...") # 应该有至少一个 comment(Agent 的响应) assert len(comments) > 0, ( f"Agent 未写任何 comment,Prompt v3.0 三级响应可能未生效" ) finally: conn.close() print(f" ✅ Prompt v3.0 广播响应验证完成") def test_broadcast_claim_by_matching_agent(self): """广播任务 → 匹配 Agent 执行 claim → done 对比测试:正确 assignee 的任务应被认领并执行。 """ pid = _create_project(self._projects, "E15b", agents=["zhangfei-dev"]) tid = _create_task( pid, title="E2E Prompt v3.0:认领型任务", description=( "请执行 echo claim-test 并标记done。\n" "这是E2E测试,验证正确 assignee 的任务被认领执行。\n" "不需要做其他事。" ), assignee="zhangfei-dev", task_type="coding", ) print(f"\n🚀 E15b: 等待正确Agent认领 (pid={pid}, tid={tid})") result = _poll_task( pid, tid, timeout=MAX_WAIT_AGENT, terminal_states=("done", "failed", "cancelled", "blocked"), ) status = result.get("status") print(f" 最终状态: {status}") assert status != "pending", "任务未被认领" assert status != "blocked", "任务被错误拦截" print(f" ✅ 正确 assignee 认领执行验证通过")