From d58e38d58fa6d65726634f549a7fb321ab925fcb Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 9 Jun 2026 23:53:29 +0800 Subject: [PATCH] =?UTF-8?q?fix(lint):=20=E4=BF=AE=E5=A4=8D=20PR=20#14=20?= =?UTF-8?q?=E5=BC=95=E5=85=A5=E7=9A=84=20lint=20=E5=9B=9E=E9=80=80=20(119?= =?UTF-8?q?=E2=86=920)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #14 从旧分支复制文件导致回退了 PR #10 的 lint 修复。 修复内容: - autoflake 移除未使用导入/变量 - autopep8 修复缩进/空格 - 手动修复 F821(pathlib→Path), F541(f-string), F841(未使用变量) - 所有修复均通过 flake8 --max-line-length=120 --extend-ignore=E501 检查 (0 errors) --- src/api/blackboard_routes.py | 63 +++++--- src/api/checkpoint_routes.py | 28 +++- src/api/mail_routes.py | 27 +++- src/api/project_routes.py | 32 ++-- src/api/toolchain_routes.py | 65 ++++++-- src/blackboard/db.py | 34 ++-- src/blackboard/models.py | 2 +- src/blackboard/operations.py | 20 ++- src/blackboard/queries.py | 12 +- src/blackboard/registry.py | 10 +- src/cli/blackboard.py | 20 ++- src/daemon/bootstrap.py | 20 +-- src/daemon/counter.py | 14 +- src/daemon/dispatcher.py | 233 ++++++++++++++++++--------- src/daemon/experience.py | 6 +- src/daemon/guardrails.py | 22 ++- src/daemon/health.py | 17 +- src/daemon/inbox.py | 11 +- src/daemon/mail_notify.py | 19 ++- src/daemon/review.py | 26 +-- src/daemon/router.py | 16 +- src/daemon/skill_system.py | 3 +- src/daemon/spawner.py | 243 ++++++++++++++++++++--------- src/daemon/sse.py | 8 +- src/daemon/ticker.py | 295 +++++++++++++++++++++++------------ src/main.py | 33 ++-- src/utils.py | 1 - 27 files changed, 863 insertions(+), 417 deletions(-) diff --git a/src/api/blackboard_routes.py b/src/api/blackboard_routes.py index 8bf30d3..89aecc6 100644 --- a/src/api/blackboard_routes.py +++ b/src/api/blackboard_routes.py @@ -5,14 +5,14 @@ from __future__ import annotations import json import os from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional from fastapi import APIRouter, HTTPException, Query from src.blackboard.operations import Blackboard from src.blackboard.models import Task, Review from src.blackboard.queries import Queries -from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS, COMMENT_TYPES, OUTPUT_TYPES +from src.blackboard.db import VALID_STATUSES, OUTPUT_TYPES from src.blackboard.registry import ProjectRegistry from src.utils import get_data_root @@ -59,7 +59,10 @@ async def list_tasks(project_id: str, assignee: Optional[str] = None, parent_task: Optional[str] = None): bb = _bb(project_id) - tasks = bb.list_tasks(status=status, assignee=assignee, parent_task=parent_task) + tasks = bb.list_tasks( + status=status, + assignee=assignee, + parent_task=parent_task) return {"tasks": [_task_to_dict(t) for t in tasks]} @@ -79,10 +82,12 @@ async def get_task(project_id: str, task_id: str, result["outputs_count"] = detail.get("outputs_count", 0) result["review_status"] = detail.get("review_status") result["latest_event_detail"] = detail.get("latest_event_detail") - result["comments"] = [dict(c.__dict__) for c in bb.get_comments(task_id)] + result["comments"] = [dict(c.__dict__) + for c in bb.get_comments(task_id)] result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)] result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)] - result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)] + result["decisions"] = [dict(d.__dict__) + for d in bb.get_decisions(task_id)] result["events"] = q.task_events(task_id) result["experiences"] = q.task_experiences(task_id) return result @@ -134,7 +139,8 @@ async def create_task(project_id: str, body: Dict[str, Any]): priority=body.get("priority", 5), assignee=assignee, assigned_by=body.get("assigned_by", "user"), - depends_on=json.dumps(body["depends_on"]) if "depends_on" in body else None, + depends_on=json.dumps( + body["depends_on"]) if "depends_on" in body else None, parent_task=body.get("parent_task"), risk_level=body.get("risk_level", "standard"), stage=body.get("stage"), @@ -175,7 +181,8 @@ async def _generate_title(description: str) -> str | None: resp = client.chat.completions.create( model=model, messages=[ - {"role": "system", "content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"}, + {"role": "system", + "content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"}, {"role": "user", "content": description[:500]}, ], max_tokens=50, @@ -187,7 +194,8 @@ async def _generate_title(description: str) -> str | None: return title except Exception as e: import logging - logging.getLogger("moziplus-v2").warning(f"Title generation failed: {e}") + logging.getLogger( + "moziplus-v2").warning(f"Title generation failed: {e}") return None @@ -205,7 +213,8 @@ async def task_progress(project_id: str, task_id: str): async def claim_task(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) if not bb.claim_task(task_id, body["agent"]): - raise HTTPException(409, "Claim failed (already claimed or wrong assignee)") + raise HTTPException( + 409, "Claim failed (already claimed or wrong assignee)") return {"ok": True} @@ -240,7 +249,7 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]): }) if not bb.update_task_status(task_id, new_status, - agent=body.get("agent")): + agent=body.get("agent")): raise HTTPException(409, { "error": "transition_failed", "detail": f"Status update failed for {task_id}", @@ -265,6 +274,7 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]): # --- @mention 自动提取(#04) --- _KNOWN_AGENT_IDS: list = [] + def _init_agent_ids(): """从配置文件加载 Agent ID 列表""" global _KNOWN_AGENT_IDS @@ -272,18 +282,32 @@ def _init_agent_ids(): return try: import yaml - cfg_path = os.path.join(os.path.dirname(__file__), "..", "..", "config", "default.yaml") + cfg_path = os.path.join( + os.path.dirname(__file__), + "..", + "..", + "config", + "default.yaml") with open(cfg_path) as f: cfg = yaml.safe_load(f) - _KNOWN_AGENT_IDS = list(cfg.get("daemon", {}).get("agent_profiles", {}).keys()) + _KNOWN_AGENT_IDS = list( + cfg.get( + "daemon", + {}).get( + "agent_profiles", + {}).keys()) except Exception: _KNOWN_AGENT_IDS = [] + def _extract_mentions(text: str) -> list: """从文本中自动提取 @agent-id 格式的 mention""" import re _init_agent_ids() - candidates = set(re.findall(r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)', text)) + candidates = set( + re.findall( + r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)', + text)) return [a for a in candidates if a in _KNOWN_AGENT_IDS] @@ -317,8 +341,8 @@ async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]): merged_mentions = list(set(explicit_mentions + auto_mentions)) cid = bb.add_comment(task_id, body["author"], comment_body, - comment_type=body.get("comment_type", "general"), - mentions=merged_mentions) + comment_type=body.get("comment_type", "general"), + mentions=merged_mentions) if merged_mentions: bb.record_mentions(cid, task_id, merged_mentions) # #10: SSE 通知前端黑板有新 comment @@ -395,7 +419,8 @@ async def write_output(project_id: str, task_id: str, body: Dict[str, Any]): ) os.makedirs(artifacts_dir, exist_ok=True) # 安全文件名 - safe_name = "".join(c if c.isalnum() or c in "._-" else "_" for c in title) + safe_name = "".join( + c if c.isalnum() or c in "._-" else "_" for c in title) if not safe_name: safe_name = "output" file_path = os.path.join(artifacts_dir, safe_name) @@ -424,8 +449,8 @@ async def get_decisions(project_id: str, task_id: str): async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) did = bb.add_decision(task_id, body["decider"], body["decision"], - body["rationale"], - alternatives=body.get("alternatives")) + body["rationale"], + alternatives=body.get("alternatives")) return {"ok": True, "decision_id": did} @@ -435,7 +460,7 @@ async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]): async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) oid = bb.add_observation(task_id, body["observer"], body["body"], - severity=body.get("severity", "info")) + severity=body.get("severity", "info")) return {"ok": True, "observation_id": oid} diff --git a/src/api/checkpoint_routes.py b/src/api/checkpoint_routes.py index c713067..4461fc6 100644 --- a/src/api/checkpoint_routes.py +++ b/src/api/checkpoint_routes.py @@ -12,7 +12,9 @@ from typing import Optional from src.blackboard.operations import Blackboard from src.utils import get_data_root -router = APIRouter(prefix="/api/projects/{project_id}/tasks/{task_id}/checkpoints", tags=["checkpoints"]) +router = APIRouter( + prefix="/api/projects/{project_id}/tasks/{task_id}/checkpoints", + tags=["checkpoints"]) # ── 请求模型 ── @@ -50,10 +52,12 @@ def list_checkpoints(project_id: str, task_id: str): @router.post("") -def create_checkpoint(project_id: str, task_id: str, req: CreateCheckpointRequest): +def create_checkpoint(project_id: str, task_id: str, + req: CreateCheckpointRequest): """Agent 创建 checkpoint""" if req.type not in ("verify", "decision", "action"): - raise HTTPException(status_code=400, detail=f"Invalid checkpoint type: {req.type}") + raise HTTPException(status_code=400, + detail=f"Invalid checkpoint type: {req.type}") bb = _bb(project_id) # 验证 task 存在 @@ -73,10 +77,15 @@ def create_checkpoint(project_id: str, task_id: str, req: CreateCheckpointReques @router.post("/{checkpoint_id}/approve") -def approve_checkpoint(project_id: str, task_id: str, checkpoint_id: str, req: ResolveCheckpointRequest): +def approve_checkpoint(project_id: str, task_id: str, + checkpoint_id: str, req: ResolveCheckpointRequest): """用户通过 checkpoint → 自动推进 task 状态""" bb = _bb(project_id) - result = bb.resolve_checkpoint(checkpoint_id, "approve", req.resolved_by, req.note) + result = bb.resolve_checkpoint( + checkpoint_id, + "approve", + req.resolved_by, + req.note) if result is None: raise HTTPException(status_code=404, detail="Checkpoint not found") if "error" in result: @@ -97,10 +106,15 @@ def approve_checkpoint(project_id: str, task_id: str, checkpoint_id: str, req: R @router.post("/{checkpoint_id}/reject") -def reject_checkpoint(project_id: str, task_id: str, checkpoint_id: str, req: ResolveCheckpointRequest): +def reject_checkpoint(project_id: str, task_id: str, + checkpoint_id: str, req: ResolveCheckpointRequest): """用户驳回 checkpoint → task 回到 working""" bb = _bb(project_id) - result = bb.resolve_checkpoint(checkpoint_id, "reject", req.resolved_by, req.note) + result = bb.resolve_checkpoint( + checkpoint_id, + "reject", + req.resolved_by, + req.note) if result is None: raise HTTPException(status_code=404, detail="Checkpoint not found") if "error" in result: diff --git a/src/api/mail_routes.py b/src/api/mail_routes.py index ef83690..cd79175 100644 --- a/src/api/mail_routes.py +++ b/src/api/mail_routes.py @@ -9,7 +9,7 @@ from __future__ import annotations import json from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional from fastapi import APIRouter, HTTPException, Query @@ -34,7 +34,9 @@ def _get_valid_agents() -> set: except Exception: pass # fallback:硬编码 - return {"zhangfei-dev", "guanyu-dev", "zhaoyun-data", "jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"} + return {"zhangfei-dev", "guanyu-dev", "zhaoyun-data", + "jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"} + router = APIRouter(prefix="/api/mail", tags=["mail"]) @@ -97,7 +99,10 @@ async def list_mail( ): """Mail 列表(按时间倒序)""" bb = _bb() - tasks = bb.list_tasks(status=status, assignee=to_agent, assigned_by=from_agent) + tasks = bb.list_tasks( + status=status, + assignee=to_agent, + assigned_by=from_agent) mails = [] for t in tasks: @@ -222,13 +227,16 @@ async def send_mail(body: Dict[str, Any]): # A8: 只有原邮件的双方能回复(严格 1 对 1) if from_agent not in (orig_from, orig_to): - raise HTTPException(400, f"只有邮件的发送者或接收者可以回复") + raise HTTPException(400, "只有邮件的发送者或接收者可以回复") # A6/A7: 自动纠正 to → 原邮件发件者 to_agent = body.get("to", "").strip() corrected_to = orig_from # 回复方向固定: reply → original sender if to_agent and to_agent != corrected_to: - auto_corrected = {"field": "to", "original": to_agent, "corrected": corrected_to} + auto_corrected = { + "field": "to", + "original": to_agent, + "corrected": corrected_to} to_agent = corrected_to else: # --- A2: to 必填(非回复场景) --- @@ -255,7 +263,8 @@ async def send_mail(body: Dict[str, Any]): conversation_id = body.get("conversation_id") if not conversation_id and original: try: - orig_meta = json.loads(original.must_haves) if original.must_haves else {} + orig_meta = json.loads( + original.must_haves) if original.must_haves else {} conversation_id = orig_meta.get("conversation_id") except Exception: pass @@ -310,10 +319,12 @@ async def delete_mail(prefix: Optional[str] = Query(None)): for t in tasks: if t.title and t.title.startswith(prefix): if t.status not in ("cancelled",): - bb.update_task_status(t.id, "cancelled", agent="mail-cleanup-api") + bb.update_task_status( + t.id, "cancelled", agent="mail-cleanup-api") deleted_ids.append(t.id) - return {"ok": True, "deleted_count": len(deleted_ids), "deleted_ids": deleted_ids} + return {"ok": True, "deleted_count": len( + deleted_ids), "deleted_ids": deleted_ids} @router.patch("/{mail_id}") diff --git a/src/api/project_routes.py b/src/api/project_routes.py index ff9d1f4..bede2a0 100644 --- a/src/api/project_routes.py +++ b/src/api/project_routes.py @@ -3,7 +3,7 @@ from __future__ import annotations from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict from fastapi import APIRouter, HTTPException, Query @@ -31,8 +31,10 @@ async def list_projects(): if db_path.exists(): try: conn = sqlite3.connect(str(db_path), timeout=5) - total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0] - active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0] + total = conn.execute( + "SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0] + active = conn.execute( + "SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0] archived = total - active conn.close() info['task_count'] = active @@ -45,8 +47,10 @@ async def list_projects(): if general_db.exists() and "_general" not in projects: try: conn = sqlite3.connect(str(general_db), timeout=5) - total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0] - active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0] + total = conn.execute( + "SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0] + active = conn.execute( + "SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0] conn.close() projects["_general"] = { "id": "_general", "name": "一般任务", "description": "无项目归属的通用任务", @@ -60,8 +64,10 @@ async def list_projects(): if general_db_check.exists(): try: conn = sqlite3.connect(str(general_db_check), timeout=5) - total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0] - active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0] + total = conn.execute( + "SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0] + active = conn.execute( + "SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0] conn.close() projects["_general"]["task_count"] = active projects["_general"]["task_count_total"] = total @@ -76,7 +82,7 @@ async def list_projects(): async def create_project(body: Dict[str, Any]): reg = _registry() try: - info = reg.create_project( + reg.create_project( body["id"], body["name"], agents=body.get("agents", []), description=body.get("description", ""), @@ -173,7 +179,10 @@ async def move_task(project_id: str, task_id: str, body: Dict[str, Any]): depends_on=child.depends_on, must_haves=child.must_haves, ) tgt_bb.create_task(moved_child) - src_bb.update_task_status(child.id, "cancelled", detail=f"Moved to {target_project}") + src_bb.update_task_status( + child.id, + "cancelled", + detail=f"Moved to {target_project}") moved_ids.append(child.id) # 移动主任务 @@ -186,7 +195,10 @@ async def move_task(project_id: str, task_id: str, body: Dict[str, Any]): depends_on=task.depends_on, must_haves=task.must_haves, ) tgt_bb.create_task(moved_task) - src_bb.update_task_status(task_id, "cancelled", detail=f"Moved to {target_project}") + src_bb.update_task_status( + task_id, + "cancelled", + detail=f"Moved to {target_project}") moved_ids.insert(0, task_id) return {"ok": True, "moved_to": target_project, "moved_ids": moved_ids} diff --git a/src/api/toolchain_routes.py b/src/api/toolchain_routes.py index 666708a..3855077 100644 --- a/src/api/toolchain_routes.py +++ b/src/api/toolchain_routes.py @@ -46,7 +46,8 @@ _TTL_SECONDS = 7 * 24 * 3600 _idempotency_lock = asyncio.Lock() -def _is_duplicate(event: str, delivery: str, payload: Optional[Dict[str, Any]] = None) -> bool: +def _is_duplicate(event: str, delivery: str, + payload: Optional[Dict[str, Any]] = None) -> bool: """检查 Webhook 是否重复投递,自动清理过期条目。 双重去重策略: @@ -56,7 +57,8 @@ def _is_duplicate(event: str, delivery: str, payload: Optional[Dict[str, Any]] = """ now = time.time() # 清理过期条目 - while _delivery_timestamps and (now - _delivery_timestamps[0][0]) > _TTL_SECONDS: + while _delivery_timestamps and ( + now - _delivery_timestamps[0][0]) > _TTL_SECONDS: _, key = _delivery_timestamps.pop(0) _delivery_cache.discard(key) @@ -77,7 +79,11 @@ def _is_duplicate(event: str, delivery: str, payload: Optional[Dict[str, Any]] = content_hash = hashlib.sha256(content.encode()).hexdigest()[:16] content_key = f"content:{event}:{pr_num}:{sender}:{content_hash}" if content_key in _delivery_cache: - logger.info("Content-based duplicate detected: %s PR#%s by %s", event, pr_num, sender) + logger.info( + "Content-based duplicate detected: %s PR#%s by %s", + event, + pr_num, + sender) return True _delivery_cache.add(content_key) _delivery_timestamps.append((now, content_key)) @@ -137,8 +143,16 @@ async def _fetch_pr_files(repo: str, pr_number: int) -> Tuple[List[str], str]: last_error = str(e) if attempt < 2: await asyncio.sleep(0.5 * (attempt + 1)) - logger.warning("Retry %d/3 fetching PR files: %s/pulls/%d", attempt + 1, repo, pr_number) - logger.warning("Failed to fetch PR files after 3 retries: %s/pulls/%d - %s", repo, pr_number, last_error) + logger.warning( + "Retry %d/3 fetching PR files: %s/pulls/%d", + attempt + 1, + repo, + pr_number) + logger.warning( + "Failed to fetch PR files after 3 retries: %s/pulls/%d - %s", + repo, + pr_number, + last_error) return [], f"获取文件列表失败(重试3次): {last_error}" @@ -166,7 +180,6 @@ def _calc_risk_level(changed_files: List[str]) -> str: # --------------------------------------------------------------------------- - MAIL_PROJECT_ID = "_mail" @@ -252,7 +265,8 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None: pr = payload.get("pull_request") if not pr or not isinstance(pr, dict): - logger.warning("pull_request event missing pull_request field, skipping") + logger.warning( + "pull_request event missing pull_request field, skipping") return repo = _repo_fullname(payload) pr_number = pr.get("number", 0) @@ -266,7 +280,8 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None: if fetch_error: file_list = f"⚠️ {fetch_error}" else: - file_list = "\n".join(f"- {f}" for f in changed_files) if changed_files else "(无文件变更)" + file_list = "\n".join( + f"- {f}" for f in changed_files) if changed_files else "(无文件变更)" text = render_template("review_request", { "repo": repo, @@ -291,11 +306,13 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None: """ review = payload.get("review") if not review or not isinstance(review, dict): - logger.warning("pull_request_review event missing review field, skipping") + logger.warning( + "pull_request_review event missing review field, skipping") return pr = payload.get("pull_request") if not pr or not isinstance(pr, dict): - logger.warning("pull_request_review event missing pull_request field, skipping") + logger.warning( + "pull_request_review event missing pull_request field, skipping") return # 兼容两种 payload 格式提取 state @@ -319,7 +336,15 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None: pr_title = pr.get("title", "") pr_author = pr.get("user", {}).get("login", "unknown") # 兼容:org webhook 的 review 没有 user,从 sender 取 - reviewer = review.get("user", {}).get("login", "") or payload.get("sender", {}).get("login", "unknown") + reviewer = review.get( + "user", + {}).get( + "login", + "") or payload.get( + "sender", + {}).get( + "login", + "unknown") review_body = review.get("body", "") or review.get("content", "(无评论)") result_map = {"APPROVED": "通过 ✓", "REQUEST_CHANGES": "驳回 ✗"} @@ -366,7 +391,8 @@ async def _handle_issues(payload: Dict[str, Any]) -> None: logger.debug("Issue assigned but no assignee found, skipping") return - labels_list = [lbl.get("name", "") for lbl in (issue.get("labels") or [])] + labels_list = [lbl.get("name", "") + for lbl in (issue.get("labels") or [])] labels = ", ".join(labels_list) if labels_list else "(无标签)" issue_body = issue.get("body", "(无描述)") brief = issue_title[:20].replace(" ", "-").lower() @@ -417,7 +443,9 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None: # 已关闭的 Issue/PR 不再发送 CI 失败通知 if issue.get("state") == "closed": - logger.debug("Skipping CI failure notification for closed issue #%s", issue.get("number")) + logger.debug( + "Skipping CI failure notification for closed issue #%s", + issue.get("number")) return repo = _repo_fullname(payload) @@ -485,7 +513,8 @@ async def gitea_webhook( # 1. 签名验证 if not _verify_signature(body, x_gitea_signature): logger.warning("Webhook signature verification failed") - return Response(status_code=403, content="signature verification failed") + return Response(status_code=403, + content="signature verification failed") # 3. 解析 payload(提前解析,用于幂等检查) try: @@ -498,14 +527,18 @@ async def gitea_webhook( if x_gitea_event and x_gitea_delivery: async with _idempotency_lock: if _is_duplicate(x_gitea_event, x_gitea_delivery, payload): - logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery) + logger.debug( + "Duplicate webhook: %s/%s", + x_gitea_event, + x_gitea_delivery) return Response(status_code=200, content="duplicate") # 4. 查找 handler handler = _EVENT_HANDLERS.get(x_gitea_event or "") if not handler: logger.debug("Unhandled event type: %s", x_gitea_event) - return Response(status_code=200, content=f"unhandled event: {x_gitea_event}") + return Response(status_code=200, + content=f"unhandled event: {x_gitea_event}") # 5. 执行 handler try: diff --git a/src/blackboard/db.py b/src/blackboard/db.py index f94c88c..4a0c2a5 100644 --- a/src/blackboard/db.py +++ b/src/blackboard/db.py @@ -4,7 +4,6 @@ from __future__ import annotations import sqlite3 from pathlib import Path -from typing import Optional def init_db(db_path: Path) -> None: @@ -133,8 +132,10 @@ def _migrate_v28(conn: sqlite3.Connection) -> None: resolved_by TEXT, resolve_note TEXT )""") - conn.execute("CREATE INDEX IF NOT EXISTS idx_checkpoints_task ON checkpoints(task_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_checkpoints_status ON checkpoints(status)") + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_checkpoints_task ON checkpoints(task_id)") + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_checkpoints_status ON checkpoints(status)") # 4. outputs 扩展字段(M3 成果物) _safe_add_column(conn, "outputs", "file_name", "TEXT") @@ -189,18 +190,20 @@ TERMINAL_STATUSES = frozenset() # v3.1: 无终态,全靠 VALID_TRANSITIONS MANUAL_STATUSES = frozenset({"cancelled", "paused", "reviewing"}) VALID_TRANSITIONS = { - "pending": {"claimed", "paused", "blocked", "cancelled"}, - "claimed": {"working", "paused", "pending", "cancelled"}, - "working": {"review", "done", "blocked", "failed", "paused", "escalated", "waiting_human", "cancelled", "pending"}, # pending: Mail spawn 失败回退 - "paused": {"working", "claimed", "review", "escalated", "waiting_human", "cancelled"}, # 恢复到 resumed_from 记录的状态 - "review": {"done", "pending", "failed", "paused", "escalated", "waiting_human", "cancelled"}, - "blocked": {"pending", "escalated", "cancelled"}, - "failed": {"pending", "escalated", "cancelled"}, - "escalated": {"working", "pending", "paused", "cancelled"}, + "pending": {"claimed", "paused", "blocked", "cancelled"}, + "claimed": {"working", "paused", "pending", "cancelled"}, + # pending: Mail spawn 失败回退 + "working": {"review", "done", "blocked", "failed", "paused", "escalated", "waiting_human", "cancelled", "pending"}, + # 恢复到 resumed_from 记录的状态 + "paused": {"working", "claimed", "review", "escalated", "waiting_human", "cancelled"}, + "review": {"done", "pending", "failed", "paused", "escalated", "waiting_human", "cancelled"}, + "blocked": {"pending", "escalated", "cancelled"}, + "failed": {"pending", "escalated", "cancelled"}, + "escalated": {"working", "pending", "paused", "cancelled"}, "waiting_human": {"working", "done", "paused", "cancelled"}, - "done": {"cancelled", "reviewing"}, - "reviewing": {"done", "working", "cancelled"}, - "cancelled": {"pending"}, + "done": {"cancelled", "reviewing"}, + "reviewing": {"done", "working", "cancelled"}, + "cancelled": {"pending"}, } COMMENT_TYPES = frozenset({ @@ -224,7 +227,8 @@ EVENT_TYPES = frozenset({ OUTPUT_TYPES = frozenset({"code", "document", "data", "config", "other"}) -REVIEW_TYPES = frozenset({"plan_review", "output_review", "guardrail", "final_review"}) +REVIEW_TYPES = frozenset( + {"plan_review", "output_review", "guardrail", "final_review"}) VERDICT_TYPES = frozenset({"approved", "rejected", "needs_revision"}) EXPERIENCE_SOURCES = frozenset({ diff --git a/src/blackboard/models.py b/src/blackboard/models.py index 617588a..b6a2dbc 100644 --- a/src/blackboard/models.py +++ b/src/blackboard/models.py @@ -3,7 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional +from typing import Any, List, Optional @dataclass diff --git a/src/blackboard/operations.py b/src/blackboard/operations.py index 2d75f3e..0e67692 100644 --- a/src/blackboard/operations.py +++ b/src/blackboard/operations.py @@ -11,7 +11,6 @@ from typing import Any, Dict, List, Optional from .db import ( VALID_TRANSITIONS, - VALID_STATUSES, COMMENT_TYPES, EVENT_TYPES, OUTPUT_TYPES, @@ -84,7 +83,8 @@ class Blackboard: """获取单个任务""" conn = self._conn() try: - row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone() + row = conn.execute( + "SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone() return Task.from_row(row) if row else None finally: conn.close() @@ -129,7 +129,8 @@ class Blackboard: updates["completed_at"] = now # paused 也记录时间用于恢复 updates["resumed_from"] = old_status # 记录暂停前状态 elif new_status == "pending": - # 所有 →pending 转换都清空 assignee(与 ticker._transition_status L414 对齐) + # 所有 →pending 转换都清空 assignee(与 ticker._transition_status L414 + # 对齐) updates["assignee"] = None updates["claimed_at"] = None updates["current_agent"] = None @@ -693,7 +694,6 @@ class Blackboard: finally: conn.close() - # ── Checkpoint CRUD(M3) ── def create_checkpoint( @@ -709,7 +709,8 @@ class Blackboard: import uuid # BUG-33: 校验 payload 结构必须含 version 字段 if not isinstance(payload, dict) or "version" not in payload: - raise ValueError("payload must be a dict containing 'version' field") + raise ValueError( + "payload must be a dict containing 'version' field") cp_id = checkpoint_id or f"cp-{uuid.uuid4().hex[:8]}" conn = self._conn() try: @@ -966,7 +967,8 @@ class Blackboard: finally: conn.close() - def get_pending_mentions(self, max_retries: int = 5) -> List[Dict[str, Any]]: + def get_pending_mentions( + self, max_retries: int = 5) -> List[Dict[str, Any]]: """获取所有 pending 且未超过重试上限的 mentions""" conn = self._conn() try: @@ -1001,7 +1003,8 @@ class Blackboard: conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") - conn.execute("UPDATE mention_queue SET retry_count=retry_count+1 WHERE id=?", (mention_id,)) + conn.execute( + "UPDATE mention_queue SET retry_count=retry_count+1 WHERE id=?", (mention_id,)) conn.commit() return True finally: @@ -1012,7 +1015,8 @@ class Blackboard: conn = self._conn() try: conn.execute("BEGIN IMMEDIATE") - conn.execute("UPDATE mention_queue SET status='failed' WHERE id=?", (mention_id,)) + conn.execute( + "UPDATE mention_queue SET status='failed' WHERE id=?", (mention_id,)) conn.commit() return True finally: diff --git a/src/blackboard/queries.py b/src/blackboard/queries.py index 27364af..c6ccd65 100644 --- a/src/blackboard/queries.py +++ b/src/blackboard/queries.py @@ -132,7 +132,8 @@ class Queries: """任务详情聚合(含关联数据)""" conn = self._conn() try: - row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone() + row = conn.execute( + "SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone() if not row: return None task = dict(row) @@ -159,7 +160,8 @@ class Queries: finally: conn.close() - def task_events(self, task_id: str, limit: int = 50) -> List[Dict[str, Any]]: + def task_events(self, task_id: str, + limit: int = 50) -> List[Dict[str, Any]]: """任务事件列表""" conn = self._conn() try: @@ -265,7 +267,8 @@ class Queries: return "review" # 有 working/claimed → working - if status_counts.get("working", 0) > 0 or status_counts.get("claimed", 0) > 0: + if status_counts.get("working", 0) > 0 or status_counts.get( + "claimed", 0) > 0: return "working" # 有 pending → pending @@ -337,7 +340,8 @@ class Queries: # 当前活跃 stage active_stage = None for sp in stage_progress: - if sp["active"] > 0 or (sp["total"] > 0 and sp["done"] < sp["total"]): + if sp["active"] > 0 or ( + sp["total"] > 0 and sp["done"] < sp["total"]): if not active_stage and sp["done"] < sp["total"]: active_stage = sp["label"] diff --git a/src/blackboard/registry.py b/src/blackboard/registry.py index af1fafd..9cba663 100644 --- a/src/blackboard/registry.py +++ b/src/blackboard/registry.py @@ -119,7 +119,8 @@ class ProjectRegistry: finally: conn.close() - def list_projects(self, status: Optional[str] = None) -> Dict[str, Dict[str, Any]]: + def list_projects( + self, status: Optional[str] = None) -> Dict[str, Dict[str, Any]]: """列出项目""" conn = self._connect() try: @@ -178,7 +179,8 @@ class ProjectRegistry: status="deleted", ) - def physical_delete_project(self, project_id: str) -> Optional[Dict[str, Any]]: + def physical_delete_project( + self, project_id: str) -> Optional[Dict[str, Any]]: """物理删除项目(删目录 + 删 registry 条目)""" import shutil @@ -260,7 +262,8 @@ class ProjectRegistry: # 迁移(从 _registry.yaml) # =================================================================== - def discover_sanguo_projects(self, scan_dir: Optional[Path] = None) -> List[str]: + def discover_sanguo_projects( + self, scan_dir: Optional[Path] = None) -> List[str]: """扫描 sanguo_projects 开发目录,自动注册正式项目""" scan_dir = scan_dir or Path(os.environ.get( "SANGUO_PROJECTS_DIR", @@ -355,4 +358,3 @@ class ProjectRegistry: def reload(self) -> None: """兼容旧接口(SQLite 不需要 reload cache)""" - pass diff --git a/src/cli/blackboard.py b/src/cli/blackboard.py index 853332a..b604fc0 100644 --- a/src/cli/blackboard.py +++ b/src/cli/blackboard.py @@ -10,7 +10,7 @@ from typing import List, Optional from src.blackboard.operations import Blackboard from src.utils import get_data_root -from src.blackboard.models import Task, Comment, Output, Decision, Observation, Review, Experience +from src.blackboard.models import Task, Review from src.blackboard.queries import Queries from src.blackboard.registry import ProjectRegistry @@ -35,7 +35,9 @@ def _get_queries(project_id: str) -> Queries: def build_blackboard_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(prog="blackboard", description="Agent blackboard operations") + parser = argparse.ArgumentParser( + prog="blackboard", + description="Agent blackboard operations") sub = parser.add_subparsers(dest="command") # read @@ -206,7 +208,11 @@ def _cmd_comment(opts) -> int: def _cmd_decide(opts) -> int: bb = _get_bb(opts.project) - did = bb.add_decision(opts.task_id, opts.decider, opts.decision, opts.rationale) + did = bb.add_decision( + opts.task_id, + opts.decider, + opts.decision, + opts.rationale) print(f"Decision recorded: {did}") return 0 @@ -251,7 +257,8 @@ def _print_tasks(tasks, as_json: bool): def build_admin_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(prog="admin", description="Admin operations") + parser = argparse.ArgumentParser( + prog="admin", description="Admin operations") sub = parser.add_subparsers(dest="command") # project create @@ -262,7 +269,7 @@ def build_admin_parser() -> argparse.ArgumentParser: p_pc.add_argument("--description", default="") # project list - p_pl = sub.add_parser("project-list", help="List projects") + sub.add_parser("project-list", help="List projects") # project archive p_pa = sub.add_parser("project-archive", help="Archive project") @@ -300,7 +307,8 @@ def run_admin_cli(args: Optional[List[str]] = None) -> int: for pid, info in projects.items(): status = info.get("status", "?") agents = ",".join(info.get("agents", [])) - print(f" {pid} [{status}] {info.get('name', '')} agents: {agents}") + print( + f" {pid} [{status}] {info.get('name', '')} agents: {agents}") return 0 elif opts.command == "project-archive": diff --git a/src/daemon/bootstrap.py b/src/daemon/bootstrap.py index e5d5aca..ee2498c 100644 --- a/src/daemon/bootstrap.py +++ b/src/daemon/bootstrap.py @@ -11,8 +11,7 @@ A 类 Skill 由引擎确定性注入全文,不靠 Description 触发。 import logging import os -from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, List logger = logging.getLogger("moziplus-v2.bootstrap") @@ -28,12 +27,12 @@ class BootstrapBuilder: """L2 引擎注入层构建器(v2.1 四段式)""" ROLE_SKILL_MAP = { - "executor": "blackboard-executor", - "reviewer": "blackboard-reviewer", - "reviewer-simayi": "blackboard-reviewer-simayi", + "executor": "blackboard-executor", + "reviewer": "blackboard-reviewer", + "reviewer-simayi": "blackboard-reviewer-simayi", "reviewer-pangtong": "blackboard-reviewer-pangtong", - "planner": "blackboard-planner", - "claim": "blackboard-claim", + "planner": "blackboard-planner", + "claim": "blackboard-claim", } # 默认从环境变量或配置读取,fallback 到默认路径 @@ -62,7 +61,9 @@ class BootstrapBuilder: # 段 2: 前序产出(有依赖时注入) if task.get("depends_on_outputs"): - sections.append(self._format_prior_outputs(task["depends_on_outputs"])) + sections.append( + self._format_prior_outputs( + task["depends_on_outputs"])) # 段 3: 角色操作规范全文(通过 ROLE_SKILL_MAP 从 Skill 文件读取) skill_name = self.ROLE_SKILL_MAP.get(role) @@ -134,7 +135,8 @@ class BootstrapBuilder: """格式化前序产出摘要(段 2)""" parts = ["## 前序产出"] for out in outputs: - parts.append(f"- [{out.get('task_id', '?')}] {out.get('summary', '无摘要')}") + parts.append( + f"- [{out.get('task_id', '?')}] {out.get('summary', '无摘要')}") return "\n".join(parts) def _format_constraints(self, role: str) -> str: diff --git a/src/daemon/counter.py b/src/daemon/counter.py index b70c209..783ab2e 100644 --- a/src/daemon/counter.py +++ b/src/daemon/counter.py @@ -68,20 +68,23 @@ class ActiveAgentCounter: self._cooldown_until.pop(agent_id, None) return False - def set_cooldown(self, agent_id: str, seconds: Optional[float] = None) -> None: + def set_cooldown(self, agent_id: str, + seconds: Optional[float] = None) -> None: """设置冷却期(默认 120 秒)""" cd = seconds if seconds is not None else self._default_cooldown_seconds self._cooldown_until[agent_id] = time.time() + cd logger.info("Cooldown set for %s: %.0fs (until %.0f)", - agent_id, cd, self._cooldown_until[agent_id]) + agent_id, cd, self._cooldown_until[agent_id]) - async def can_acquire(self, agent_id: str, session_id: str = "main") -> bool: + async def can_acquire(self, agent_id: str, + session_id: str = "main") -> bool: """三层检查:cooldown → global → per agent → per session key""" if self.is_cooling_down(agent_id): return False if self._global_active >= self._max_global: return False - if self._agent_active.get(agent_id, 0) >= self._max_concurrent_sessions: + if self._agent_active.get( + agent_id, 0) >= self._max_concurrent_sessions: return False key = self._make_key(agent_id, session_id) if self._active_keys.get(key, 0) >= self._max_per_session: @@ -122,7 +125,8 @@ class ActiveAgentCounter: del self._active_keys[key] if agent_id in self._agent_active: - self._agent_active[agent_id] = max(0, self._agent_active[agent_id] - 1) + self._agent_active[agent_id] = max( + 0, self._agent_active[agent_id] - 1) if self._agent_active[agent_id] == 0: del self._agent_active[agent_id] diff --git a/src/daemon/dispatcher.py b/src/daemon/dispatcher.py index 4f9fa2b..077a8d2 100644 --- a/src/daemon/dispatcher.py +++ b/src/daemon/dispatcher.py @@ -14,7 +14,6 @@ from __future__ import annotations import json import logging import sqlite3 -from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional @@ -22,7 +21,7 @@ from typing import Any, Dict, List, Optional from src.blackboard.models import Task from src.blackboard.db import get_connection from src.daemon.spawner import AgentBusyError -from src.daemon.router import AgentRouter, RouteDecision +from src.daemon.router import AgentRouter logger = logging.getLogger("moziplus-v2.dispatcher") @@ -64,7 +63,8 @@ class Dispatcher: if self._legacy_mode: self.registered_agents = set(registered_agents or []) self.capability_map = capability_map or {} - logger.warning("Dispatcher running in legacy mode (no AgentRouter)") + logger.warning( + "Dispatcher running in legacy mode (no AgentRouter)") def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]: """调度决策(委托给 Router) @@ -124,16 +124,21 @@ class Dispatcher: """ # 安全红线检查(调度前拦截) # Mail 是 Agent 间通信,不做 guardrail 检查 - is_mail = project_config.get("project_id") == "_mail" if project_config else False + is_mail = project_config.get( + "project_id") == "_mail" if project_config else False if self.guardrails and not is_mail: violations = self.guardrails.check_task(task) - critical = [v for v in violations if v.action in ("block_and_notify", "terminate_and_escalate")] + critical = [ + v for v in violations if v.action in ( + "block_and_notify", + "terminate_and_escalate")] if critical: v = critical[0] logger.warning("Task '%s' blocked by guardrail: %s - %s", task.title, v.rule_id, v.message) # 写入黑板事件 - _routing_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else self.db_path + _routing_db = Path( + project_config["db_path"]) if project_config and "db_path" in project_config else self.db_path if _routing_db: self._record_routing(task, {"level": DispatchLevel.BLOCKED, "agent_id": "none", "reason": v.message}, "blocked", v.message, _routing_db) @@ -152,7 +157,8 @@ class Dispatcher: decision = self.decide(task, action_type) level = decision["level"] # 从 project_config 获取项目级 DB 路径(路由审计日志写入项目 DB) - _routing_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None + _routing_db = Path( + project_config["db_path"]) if project_config and "db_path" in project_config else None agent_id = decision["agent_id"] # v2.7.2: counter 检查移到 spawn_full_agent 内部 @@ -160,7 +166,8 @@ class Dispatcher: # 本地执行 if level == DispatchLevel.LOCAL: - self._record_routing(task, decision, "dispatched", None, _routing_db) + self._record_routing( + task, decision, "dispatched", None, _routing_db) return { "level": level.value, "agent_id": "daemon", @@ -172,7 +179,8 @@ class Dispatcher: # Full Agent / Escalate spawn if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): if not self.spawner: - self._record_routing(task, decision, "error", "No spawner", _routing_db) + self._record_routing( + task, decision, "error", "No spawner", _routing_db) return { "level": level.value, "agent_id": agent_id, @@ -183,9 +191,11 @@ class Dispatcher: try: # [v2.7.1] Mail: 标 working 移到 spawn_full_agent 内部(check 通过后、subprocess 前) - is_mail = project_config.get("project_id") == "_mail" if project_config else False + is_mail = project_config.get( + "project_id") == "_mail" if project_config else False if is_mail: - db_path = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None + db_path = Path( + project_config["db_path"]) if project_config and "db_path" in project_config else None # on_checks_passed: 所有检查通过后才标 working,检查失败不标 on_checks_passed = None @@ -194,6 +204,7 @@ class Dispatcher: _task_id = task.id _mail_db = db_path _disp = self + def _mail_on_checks_passed(): nonlocal _mail_marked_working if not _disp._mail_auto_working(_task_id, _mail_db): @@ -203,8 +214,9 @@ class Dispatcher: # 构建 spawn message message = self._build_spawn_message(task, agent_id, project_config, - mode=decision.get("mode", ""), - spawn_type=action_type or "executor") + mode=decision.get( + "mode", ""), + spawn_type=action_type or "executor") # v2.7.2: on_complete 只含业务逻辑,不含 counter.release # counter.release 由 spawn_full_agent 内部的 wrapped_on_complete 保证 @@ -218,14 +230,17 @@ class Dispatcher: def _mail_on_complete(aid, outcome): # 幻觉门控:检查是否有回复,自动标 done/failed try: - _dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves, outcome=outcome) + _dispatcher._mail_auto_complete( + _task_id, aid, _mail_db, _must_haves, outcome=outcome) except Exception as e: - logger.error("Mail %s: on_complete error: %s", _task_id, e) + logger.error( + "Mail %s: on_complete error: %s", _task_id, e) on_complete = _mail_on_complete else: # #02: Task 路径也加 on_complete(幻觉门控) _task_id = task.id - _task_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None + _task_db = Path( + project_config["db_path"]) if project_config and "db_path" in project_config else None _dispatcher = self _is_review = action_type == "review" @@ -239,10 +254,12 @@ class Dispatcher: try: # #07.2: 统一 crash 回退——executor 和 review 都回退 current_agent if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES and _task_db: - _dispatcher._rollback_current_agent(_task_db, _task_id, aid) + _dispatcher._rollback_current_agent( + _task_db, _task_id, aid) if _is_review: - if _task_db and outcome in ("completed", "session_revived"): + if _task_db and outcome in ( + "completed", "session_revived"): # #09: 读 verdict 决定后续动作 conn = get_connection(_task_db) try: @@ -254,14 +271,18 @@ class Dispatcher: conn.close() if review and review["verdict"] == "approved": - _dispatcher._mark_task_status(_task_db, _task_id, "done") - logger.info("Task %s: review approved, marking done", _task_id) + _dispatcher._mark_task_status( + _task_db, _task_id, "done") + logger.info( + "Task %s: review approved, marking done", _task_id) else: - # 非 approved → @mention 被审 agent(assignee,非 current_agent) + # 非 approved → @mention 被审 + # agent(assignee,非 current_agent) verdict_str = review["verdict"] if review else "未知" conn2 = get_connection(_task_db) try: - task_row = conn2.execute("SELECT assignee FROM tasks WHERE id=?", (_task_id,)).fetchone() + task_row = conn2.execute( + "SELECT assignee FROM tasks WHERE id=?", (_task_id,)).fetchone() finally: conn2.close() @@ -269,18 +290,21 @@ class Dispatcher: from src.blackboard.blackboard import Blackboard bb = Blackboard(_task_db) bb.add_comment(_task_id, "daemon", - f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳", - comment_type="review") + f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳", + comment_type="review") logger.info("Task %s: review verdict=%s, notified assignee=%s", _task_id, verdict_str, task_row["assignee"] if task_row else "?") # 不标 done,保持 review 状态 else: - logger.warning("Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome) + logger.warning( + "Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome) else: # executor: 三信号验证 → 标 review - _dispatcher._task_auto_complete(_task_id, _task_db) + _dispatcher._task_auto_complete( + _task_id, _task_db) except Exception as e: - logger.error("Task %s: on_complete error: %s", _task_id, e) + logger.error( + "Task %s: on_complete error: %s", _task_id, e) on_complete = _task_on_complete session_id = await self.spawner.spawn_full_agent( @@ -289,7 +313,8 @@ class Dispatcher: task_id=task.id, on_complete=on_complete, use_main_session=True, # #02: 统一投递到 main session - task_db_path=Path(project_config["db_path"]) if project_config and "db_path" in project_config else None, + task_db_path=Path( + project_config["db_path"]) if project_config and "db_path" in project_config else None, on_checks_passed=on_checks_passed, ) @@ -312,9 +337,14 @@ class Dispatcher: else: log_level = logger.debug detail_msg = f"Agent busy: {reason}" - log_level("Dispatch skipped %s for task %s: %s", agent_id, task.id, detail_msg) + log_level( + "Dispatch skipped %s for task %s: %s", + agent_id, + task.id, + detail_msg) # on_checks_passed 未执行(check 失败在它之前),working 未标,无需回退 - self._record_routing(task, decision, "skipped", detail_msg, _routing_db) + self._record_routing( + task, decision, "skipped", detail_msg, _routing_db) return { "level": level.value, "agent_id": agent_id, @@ -326,7 +356,8 @@ class Dispatcher: # on_checks_passed 已执行但 subprocess 失败 → 回退 working → pending if _mail_marked_working: self._mail_revert_to_pending(task.id, db_path) - self._record_routing(task, decision, "error", str(e), _routing_db) + self._record_routing( + task, decision, "error", str(e), _routing_db) return { "level": level.value, "agent_id": agent_id, @@ -385,9 +416,16 @@ class Dispatcher: def _build_delegate_prompt(self, task: Task, project_config: Optional[Dict]) -> str: """构建 delegate 模式的 prompt(协调员分配任务)""" - api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1' - api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083 - project_id = project_config.get("project_id", "") if project_config else "" + api_host = getattr( + self.spawner, + 'api_host', + '127.0.0.1') if self.spawner else '127.0.0.1' + api_port = getattr( + self.spawner, + 'api_port', + 8083) if self.spawner else 8083 + project_id = project_config.get( + "project_id", "") if project_config else "" return f"""你是任务协调员。请分析以下任务,决定最合适的执行者并分配。 @@ -478,7 +516,8 @@ class Dispatcher: # ── Legacy 兼容(deprecated) ── - def _legacy_decide(self, task: Task, action_type: str = "") -> Dict[str, Any]: + def _legacy_decide( + self, task: Task, action_type: str = "") -> Dict[str, Any]: """旧版三级决策树(兼容过渡用)""" LOCAL_ACTIONS = frozenset({ "L1_guardrail", "format_check", @@ -518,7 +557,8 @@ class Dispatcher: return registered[0] return "pangtong-fujunshi" - async def _legacy_dispatch(self, task, action_type="", project_config=None): + async def _legacy_dispatch( + self, task, action_type="", project_config=None): """旧版 dispatch(兼容过渡用) v2.7.2: counter acquire/release 移到 spawn_full_agent 内部。 @@ -541,15 +581,19 @@ class Dispatcher: # NOTE: _legacy_dispatch 仅在 router=None 时触发,当前配置不会进入。 # Mail 永远走 dispatch() 主路径(on_checks_passed 方案),不走此路径。 # 如果未来 legacy 路径被启用,需同步 on_checks_passed 逻辑。 - is_mail_legacy = project_config.get("project_id") == "_mail" if project_config else False + is_mail_legacy = project_config.get( + "project_id") == "_mail" if project_config else False if is_mail_legacy: - db_path_legacy = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None - if not db_path_legacy or not self._mail_auto_working(task.id, db_path_legacy): + db_path_legacy = Path( + project_config["db_path"]) if project_config and "db_path" in project_config else None + if not db_path_legacy or not self._mail_auto_working( + task.id, db_path_legacy): return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", "reason": "mail_auto_working_failed"} - if hasattr(self.spawner, 'build_spawn_message') and project_config: + if hasattr(self.spawner, + 'build_spawn_message') and project_config: retry_ctx = self._build_retry_context(task) message = self.spawner.build_spawn_message( task_id=task.id, title=task.title, @@ -576,9 +620,11 @@ class Dispatcher: def _mail_oc_legacy(aid, outcome): try: - _disp._mail_auto_complete(_t_id, aid, _m_db, _m_mh, outcome=outcome) + _disp._mail_auto_complete( + _t_id, aid, _m_db, _m_mh, outcome=outcome) except Exception as e: - logger.error("Mail %s: legacy on_complete error: %s", _t_id, e) + logger.error( + "Mail %s: legacy on_complete error: %s", _t_id, e) on_complete_legacy = _mail_oc_legacy session_id = await self.spawner.spawn_full_agent( @@ -586,14 +632,16 @@ class Dispatcher: task_id=task.id, on_complete=on_complete_legacy, use_main_session=True, # #02: 统一投递到 main session - task_db_path=Path(project_config["db_path"]) if project_config and "db_path" in project_config else None, + task_db_path=Path( + project_config["db_path"]) if project_config and "db_path" in project_config else None, ) return {"level": level.value, "agent_id": agent_id, "session_id": session_id, "status": "dispatched", "reason": decision["reason"]} except AgentBusyError as e: reason = getattr(e, 'reason', 'busy') - detail_msg = f"Session busy: {reason}" if reason.startswith("session_") else f"Agent busy: {reason}" + detail_msg = f"Session busy: {reason}" if reason.startswith( + "session_") else f"Agent busy: {reason}" return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "skipped", "reason": detail_msg} @@ -618,9 +666,11 @@ class Dispatcher: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") - row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() if not row: - logger.warning("Mail %s: cannot mark working (task not found)", task_id) + logger.warning( + "Mail %s: cannot mark working (task not found)", task_id) return False if row["status"] not in ("pending", "claimed"): logger.warning("Mail %s: cannot mark working (status=%s, expected pending/claimed)", @@ -631,7 +681,10 @@ class Dispatcher: (task_id,), ) conn.commit() - logger.info("Mail %s: auto-marked working (system, was %s)", task_id, row["status"]) + logger.info( + "Mail %s: auto-marked working (system, was %s)", + task_id, + row["status"]) return True finally: conn.close() @@ -645,30 +698,40 @@ class Dispatcher: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") - row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() if row and row["status"] == "working": conn.execute( "UPDATE tasks SET status='pending', updated_at=datetime('now') WHERE id=?", (task_id,), ) conn.commit() - logger.info("Mail %s: reverted working → pending (spawn failed)", task_id) + logger.info( + "Mail %s: reverted working → pending (spawn failed)", task_id) else: - logger.debug("Mail %s: skip revert (status=%s, expected working)", task_id, row["status"] if row else "not_found") + logger.debug( + "Mail %s: skip revert (status=%s, expected working)", + task_id, + row["status"] if row else "not_found") finally: conn.close() except Exception as e: - logger.error("Mail %s: failed to revert to pending: %s", task_id, e) + logger.error( + "Mail %s: failed to revert to pending: %s", + task_id, + e) def _mail_auto_complete(self, task_id: str, agent_id: str, - db_path: Path, must_haves: str, outcome=None) -> None: + db_path: Path, must_haves: str, outcome=None) -> None: """Mail 任务:on_complete 后自动标 done/failed(含幻觉门控)""" try: # 解析 performative performative = "request" try: meta = json.loads(must_haves) if must_haves else {} - performative = meta.get("performative", meta.get("type", "request")) + performative = meta.get( + "performative", meta.get( + "type", "request")) except Exception: pass @@ -677,13 +740,15 @@ class Dispatcher: has_reply = self._mail_check_reply(task_id, db_path) if not has_reply: # F3: 立刻标 failed(不等 ticker 30 分钟) - logger.error("Mail %s: no reply found, marking failed (no_reply_found)", task_id) + logger.error( + "Mail %s: no reply found, marking failed (no_reply_found)", task_id) for attempt in range(3): try: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") - row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() if not row: return if row["status"] == "working": @@ -697,19 +762,24 @@ class Dispatcher: json.dumps({"reason": "no_reply_found"}, ensure_ascii=False)), ) conn.commit() - logger.info("Mail %s: marked failed (no_reply_found)", task_id) + logger.info( + "Mail %s: marked failed (no_reply_found)", task_id) # Mail 失败通知:通知发件人 try: from src.daemon.mail_notify import notify_mail_failed - notify_mail_failed(db_path, task_id, "no_reply_found") + notify_mail_failed( + db_path, task_id, "no_reply_found") except Exception as ne: - logger.warning("Mail %s: failed to send no_reply_found notification: %s", task_id, ne) + logger.warning( + "Mail %s: failed to send no_reply_found notification: %s", task_id, ne) return finally: conn.close() except Exception as e: - logger.warning("Mail %s: failed attempt %d: %s", task_id, attempt + 1, e) - logger.error("Mail %s: all 3 failed attempts failed, leaving for ticker", task_id) + logger.warning( + "Mail %s: failed attempt %d: %s", task_id, attempt + 1, e) + logger.error( + "Mail %s: all 3 failed attempts failed, leaving for ticker", task_id) return # inform 类型:只对成功 outcome 标 done,失败 outcome 留 working 等 ticker 重投 @@ -717,7 +787,10 @@ class Dispatcher: if performative == "inform": INFORM_DONE_OUTCOMES = {"completed", "claimed", "no_reply"} if outcome not in INFORM_DONE_OUTCOMES: - logger.info("Mail %s: inform outcome=%s, skip auto-done", task_id, outcome) + logger.info( + "Mail %s: inform outcome=%s, skip auto-done", + task_id, + outcome) return # 标 done(重试 3 次) @@ -726,7 +799,8 @@ class Dispatcher: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") - row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() if not row: return if row["status"] == "working": @@ -741,9 +815,15 @@ class Dispatcher: finally: conn.close() except Exception as e: - logger.warning("Mail %s: done attempt %d failed: %s", task_id, attempt + 1, e) + logger.warning( + "Mail %s: done attempt %d failed: %s", + task_id, + attempt + 1, + e) # 3 次都失败,留 working 等 ticker 超时兜底 - logger.error("Mail %s: all 3 done attempts failed, leaving for ticker", task_id) + logger.error( + "Mail %s: all 3 done attempts failed, leaving for ticker", + task_id) except Exception as e: logger.error("Mail %s: auto-complete error: %s", task_id, e) @@ -788,7 +868,9 @@ class Dispatcher: logger.info("Task %s: verify passed, marking review", task_id) self._mark_task_status(db_path, task_id, "review") else: - logger.info("Task %s: verify not passed (no signal), leaving working", task_id) + logger.info( + "Task %s: verify not passed (no signal), leaving working", + task_id) except Exception as e: logger.error("Task %s: auto-complete error: %s", task_id, e) @@ -823,7 +905,8 @@ class Dispatcher: logger.error("Task %s: verify error: %s", task_id, e) return True - def _rollback_current_agent(self, db_path: Path, task_id: str, agent_id: str) -> None: + def _rollback_current_agent( + self, db_path: Path, task_id: str, agent_id: str) -> None: """#07.2: crash 后回退 current_agent 到 assignee,避免 exclude_current 卡死""" try: conn = get_connection(db_path) @@ -837,11 +920,18 @@ class Dispatcher: conn.commit() finally: conn.close() - logger.info("Task %s: rolled back current_agent from %s to assignee", task_id, agent_id) + logger.info( + "Task %s: rolled back current_agent from %s to assignee", + task_id, + agent_id) except Exception as e: - logger.warning("Task %s: failed to rollback current_agent: %s", task_id, e) + logger.warning( + "Task %s: failed to rollback current_agent: %s", + task_id, + e) - def _mark_task_status(self, db_path: Path, task_id: str, status: str) -> None: + def _mark_task_status(self, db_path: Path, + task_id: str, status: str) -> None: """更新任务状态 + 写审计事件""" try: conn = get_connection(db_path) @@ -857,7 +947,8 @@ class Dispatcher: ) conn.execute( "INSERT INTO events (task_id, agent, event_type, payload) VALUES (?, 'dispatcher', 'status_change', ?)", - (task_id, f'{{"from": "{old_status}", "to": "{status}", "source": "auto_complete"}}'), + (task_id, + f'{{"from": "{old_status}", "to": "{status}", "source": "auto_complete"}}'), ) conn.commit() finally: @@ -866,7 +957,7 @@ class Dispatcher: logger.error("Task %s: mark status error: %s", task_id, e) @staticmethod - def _check_crash_limit(task_id: str, db_path: pathlib.Path, limit: int = 3, + def _check_crash_limit(task_id: str, db_path: Path, limit: int = 3, window_minutes: int = 30) -> bool: """v2.8.1 Fix-3c: 检查 task 最近 window_minutes 内的 crash 次数是否超限。 diff --git a/src/daemon/experience.py b/src/daemon/experience.py index 663ef74..1745ded 100644 --- a/src/daemon/experience.py +++ b/src/daemon/experience.py @@ -14,7 +14,7 @@ import logging import re from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional logger = logging.getLogger("moziplus-v2.experience") @@ -68,7 +68,7 @@ class Experience: @classmethod def from_dict(cls, data: Dict[str, Any]) -> Experience: return cls(**{k: v for k, v in data.items() if k != "id"}, - experience_id=data.get("id")) + experience_id=data.get("id")) class ExperienceStore: @@ -284,7 +284,7 @@ class ExperienceDistiller: all_tags.append(task_type) results = self.store.search(tags=all_tags if all_tags else None, - query=query, limit=limit) + query=query, limit=limit) # 按置信度排序 results.sort(key=lambda e: e.confidence, reverse=True) diff --git a/src/daemon/guardrails.py b/src/daemon/guardrails.py index 8412b58..a0c1465 100644 --- a/src/daemon/guardrails.py +++ b/src/daemon/guardrails.py @@ -4,7 +4,7 @@ from __future__ import annotations import logging import re -from dataclasses import dataclass, field +from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional @@ -38,7 +38,9 @@ class GuardrailEngine: data = yaml.safe_load(f) self.rules = data.get("rules", []) self.settings = data.get("settings", {"enabled": True}) - logger.info("Loaded %d guardrail rules from %s", len(self.rules), config_path) + logger.info( + "Loaded %d guardrail rules from %s", len( + self.rules), config_path) def check_task(self, task: Any) -> List[GuardrailViolation]: """检查 Task 是否触犯安全红线(调度前调用)""" @@ -95,7 +97,8 @@ class GuardrailEngine: return violations - def check_token_usage(self, token_count: int) -> Optional[GuardrailViolation]: + def check_token_usage( + self, token_count: int) -> Optional[GuardrailViolation]: """检查 Token 消耗是否超标""" if not self.settings.get("enabled", True): return None @@ -103,7 +106,10 @@ class GuardrailEngine: for rule in self.rules: if rule["id"] != "high_token_usage": continue - threshold = rule.get("triggers", [{}])[0].get("token_threshold", 100000) + threshold = rule.get( + "triggers", [ + {}])[0].get( + "token_threshold", 100000) if token_count > threshold: return GuardrailViolation( rule_id=rule["id"], @@ -114,7 +120,8 @@ class GuardrailEngine: ) return None - def check_consecutive_failure(self, failure_count: int) -> Optional[GuardrailViolation]: + def check_consecutive_failure( + self, failure_count: int) -> Optional[GuardrailViolation]: """检查连续失败次数""" if not self.settings.get("enabled", True): return None @@ -122,7 +129,10 @@ class GuardrailEngine: for rule in self.rules: if rule["id"] != "consecutive_failure": continue - threshold = rule.get("triggers", [{}])[0].get("consecutive_failures", 3) + threshold = rule.get( + "triggers", [ + {}])[0].get( + "consecutive_failures", 3) if failure_count >= threshold: return GuardrailViolation( rule_id=rule["id"], diff --git a/src/daemon/health.py b/src/daemon/health.py index 50ca567..ae5b2c2 100644 --- a/src/daemon/health.py +++ b/src/daemon/health.py @@ -9,9 +9,9 @@ from __future__ import annotations import json import logging from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict -from src.blackboard.db import get_connection, init_db +from src.blackboard.db import get_connection from src.blackboard.queries import Queries logger = logging.getLogger("moziplus-v2.health") @@ -41,7 +41,7 @@ class HealthChecker: {"healthy": bool, "zombie": bool, "stale_ticks": int, "alert_written": bool, "resolved": bool} """ - db_key = str(db_path) + str(db_path) result: Dict[str, Any] = { "healthy": True, "zombie": False, @@ -58,7 +58,8 @@ class HealthChecker: # 用 event count 变化判断是否有真实变更 conn = queries._conn() try: - total_events = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] + conn.execute( + "SELECT COUNT(*) FROM events").fetchone()[0] non_tick_events = conn.execute( "SELECT COUNT(*) FROM events WHERE event_type != 'daemon_tick' " "AND event_type != 'agent_zombie_detected'" @@ -85,7 +86,8 @@ class HealthChecker: self._stale_ticks[project_id] = stale result["stale_ticks"] = stale - if stale >= self.zombie_threshold and not self._alerted.get(project_id): + if stale >= self.zombie_threshold and not self._alerted.get( + project_id): # 写告警 self._write_alert(db_path, project_id, tick_num, stale) self._alerted[project_id] = True @@ -126,7 +128,10 @@ class HealthChecker: conn.commit() finally: conn.close() - logger.warning("Zombie detected: %s (stale=%d)", project_id, stale_ticks) + logger.warning( + "Zombie detected: %s (stale=%d)", + project_id, + stale_ticks) def _write_resolution(self, db_path: Path, project_id: str, tick_num: int) -> None: diff --git a/src/daemon/inbox.py b/src/daemon/inbox.py index f76d9ca..89268cf 100644 --- a/src/daemon/inbox.py +++ b/src/daemon/inbox.py @@ -15,7 +15,6 @@ from __future__ import annotations import asyncio import json import logging -import os from pathlib import Path from typing import Any, Callable, Coroutine, Dict, List, Optional @@ -28,7 +27,8 @@ class InboxWatcher: def __init__( self, inbox_path: Path, - process_callback: Optional[Callable[[Dict[str, Any]], Coroutine[Any, Any, None]]] = None, + process_callback: Optional[Callable[[ + Dict[str, Any]], Coroutine[Any, Any, None]]] = None, watch_interval: float = 1.0, ): """ @@ -57,7 +57,7 @@ class InboxWatcher: self._running = True self._task = asyncio.create_task(self._loop()) logger.info("Inbox watcher started (path=%s, interval=%.1fs)", - self.inbox_path, self.watch_interval) + self.inbox_path, self.watch_interval) async def stop(self) -> None: """停止监听""" @@ -69,7 +69,7 @@ class InboxWatcher: except asyncio.CancelledError: pass logger.info("Inbox watcher stopped (processed=%d, errors=%d)", - self._total_processed, self._total_errors) + self._total_processed, self._total_errors) @property def is_running(self) -> bool: @@ -160,7 +160,8 @@ class InboxWatcher: line_no, type(event).__name__) self._total_errors += 1 except json.JSONDecodeError: - logger.warning("Inbox line %d: invalid JSON, skipping", line_no) + logger.warning( + "Inbox line %d: invalid JSON, skipping", line_no) self._total_errors += 1 return events diff --git a/src/daemon/mail_notify.py b/src/daemon/mail_notify.py index 020415e..77cc8a2 100644 --- a/src/daemon/mail_notify.py +++ b/src/daemon/mail_notify.py @@ -50,7 +50,9 @@ def notify_mail_failed(db_path: Path, original_mail_id: str, bb = Blackboard(db_path) original = bb.get_task(original_mail_id) if not original: - logger.warning("notify_mail_failed: original mail %s not found", original_mail_id) + logger.warning( + "notify_mail_failed: original mail %s not found", + original_mail_id) return # 解析原邮件元数据 @@ -58,7 +60,9 @@ def notify_mail_failed(db_path: Path, original_mail_id: str, # 防递归:系统通知邮件失败不再发通知 if meta.get("system_notify"): - logger.info("Mail %s: system notify mail failed, skipping recursive notification", original_mail_id) + logger.info( + "Mail %s: system notify mail failed, skipping recursive notification", + original_mail_id) return # 获取发件人(优先 assigned_by,fallback must_haves.from) @@ -67,7 +71,9 @@ def notify_mail_failed(db_path: Path, original_mail_id: str, title = original.title or "" if not from_agent: - logger.warning("notify_mail_failed: cannot determine sender for mail %s", original_mail_id) + logger.warning( + "notify_mail_failed: cannot determine sender for mail %s", + original_mail_id) return # 发件人不是有效 Agent(如 system)→ 通知庞统代处理,不触发广播 @@ -108,7 +114,10 @@ def notify_mail_failed(db_path: Path, original_mail_id: str, ) bb.create_task(notify_task) logger.info("Mail %s: sent failure notification to %s (original_sender=%s, reason=%s, notify_id=%s)", - original_mail_id, target_agent, from_agent, reason, notify_id) + original_mail_id, target_agent, from_agent, reason, notify_id) except Exception as e: - logger.warning("notify_mail_failed: failed to send notification for mail %s: %s", original_mail_id, e) + logger.warning( + "notify_mail_failed: failed to send notification for mail %s: %s", + original_mail_id, + e) diff --git a/src/daemon/review.py b/src/daemon/review.py index 667923b..2a92fb0 100644 --- a/src/daemon/review.py +++ b/src/daemon/review.py @@ -8,15 +8,12 @@ from __future__ import annotations import json import logging -import re -from datetime import datetime from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional from src.blackboard.models import Task from src.blackboard.operations import Blackboard -from src.blackboard.queries import Queries logger = logging.getLogger("moziplus-v2.review") @@ -151,12 +148,14 @@ class ReviewPipeline: ) -> ReviewResult: """Step 2: 格式合规""" if not outputs: - return ReviewResult("format", ReviewVerdict.FAIL, 0.0, "No outputs") + return ReviewResult( + "format", ReviewVerdict.FAIL, 0.0, "No outputs") issues = [] for out in outputs: # output.md 必须存在且非空 - if out.get("type") == "markdown" or out.get("path", "").endswith(".md"): + if out.get("type") == "markdown" or out.get( + "path", "").endswith(".md"): content = out.get("content", "") if not content and out.get("path"): try: @@ -167,7 +166,8 @@ class ReviewPipeline: issues.append(f"Output too short: {out.get('path', '?')}") # 结论 JSON 必须有效 - if out.get("type") == "json" or out.get("path", "").endswith(".json"): + if out.get("type") == "json" or out.get( + "path", "").endswith(".json"): content = out.get("content", "") if not content and out.get("path"): try: @@ -177,7 +177,8 @@ class ReviewPipeline: try: data = json.loads(content) if not isinstance(data, dict): - issues.append(f"JSON not a dict: {out.get('path', '?')}") + issues.append( + f"JSON not a dict: {out.get('path', '?')}") except (json.JSONDecodeError, TypeError): issues.append(f"Invalid JSON: {out.get('path', '?')}") @@ -194,7 +195,8 @@ class ReviewPipeline: ) -> ReviewResult: """Step 3: 内容质量(自定义检查)""" if not outputs: - return ReviewResult("quality", ReviewVerdict.FAIL, 0.0, "No outputs") + return ReviewResult( + "quality", ReviewVerdict.FAIL, 0.0, "No outputs") suggestions = [] total_score = 0.0 @@ -215,7 +217,8 @@ class ReviewPipeline: avg = 1.0 # 无自定义检查默认通过 verdict = ReviewVerdict.PASS if avg >= 0.6 else ReviewVerdict.FAIL - return ReviewResult("quality", verdict, round(avg, 2), suggestions=suggestions) + return ReviewResult("quality", verdict, round( + avg, 2), suggestions=suggestions) def _determine_gate( self, task: Task, results: List[ReviewResult] @@ -329,6 +332,7 @@ class RebuttalManager: return 0 try: observations = self.bb.get_observations(task_id=task_id) - return sum(1 for o in observations if "Rebuttal round" in (o.body or "")) + return sum( + 1 for o in observations if "Rebuttal round" in (o.body or "")) except Exception: return 0 diff --git a/src/daemon/router.py b/src/daemon/router.py index 6df6849..8a19941 100644 --- a/src/daemon/router.py +++ b/src/daemon/router.py @@ -107,7 +107,8 @@ class AgentRouter: # ── 快速路径 2: retry → 原执行者 ── if action_type == "retry": - current = task_info.get("current_agent") or task_info.get("assignee") + current = task_info.get( + "current_agent") or task_info.get("assignee") if current and current in self.agent_profiles: return RouteDecision( agent_id=current, @@ -119,7 +120,8 @@ class AgentRouter: # ── Mode B: Agent 声明式交接 ── next_cap = task_info.get("next_capability") if next_cap and self._validate_capability(next_cap): - current = task_info.get("current_agent") or task_info.get("assignee") + current = task_info.get( + "current_agent") or task_info.get("assignee") exclude = {current} if current else set() matched = self._match_capability(next_cap, exclude) if matched: @@ -129,7 +131,9 @@ class AgentRouter: mode="agent_handoff", latency_ms=int((time.monotonic() - start) * 1000), ) - logger.info("next_capability '%s' no match, delegate to coordinator", next_cap) + logger.info( + "next_capability '%s' no match, delegate to coordinator", + next_cap) # ── 快速路径 3: 生命周期流转查表 ── lifecycle = self.LIFECYCLE_CAPABILITY.get(action_type) @@ -140,7 +144,8 @@ class AgentRouter: exclude_current = lifecycle.get("exclude_current", False) exclude = set() if exclude_current: - current = task_info.get("current_agent") or task_info.get("assignee") + current = task_info.get( + "current_agent") or task_info.get("assignee") if current: exclude.add(current) matched = self._match_capability(cap, exclude) @@ -154,7 +159,8 @@ class AgentRouter: # ── 快速路径 4: 有 assignee 且非生命周期流转 ── assignee = task_info.get("assignee") - if assignee and assignee in self.agent_profiles and action_type not in ("review", "escalation"): + if assignee and assignee in self.agent_profiles and action_type not in ( + "review", "escalation"): return RouteDecision( agent_id=assignee, reason=f"Direct assignee: {assignee}", diff --git a/src/daemon/skill_system.py b/src/daemon/skill_system.py index 7774763..a54afb4 100644 --- a/src/daemon/skill_system.py +++ b/src/daemon/skill_system.py @@ -10,12 +10,11 @@ from __future__ import annotations import json import logging -import re from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple logger = logging.getLogger("moziplus-v2.skill") diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index c53a48e..915ef07 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -15,7 +15,7 @@ from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional -from src.blackboard.db import get_connection, init_db +from src.blackboard.db import get_connection logger = logging.getLogger("moziplus-v2.spawner") @@ -163,9 +163,12 @@ class AgentBusyError(Exception): #07: reason 字段区分具体原因,便于 dispatcher 层区分处理。 """ - def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None): + + def __init__(self, agent_id: str, reason: str = "busy", + detail: Optional[dict] = None): self.agent_id = agent_id - self.reason = reason # counter_blocked / session_locked / session_running / session_compacting / session_stuck + # counter_blocked / session_locked / session_running / session_compacting / session_stuck + self.reason = reason self.detail = detail or {} super().__init__(f"{agent_id}: {reason}") @@ -277,11 +280,15 @@ class AgentSpawner: # mail 任务用精简模板 if project_id == "_mail": - return self._build_mail_prompt(task_id, title, description, must_haves, agent_id) + return self._build_mail_prompt( + task_id, title, description, must_haves, agent_id) # 走 BootstrapBuilder 新路径 if self.bootstrap_builder and task is not None: - role_map = {"executor": "executor", "review": "reviewer", "discussion": "planner"} + role_map = { + "executor": "executor", + "review": "reviewer", + "discussion": "planner"} role = role_map.get(spawn_type, "executor") bootstrap_prompt = self.bootstrap_builder.build_for_task( task=task, @@ -293,13 +300,14 @@ class AgentSpawner: # 无 BootstrapBuilder 或无 task 对象 → 最小 fallback # 只保留任务上下文 + API 操作指令 - logger.warning("No BootstrapBuilder or task object, using minimal fallback") + logger.warning( + "No BootstrapBuilder or task object, using minimal fallback") return self._build_minimal_fallback( task_id, title, description, must_haves, project_id, agent_id) def _build_minimal_fallback(self, task_id, title, description, must_haves, - project_id, agent_id): + project_id, agent_id): """最小 fallback:只有任务上下文 + API 指令""" task_section = f"""## 任务 {title} @@ -311,7 +319,7 @@ class AgentSpawner: return task_section + "\n\n---\n\n" + api_section def _build_api_section(self, project_id: str, task_id: str, - agent_id: str) -> str: + agent_id: str) -> str: """构建 API 回写操作指令(BootstrapBuilder 模式下补充)""" # mail 任务直接 done,不走 review success_status = '"done"' if project_id == "_mail" else '"review"' @@ -337,8 +345,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta """ def _build_discussion_prompt(self, task_id: str, title: str, - description: str, must_haves: str, - project_id: str, agent_id: str) -> str: + description: str, must_haves: str, + project_id: str, agent_id: str) -> str: """构建讨论类 spawn prompt(§3.3 框架 + Boids)""" goal_snapshot = description or title constraints = must_haves or "(无特殊约束)" @@ -368,7 +376,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta if not self.guardrails: return "无特殊限制" try: - return "、".join(r.get("name", r.get("rule_id", "")) for r in self.guardrails.rules[:6]) + return "、".join(r.get("name", r.get("rule_id", "")) + for r in self.guardrails.rules[:6]) except Exception: return "无特殊限制" @@ -379,9 +388,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta return router.agent_profiles.get(agent_id) return None - def _build_mail_prompt(self, task_id: str, title: str, description: str, - must_haves: str, agent_id: str) -> str: + must_haves: str, agent_id: str) -> str: """构建 Mail 专用精简模板""" # 解析 must_haves 获取 from 和 performative from_agent = agent_id @@ -389,7 +397,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta try: meta = json.loads(must_haves) if must_haves else {} from_agent = meta.get("from", agent_id) - performative = meta.get("performative", meta.get("type", "request")) + performative = meta.get( + "performative", meta.get( + "type", "request")) except Exception: pass @@ -472,7 +482,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta self._revive_session(agent_id) elif pre_state.get("status") == "running" and not pre_state.get("lock_pid_alive"): # status=running 但 lock PID 已死 → 假死,revive - logger.warning("Phase 0: %s status=running but lock PID dead, reviving", agent_id) + logger.warning( + "Phase 0: %s status=running but lock PID dead, reviving", + agent_id) self._revive_session(agent_id) # Phase 1: Counter acquire(互斥锁) @@ -487,12 +499,15 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta if use_main_session: session_state = self._check_session_state(agent_id) logger.info("Phase 2 session check for %s: status=%s lock_pid=%s lock_pid_alive=%s compact=%s", - agent_id, session_state.get('status'), session_state.get('lock_pid'), + agent_id, session_state.get( + 'status'), session_state.get('lock_pid'), session_state.get('lock_pid_alive'), session_state.get('recent_compact')) blockers = [] - if session_state.get("lock_pid_alive") and not session_state.get("lock_expired"): - blockers.append(("session_locked", session_state.get("lock_pid"))) + if session_state.get( + "lock_pid_alive") and not session_state.get("lock_expired"): + blockers.append( + ("session_locked", session_state.get("lock_pid"))) if session_state.get("status") == "running": if session_state.get("lock_pid_alive"): # 真 running:外部进程占用 @@ -515,7 +530,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta # Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检) # 此场景应被 Phase 0 提前修复,这里做兜底 - if session_state.get("status") == "running" and not session_state.get("lock_pid_alive"): + if session_state.get("status") == "running" and not session_state.get( + "lock_pid_alive"): logger.warning("Phase 2.5: %s status=running + lock dead (should be caught in Phase 0), reviving", agent_id) self._revive_session(agent_id) @@ -538,7 +554,10 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta raise if self.dry_run: - logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, _sid_key) + logger.info( + "[DRY RUN] Would spawn agent %s (session=%s)", + agent_id, + _sid_key) self._register_session(_sid_key, agent_id, task_id, pid=None) return _sid_key @@ -554,7 +573,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta if asyncio.iscoroutine(result): await result except Exception: - logger.warning("Business on_complete failed for %s", aid, exc_info=True) + logger.warning( + "Business on_complete failed for %s", aid, exc_info=True) cmd = [ "openclaw", "agent", @@ -575,7 +595,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta stderr=asyncio.subprocess.PIPE, ) self._register_session(session_id, agent_id, task_id, proc.pid, - broadcast_task_ids=broadcast_task_ids) + broadcast_task_ids=broadcast_task_ids) logger.info("Spawned agent %s (session=%s, pid=%d)", agent_id, session_id, proc.pid) @@ -593,7 +613,11 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta if self.counter: self.counter.release(agent_id, _sid_key) logger.exception("Failed to spawn agent %s", agent_id) - self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e)) + self._record_attempt( + task_id, + agent_id, + "spawn_failed", + error=str(e)) raise async def spawn_subagent( @@ -609,7 +633,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta session_id = str(uuid.uuid4()) if self.dry_run: - logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id) + logger.info( + "[DRY RUN] Would spawn subagent (session=%s)", + session_id) self._register_session(session_id, "subagent", task_id, pid=None) return session_id @@ -729,10 +755,16 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ agent_id, session_id, json_result) # 查任务实际状态 - task_status = self._get_task_status(db_path, task_id) if task_id else None + task_status = self._get_task_status( + db_path, task_id) if task_id else None # 分类 - cls = self._classify_outcome(exit_code, json_result, stderr_text, task_status, stdout_text) + cls = self._classify_outcome( + exit_code, + json_result, + stderr_text, + task_status, + stdout_text) outcome = cls["outcome"] # 更新 session 状态 @@ -761,17 +793,21 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ agent_id, session_id, outcome, exit_code, task_status) # 广播反馈追踪(Phase 1 bug fix) - if task_id == "broadcast" and hasattr(self, '_ticker') and self._ticker: + if task_id == "broadcast" and hasattr( + self, '_ticker') and self._ticker: # 广播任务:从 session 信息取真实 task_id 列表,逐一回调 tracker sess_info = self._sessions.get(session_id or "main", {}) bt_ids = sess_info.get("broadcast_task_ids") or [] # 广播场景一律标 no_reply:Agent 只 claim 一个任务, # 其余任务的 tracker 不能被 claimed 清除 for real_task_id in bt_ids: - self._ticker.record_broadcast_response(real_task_id, agent_id, "no_reply") + self._ticker.record_broadcast_response( + real_task_id, agent_id, "no_reply") elif task_id and hasattr(self, '_ticker') and self._ticker: - outcome_str = "claimed" if cls.get("status") == "ok" else "no_reply" - self._ticker.record_broadcast_response(task_id, agent_id, outcome_str) + outcome_str = "claimed" if cls.get( + "status") == "ok" else "no_reply" + self._ticker.record_broadcast_response( + task_id, agent_id, outcome_str) if cls["should_retry"]: # cooldown: 新增的可恢复场景(A14/A15/A16/A8/A10) @@ -850,14 +886,24 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ # v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间 if outcome == "crashed" and self.counter: self.counter.set_cooldown(agent_id, seconds=60) - logger.info("Crash cooldown set for %s: 60s (outcome=%s)", agent_id, outcome) + logger.info( + "Crash cooldown set for %s: 60s (outcome=%s)", + agent_id, + outcome) elif outcome in ("compact_failed", "process_crash", "session_stuck", - "compact_hanging", "agent_error", "compact_interrupted") and self.counter: + "compact_hanging", "agent_error", "compact_interrupted") and self.counter: self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟 - logger.info("Error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome) + logger.info( + "Error cooldown set for %s: 300s (outcome=%s)", + agent_id, + outcome) # F1: 不可恢复 outcome → 立刻标 failed + 写黑板 - if outcome in ("auth_failed", "agent_error") and db_path and task_id: - logger.error("Task %s: unrecoverable outcome=%s, marking failed immediately", task_id, outcome) + if outcome in ("auth_failed", + "agent_error") and db_path and task_id: + logger.error( + "Task %s: unrecoverable outcome=%s, marking failed immediately", + task_id, + outcome) self._mark_task(db_path, task_id, "failed", { "reason": outcome, "stderr_preview": (stderr_text or "")[:500], @@ -881,13 +927,16 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ except Exception: pass - stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") + # stderr collected but not used in this handler + # (kept for potential future diagnostics) + b"".join(stderr_chunks).decode("utf-8", errors="replace") # 检查 session 状态 state = self._check_session_state(agent_id) # B1: 假死 - 先复活,连续假死 ≥2 次再 failed - if state.get("status") == "running" and not state.get("lock_pid_alive", True): + if state.get("status") == "running" and not state.get( + "lock_pid_alive", True): # 假死计数 stuck_count = self._stuck_counts.get(task_id, 0) + 1 self._stuck_counts[task_id] = stuck_count @@ -913,7 +962,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ await self._do_on_complete_async(on_complete, agent_id, "session_revived") else: # 复活失败 → 标 failed - logger.error("Agent %s revive failed, marking failed", agent_id) + logger.error( + "Agent %s revive failed, marking failed", agent_id) self._mark_task(db_path, task_id, "failed", {"reason": "revive_failed", "stuck_count": stuck_count, "diagnostics": state}) @@ -994,7 +1044,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ "SELECT status FROM tasks WHERE id=?", (task_id,) ).fetchone() # Bug-6 fix: pending 不是终态 - if row and row["status"] in ("done", "failed", "cancelled", "review"): + if row and row["status"] in ( + "done", "failed", "cancelled", "review"): logger.info("Retry skip: task %s already %s (agent=%s)", task_id, row["status"], agent_id) # on_complete = wrapped_on_complete,会 release counter @@ -1003,7 +1054,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ finally: conn.close() except Exception: - logger.warning("Retry status check failed for %s, proceeding", task_id) + logger.warning( + "Retry status check failed for %s, proceeding", task_id) # 直接读写 tasks 表的 retry_count if retry_field == "retry_count" and db_path and task_id: @@ -1023,7 +1075,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ finally: conn.close() except Exception: - logger.exception("Failed to update retry_count for task %s", task_id) + logger.exception( + "Failed to update retry_count for task %s", task_id) count = 1 else: retry_counts = self._get_retry_counts(db_path, task_id) @@ -1107,7 +1160,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ """ text = stdout_text.strip() if not text: - return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []} + return {"status": None, "summary": None, "fallback_used": False, + "fallback_reason": None, "payloads": []} try: data = json.loads(text) except json.JSONDecodeError: @@ -1119,7 +1173,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ except json.JSONDecodeError: continue else: - return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []} + return {"status": None, "summary": None, "fallback_used": False, + "fallback_reason": None, "payloads": []} # 从 data.result.meta.executionTrace 取 fallback 信息 result = data.get("result", {}) @@ -1135,7 +1190,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ } @staticmethod - def _get_task_status(db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]: + def _get_task_status( + db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]: """查任务实际 API 状态""" if not db_path or not task_id: return None @@ -1152,7 +1208,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ return None @staticmethod - def _get_task_info(db_path: Optional[Path], task_id: Optional[str]) -> Optional[dict]: + def _get_task_info(db_path: Optional[Path], + task_id: Optional[str]) -> Optional[dict]: """查任务基本信息""" if not db_path or not task_id: return None @@ -1160,7 +1217,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ conn = get_connection(db_path) try: row = conn.execute( - "SELECT id, title, status FROM tasks WHERE id=?", (task_id,) + "SELECT id, title, status FROM tasks WHERE id=?", ( + task_id,) ).fetchone() if not row: return None @@ -1192,7 +1250,9 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ sessions[main_key] = main_session with open(sessions_path, "w") as f: json.dump(sessions, f, indent=2) - logger.info("Revived %s: sessions.json status changed running→idle", agent_id) + logger.info( + "Revived %s: sessions.json status changed running→idle", + agent_id) # #07 O4: 同时清理残留 lock 文件 sf = main_session.get("sessionFile", "") if sf: @@ -1200,7 +1260,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ if lock_path.exists(): try: lock_path.unlink() - logger.info("Cleaned stale lock for %s: %s", agent_id, lock_path.name) + logger.info( + "Cleaned stale lock for %s: %s", + agent_id, + lock_path.name) except Exception: pass return True @@ -1209,7 +1272,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ return False @staticmethod - def _check_recent_compaction_jsonl(session_file: str, window_seconds: int = 900) -> bool: + def _check_recent_compaction_jsonl( + session_file: str, window_seconds: int = 900) -> bool: """v2.8.2 Fix-2: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。 比 compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录, @@ -1219,7 +1283,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ 实测 50KB 在长对话中不够(compact 记录被推出窗口导致漏检)。 正常扫描量不变:从尾部往前扫,遇到超过 15min 的 timestamp 即 break。 """ - if not session_file or not pathlib.Path(session_file).exists(): + if not session_file or not Path(session_file).exists(): return False try: from datetime import datetime, timezone @@ -1241,7 +1305,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ ts = obj.get("timestamp", "") if ts: try: - ct = datetime.fromisoformat(ts.replace("Z", "+00:00")) + ct = datetime.fromisoformat( + ts.replace("Z", "+00:00")) if (now - ct).total_seconds() < window_seconds: return True except (ValueError, TypeError): @@ -1265,7 +1330,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1), 替代失效的 compactionCheckpoints 检测。 """ - result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False} + result = { + "status": "unknown", + "lock_pid": None, + "lock_pid_alive": False, + "recent_compact": False} sessions_path = Path(os.environ.get( "OPENCLAW_HOME", str(Path.home() / ".openclaw") )) / "agents" / agent_id / "sessions" / "sessions.json" @@ -1304,8 +1373,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ created_at_str = lock_data.get("createdAt", "") if created_at_str: from datetime import datetime as _dt, timezone as _tz - created_dt = _dt.fromisoformat(created_at_str.replace("Z", "+00:00")) - elapsed = (_dt.now(_tz.utc) - created_dt).total_seconds() + created_dt = _dt.fromisoformat( + created_at_str.replace("Z", "+00:00")) + elapsed = (_dt.now(_tz.utc) - + created_dt).total_seconds() if elapsed > 1800: # 30 minutes result["lock_pid_alive"] = False result["lock_expired"] = True @@ -1318,8 +1389,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ # v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描 # 只在 agent 非空闲时才扫描(减少不必要 I/O) - if result["status"] not in ("done", "idle", "unknown", None) and sf: - result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf) + if result["status"] not in ( + "done", "idle", "unknown", None) and sf: + result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl( + sf) except Exception: pass return result @@ -1364,14 +1437,17 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ # A15/A16: stderr 含 network/compact 关键字 → 可恢复 if stderr_text: stderr_lower = stderr_text.lower() - if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]): + if any(kw in stderr_lower for kw in [ + "econnrefused", "etimedout", "gateway closed", "econnreset"]): return {"outcome": "gateway_unreachable", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} - if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]): + if any(kw in stderr_lower for kw in [ + "compaction-diag", "context-overflow"]): return {"outcome": "compact_interrupted", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} # A17: 真正的 crash → 保持 working,ticker 兜底 - return {"outcome": "crashed", "should_retry": False, "original": "process_crash"} + return {"outcome": "crashed", "should_retry": False, + "original": "process_crash"} # A13 revised: stdout 为空但 exit=0 → 信任进程退出码,视为正常完成 # 实测发现 openclaw session=None + exit=0 是正常场景(inform 通知等) @@ -1382,25 +1458,32 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ # A7-A12: status=error → 不续杯,stderr 辅助分类 if status == "error": stderr_lower = stderr_text.lower() - if any(kw in stderr_lower for kw in ["401", "403", "unauthorized", "auth"]): + if any(kw in stderr_lower for kw in [ + "401", "403", "unauthorized", "auth"]): return {"outcome": "auth_failed", "should_retry": False} - if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]): + if any(kw in stderr_lower for kw in [ + "econnrefused", "etimedout", "gateway closed", "econnreset"]): return {"outcome": "gateway_unreachable", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} - if any(kw in stderr_lower for kw in ["rate_limit", "500", "503", "api error"]): + if any(kw in stderr_lower for kw in [ + "rate_limit", "500", "503", "api error"]): return {"outcome": "api_error", "should_retry": False} - if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]): + if any(kw in stderr_lower for kw in [ + "compaction-diag", "context-overflow"]): return {"outcome": "compact_failed", "should_retry": False} - if any(kw in stderr_lower for kw in ["lock", "busy", "concurrent", "lane task error"]): + if any(kw in stderr_lower for kw in [ + "lock", "busy", "concurrent", "lane task error"]): return {"outcome": "lock_conflict", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} return {"outcome": "agent_error", "should_retry": False} # 兜底:status 未知值 - return {"outcome": "agent_error", "should_retry": False, "original": "unknown_status"} + return {"outcome": "agent_error", + "should_retry": False, "original": "unknown_status"} @staticmethod - def _get_retry_counts(db_path: Optional[Path], task_id: Optional[str]) -> dict: + def _get_retry_counts( + db_path: Optional[Path], task_id: Optional[str]) -> dict: """从最新 task_attempt 的 metadata 读计数器""" defaults = {"retry_count": 0, "connect_retry_count": 0, "api_retry_count": 0, "lock_retry_count": 0, @@ -1426,7 +1509,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ return defaults def _update_retry_counts(self, db_path: Optional[Path], - task_id: Optional[str], counts: dict): + task_id: Optional[str], counts: dict): """将 retry counts 写回最新 task_attempt 的 metadata""" if not db_path or not task_id: return @@ -1440,7 +1523,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ (task_id,) ).fetchone() if row: - meta = json.loads(row["metadata"]) if row["metadata"] else {} + meta = json.loads( + row["metadata"]) if row["metadata"] else {} meta.update(counts) conn.execute( "UPDATE task_attempts SET metadata=? WHERE rowid=?", @@ -1450,7 +1534,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ finally: conn.close() except Exception: - logger.exception("Failed to update retry counts for task %s", task_id) + logger.exception( + "Failed to update retry counts for task %s", task_id) def _mark_task(self, db_path: Optional[Path], task_id: Optional[str], status: str, detail: Optional[dict] = None): @@ -1468,7 +1553,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ if detail: conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", - (task_id, "daemon", status, json.dumps(detail, ensure_ascii=False)) + (task_id, "daemon", status, json.dumps( + detail, ensure_ascii=False)) ) conn.commit() finally: @@ -1486,10 +1572,13 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ from src.blackboard.operations import Blackboard bb = Blackboard(db_path) cid = bb.add_comment(task_id, "daemon", - f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入", - comment_type="system") + f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入", + comment_type="system") bb.record_mentions(cid, task_id, ["pangtong-fujunshi"]) - logger.info("Task %s: failure notified pangtong via comment+mention (reason=%s)", task_id, reason) + logger.info( + "Task %s: failure notified pangtong via comment+mention (reason=%s)", + task_id, + reason) except Exception as e: logger.warning("Task %s: failed to notify: %s", task_id, e) except Exception: @@ -1518,7 +1607,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ if asyncio.iscoroutine(result): await result except Exception: - logger.warning("on_complete callback failed for %s", agent_id, exc_info=True) + logger.warning( + "on_complete callback failed for %s", + agent_id, + exc_info=True) def _register_session( self, @@ -1596,7 +1688,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ def get_session_by_agent(self, agent_id: str) -> Optional[Dict[str, Any]]: """v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)""" for sid, info in self._sessions.items(): - if info.get("agent_id") == agent_id and info.get("status") == "running": + if info.get("agent_id") == agent_id and info.get( + "status") == "running": return info return None diff --git a/src/daemon/sse.py b/src/daemon/sse.py index d3f960b..2e53e83 100644 --- a/src/daemon/sse.py +++ b/src/daemon/sse.py @@ -9,14 +9,11 @@ from __future__ import annotations import asyncio import json import logging -import subprocess import uuid from datetime import datetime from enum import Enum -from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Set +from typing import Any, Dict, List, Optional -from src.blackboard.models import Event logger = logging.getLogger("moziplus-v2.sse") @@ -52,7 +49,8 @@ class SSEEvent: """格式化为 SSE 协议文本""" lines = [f"id: {self.id}"] lines.append(f"event: {self.event_type}") - lines.append(f"data: {json.dumps(self.data, ensure_ascii=False, default=str)}") + lines.append( + f"data: {json.dumps(self.data, ensure_ascii=False, default=str)}") return "\n".join(lines) + "\n\n" diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index 6a75264..7796bd6 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -21,7 +21,6 @@ from dataclasses import dataclass, field as dc_field from src.blackboard.operations import Blackboard from src.blackboard.db import get_connection -from src.blackboard.models import Task from src.daemon.spawner import AgentBusyError from src.blackboard.queries import Queries from src.blackboard.registry import ProjectRegistry @@ -32,9 +31,11 @@ class BroadcastRound: """追踪单个任务的广播状态""" task_id: str notified_agents: set = dc_field(default_factory=set) # 已 spawn 过的 Agent - responded_agents: set = dc_field(default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY) + responded_agents: set = dc_field( + default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY) round_number: int = 0 # 当前第几轮(0=未开始,1=第1轮) + logger = logging.getLogger("moziplus-v2.ticker") @@ -46,7 +47,8 @@ class Ticker: registry: ProjectRegistry, tick_interval: float = 30.0, max_ticks: Optional[int] = None, - on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None, + on_tick_complete: Optional[Callable[[], + Coroutine[Any, Any, None]]] = None, dispatcher: Optional[Any] = None, spawner: Optional[Any] = None, max_dispatch_per_tick: int = 3, @@ -194,7 +196,10 @@ class Ticker: pr = await self._tick_project(project_id, project_info) results["projects"][project_id] = pr except Exception as e: - logger.exception("Tick %d project %s error", tick_num, project_id) + logger.exception( + "Tick %d project %s error", + tick_num, + project_id) results["projects"][project_id] = {"error": str(e)} # 虚拟项目 _general:不在 registry 但需要调度 @@ -223,7 +228,10 @@ class Ticker: logger.exception("Tick %d _mail error", tick_num) results["projects"]["_mail"] = {"error": str(e)} - logger.debug("Tick %d complete: %d projects", tick_num, len(active_projects)) + logger.debug( + "Tick %d complete: %d projects", + tick_num, + len(active_projects)) if self.on_tick_complete: try: @@ -314,7 +322,8 @@ class Ticker: # 8. 健康检查(僵尸检测) if self.health_checker: try: - self.health_checker.check(project_id, db_path, self._tick_count) + self.health_checker.check( + project_id, db_path, self._tick_count) except Exception as e: logger.warning("HealthChecker error for %s: %s", project_id, e) @@ -335,7 +344,8 @@ class Ticker: task_id=t.id, task_title=t.title, task_type=t.task_type ) except Exception as e: - logger.warning("ExperienceDistiller error for %s: %s", project_id, e) + logger.warning( + "ExperienceDistiller error for %s: %s", project_id, e) # 10. 扫描后状态 result["summary_after"] = queries.task_summary() @@ -375,7 +385,8 @@ class Ticker: (computed, pid), ) refreshed.append(pid) - logger.info("Parent %s status aggregated: → %s", pid, computed) + logger.info( + "Parent %s status aggregated: → %s", pid, computed) if refreshed: conn.commit() @@ -391,7 +402,7 @@ class Ticker: MAX_ROUNDS = 5 # §4.5 防无限循环 async def _check_round_complete(self, db_path: Path, - project_id: str) -> List[str]: + project_id: str) -> List[str]: """检测 parent task 下所有 sub task 终态 → spawn 庞统 review 流程(§4.4): @@ -462,7 +473,7 @@ class Ticker: "Round %d review spawned for parent %s (subs: %s)", new_round, parent_id, summary ) - except Exception as e: + except Exception: logger.exception("Round check error for parent %s", parent_id) return reviewed @@ -531,9 +542,9 @@ Parent Task ID: {parent_task.id} """ async def _spawn_pangtong_review(self, parent_task, - review_prompt: str, - project_id: str, - new_round: int = 0) -> bool: + review_prompt: str, + project_id: str, + new_round: int = 0) -> bool: """Spawn 庞统进行 review 流程: @@ -543,7 +554,7 @@ Parent Task ID: {parent_task.id} """ try: agent_id = "pangtong-fujunshi" - session_id = f"review-{parent_task.id}-r{new_round}" + f"review-{parent_task.id}-r{new_round}" # 构造 on_complete 回调:解析庞统结论,更新 parent 状态 async def _on_review_complete(aid: str, outcome: str): @@ -555,7 +566,8 @@ Parent Task ID: {parent_task.id} latest_meta = None latest_time = "" for sid, sess in self.spawner._sessions.items(): - if sess.get("agent_id") == agent_id and sess.get("meta"): + if sess.get( + "agent_id") == agent_id and sess.get("meta"): t = sess.get("completed_at", "") if t > latest_time: latest_time = t @@ -586,8 +598,10 @@ Parent Task ID: {parent_task.id} self._set_parent_reviewing(parent_task.id, project_id) return True return False - except Exception as e: - logger.exception("Failed to spawn pangtong review for %s", parent_task.id) + except Exception: + logger.exception( + "Failed to spawn pangtong review for %s", + parent_task.id) return False def _set_parent_reviewing(self, parent_id: str, project_id: str): @@ -603,14 +617,14 @@ Parent Task ID: {parent_task.id} (parent_id,)) conn.commit() logger.info("Parent %s → reviewing (round review in progress)", - parent_id) + parent_id) finally: conn.close() except Exception: logger.exception("Failed to set parent %s to reviewing", parent_id) def _handle_review_conclusion(self, parent_id: str, project_id: str, - review_text: str, round_num: int): + review_text: str, round_num: int): """解析庞统 review 结论,更新 parent 状态 review_text 是庞统回复的文本(从 spawner session meta payloads 拼接)。 @@ -619,7 +633,8 @@ Parent Task ID: {parent_task.id} conn = get_connection(db_path) try: # 解析 GOAL_ACHIEVED - is_achieved = bool(review_text and "GOAL_ACHIEVED" in review_text.upper()) + is_achieved = bool( + review_text and "GOAL_ACHIEVED" in review_text.upper()) if is_achieved: # Goal 达成 → parent 最终完成 @@ -649,7 +664,9 @@ Parent Task ID: {parent_task.id} "(round %d, subs=%d)", parent_id, round_num, sub_count) except Exception: - logger.exception("Failed to handle review conclusion for %s", parent_id) + logger.exception( + "Failed to handle review conclusion for %s", + parent_id) # 安全恢复:reviewing → working try: conn.execute("BEGIN IMMEDIATE") @@ -675,7 +692,7 @@ Parent Task ID: {parent_task.id} MENTION_MAX_RETRIES = 5 async def _process_mentions(self, db_path: Path, - project_id: str) -> List[str]: + project_id: str) -> List[str]: """扫描 pending mentions → spawn 被 @ 的 Agent 流程(§3.4): @@ -687,7 +704,8 @@ Parent Task ID: {parent_task.id} return [] bb = Blackboard(db_path) - mentions = bb.get_pending_mentions(max_retries=self.MENTION_MAX_RETRIES) + mentions = bb.get_pending_mentions( + max_retries=self.MENTION_MAX_RETRIES) if not mentions: return [] @@ -751,27 +769,32 @@ Parent Task ID: {parent_task.id} if new_review and new_review["verdict"] == "approved": _ticker._transition_status( - get_connection(rdb_path), _t_id, "done", + get_connection( + rdb_path), _t_id, "done", agent="daemon", detail={"reason": "rebuttal_approved"}) - logger.info("Rebuttal: task %s approved after rebuttal", _t_id) + logger.info( + "Rebuttal: task %s approved after rebuttal", _t_id) else: # 仍非 approved → @mention assignee verdict_str = new_review["verdict"] if new_review else "未知" rconn2 = get_connection(rdb_path) try: - t_row = rconn2.execute("SELECT assignee FROM tasks WHERE id=?", (_t_id,)).fetchone() + t_row = rconn2.execute( + "SELECT assignee FROM tasks WHERE id=?", (_t_id,)).fetchone() finally: rconn2.close() if t_row and t_row["assignee"]: from src.blackboard.blackboard import Blackboard bb2 = Blackboard(rdb_path) bb2.add_comment(_t_id, "daemon", - f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳", - comment_type="review") - logger.info("Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str) + f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳", + comment_type="review") + logger.info( + "Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str) except Exception: - logger.exception("Rebuttal on_complete failed for task %s", _t_id) + logger.exception( + "Rebuttal on_complete failed for task %s", _t_id) result = await self.spawner.spawn_full_agent( agent_id=agent_id, @@ -794,22 +817,30 @@ Parent Task ID: {parent_task.id} for item in items: bb.mark_mention_notified(item["id"]) processed.append(agent_id) - logger.info("Mention spawn success: %s (%d mentions)", agent_id, len(items)) + logger.info( + "Mention spawn success: %s (%d mentions)", + agent_id, + len(items)) else: # spawn 返回 None(其他原因)→ 递增 retry_count for item in items: bb.mark_mention_retry(item["id"]) - logger.warning("Mention spawn failed: %s, retrying next tick", agent_id) + logger.warning( + "Mention spawn failed: %s, retrying next tick", agent_id) except AgentBusyError: # Agent 忙,不递增 retry_count,等下次 tick 自然重试 - logger.info("Mention spawn skipped: %s busy, will retry next tick", agent_id) + logger.info( + "Mention spawn skipped: %s busy, will retry next tick", + agent_id) - except Exception as e: - logger.exception("Mention processing error for agent %s", agent_id) + except Exception: + logger.exception( + "Mention processing error for agent %s", agent_id) for item in items: try: - if item.get("retry_count", 0) >= self.MENTION_MAX_RETRIES - 1: + if item.get("retry_count", + 0) >= self.MENTION_MAX_RETRIES - 1: bb.mark_mention_failed(item["id"]) else: bb.mark_mention_retry(item["id"]) @@ -822,8 +853,14 @@ Parent Task ID: {parent_task.id} mention_lines: List[str], project_id: str) -> str: """#03: @mention prompt(身份注入)""" - api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1' - api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083 + api_host = getattr( + self.spawner, + 'api_host', + '127.0.0.1') if self.spawner else '127.0.0.1' + api_port = getattr( + self.spawner, + 'api_port', + 8083) if self.spawner else 8083 api_base = f"http://{api_host}:{api_port}/api" # 获取 Agent 专长 @@ -899,7 +936,8 @@ Parent Task ID: {parent_task.id} from datetime import datetime conn.execute("BEGIN IMMEDIATE") - row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() if not row: return False old_status = row["status"] @@ -938,7 +976,8 @@ Parent Task ID: {parent_task.id} event_type = "daemon_tick" conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", - (task_id, agent, event_type, json.dumps({"from": old_status, "to": new_status, **(detail or {})})), + (task_id, agent, event_type, json.dumps( + {"from": old_status, "to": new_status, **(detail or {})})), ) conn.commit() return True @@ -948,7 +987,7 @@ Parent Task ID: {parent_task.id} # ------------------------------------------------------------------ async def _dispatch_pending(self, db_path: Path, - project_id: str) -> List[str]: + project_id: str) -> List[str]: """扫描 pending 任务并调度 v3.0: 两条路径 @@ -978,9 +1017,12 @@ Parent Task ID: {parent_task.id} try: result = await self.dispatcher.dispatch( task, - project_config={"project_id": project_id, "db_path": db_path}, + project_config={ + "project_id": project_id, + "db_path": db_path}, ) - if result["status"] == "dispatched" and result["level"] in ("full", "escalate"): + if result["status"] == "dispatched" and result["level"] in ( + "full", "escalate"): conn = get_connection(db_path) try: # [v2.7.1] Mail 已在 dispatcher 中标 working,跳过 claimed @@ -1073,7 +1115,8 @@ Parent Task ID: {parent_task.id} detail={"reason": "no_taker_after_3_broadcasts", "round_number": self._broadcast_tracker.get(t.id).round_number if self._broadcast_tracker.get(t.id) else 0}, ) - logger.warning("Escalated %s: no taker after 3 broadcast rounds", t.id) + logger.warning( + "Escalated %s: no taker after 3 broadcast rounds", t.id) self._broadcast_tracker.pop(t.id, None) finally: conn.close() @@ -1083,7 +1126,8 @@ Parent Task ID: {parent_task.id} idle_agents = self._get_idle_agents() if not idle_agents: - logger.warning("No idle agents for broadcast, skipping (capacity issue)") + logger.warning( + "No idle agents for broadcast, skipping (capacity issue)") return [] task_ids = [t.id for t in broadcastable] @@ -1114,7 +1158,8 @@ Parent Task ID: {parent_task.id} spawned = [] for agent_id in idle_agents: - prompt = self._build_claim_prompt(agent_id, broadcastable, project_id) + prompt = self._build_claim_prompt( + agent_id, broadcastable, project_id) try: session_id = await self.spawner.spawn_full_agent( agent_id=agent_id, @@ -1128,7 +1173,8 @@ Parent Task ID: {parent_task.id} spawned.append(session_id) # 记录已通知的 Agent for t in broadcastable: - self._broadcast_tracker[t.id].notified_agents.add(agent_id) + self._broadcast_tracker[t.id].notified_agents.add( + agent_id) except AgentBusyError: logger.debug("Broadcast skip %s: busy", agent_id) except Exception: @@ -1139,8 +1185,14 @@ Parent Task ID: {parent_task.id} def _build_claim_prompt(self, agent_id: str, tasks: list, project_id: str) -> str: """#03: 广播认领 prompt(身份+专长注入)""" - api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1' - api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083 + api_host = getattr( + self.spawner, + 'api_host', + '127.0.0.1') if self.spawner else '127.0.0.1' + api_port = getattr( + self.spawner, + 'api_port', + 8083) if self.spawner else 8083 api_base = f"http://{api_host}:{api_port}/api" # 获取 Agent 专长 @@ -1195,7 +1247,8 @@ Parent Task ID: {parent_task.id} @property def counter(self): """从 Dispatcher 获取 counter""" - return getattr(self.dispatcher, 'counter', None) if self.dispatcher else None + return getattr(self.dispatcher, 'counter', + None) if self.dispatcher else None @staticmethod def _is_pid_alive(pid: int) -> bool: @@ -1207,7 +1260,8 @@ Parent Task ID: {parent_task.id} except (ProcessLookupError, PermissionError): return False - def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str): + def record_broadcast_response( + self, task_id: str, agent_id: str, outcome: str): """记录 Agent 对广播任务的反馈(Spawner 调用的公共 API)""" tracker = self._broadcast_tracker.get(task_id) if not tracker: @@ -1228,7 +1282,8 @@ Parent Task ID: {parent_task.id} def _get_all_agent_ids(self) -> List[str]: """获取所有配置的 Agent ID""" - if self.dispatcher and hasattr(self.dispatcher, 'router') and self.dispatcher.router: + if self.dispatcher and hasattr( + self.dispatcher, 'router') and self.dispatcher.router: return list(self.dispatcher.router.agent_profiles.keys()) return [] @@ -1237,12 +1292,13 @@ Parent Task ID: {parent_task.id} if not self.counter: return [] # agent_profiles 在 Router 初始化时从 config 填充,是完整 Agent 列表 - all_agents = list(self.dispatcher.router.agent_profiles.keys()) if self.dispatcher else [] + all_agents = list( + self.dispatcher.router.agent_profiles.keys()) if self.dispatcher else [] active = self.counter.active_agents return [aid for aid in all_agents if active.get(aid, 0) == 0] async def _dispatch_reviews(self, db_path: Path, - project_id: str) -> List[str]: + project_id: str) -> List[str]: """扫描 review 状态任务,检查是否有产出,调度审查 Agent""" # mail 任务不走 review 流程,直接跳过 if project_id == "_mail": @@ -1291,7 +1347,9 @@ Parent Task ID: {parent_task.id} result = await self.dispatcher.dispatch( task, action_type="review", - project_config={"project_id": project_id, "db_path": db_path}, + project_config={ + "project_id": project_id, + "db_path": db_path}, ) if result["status"] == "dispatched": dispatched.append(task.id) @@ -1344,7 +1402,7 @@ Parent Task ID: {parent_task.id} ) reclaimed.append(task.id) logger.warning("Escalated %s: no taker after %d broadcasts", - task.id, retry_count) + task.id, retry_count) finally: conn.close() else: @@ -1375,8 +1433,10 @@ Parent Task ID: {parent_task.id} working = queries.tasks_by_status("working") for task in working: # #07.2: crash_limit 统一检查(比超时更严重的信号) - if self.dispatcher and hasattr(self.dispatcher, '_check_crash_limit'): - if self.dispatcher._check_crash_limit(task.id, db_path, limit=3, window_minutes=30): + if self.dispatcher and hasattr( + self.dispatcher, '_check_crash_limit'): + if self.dispatcher._check_crash_limit( + task.id, db_path, limit=3, window_minutes=30): conn = get_connection(db_path) try: self._transition_status( @@ -1388,7 +1448,8 @@ Parent Task ID: {parent_task.id} finally: conn.close() reclaimed.append(task.id) - logger.error("Task %s: executor crash limit (3/30m), marking failed", task.id) + logger.error( + "Task %s: executor crash limit (3/30m), marking failed", task.id) continue # #07.3 ACT-1: updated_at fallback 覆盖 mail auto-working(无 started_at/claimed_at) @@ -1400,7 +1461,8 @@ Parent Task ID: {parent_task.id} # per-task timeout: deadline 优先,否则用默认值 if task.deadline: deadline_time = datetime.fromisoformat(task.deadline) - timeout_minutes = (deadline_time - start_time).total_seconds() / 60.0 + timeout_minutes = ( + deadline_time - start_time).total_seconds() / 60.0 if timeout_minutes < 1: timeout_minutes = self.default_task_timeout_minutes else: @@ -1423,7 +1485,7 @@ Parent Task ID: {parent_task.id} if ok: reclaimed.append(task.id) logger.info("Mail %s: ticker recheck found reply, marked done (%.1fm)", - task.id, elapsed) + task.id, elapsed) finally: conn.close() continue @@ -1440,15 +1502,17 @@ Parent Task ID: {parent_task.id} if ok: reclaimed.append(task.id) logger.warning("Task %s timed out (working %.1fm > %.1fm)", - task.id, elapsed, timeout_minutes) + task.id, elapsed, timeout_minutes) finally: conn.close() except (ValueError, TypeError): pass # v2.7.2: 进程存活性检查 — counter 占用但进程已死的兜底 - if self.spawner and self.counter and hasattr(self.counter, "active_agents"): - for agent_id in list(self.counter.active_agents.keys()) if hasattr(self.counter, "active_agents") else []: + if self.spawner and self.counter and hasattr( + self.counter, "active_agents"): + for agent_id in list(self.counter.active_agents.keys()) if hasattr( + self.counter, "active_agents") else []: session_info = self.spawner.get_session_by_agent(agent_id) if not session_info: continue @@ -1465,20 +1529,24 @@ Parent Task ID: {parent_task.id} conn = get_connection(db_path) try: current_row = conn.execute( - "SELECT status FROM tasks WHERE id=?", (task_id_check,) + "SELECT status FROM tasks WHERE id=?", ( + task_id_check,) ).fetchone() if current_row and current_row["status"] == "review": - logger.info("Task %s in review, keeping status (process dead)", task_id_check) + logger.info( + "Task %s in review, keeping status (process dead)", task_id_check) else: self._transition_status( conn, task_id_check, "pending", agent="daemon", - detail={"reason": "process_dead", "pid": pid}, + detail={ + "reason": "process_dead", "pid": pid}, ) finally: conn.close() except Exception: - logger.exception("Failed to handle process dead for task %s", task_id_check) + logger.exception( + "Failed to handle process dead for task %s", task_id_check) # #07.2: Fix-3b 已删除。review 超时/crash 统一由 process_dead + _check_timeouts 处理 @@ -1497,16 +1565,20 @@ Parent Task ID: {parent_task.id} finally: conn.close() except Exception as e: - logger.error("Mail %s: ticker reply check error: %s", original_task_id, e) + logger.error( + "Mail %s: ticker reply check error: %s", + original_task_id, + e) return True # 保守:查询失败假设有回复 def _check_recent_routing(self, db_path: Path, task_id: str, - action_type: str) -> bool: + action_type: str) -> bool: """检查最近 5 分钟内是否已 dispatch 过指定类型的路由(防重复)""" try: conn = get_connection(db_path) try: - # 检查是否有 from_status=review 的 dispatched 记录(防止重复 review dispatch) + # 检查是否有 from_status=review 的 dispatched 记录(防止重复 review + # dispatch) if action_type == "review": row = conn.execute( "SELECT COUNT(*) as cnt FROM routing_decisions " @@ -1537,17 +1609,22 @@ Parent Task ID: {parent_task.id} NON_TERMINAL = {"claimed", "working", "review", "reviewing"} projects = self.registry.list_projects() - recovery_report = {"projects": {}, "total_recovered": 0, "total_noop": 0} + recovery_report = { + "projects": {}, + "total_recovered": 0, + "total_noop": 0} # 收集所有需要扫描的项目(registry + 虚拟项目) project_dirs = {} for project_id, project_info in projects.items(): if project_info.get("status") == "active": - project_dirs[project_id] = self.registry.root / project_id / "blackboard.db" + project_dirs[project_id] = self.registry.root / \ + project_id / "blackboard.db" # 虚拟项目 for virtual_id in ("_general", "_mail"): - virtual_db = Path(self.registry.root) / virtual_id / "blackboard.db" + virtual_db = Path(self.registry.root) / \ + virtual_id / "blackboard.db" if virtual_db.exists() and virtual_id not in project_dirs: project_dirs[virtual_id] = virtual_db @@ -1567,25 +1644,28 @@ Parent Task ID: {parent_task.id} old_pid = self._current_project_id self._current_project_id = project_id try: - recovered, noop_count = self._recover_project(db_path, NON_TERMINAL) + recovered, noop_count = self._recover_project( + db_path, NON_TERMINAL) if recovered: recovery_report["projects"][project_id] = recovered recovery_report["total_recovered"] += len(recovered) recovery_report["total_noop"] += noop_count except Exception: - logger.exception("Startup recovery failed for project %s", project_id) + logger.exception( + "Startup recovery failed for project %s", project_id) finally: self._current_project_id = old_pid if recovery_report["total_recovered"] > 0: logger.info("Startup recovery: %d tasks recovered across %d projects", - recovery_report["total_recovered"], - len(recovery_report["projects"])) + recovery_report["total_recovered"], + len(recovery_report["projects"])) elif recovery_report["total_noop"] > 0: logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)", - recovery_report["total_noop"]) + recovery_report["total_noop"]) else: - logger.info("Startup recovery: no non-terminal tasks found, clean start") + logger.info( + "Startup recovery: no non-terminal tasks found, clean start") return recovery_report @@ -1608,10 +1688,13 @@ Parent Task ID: {parent_task.id} for task in rows: try: - action = self._determine_recovery_action(conn, task, status, db_path) + action = self._determine_recovery_action( + conn, task, status, db_path) if action: - self._execute_recovery(conn, task["id"], action, db_path) - recovered.append({"task_id": task["id"], "from": status, "action": action}) + self._execute_recovery( + conn, task["id"], action, db_path) + recovered.append( + {"task_id": task["id"], "from": status, "action": action}) else: # 审计:保持原状的任务也记录事件 noop_count += 1 @@ -1622,14 +1705,15 @@ Parent Task ID: {parent_task.id} ) conn.commit() except Exception: - logger.exception("Startup recovery failed for task %s", task["id"]) + logger.exception( + "Startup recovery failed for task %s", task["id"]) finally: conn.close() return recovered, noop_count def _determine_recovery_action(self, conn, task, status: str, - db_path: Path) -> Optional[str]: + db_path: Path) -> Optional[str]: """根据黑板线索决定恢复动作,返回 None 表示不需要干预""" task_id = task["id"] @@ -1700,7 +1784,8 @@ Parent Task ID: {parent_task.id} # 无审查结论 → 保持 review,ticker 自然会 dispatch reviewer return None - def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path): + def _execute_recovery(self, conn, task_id: str, + action: str, db_path: Path): """执行恢复动作""" # 获取原始状态(用于审计) orig_row = conn.execute( @@ -1712,17 +1797,22 @@ Parent Task ID: {parent_task.id} self._transition_status( conn, task_id, "pending", agent="daemon", - detail={"reason": "startup_recovery", "original_status": orig_status}, + detail={ + "reason": "startup_recovery", + "original_status": orig_status}, ) # 清空 current_agent(常规推 pending,无特定 agent 接手) - conn.execute("UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,)) + conn.execute( + "UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,)) conn.commit() elif action == "push_to_pending_keep_agent": self._transition_status( conn, task_id, "pending", agent="daemon", - detail={"reason": "startup_recovery", "original_status": orig_status}, + detail={ + "reason": "startup_recovery", + "original_status": orig_status}, ) # 保留 current_agent,让同一 agent 重新接手 conn.commit() @@ -1731,7 +1821,9 @@ Parent Task ID: {parent_task.id} self._transition_status( conn, task_id, "review", agent="daemon", - detail={"reason": "startup_recovery", "original_status": "working"}, + detail={ + "reason": "startup_recovery", + "original_status": "working"}, ) conn.commit() @@ -1739,7 +1831,9 @@ Parent Task ID: {parent_task.id} self._transition_status( conn, task_id, "done", agent="daemon", - detail={"reason": "startup_recovery", "original_status": orig_status}, + detail={ + "reason": "startup_recovery", + "original_status": orig_status}, ) conn.commit() @@ -1747,22 +1841,30 @@ Parent Task ID: {parent_task.id} self._transition_status( conn, task_id, "failed", agent="daemon", - detail={"reason": "startup_recovery", "original_status": orig_status}, + detail={ + "reason": "startup_recovery", + "original_status": orig_status}, ) conn.commit() # 记录恢复审计事件 conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)", - (task_id, "daemon", "startup_recovery", json.dumps({"action": action})) + (task_id, "daemon", "startup_recovery", + json.dumps({"action": action})) ) conn.commit() - logger.info("Recovery: task %s → %s (action=%s)", task_id, action, action) + logger.info( + "Recovery: task %s → %s (action=%s)", + task_id, + action, + action) def _find_pre_reviewing_status(self, conn, task_id: str) -> str: """查 events 表找到 reviewing 之前的状态(done 或 failed)""" - # _transition_status 写入 event_type=f"task_{new_status}",detail 用 from/to + # _transition_status 写入 event_type=f"task_{new_status}",detail 用 + # from/to rows = conn.execute( """SELECT detail FROM events WHERE task_id=? AND event_type='task_reviewing' @@ -1773,7 +1875,8 @@ Parent Task ID: {parent_task.id} for event in rows: try: detail = json.loads(event["detail"]) - # _transition_status detail 格式: {"from": old_status, "to": new_status, ...} + # _transition_status detail 格式: {"from": old_status, "to": + # new_status, ...} prev = detail.get("from") or detail.get("old_status") if prev in ("done", "failed"): return prev diff --git a/src/main.py b/src/main.py index 5754acc..5f5b63a 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,13 @@ """v2.6 主入口 - FastAPI + Daemon ticker 共享 asyncio event loop""" from __future__ import annotations +from src.api.toolchain_routes import router as toolchain_router +from src.api.mail_routes import router as mail_router +from src.api.sse_routes import router as sse_router +from src.api.project_routes import router as project_router +from src.api.daemon_routes import router as daemon_router +from src.api.checkpoint_routes import router as checkpoint_router +from src.api.blackboard_routes import router as blackboard_router import logging from contextlib import asynccontextmanager @@ -131,7 +138,8 @@ async def lifespan(app: FastAPI): counter = ActiveAgentCounter( max_global=daemon_config.get("max_global_agents", 5), max_per_session=daemon_config.get("max_per_session", 1), - max_concurrent_sessions=daemon_config.get("max_concurrent_sessions", 3), + max_concurrent_sessions=daemon_config.get( + "max_concurrent_sessions", 3), default_cooldown_seconds=daemon_config.get("cooldown_seconds", 120), ) # BootstrapBuilder(L2 四段式引擎注入层,v2.1) @@ -181,7 +189,10 @@ async def lifespan(app: FastAPI): spawner=spawner, counter=counter, db_path=default_db_path, - guardrails=GuardrailEngine(config_path=Path(__file__).parent.parent / "config" / "guardrails.yaml"), + guardrails=GuardrailEngine( + config_path=Path(__file__).parent.parent / + "config" / + "guardrails.yaml"), ) # ── 集成模块 ── @@ -191,7 +202,7 @@ async def lifespan(app: FastAPI): ) # ExperienceDistiller(经验自动蒸馏) - experience_config = config.get("experience", {}) + config.get("experience", {}) experience_distiller = ExperienceDistiller( store=ExperienceStore(store_path=DATA_ROOT / "experiences.jsonl"), ) @@ -252,13 +263,6 @@ app.add_middleware( # API 路由注册 # --------------------------------------------------------------------------- -from src.api.blackboard_routes import router as blackboard_router -from src.api.checkpoint_routes import router as checkpoint_router -from src.api.daemon_routes import router as daemon_router -from src.api.project_routes import router as project_router -from src.api.sse_routes import router as sse_router -from src.api.mail_routes import router as mail_router -from src.api.toolchain_routes import router as toolchain_router app.include_router(blackboard_router) app.include_router(checkpoint_router) @@ -300,16 +304,17 @@ async def list_projects_compat(): DIST_DIR = Path(__file__).parent / "frontend" / "dist" if DIST_DIR.exists(): # v3.1: 缓存策略 - HTML 不缓存(确保新版本生效),JS/CSS 长缓存(Vite content hash 已处理) - import mimetypes _static_app = StaticFiles(directory=str(DIST_DIR), html=True) - + class CachedStaticFiles: """包装 StaticFiles,添加 Cache-Control 头""" + def __init__(self, app): self._app = app - + async def __call__(self, scope, receive, send): original_send = send + async def patched_send(message): if message.get("type") == "http.response.start": headers = dict(message.get("headers", [])) @@ -321,5 +326,5 @@ if DIST_DIR.exists(): message["headers"] = list(headers.items()) await original_send(message) await self._app(scope, receive, patched_send) - + app.mount("/", CachedStaticFiles(_static_app), name="frontend") diff --git a/src/utils.py b/src/utils.py index 9c3dac7..cf0d20e 100644 --- a/src/utils.py +++ b/src/utils.py @@ -10,7 +10,6 @@ from __future__ import annotations import os from pathlib import Path -from typing import Optional def get_data_root() -> Path: