7fb4d988ec
- mail_notify: f-string 反斜杠修复、行过长修复、unused import - test_classify_outcome: api_error should_retry 改 True
231 lines
7.5 KiB
Python
231 lines
7.5 KiB
Python
import pytest
|
|
from typing import Optional
|
|
|
|
pytestmark = pytest.mark.unit
|
|
|
|
from src.daemon.spawner import AgentSpawner as Spawner
|
|
|
|
|
|
class TestClassifyCompleted:
|
|
"""A1: 正常完成"""
|
|
|
|
def test_ok_completed(self):
|
|
result = Spawner._classify_outcome(
|
|
0, {"status": "ok", "summary": "completed"}, "", None
|
|
)
|
|
assert result == {"outcome": "completed", "should_retry": False}
|
|
|
|
def test_ok_completed_with_fallback(self):
|
|
"""A5/A6: status=ok + fallback_used"""
|
|
result = Spawner._classify_outcome(
|
|
0,
|
|
{"status": "ok", "summary": "completed", "fallback_used": True},
|
|
"",
|
|
None,
|
|
)
|
|
assert result["outcome"] == "fallback_timeout"
|
|
assert result["should_retry"] is False
|
|
|
|
|
|
class TestClassifyAgentFailed:
|
|
"""A4: Agent 自标 failed"""
|
|
|
|
def test_task_status_failed(self):
|
|
result = Spawner._classify_outcome(
|
|
0, {"status": "ok"}, "", "failed"
|
|
)
|
|
assert result["outcome"] == "agent_failed"
|
|
assert result["should_retry"] is False
|
|
|
|
|
|
class TestClassifyTimeout:
|
|
"""A2/A3: gateway timeout"""
|
|
|
|
def test_timeout_should_retry(self):
|
|
result = Spawner._classify_outcome(0, {"status": "timeout"}, "", None)
|
|
assert result["outcome"] == "gateway_timeout"
|
|
assert result["should_retry"] is True
|
|
assert result["retry_field"] == "retry_count"
|
|
|
|
|
|
class TestClassifyInterrupted:
|
|
"""A14: SIGINT / SIGTERM 外部中断"""
|
|
|
|
def test_sigint_exit_130(self):
|
|
result = Spawner._classify_outcome(130, {}, "", None, "")
|
|
assert result["outcome"] == "interrupted"
|
|
assert result["should_retry"] is True
|
|
assert result["retry_field"] == "retry_count"
|
|
assert result["cooldown_seconds"] == 60
|
|
|
|
def test_sigterm_exit_143(self):
|
|
result = Spawner._classify_outcome(143, {}, "", None, "")
|
|
assert result["outcome"] == "interrupted"
|
|
assert result["should_retry"] is True
|
|
assert result["retry_field"] == "retry_count"
|
|
assert result["cooldown_seconds"] == 60
|
|
|
|
|
|
class TestClassifyGatewayUnreachableNoJson:
|
|
"""A15: 无 JSON + exit≠0 + stderr 含 network 关键字"""
|
|
|
|
def test_stderr_econnrefused(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {}, "Error: ECONNREFUSED connection refused", None, ""
|
|
)
|
|
assert result["outcome"] == "gateway_unreachable"
|
|
assert result["should_retry"] is True
|
|
assert result["cooldown_seconds"] == 60
|
|
|
|
def test_stderr_gateway_closed(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {}, "gateway closed unexpectedly", None, ""
|
|
)
|
|
assert result["outcome"] == "gateway_unreachable"
|
|
assert result["should_retry"] is True
|
|
assert result["cooldown_seconds"] == 60
|
|
|
|
|
|
class TestClassifyCompactInterruptedNoJson:
|
|
"""A16: 无 JSON + exit≠0 + stderr 含 compact 关键字"""
|
|
|
|
def test_stderr_compaction_diag(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {}, "compaction-diag: overflow detected", None, ""
|
|
)
|
|
assert result["outcome"] == "compact_interrupted"
|
|
assert result["should_retry"] is True
|
|
assert result["cooldown_seconds"] == 60
|
|
|
|
|
|
class TestClassifyCrashed:
|
|
"""A17: 无 JSON + exit≠0 + 无匹配 stderr → crash"""
|
|
|
|
def test_crash_no_stderr(self):
|
|
result = Spawner._classify_outcome(1, {}, "", None, "")
|
|
assert result["outcome"] == "crashed"
|
|
assert result["should_retry"] is False
|
|
assert result["original"] == "process_crash"
|
|
|
|
|
|
class TestClassifyNoJsonExit0:
|
|
"""exit=0 + 空 stdout → 根据 task_status 判定"""
|
|
|
|
def test_task_status_done(self):
|
|
result = Spawner._classify_outcome(0, {}, "", "done", "")
|
|
assert result["outcome"] == "completed"
|
|
assert result["should_retry"] is False
|
|
|
|
def test_task_status_review(self):
|
|
result = Spawner._classify_outcome(0, {}, "", "review", "")
|
|
assert result["outcome"] == "completed"
|
|
assert result["should_retry"] is False
|
|
|
|
def test_task_status_pending(self):
|
|
result = Spawner._classify_outcome(0, {}, "", "pending", "")
|
|
assert result["outcome"] == "completed"
|
|
assert result["should_retry"] is False
|
|
|
|
|
|
class TestClassifyErrorAuth:
|
|
"""A7: status=error + stderr 含 auth 关键字"""
|
|
|
|
def test_stderr_401(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {"status": "error"}, "HTTP 401 Unauthorized", None
|
|
)
|
|
assert result["outcome"] == "auth_failed"
|
|
assert result["should_retry"] is False
|
|
|
|
def test_stderr_unauthorized(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {"status": "error"}, "unauthorized access", None
|
|
)
|
|
assert result["outcome"] == "auth_failed"
|
|
assert result["should_retry"] is False
|
|
|
|
|
|
class TestClassifyErrorGateway:
|
|
"""A8: status=error + stderr 含 network 关键字"""
|
|
|
|
def test_stderr_econnrefused(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {"status": "error"}, "ECONNREFUSED 127.0.0.1:8083", None
|
|
)
|
|
assert result["outcome"] == "gateway_unreachable"
|
|
assert result["should_retry"] is True
|
|
assert result["cooldown_seconds"] == 60
|
|
|
|
|
|
class TestClassifyErrorApi:
|
|
"""A9: status=error + stderr 含 API 错误关键字"""
|
|
|
|
def test_stderr_rate_limit(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {"status": "error"}, "rate_limit exceeded", None
|
|
)
|
|
assert result["outcome"] == "api_error"
|
|
assert result["should_retry"] is True
|
|
assert result["cooldown_seconds"] == 60
|
|
|
|
def test_stderr_500(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {"status": "error"}, "HTTP 500 Internal Server Error", None
|
|
)
|
|
assert result["outcome"] == "api_error"
|
|
assert result["should_retry"] is True
|
|
assert result["cooldown_seconds"] == 60
|
|
|
|
|
|
class TestClassifyErrorCompact:
|
|
"""A10: status=error + stderr 含 compact 关键字"""
|
|
|
|
def test_stderr_compaction_diag(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {"status": "error"}, "compaction-diag report", None
|
|
)
|
|
assert result["outcome"] == "compact_failed"
|
|
assert result["should_retry"] is False
|
|
|
|
|
|
class TestClassifyErrorLock:
|
|
"""A11: status=error + stderr 含 lock 关键字"""
|
|
|
|
def test_stderr_lock(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {"status": "error"}, "lock acquisition failed", None
|
|
)
|
|
assert result["outcome"] == "lock_conflict"
|
|
assert result["should_retry"] is True
|
|
assert result["cooldown_seconds"] == 60
|
|
|
|
def test_stderr_busy(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {"status": "error"}, "resource is busy", None
|
|
)
|
|
assert result["outcome"] == "lock_conflict"
|
|
assert result["should_retry"] is True
|
|
|
|
|
|
class TestClassifyErrorFallback:
|
|
"""A12: status=error + stderr 无匹配"""
|
|
|
|
def test_no_matching_stderr(self):
|
|
result = Spawner._classify_outcome(
|
|
1, {"status": "error"}, "some unknown error", None
|
|
)
|
|
assert result["outcome"] == "agent_error"
|
|
assert result["should_retry"] is False
|
|
|
|
|
|
class TestClassifyUnknownStatus:
|
|
"""兜底: status 为未知值"""
|
|
|
|
def test_unknown_status_value(self):
|
|
result = Spawner._classify_outcome(
|
|
0, {"status": "weird_value"}, "", None
|
|
)
|
|
assert result["outcome"] == "agent_error"
|
|
assert result["should_retry"] is False
|
|
assert result["original"] == "unknown_status"
|