auto-sync: 2026-06-07 01:18:36
Deploy / ci (push) Waiting to run
Deploy / deploy (push) Blocked by required conditions
Deploy / notify-deploy-failure (push) Blocked by required conditions

This commit is contained in:
cfdaily
2026-06-07 01:18:36 +08:00
parent 6465a6c43f
commit 36c730d15a
2 changed files with 109 additions and 64 deletions
+28 -3
View File
@@ -676,9 +676,34 @@ class Dispatcher:
if performative == "request":
has_reply = self._mail_check_reply(task_id, db_path)
if not has_reply:
# 不直接标 failed,留 working 等 ticker 下一轮再查
logger.warning("Mail %s: no reply found on on_complete, "
"leaving working for ticker recheck", task_id)
# F3: 立刻标 failed(不等 ticker 30 分钟)
logger.error("Mail %s: no reply found, marking failed (no_reply_found)", task_id)
for attempt in range(3):
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return
if row["status"] == "working":
conn.execute(
"UPDATE tasks SET status='failed', updated_at=datetime('now') WHERE id=?",
(task_id,),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, "daemon", "failed",
json.dumps({"reason": "no_reply_found"}, ensure_ascii=False)),
)
conn.commit()
logger.info("Mail %s: marked failed (no_reply_found)", task_id)
return
finally:
conn.close()
except Exception as e:
logger.warning("Mail %s: failed attempt %d: %s", task_id, attempt + 1, e)
logger.error("Mail %s: all 3 failed attempts failed, leaving for ticker", task_id)
return
# 标 done(重试 3 次)
+81 -61
View File
@@ -23,18 +23,18 @@ logger = logging.getLogger("moziplus-v2.spawner")
# ── Prompt 模板 ──
# Mail 专用模板:inform 类型(纯通知,状态由系统管理)
MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书纯通知
MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知)
发件者: {from_agent}
主题: {title}
内容: {text}
已阅即可。如需回复用 in_reply_to 回复发件者不需要填 to
已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)
⚠️ 不要执行任何状态转换命令。
"""
# Mail 专用模板request 类型需要处理并回复状态由系统管理
MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书需要你处理并回复。
# Mail 专用模板:request 类型(需要处理并回复,状态由系统管理)
MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回复。
发件者: {from_agent}
主题: {title}
@@ -46,7 +46,7 @@ curl -s -X POST http://localhost:8083/api/mail \\
-H 'Content-Type: application/json' \\
-d '{{"from": "{agent_id}", "in_reply_to": "{task_id}", "title": "回复: {title}", "text": "你的回复内容"}}'
⚠️ 不需要填 "to"系统自动回复给发件者。
⚠️ 不需要填 "to",系统自动回复给发件者。
### 如何给其他人发新邮件
@@ -55,9 +55,9 @@ curl -s -X POST http://localhost:8083/api/mail \\
-d '{{"from": "{agent_id}", "to": "对方agent-id", "title": "标题", "text": "正文", "type": "inform"}}'
⚠️ to 必须是有效的 agent id: {valid_agents}
⚠️ 纯通知用 type=inform需要对方回复不填 type默认 request
⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)
⚠️ 不能给自己发邮件
⚠️ 不要执行任何状态转换命令标 working/done/review/failed 等),系统会自动处理。
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
"""
SPAWN_PROMPT_TEMPLATE = """{identity_section}
@@ -73,7 +73,7 @@ SPAWN_PROMPT_TEMPLATE = """{identity_section}
{retry_context}
## 你能做什么
- 读任务详情含依赖、讨论、产出: GET {api_base}/projects/{project_id}/tasks/{task_id}?expand=all
- 读任务详情(含依赖、讨论、产出): GET {api_base}/projects/{project_id}/tasks/{task_id}?expand=all
- 读所有活跃任务: GET {api_base}/projects/{project_id}/tasks?status=working,claimed,review
- 写产出: POST {api_base}/projects/{project_id}/tasks/{task_id}/outputs
- 写评论/交接: POST {api_base}/projects/{project_id}/tasks/{task_id}/comments
@@ -82,11 +82,11 @@ SPAWN_PROMPT_TEMPLATE = """{identity_section}
- 认领任务: POST {api_base}/projects/{project_id}/tasks/{{{{id}}}}/claim
## 约束
- 完成后必须写产出物output并标 review不能无产出就提交
- 完成后必须写产出物(output)并标 review,不能无产出就提交
- 失败了标 failed 并写明原因
- 产出物 handoff comment ≥ 50 字符用于系统验证
- 禁止使用 sessions_send 直接发消息用 Mail API 或黑板 comment
- 委托他人做事用黑板 comment @agent-id系统自动路由如 @zhaoyun-data 你来获取数据无需手动传 mentions 数组
- 产出物 handoff comment ≥ 50 字符(用于系统验证)
- 禁止使用 sessions_send 直接发消息(用 Mail API 或黑板 comment)
- 委托他人做事用黑板 comment @agent-id,系统自动路由(如 @zhaoyun-data 你来获取数据,无需手动传 mentions 数组)
- 安全红线: {guardrails_summary}
### API 请求体示例
@@ -97,7 +97,7 @@ SPAWN_PROMPT_TEMPLATE = """{identity_section}
写评论: POST .../comments
```json
{{{{"author": "{agent_id}", "body": "评论内容≥50字符", "comment_type": "handoff"}}}}
{{{{"author": "{agent_id}", "body": "评论内容(≥50字符)", "comment_type": "handoff"}}}}
```
"""
@@ -117,7 +117,7 @@ DISCUSSION_PROMPT_TEMPLATE = """你被 spawn 来参与黑板讨论。这是一
你可以随时:
- 读黑板:GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all(含 comments、outputs)
- 写 comment:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments
body: {{"author": "{agent_id}", "body": "内容@agent-id 自动路由"}}
body: {{"author": "{agent_id}", "body": "内容(@agent-id 自动路由)"}}
- 创建 sub task:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks
body: {{"title": "...", "description": "...", "task_type": "...", "parent_task": "{task_id}", "must_haves": "{{\"capability\": \"...\"}}"}}
- 认领任务:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{{sub_task_id}}/claim
@@ -159,9 +159,9 @@ MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程
class AgentBusyError(Exception):
"""Agent 无法 spawn被占用/冷却/session 锁等
"""Agent 无法 spawn(被占用/冷却/session 锁等)
#07: reason 字段区分具体原因便于 dispatcher 层区分处理。
#07: reason 字段区分具体原因,便于 dispatcher 层区分处理。
"""
def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None):
self.agent_id = agent_id
@@ -217,7 +217,7 @@ class AgentSpawner:
self._valid_agents_cache: Optional[set] = None
def _load_valid_agents(self) -> set:
"""从 config/default.yaml 读取有效 Agent ID 列表带缓存"""
"""从 config/default.yaml 读取有效 Agent ID 列表(带缓存)"""
if self._valid_agents_cache is not None:
return self._valid_agents_cache
config_path = Path(__file__).parent.parent / "config" / "default.yaml"
@@ -300,7 +300,7 @@ class AgentSpawner:
def _build_minimal_fallback(self, task_id, title, description, must_haves,
project_id, agent_id):
"""最小 fallback只有任务上下文 + API 指令"""
"""最小 fallback:只有任务上下文 + API 指令"""
task_section = f"""## 任务
{title}
{description or "(无描述)"}
@@ -361,7 +361,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
profile = router.agent_profiles.get(agent_id)
if profile and getattr(profile, 'capabilities_zh', None):
caps = ", ".join(profile.capabilities_zh)
return f"你是 {agent_id}专长: {caps}"
return f"你是 {agent_id},专长: {caps}"
def _get_guardrails_summary(self) -> str:
"""#03: 从 GuardrailEngine 提取红线摘要"""
@@ -397,7 +397,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
safe_title = (title or "").replace('"', '\\"')[:100]
safe_text = (description or "").replace('"', '\\"')
# 获取有效 Agent 列表从 config/default.yaml 读取
# 获取有效 Agent 列表(从 config/default.yaml 读取)
valid_agents_list = self._load_valid_agents()
valid_agents_str = " / ".join(sorted(valid_agents_list))
@@ -441,7 +441,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
counter.release 由内部 wrapped_on_complete 保证。
use_main_session: True = 投递到主 Agent session(不传 --session-id)
on_checks_passed: 所有检查通过后的回调(session check + counter acquire 后、subprocess 前)
reuse_session_id: 传入指定 session-id 复用(用于续杯) deprecateduse_main_session=True 已替代
reuse_session_id: 传入指定 session-id 复用(用于续杯) - deprecated,use_main_session=True 已替代
Returns:
session_id
@@ -451,7 +451,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
"""
# ── #07 Acquire-First: counter 前置 → session check 在锁内贴近 spawn ──
# Step 0: 分配 session_id纯计算,无 IO
# Step 0: 分配 session_id(纯计算,无 IO)
if use_main_session:
session_id = None
elif reuse_session_id:
@@ -460,9 +460,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
session_id = str(uuid.uuid4())
_sid_key = session_id or "main" # counter 用的 key
# Phase 0: Pre-acquire 修复无锁
# timeout/failed 状态先修复再 acquire。revive 只改 running→idle幂等安全。
# asyncio 协作式并发保证同一时刻只有一个协程在执行revive 的 sessions.json
# Phase 0: Pre-acquire 修复(无锁)
# timeout/failed 状态先修复再 acquire。revive 只改 running→idle,幂等安全。
# asyncio 协作式并发保证同一时刻只有一个协程在执行,revive 的 sessions.json
# 写操作不会真正并行。
if use_main_session:
pre_state = self._check_session_state(agent_id)
@@ -475,15 +475,15 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
logger.warning("Phase 0: %s status=running but lock PID dead, reviving", agent_id)
self._revive_session(agent_id)
# Phase 1: Counter acquire互斥锁
# v2.8.1 Bug-4 fix: retry 时跳过 countercounter 从原始 spawn 保持到 retry 完成
# Phase 1: Counter acquire(互斥锁)
# v2.8.1 Bug-4 fix: retry 时跳过 counter(counter 从原始 spawn 保持到 retry 完成)
if self.counter and not skip_counter:
acquired = await self.counter.acquire(agent_id, _sid_key)
if not acquired:
raise AgentBusyError(agent_id, reason="counter_blocked")
# Phase 2: Session check在锁保护下贴近 spawn
# 并列收集所有 block 原因统一判定。
# Phase 2: Session check(在锁保护下,贴近 spawn)
# 并列收集所有 block 原因,统一判定。
if use_main_session:
session_state = self._check_session_state(agent_id)
logger.info("Phase 2 session check for %s: status=%s lock_pid=%s lock_pid_alive=%s compact=%s",
@@ -495,16 +495,16 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
blockers.append(("session_locked", session_state.get("lock_pid")))
if session_state.get("status") == "running":
if session_state.get("lock_pid_alive"):
# 真 running外部进程占用
# 真 running:外部进程占用
blockers.append(("session_running", None))
else:
# 假 runninglock PID 死了但 status 还在 running → Phase 2.5 处理
# 假 running:lock PID 死了但 status 还在 running → Phase 2.5 处理
pass
if session_state.get("recent_compact"):
blockers.append(("session_compacting", None))
if blockers:
# 释放 counter报具体原因
# 释放 counter,报具体原因
if self.counter and not skip_counter:
self.counter.release(agent_id, _sid_key)
primary_reason, primary_detail = blockers[0]
@@ -513,8 +513,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
raise AgentBusyError(agent_id, reason=primary_reason,
detail={"blockers": blockers})
# Phase 2.5: 假死修复status=running + lock PID 死 → revive → 重检
# 此场景应被 Phase 0 提前修复这里做兜底
# Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检)
# 此场景应被 Phase 0 提前修复,这里做兜底
if session_state.get("status") == "running" and not session_state.get("lock_pid_alive"):
logger.warning("Phase 2.5: %s status=running + lock dead (should be caught in Phase 0), reviving",
agent_id)
@@ -760,12 +760,12 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)",
agent_id, session_id, outcome, exit_code, task_status)
# 广播反馈追踪Phase 1 bug fix
# 广播反馈追踪(Phase 1 bug fix)
if task_id == "broadcast" and hasattr(self, '_ticker') and self._ticker:
# 广播任务从 session 信息取真实 task_id 列表逐一回调 tracker
# 广播任务:从 session 信息取真实 task_id 列表,逐一回调 tracker
sess_info = self._sessions.get(session_id or "main", {})
bt_ids = sess_info.get("broadcast_task_ids") or []
# 广播场景一律标 no_replyAgent 只 claim 一个任务
# 广播场景一律标 no_reply:Agent 只 claim 一个任务,
# 其余任务的 tracker 不能被 claimed 清除
for real_task_id in bt_ids:
self._ticker.record_broadcast_response(real_task_id, agent_id, "no_reply")
@@ -774,7 +774,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
self._ticker.record_broadcast_response(task_id, agent_id, outcome_str)
if cls["should_retry"]:
# cooldown: 新增的可恢复场景A14/A15/A16/A8/A10
# cooldown: 新增的可恢复场景(A14/A15/A16/A8/A10)
cooldown_seconds = cls.get("cooldown_seconds", 0)
if cooldown_seconds and self.counter:
self.counter.set_cooldown(agent_id, seconds=cooldown_seconds)
@@ -785,7 +785,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
)
elif outcome == "api_error":
# A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却
# 有上限api_retry_count 累计达 max_retries 则标 failed
# 有上限:api_retry_count 累计达 max_retries 则标 failed
await self._do_on_complete_async(on_complete, agent_id, outcome)
if self.counter:
self.counter.set_cooldown(agent_id)
@@ -809,8 +809,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
task_id, api_count, self.max_retries)
elif outcome == "fallback_timeout" and not cls["should_retry"]:
# A3/A3b: fallback 分级处理
# fallback_count 从 task_attempts.metadata 读取
# 达 max_retries 标 failed(A3)否则 retry + cooldown(A3b)
# fallback_count 从 task_attempts.metadata 读取,
# 达 max_retries 标 failed(A3),否则 retry + cooldown(A3b)
fallback_count = 0
if db_path and task_id:
retry_counts = self._get_retry_counts(db_path, task_id)
@@ -819,7 +819,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
self._update_retry_counts(db_path, task_id, retry_counts)
if fallback_count >= self.max_retries:
# A3: 连续 fallback 达上限标 failed
# A3: 连续 fallback 达上限,标 failed
logger.error("A3 fallback exhausted: agent=%s session=%s task=%s "
"fallback_count=%d reason=%s",
agent_id, session_id, task_id, fallback_count,
@@ -832,7 +832,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
"fallback_reason": json_result.get("fallback_reason"),
})
else:
# A3b: fallback 未达上限retry + cooldown
# A3b: fallback 未达上限,retry + cooldown
logger.warning("A3b fallback retry: agent=%s session=%s task=%s "
"fallback_count=%d/%d reason=%s",
agent_id, session_id, task_id, fallback_count,
@@ -841,19 +841,26 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
self.counter.set_cooldown(agent_id, seconds=60)
await self._do_retry(
session_id, agent_id, task_id, on_complete, db_path,
"fallback_retry_count" # 独立计数不与 gateway_timeout 的 retry_count 共用
"fallback_retry_count" # 独立计数,不与 gateway_timeout 的 retry_count 共用
)
else:
# 其他:A1(completed), A4(agent_failed), A7(auth_failed),
# A8(gateway_unreachable), A11(lock_conflict),
# A10(compact_failed), A12(agent_error)
# v2.8.1 Fix-3a: crash 类 outcome 设 cooldown给 agent session 恢复时间
# v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间
if outcome in ("crashed", "compact_failed", "process_crash", "session_stuck",
"compact_hanging", "agent_error", "compact_interrupted") and self.counter:
self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟
logger.info("Crash/error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome)
# F1: 不可恢复 outcome → 立刻标 failed + 写黑板
if outcome in ("auth_failed", "agent_error") and db_path and task_id:
logger.error("Task %s: unrecoverable outcome=%s, marking failed immediately", task_id, outcome)
self._mark_task(db_path, task_id, "failed", {
"reason": outcome,
"stderr_preview": (stderr_text or "")[:500],
})
# 注意: cooldown 期间任务状态仍为 working,但 counter 已释放。
# DB 中的 working 是假 working——ticker 不会重新分配,_check_timeouts 会
# DB 中的 working 是"假 working"——ticker 不会重新分配,_check_timeouts 会
# 在 cooldown 结束后回收。如果 ticker 在此期间给同一 agent 分配新任务,属正常行为。
# 进程退出 → on_complete release counter
# 任务状态由各 outcome 自行处理(或等 ticker)
@@ -921,8 +928,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
compact_wait_count = self._compact_waits.get(task_id, 0) + 1
self._compact_waits[task_id] = compact_wait_count
if compact_wait_count >= self.max_monitor_timeouts:
# #07.3 ACT-2: compact_hanging 不标 failed只 release counter
# 进程还活着但不 monitor等 ticker _check_timeouts 超时回收 → 重新 dispatch
# #07.3 ACT-2: compact_hanging 不标 failed,只 release counter
# 进程还活着但不 monitor,等 ticker _check_timeouts 超时回收 → 重新 dispatch
logger.warning("Agent %s compact hanging after %d waits, releasing counter for ticker re-dispatch",
agent_id, compact_wait_count)
self._compact_waits.pop(task_id, None)
@@ -972,8 +979,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
on_complete(含 counter release)置为 None,避免 double release。
"""
# v2.8.1 Bug-4 fix: 不再手动 release counter + 置 None on_complete
# counter 从原始 spawn 保持到 retry 完成避免窗口期 ticker acquire 同一 agent
# on_complete 保留原始 wrapped_on_completeretry 完成后自然 release counter
# counter 从原始 spawn 保持到 retry 完成,避免窗口期 ticker acquire 同一 agent
# on_complete 保留原始 wrapped_on_complete,retry 完成后自然 release counter
# 续杯前检查任务状态,已终态则跳过
if db_path and task_id:
@@ -1077,7 +1084,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
skip_counter=True, # Bug-4 fix: counter 已在原始 spawn 中持有
)
except AgentBusyError as e:
# #07.3 ACT-3: session busycompact/lock/running= 暂时性阻塞
# #07.3 ACT-3: session busy(compact/lock/running)= 暂时性阻塞
# release counter → 任务保持 working → ticker 重新 dispatch
logger.warning("Retry spawn deferred: %s session busy (%s), releasing counter for ticker re-dispatch",
agent_id, e.reason)
@@ -1200,9 +1207,9 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
@staticmethod
def _check_recent_compaction_jsonl(session_file: str, window_seconds: int = 300) -> bool:
"""v2.8.1 Fix-1: 读 session jsonl 末尾检查是否有 window_seconds 内的 compaction 记录。
"""v2.8.1 Fix-1: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。
比 compactionCheckpoints 更可靠Gateway 每次完成 compact 必然在 jsonl 末尾追加记录
比 compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录,
但不保证更新 compactionCheckpoints。
"""
if not session_file or not pathlib.Path(session_file).exists():
@@ -1248,7 +1255,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
def _check_session_state(agent_id: str) -> dict:
"""检查 sessions.json 和 lock 状态
v2.8.1: compact 检测改用 session jsonl 末尾扫描Fix-1),
v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1),
替代失效的 compactionCheckpoints 检测。
"""
result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False}
@@ -1288,7 +1295,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
pass
# v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描
# 只在 agent 非空闲时才扫描减少不必要 I/O
# 只在 agent 非空闲时才扫描(减少不必要 I/O)
if result["status"] not in ("done", "idle", "unknown", None) and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)
except Exception:
@@ -1300,7 +1307,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
task_status: Optional[str], stdout_text: str = "") -> dict:
"""分类退出原因,返回处理策略
v3.1: A0 拆分为 A14-A17信号中断/stderr 智能分类
v3.1: A0 拆分为 A14-A17(信号中断/stderr 智能分类)
A8/A10 改为可恢复 retry。cooldown 统一 60s。
"""
status = json_result.get("status")
@@ -1320,15 +1327,15 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return {"outcome": "fallback_timeout", "should_retry": False}
# A2/A3: status=timeout → 唯一续杯场景
# 注意: PM2 restart 时 daemon 自身也收到 SIGTERM此时 retry spawn 的新进程
# 会随 daemon 一起被杀。A14 retry 假设 daemon 存活PM2 级重启不在此场景内。
# 注意: PM2 restart 时 daemon 自身也收到 SIGTERM,此时 retry spawn 的新进程
# 会随 daemon 一起被杀。A14 retry 假设 daemon 存活,PM2 级重启不在此场景内。
if status == "timeout":
return {"outcome": "gateway_timeout", "should_retry": True,
"retry_field": "retry_count"}
# A0 拆分: 无 JSON 输出 + exit≠0
if status is None and not stdout_text.strip() and exit_code != 0:
# A14: SIGINT(130) / SIGTERM(143) → 外部中断可恢复
# A14: SIGINT(130) / SIGTERM(143) → 外部中断,可恢复
if exit_code in (130, 143):
return {"outcome": "interrupted", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
@@ -1341,7 +1348,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]):
return {"outcome": "compact_interrupted", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
# A17: 真正的 crash → 保持 workingticker 兜底
# A17: 真正的 crash → 保持 working,ticker 兜底
return {"outcome": "crashed", "should_retry": False, "original": "process_crash"}
# stdout 为空但 exit=0:可能是正常完成但 --json 没输出
@@ -1446,6 +1453,19 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
conn.commit()
finally:
conn.close()
# F2: conn 已关闭,Blackboard 内部自己 get_connection
if status == "failed":
try:
from src.blackboard.operations import Blackboard
bb = Blackboard(db_path)
reason = (detail or {}).get("reason", "unknown")
cid = bb.add_comment(task_id, "daemon",
f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入",
comment_type="system")
bb.record_mentions(cid, task_id, ["pangtong-fujunshi"])
logger.info("Task %s: failure notified pangtong via comment+mention (reason=%s)", task_id, reason)
except Exception as e:
logger.warning("Task %s: failed to notify pangtong: %s", task_id, e)
except Exception:
logger.exception("Failed to mark task %s as %s", task_id, status)
@@ -1504,7 +1524,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
db_path: Optional[Path] = None,
) -> None:
"""记录 task_attempt"""
# 广播 spawn 产生的 "broadcast" task_id 不记录 attempts避免脏数据
# 广播 spawn 产生的 "broadcast" task_id 不记录 attempts,避免脏数据
if task_id == "broadcast":
return
effective_db = db_path or self.db_path