diff --git a/src/daemon/dispatcher.py b/src/daemon/dispatcher.py index 875aefe..5da768d 100644 --- a/src/daemon/dispatcher.py +++ b/src/daemon/dispatcher.py @@ -676,9 +676,34 @@ class Dispatcher: if performative == "request": has_reply = self._mail_check_reply(task_id, db_path) if not has_reply: - # 不直接标 failed,留 working 等 ticker 下一轮再查 - logger.warning("Mail %s: no reply found on on_complete, " - "leaving working for ticker recheck", task_id) + # F3: 立刻标 failed(不等 ticker 30 分钟) + logger.error("Mail %s: no reply found, marking failed (no_reply_found)", task_id) + for attempt in range(3): + try: + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() + if not row: + return + if row["status"] == "working": + conn.execute( + "UPDATE tasks SET status='failed', updated_at=datetime('now') WHERE id=?", + (task_id,), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, "daemon", "failed", + json.dumps({"reason": "no_reply_found"}, ensure_ascii=False)), + ) + conn.commit() + logger.info("Mail %s: marked failed (no_reply_found)", task_id) + return + finally: + conn.close() + except Exception as e: + logger.warning("Mail %s: failed attempt %d: %s", task_id, attempt + 1, e) + logger.error("Mail %s: all 3 failed attempts failed, leaving for ticker", task_id) return # 标 done(重试 3 次) diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index 22f4dc6..56924ee 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -23,18 +23,18 @@ logger = logging.getLogger("moziplus-v2.spawner") # ── Prompt 模板 ── # Mail 专用模板:inform 类型(纯通知,状态由系统管理) -MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知)。 +MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知)。 发件者: {from_agent} 主题: {title} 内容: {text} -已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。 +已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。 ⚠️ 不要执行任何状态转换命令。 """ -# Mail 专用模板:request 类型(需要处理并回复,状态由系统管理) -MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回复。 +# Mail 专用模板:request 类型(需要处理并回复,状态由系统管理) +MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回复。 发件者: {from_agent} 主题: {title} @@ -46,7 +46,7 @@ curl -s -X POST http://localhost:8083/api/mail \\ -H 'Content-Type: application/json' \\ -d '{{"from": "{agent_id}", "in_reply_to": "{task_id}", "title": "回复: {title}", "text": "你的回复内容"}}' -⚠️ 不需要填 "to",系统自动回复给发件者。 +⚠️ 不需要填 "to",系统自动回复给发件者。 ### 如何给其他人发新邮件 @@ -55,9 +55,9 @@ curl -s -X POST http://localhost:8083/api/mail \\ -d '{{"from": "{agent_id}", "to": "对方agent-id", "title": "标题", "text": "正文", "type": "inform"}}' ⚠️ to 必须是有效的 agent id: {valid_agents} -⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request) +⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request) ⚠️ 不能给自己发邮件 -⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。 +⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。 """ SPAWN_PROMPT_TEMPLATE = """{identity_section} @@ -73,7 +73,7 @@ SPAWN_PROMPT_TEMPLATE = """{identity_section} {retry_context} ## 你能做什么 -- 读任务详情(含依赖、讨论、产出): GET {api_base}/projects/{project_id}/tasks/{task_id}?expand=all +- 读任务详情(含依赖、讨论、产出): GET {api_base}/projects/{project_id}/tasks/{task_id}?expand=all - 读所有活跃任务: GET {api_base}/projects/{project_id}/tasks?status=working,claimed,review - 写产出: POST {api_base}/projects/{project_id}/tasks/{task_id}/outputs - 写评论/交接: POST {api_base}/projects/{project_id}/tasks/{task_id}/comments @@ -82,11 +82,11 @@ SPAWN_PROMPT_TEMPLATE = """{identity_section} - 认领任务: POST {api_base}/projects/{project_id}/tasks/{{{{id}}}}/claim ## 约束 -- 完成后必须写产出物(output)并标 review,不能无产出就提交 +- 完成后必须写产出物(output)并标 review,不能无产出就提交 - 失败了标 failed 并写明原因 -- 产出物 handoff comment ≥ 50 字符(用于系统验证) -- 禁止使用 sessions_send 直接发消息(用 Mail API 或黑板 comment) -- 委托他人做事用黑板 comment @agent-id,系统自动路由(如 @zhaoyun-data 你来获取数据,无需手动传 mentions 数组) +- 产出物 handoff comment ≥ 50 字符(用于系统验证) +- 禁止使用 sessions_send 直接发消息(用 Mail API 或黑板 comment) +- 委托他人做事用黑板 comment @agent-id,系统自动路由(如 @zhaoyun-data 你来获取数据,无需手动传 mentions 数组) - 安全红线: {guardrails_summary} ### API 请求体示例 @@ -97,7 +97,7 @@ SPAWN_PROMPT_TEMPLATE = """{identity_section} 写评论: POST .../comments ```json -{{{{"author": "{agent_id}", "body": "评论内容(≥50字符)", "comment_type": "handoff"}}}} +{{{{"author": "{agent_id}", "body": "评论内容(≥50字符)", "comment_type": "handoff"}}}} ``` """ @@ -117,7 +117,7 @@ DISCUSSION_PROMPT_TEMPLATE = """你被 spawn 来参与黑板讨论。这是一 你可以随时: - 读黑板:GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all(含 comments、outputs) - 写 comment:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments - body: {{"author": "{agent_id}", "body": "内容(@agent-id 自动路由)"}} + body: {{"author": "{agent_id}", "body": "内容(@agent-id 自动路由)"}} - 创建 sub task:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks body: {{"title": "...", "description": "...", "task_type": "...", "parent_task": "{task_id}", "must_haves": "{{\"capability\": \"...\"}}"}} - 认领任务:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{{sub_task_id}}/claim @@ -159,9 +159,9 @@ MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程 class AgentBusyError(Exception): - """Agent 无法 spawn(被占用/冷却/session 锁等) + """Agent 无法 spawn(被占用/冷却/session 锁等) - #07: reason 字段区分具体原因,便于 dispatcher 层区分处理。 + #07: reason 字段区分具体原因,便于 dispatcher 层区分处理。 """ def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None): self.agent_id = agent_id @@ -217,7 +217,7 @@ class AgentSpawner: self._valid_agents_cache: Optional[set] = None def _load_valid_agents(self) -> set: - """从 config/default.yaml 读取有效 Agent ID 列表(带缓存)""" + """从 config/default.yaml 读取有效 Agent ID 列表(带缓存)""" if self._valid_agents_cache is not None: return self._valid_agents_cache config_path = Path(__file__).parent.parent / "config" / "default.yaml" @@ -300,7 +300,7 @@ class AgentSpawner: def _build_minimal_fallback(self, task_id, title, description, must_haves, project_id, agent_id): - """最小 fallback:只有任务上下文 + API 指令""" + """最小 fallback:只有任务上下文 + API 指令""" task_section = f"""## 任务 {title} {description or "(无描述)"} @@ -361,7 +361,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta profile = router.agent_profiles.get(agent_id) if profile and getattr(profile, 'capabilities_zh', None): caps = ", ".join(profile.capabilities_zh) - return f"你是 {agent_id},专长: {caps}。" + return f"你是 {agent_id},专长: {caps}。" def _get_guardrails_summary(self) -> str: """#03: 从 GuardrailEngine 提取红线摘要""" @@ -397,7 +397,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta safe_title = (title or "").replace('"', '\\"')[:100] safe_text = (description or "").replace('"', '\\"') - # 获取有效 Agent 列表(从 config/default.yaml 读取) + # 获取有效 Agent 列表(从 config/default.yaml 读取) valid_agents_list = self._load_valid_agents() valid_agents_str = " / ".join(sorted(valid_agents_list)) @@ -441,7 +441,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta counter.release 由内部 wrapped_on_complete 保证。 use_main_session: True = 投递到主 Agent session(不传 --session-id) on_checks_passed: 所有检查通过后的回调(session check + counter acquire 后、subprocess 前) - reuse_session_id: 传入指定 session-id 复用(用于续杯) — deprecated,use_main_session=True 已替代 + reuse_session_id: 传入指定 session-id 复用(用于续杯) - deprecated,use_main_session=True 已替代 Returns: session_id @@ -451,7 +451,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta """ # ── #07 Acquire-First: counter 前置 → session check 在锁内贴近 spawn ── - # Step 0: 分配 session_id(纯计算,无 IO) + # Step 0: 分配 session_id(纯计算,无 IO) if use_main_session: session_id = None elif reuse_session_id: @@ -460,9 +460,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta session_id = str(uuid.uuid4()) _sid_key = session_id or "main" # counter 用的 key - # Phase 0: Pre-acquire 修复(无锁) - # timeout/failed 状态先修复再 acquire。revive 只改 running→idle,幂等安全。 - # asyncio 协作式并发保证同一时刻只有一个协程在执行,revive 的 sessions.json + # Phase 0: Pre-acquire 修复(无锁) + # timeout/failed 状态先修复再 acquire。revive 只改 running→idle,幂等安全。 + # asyncio 协作式并发保证同一时刻只有一个协程在执行,revive 的 sessions.json # 写操作不会真正并行。 if use_main_session: pre_state = self._check_session_state(agent_id) @@ -475,15 +475,15 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta logger.warning("Phase 0: %s status=running but lock PID dead, reviving", agent_id) self._revive_session(agent_id) - # Phase 1: Counter acquire(互斥锁) - # v2.8.1 Bug-4 fix: retry 时跳过 counter(counter 从原始 spawn 保持到 retry 完成) + # Phase 1: Counter acquire(互斥锁) + # v2.8.1 Bug-4 fix: retry 时跳过 counter(counter 从原始 spawn 保持到 retry 完成) if self.counter and not skip_counter: acquired = await self.counter.acquire(agent_id, _sid_key) if not acquired: raise AgentBusyError(agent_id, reason="counter_blocked") - # Phase 2: Session check(在锁保护下,贴近 spawn) - # 并列收集所有 block 原因,统一判定。 + # Phase 2: Session check(在锁保护下,贴近 spawn) + # 并列收集所有 block 原因,统一判定。 if use_main_session: session_state = self._check_session_state(agent_id) logger.info("Phase 2 session check for %s: status=%s lock_pid=%s lock_pid_alive=%s compact=%s", @@ -495,16 +495,16 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta blockers.append(("session_locked", session_state.get("lock_pid"))) if session_state.get("status") == "running": if session_state.get("lock_pid_alive"): - # 真 running:外部进程占用 + # 真 running:外部进程占用 blockers.append(("session_running", None)) else: - # 假 running:lock PID 死了但 status 还在 running → Phase 2.5 处理 + # 假 running:lock PID 死了但 status 还在 running → Phase 2.5 处理 pass if session_state.get("recent_compact"): blockers.append(("session_compacting", None)) if blockers: - # 释放 counter,报具体原因 + # 释放 counter,报具体原因 if self.counter and not skip_counter: self.counter.release(agent_id, _sid_key) primary_reason, primary_detail = blockers[0] @@ -513,8 +513,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta raise AgentBusyError(agent_id, reason=primary_reason, detail={"blockers": blockers}) - # Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检) - # 此场景应被 Phase 0 提前修复,这里做兜底 + # Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检) + # 此场景应被 Phase 0 提前修复,这里做兜底 if session_state.get("status") == "running" and not session_state.get("lock_pid_alive"): logger.warning("Phase 2.5: %s status=running + lock dead (should be caught in Phase 0), reviving", agent_id) @@ -760,12 +760,12 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)", agent_id, session_id, outcome, exit_code, task_status) - # 广播反馈追踪(Phase 1 bug fix) + # 广播反馈追踪(Phase 1 bug fix) if task_id == "broadcast" and hasattr(self, '_ticker') and self._ticker: - # 广播任务:从 session 信息取真实 task_id 列表,逐一回调 tracker + # 广播任务:从 session 信息取真实 task_id 列表,逐一回调 tracker sess_info = self._sessions.get(session_id or "main", {}) bt_ids = sess_info.get("broadcast_task_ids") or [] - # 广播场景一律标 no_reply:Agent 只 claim 一个任务, + # 广播场景一律标 no_reply:Agent 只 claim 一个任务, # 其余任务的 tracker 不能被 claimed 清除 for real_task_id in bt_ids: self._ticker.record_broadcast_response(real_task_id, agent_id, "no_reply") @@ -774,7 +774,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ self._ticker.record_broadcast_response(task_id, agent_id, outcome_str) if cls["should_retry"]: - # cooldown: 新增的可恢复场景(A14/A15/A16/A8/A10) + # cooldown: 新增的可恢复场景(A14/A15/A16/A8/A10) cooldown_seconds = cls.get("cooldown_seconds", 0) if cooldown_seconds and self.counter: self.counter.set_cooldown(agent_id, seconds=cooldown_seconds) @@ -785,7 +785,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ ) elif outcome == "api_error": # A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却 - # 有上限:api_retry_count 累计达 max_retries 则标 failed + # 有上限:api_retry_count 累计达 max_retries 则标 failed await self._do_on_complete_async(on_complete, agent_id, outcome) if self.counter: self.counter.set_cooldown(agent_id) @@ -809,8 +809,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ task_id, api_count, self.max_retries) elif outcome == "fallback_timeout" and not cls["should_retry"]: # A3/A3b: fallback 分级处理 - # fallback_count 从 task_attempts.metadata 读取, - # 达 max_retries 标 failed(A3),否则 retry + cooldown(A3b) + # fallback_count 从 task_attempts.metadata 读取, + # 达 max_retries 标 failed(A3),否则 retry + cooldown(A3b) fallback_count = 0 if db_path and task_id: retry_counts = self._get_retry_counts(db_path, task_id) @@ -819,7 +819,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ self._update_retry_counts(db_path, task_id, retry_counts) if fallback_count >= self.max_retries: - # A3: 连续 fallback 达上限,标 failed + # A3: 连续 fallback 达上限,标 failed logger.error("A3 fallback exhausted: agent=%s session=%s task=%s " "fallback_count=%d reason=%s", agent_id, session_id, task_id, fallback_count, @@ -832,7 +832,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ "fallback_reason": json_result.get("fallback_reason"), }) else: - # A3b: fallback 未达上限,retry + cooldown + # A3b: fallback 未达上限,retry + cooldown logger.warning("A3b fallback retry: agent=%s session=%s task=%s " "fallback_count=%d/%d reason=%s", agent_id, session_id, task_id, fallback_count, @@ -841,19 +841,26 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ self.counter.set_cooldown(agent_id, seconds=60) await self._do_retry( session_id, agent_id, task_id, on_complete, db_path, - "fallback_retry_count" # 独立计数,不与 gateway_timeout 的 retry_count 共用 + "fallback_retry_count" # 独立计数,不与 gateway_timeout 的 retry_count 共用 ) else: # 其他:A1(completed), A4(agent_failed), A7(auth_failed), # A8(gateway_unreachable), A11(lock_conflict), # A10(compact_failed), A12(agent_error) - # v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间 + # v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间 if outcome in ("crashed", "compact_failed", "process_crash", "session_stuck", "compact_hanging", "agent_error", "compact_interrupted") and self.counter: self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟 logger.info("Crash/error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome) + # F1: 不可恢复 outcome → 立刻标 failed + 写黑板 + if outcome in ("auth_failed", "agent_error") and db_path and task_id: + logger.error("Task %s: unrecoverable outcome=%s, marking failed immediately", task_id, outcome) + self._mark_task(db_path, task_id, "failed", { + "reason": outcome, + "stderr_preview": (stderr_text or "")[:500], + }) # 注意: cooldown 期间任务状态仍为 working,但 counter 已释放。 - # DB 中的 working 是“假 working”——ticker 不会重新分配,_check_timeouts 会 + # DB 中的 working 是"假 working"——ticker 不会重新分配,_check_timeouts 会 # 在 cooldown 结束后回收。如果 ticker 在此期间给同一 agent 分配新任务,属正常行为。 # 进程退出 → on_complete release counter # 任务状态由各 outcome 自行处理(或等 ticker) @@ -921,8 +928,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ compact_wait_count = self._compact_waits.get(task_id, 0) + 1 self._compact_waits[task_id] = compact_wait_count if compact_wait_count >= self.max_monitor_timeouts: - # #07.3 ACT-2: compact_hanging 不标 failed,只 release counter - # 进程还活着但不 monitor,等 ticker _check_timeouts 超时回收 → 重新 dispatch + # #07.3 ACT-2: compact_hanging 不标 failed,只 release counter + # 进程还活着但不 monitor,等 ticker _check_timeouts 超时回收 → 重新 dispatch logger.warning("Agent %s compact hanging after %d waits, releasing counter for ticker re-dispatch", agent_id, compact_wait_count) self._compact_waits.pop(task_id, None) @@ -972,8 +979,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ on_complete(含 counter release)置为 None,避免 double release。 """ # v2.8.1 Bug-4 fix: 不再手动 release counter + 置 None on_complete - # counter 从原始 spawn 保持到 retry 完成,避免窗口期 ticker acquire 同一 agent - # on_complete 保留原始 wrapped_on_complete,retry 完成后自然 release counter + # counter 从原始 spawn 保持到 retry 完成,避免窗口期 ticker acquire 同一 agent + # on_complete 保留原始 wrapped_on_complete,retry 完成后自然 release counter # 续杯前检查任务状态,已终态则跳过 if db_path and task_id: @@ -1077,7 +1084,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ skip_counter=True, # Bug-4 fix: counter 已在原始 spawn 中持有 ) except AgentBusyError as e: - # #07.3 ACT-3: session busy(compact/lock/running)= 暂时性阻塞 + # #07.3 ACT-3: session busy(compact/lock/running)= 暂时性阻塞 # release counter → 任务保持 working → ticker 重新 dispatch logger.warning("Retry spawn deferred: %s session busy (%s), releasing counter for ticker re-dispatch", agent_id, e.reason) @@ -1200,9 +1207,9 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ @staticmethod def _check_recent_compaction_jsonl(session_file: str, window_seconds: int = 300) -> bool: - """v2.8.1 Fix-1: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。 + """v2.8.1 Fix-1: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。 - 比 compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录, + 比 compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录, 但不保证更新 compactionCheckpoints。 """ if not session_file or not pathlib.Path(session_file).exists(): @@ -1248,7 +1255,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ def _check_session_state(agent_id: str) -> dict: """检查 sessions.json 和 lock 状态 - v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1), + v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1), 替代失效的 compactionCheckpoints 检测。 """ result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False} @@ -1288,7 +1295,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ pass # v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描 - # 只在 agent 非空闲时才扫描(减少不必要 I/O) + # 只在 agent 非空闲时才扫描(减少不必要 I/O) if result["status"] not in ("done", "idle", "unknown", None) and sf: result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf) except Exception: @@ -1300,7 +1307,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ task_status: Optional[str], stdout_text: str = "") -> dict: """分类退出原因,返回处理策略 - v3.1: A0 拆分为 A14-A17(信号中断/stderr 智能分类)。 + v3.1: A0 拆分为 A14-A17(信号中断/stderr 智能分类)。 A8/A10 改为可恢复 retry。cooldown 统一 60s。 """ status = json_result.get("status") @@ -1320,15 +1327,15 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ return {"outcome": "fallback_timeout", "should_retry": False} # A2/A3: status=timeout → 唯一续杯场景 - # 注意: PM2 restart 时 daemon 自身也收到 SIGTERM,此时 retry spawn 的新进程 - # 会随 daemon 一起被杀。A14 retry 假设 daemon 存活,PM2 级重启不在此场景内。 + # 注意: PM2 restart 时 daemon 自身也收到 SIGTERM,此时 retry spawn 的新进程 + # 会随 daemon 一起被杀。A14 retry 假设 daemon 存活,PM2 级重启不在此场景内。 if status == "timeout": return {"outcome": "gateway_timeout", "should_retry": True, "retry_field": "retry_count"} # A0 拆分: 无 JSON 输出 + exit≠0 if status is None and not stdout_text.strip() and exit_code != 0: - # A14: SIGINT(130) / SIGTERM(143) → 外部中断,可恢复 + # A14: SIGINT(130) / SIGTERM(143) → 外部中断,可恢复 if exit_code in (130, 143): return {"outcome": "interrupted", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} @@ -1341,7 +1348,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]): return {"outcome": "compact_interrupted", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} - # A17: 真正的 crash → 保持 working,ticker 兜底 + # A17: 真正的 crash → 保持 working,ticker 兜底 return {"outcome": "crashed", "should_retry": False, "original": "process_crash"} # stdout 为空但 exit=0:可能是正常完成但 --json 没输出 @@ -1446,6 +1453,19 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ conn.commit() finally: conn.close() + # F2: conn 已关闭,Blackboard 内部自己 get_connection + if status == "failed": + try: + from src.blackboard.operations import Blackboard + bb = Blackboard(db_path) + reason = (detail or {}).get("reason", "unknown") + cid = bb.add_comment(task_id, "daemon", + f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入", + comment_type="system") + bb.record_mentions(cid, task_id, ["pangtong-fujunshi"]) + logger.info("Task %s: failure notified pangtong via comment+mention (reason=%s)", task_id, reason) + except Exception as e: + logger.warning("Task %s: failed to notify pangtong: %s", task_id, e) except Exception: logger.exception("Failed to mark task %s as %s", task_id, status) @@ -1504,7 +1524,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ db_path: Optional[Path] = None, ) -> None: """记录 task_attempt""" - # 广播 spawn 产生的 "broadcast" task_id 不记录 attempts,避免脏数据 + # 广播 spawn 产生的 "broadcast" task_id 不记录 attempts,避免脏数据 if task_id == "broadcast": return effective_db = db_path or self.db_path