auto-sync: 2026-05-31 23:35:05
This commit is contained in:
@@ -229,6 +229,12 @@ class Dispatcher:
|
||||
_dispatcher = self
|
||||
_is_review = action_type == "review"
|
||||
|
||||
# v2.8.1 Fix-2: 明确需要回退 current_agent 的 outcome
|
||||
ROLLBACK_CURRENT_AGENT_OUTCOMES = frozenset({
|
||||
"crashed", "compact_failed", "process_crash",
|
||||
"session_stuck", "compact_hanging",
|
||||
})
|
||||
|
||||
def _task_on_complete(aid, outcome):
|
||||
try:
|
||||
if _is_review:
|
||||
@@ -239,7 +245,26 @@ class Dispatcher:
|
||||
_dispatcher._mark_task_status(_task_db, _task_id, "done")
|
||||
logger.info("Task %s: review complete (%s), marking done", _task_id, outcome)
|
||||
else:
|
||||
logger.warning("Task %s: review agent %s, NOT marking done", _task_id, outcome)
|
||||
logger.warning("Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome)
|
||||
# v2.8.1 Fix-2: crash 后回退 current_agent,避免 exclude_current 卡死
|
||||
if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES:
|
||||
try:
|
||||
conn = get_connection(_task_db)
|
||||
try:
|
||||
conn.execute(
|
||||
"UPDATE tasks SET current_agent = "
|
||||
"(SELECT assignee FROM tasks WHERE id=?) "
|
||||
"WHERE id=? AND current_agent=?",
|
||||
(_task_id, _task_id, aid)
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
logger.info("Task %s: rolled back current_agent from %s to assignee",
|
||||
_task_id, aid)
|
||||
except Exception as e:
|
||||
logger.warning("Task %s: failed to rollback current_agent: %s",
|
||||
_task_id, e)
|
||||
else:
|
||||
_dispatcher._task_auto_complete(_task_id, _task_db)
|
||||
except Exception as e:
|
||||
@@ -759,3 +784,31 @@ class Dispatcher:
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
logger.error("Task %s: mark status error: %s", task_id, e)
|
||||
|
||||
@staticmethod
|
||||
def _check_crash_limit(task_id: str, db_path: pathlib.Path, limit: int = 3,
|
||||
window_minutes: int = 30) -> bool:
|
||||
"""v2.8.1 Fix-3c: 检查 task 最近 window_minutes 内的 crash 次数是否超限。
|
||||
|
||||
基于 task_attempts 表(持久化),PM2 重启不丢失。
|
||||
Returns: True = 已超限,应 escalate。
|
||||
"""
|
||||
try:
|
||||
conn = get_connection(db_path)
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT COUNT(*) as cnt FROM task_attempts "
|
||||
"WHERE task_id=? AND outcome='crashed' "
|
||||
"AND started_at > datetime('now', ?)",
|
||||
(task_id, f'-{window_minutes} minutes')
|
||||
).fetchone()
|
||||
count = row["cnt"] if row else 0
|
||||
if count >= limit:
|
||||
logger.warning("Task %s: crash limit reached (%d/%d in %dm)",
|
||||
task_id, count, limit, window_minutes)
|
||||
return True
|
||||
return False
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
+60
-8
@@ -757,6 +757,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
||||
# 其他:A1(completed), A4(agent_failed), A7(auth_failed),
|
||||
# A8(gateway_unreachable), A11(lock_conflict),
|
||||
# A10(compact_failed), A12(agent_error)
|
||||
# v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间
|
||||
if outcome in ("crashed", "compact_failed", "process_crash", "session_stuck",
|
||||
"compact_hanging", "agent_error") and self.counter:
|
||||
self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟
|
||||
logger.info("Crash/error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome)
|
||||
# 进程退出 → on_complete release counter
|
||||
# 任务状态由各 outcome 自行处理(或等 ticker)
|
||||
await self._do_on_complete_async(on_complete, agent_id, outcome)
|
||||
@@ -1088,9 +1093,59 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
||||
logger.exception("Failed to revive %s", agent_id)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _check_recent_compaction_jsonl(session_file: str, window_seconds: int = 300) -> bool:
|
||||
"""v2.8.1 Fix-1: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。
|
||||
|
||||
比 compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录,
|
||||
但不保证更新 compactionCheckpoints。
|
||||
"""
|
||||
if not session_file or not pathlib.Path(session_file).exists():
|
||||
return False
|
||||
try:
|
||||
from datetime import datetime, timezone
|
||||
now = datetime.now(timezone.utc)
|
||||
with open(session_file, "rb") as sf:
|
||||
sf.seek(0, 2)
|
||||
size = sf.tell()
|
||||
sf.seek(max(0, size - 51200))
|
||||
tail = sf.read().decode("utf-8", errors="replace")
|
||||
for line in reversed(tail.splitlines()):
|
||||
if not line.strip():
|
||||
continue
|
||||
try:
|
||||
import json as _json
|
||||
obj = _json.loads(line)
|
||||
except (_json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
if obj.get("type") == "compaction":
|
||||
ts = obj.get("timestamp", "")
|
||||
if ts:
|
||||
try:
|
||||
ct = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
if (now - ct).total_seconds() < window_seconds:
|
||||
return True
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
ts = obj.get("timestamp", "")
|
||||
if ts:
|
||||
try:
|
||||
ct = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
if (now - ct).total_seconds() >= window_seconds:
|
||||
break
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _check_session_state(agent_id: str) -> dict:
|
||||
"""检查 sessions.json 和 lock 状态"""
|
||||
"""检查 sessions.json 和 lock 状态
|
||||
|
||||
v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1),
|
||||
替代失效的 compactionCheckpoints 检测。
|
||||
"""
|
||||
result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False}
|
||||
sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json"
|
||||
if not sessions_path.exists():
|
||||
@@ -1125,13 +1180,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 最近 5 分钟的 compact
|
||||
import time
|
||||
now_ms = time.time() * 1000
|
||||
for cp in main_session.get("compactionCheckpoints", []):
|
||||
if (now_ms - cp.get("createdAt", 0)) < 300_000:
|
||||
result["recent_compact"] = True
|
||||
break
|
||||
# v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描
|
||||
# 只在 agent 非空闲时才扫描(减少不必要 I/O)
|
||||
if result["status"] not in ("done", "idle", "unknown", None) and sf:
|
||||
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)
|
||||
except Exception:
|
||||
pass
|
||||
return result
|
||||
|
||||
@@ -1154,6 +1154,25 @@ Parent Task ID: {parent_task.id}
|
||||
if existing:
|
||||
continue # 已有活跃 review dispatch
|
||||
|
||||
# v2.8.1 Fix-3c: crash 计数上限检查
|
||||
if self.dispatcher._check_crash_limit(task.id, db_path, limit=3, window_minutes=30):
|
||||
conn = get_connection(db_path)
|
||||
try:
|
||||
self._transition_status(
|
||||
conn, task.id, "failed",
|
||||
agent="daemon",
|
||||
detail={"reason": "review_crash_limit",
|
||||
"message": "30 分钟内 crash 3 次,自动标 failed"},
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
bb.add_observation(
|
||||
task.id, "daemon",
|
||||
"审查任务连续 crash 3 次(30 分钟内),自动标记为 failed,请人工介入",
|
||||
)
|
||||
logger.error("Task %s: 3 crashes in 30min, marking failed", task.id)
|
||||
continue
|
||||
|
||||
# 检查是否有产出(司马懿建议:无产出直接标 failed)
|
||||
outputs = bb.get_outputs(task.id)
|
||||
if not outputs:
|
||||
@@ -1343,6 +1362,54 @@ Parent Task ID: {parent_task.id}
|
||||
except Exception:
|
||||
logger.exception("Failed to push back task %s", task_id_check)
|
||||
|
||||
# v2.8.1 Fix-3b: review 超时回收
|
||||
# review 状态无超时回收是设计空白:review agent crash 后任务永远卡在 review
|
||||
review_timeout_minutes = getattr(self, 'review_timeout_minutes', 15)
|
||||
review_tasks = queries.tasks_by_status("review")
|
||||
for task in review_tasks:
|
||||
current = getattr(task, 'current_agent', None)
|
||||
# 检查是否有活跃的 review agent 进程
|
||||
if current and self.spawner:
|
||||
session_info = self.spawner.get_session_by_agent(current)
|
||||
if session_info:
|
||||
pid = session_info.get("pid")
|
||||
if pid and self._is_pid_alive(pid):
|
||||
continue # review agent 还在跑,不回收
|
||||
# 无活跃进程 → 检查超时
|
||||
updated = task.updated_at
|
||||
if not updated:
|
||||
continue
|
||||
try:
|
||||
elapsed = (now - datetime.fromisoformat(updated)).total_seconds() / 60.0
|
||||
if elapsed > review_timeout_minutes:
|
||||
try:
|
||||
conn = get_connection(db_path)
|
||||
try:
|
||||
current_row = conn.execute(
|
||||
"SELECT status FROM tasks WHERE id=?", (task.id,)
|
||||
).fetchone()
|
||||
if current_row and current_row["status"] != "review":
|
||||
continue
|
||||
self._transition_status(
|
||||
conn, task.id, "pending",
|
||||
agent="daemon",
|
||||
detail={"reason": "review_timeout",
|
||||
"elapsed_minutes": round(elapsed, 1)},
|
||||
)
|
||||
conn.execute(
|
||||
"UPDATE tasks SET current_agent=NULL WHERE id=?", (task.id,)
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
reclaimed.append(task.id)
|
||||
logger.warning("Review timeout: %s (%.1fm > %dm), pushed back to pending",
|
||||
task.id, elapsed, review_timeout_minutes)
|
||||
except Exception:
|
||||
logger.exception("Failed to reclaim review task %s", task.id)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
return reclaimed
|
||||
|
||||
def _mail_check_reply(self, original_task_id: str, db_path: Path) -> bool:
|
||||
|
||||
Reference in New Issue
Block a user