diff --git a/src/api/blackboard_routes.py b/src/api/blackboard_routes.py index 088d898..edd7e26 100644 --- a/src/api/blackboard_routes.py +++ b/src/api/blackboard_routes.py @@ -130,7 +130,7 @@ async def create_task(project_id: str, body: Dict[str, Any]): task = Task( id=task_id, title=title, description=body.get("description"), - task_type=body.get("task_type", "coding"), + task_type=body.get("task_type", None), priority=body.get("priority", 5), assignee=assignee, assigned_by=body.get("assigned_by", "user"), diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index 60c71ae..6c1828a 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -274,50 +274,40 @@ class AgentSpawner: task_id, title, description, must_haves, project_id, agent_id) - # 尝试 BootstrapBuilder - if self.bootstrap_builder and task is not None: - try: - # v3.1: spawn_type 映射到角色 (executor→executor, review→reviewer, discussion→planner) - role_map = {"executor": "executor", "review": "reviewer", "discussion": "planner"} - role = role_map.get(spawn_type, "executor") - bootstrap_prompt = self.bootstrap_builder.build_for_task( - task=task, - role=role, - project_config=project_config, - ) - # mail 任务用精简模板,不走 BootstrapBuilder - if project_id == "_mail": - return self._build_mail_prompt(task_id, title, description, must_haves, agent_id) - api_section = self._build_api_section( - project_id, task_id, agent_id) - return bootstrap_prompt + "\n\n---\n\n" + api_section - except Exception: - logger.exception("BootstrapBuilder failed, falling back to template") - # mail 任务用精简模板 if project_id == "_mail": return self._build_mail_prompt(task_id, title, description, must_haves, agent_id) - # Fallback: 使用硬编码模板 - # mail 任务直接 done,不走 review - completion_status = "done" if project_id == "_mail" else "review" - identity_section = self._inject_agent_identity(agent_id) - guardrails_summary = self._get_guardrails_summary() - return SPAWN_PROMPT_TEMPLATE.format( - identity_section=identity_section, - project_id=project_id, - task_id=task_id, - title=title, - description=description or "(无描述)", - task_type=task_type or "general", - priority=priority, - must_haves=must_haves or "(无)", - agent_id=agent_id, - api_base=f"http://{self.api_host}:{self.api_port}/api", - retry_context=retry_context or "", - completion_status=completion_status, - guardrails_summary=guardrails_summary, - ) + # 走 BootstrapBuilder 新路径 + if self.bootstrap_builder and task is not None: + role_map = {"executor": "executor", "review": "reviewer", "discussion": "planner"} + role = role_map.get(spawn_type, "executor") + bootstrap_prompt = self.bootstrap_builder.build_for_task( + task=task, + role=role, + ) + api_section = self._build_api_section( + project_id, task_id, agent_id) + return bootstrap_prompt + "\n\n---\n\n" + api_section + + # 无 BootstrapBuilder 或无 task 对象 → 最小 fallback + # 只保留任务上下文 + API 操作指令 + logger.warning("No BootstrapBuilder or task object, using minimal fallback") + return self._build_minimal_fallback( + task_id, title, description, must_haves, + project_id, agent_id) + + def _build_minimal_fallback(self, task_id, title, description, must_haves, + project_id, agent_id): + """最小 fallback:只有任务上下文 + API 指令""" + task_section = f"""## 任务 +{title} +{description or "(无描述)"} + +项目: {project_id} | ID: {task_id} +验收标准: {must_haves or "(无)"}""" + api_section = self._build_api_section(project_id, task_id, agent_id) + return task_section + "\n\n---\n\n" + api_section def _build_api_section(self, project_id: str, task_id: str, agent_id: str) -> str: @@ -767,6 +757,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)", agent_id, session_id, outcome, exit_code, task_status) + # 广播反馈追踪(Phase 1 bug fix) + if task_id and task_id != "broadcast" and hasattr(self, '_ticker') and self._ticker: + outcome_str = "claimed" if cls.get("status") == "ok" else "no_reply" + self._ticker.record_broadcast_response(task_id, agent_id, outcome_str) + if cls["should_retry"]: # cooldown: 新增的可恢复场景(A14/A15/A16/A8/A10) cooldown_seconds = cls.get("cooldown_seconds", 0) diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index b239bd9..f7289af 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -17,6 +17,8 @@ from datetime import datetime from pathlib import Path from typing import Any, Callable, Coroutine, Dict, List, Optional +from dataclasses import dataclass, field as dc_field + from src.blackboard.operations import Blackboard from src.blackboard.db import get_connection from src.blackboard.models import Task @@ -24,6 +26,15 @@ from src.daemon.spawner import AgentBusyError from src.blackboard.queries import Queries from src.blackboard.registry import ProjectRegistry + +@dataclass +class BroadcastRound: + """追踪单个任务的广播状态""" + task_id: str + notified_agents: set = dc_field(default_factory=set) # 已 spawn 过的 Agent + responded_agents: set = dc_field(default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY) + round_number: int = 1 # 当前第几轮 + logger = logging.getLogger("moziplus-v2.ticker") @@ -80,6 +91,9 @@ class Ticker: # 每个项目上次 tick 的 event count(用于僵尸检测 F8) self._last_event_counts: Dict[str, int] = {} + # 广播轮次追踪(Phase 1 bug fix) + self._broadcast_tracker: Dict[str, BroadcastRound] = {} + # 当前项目 ID(_dispatch_pending 需要) self._current_project_id: Optional[str] = None @@ -1039,12 +1053,12 @@ Parent Task ID: {parent_task.id} self.counter.global_active, self.counter.max_global) return [] - # 过滤掉已广播太多次的任务(retry_count >= 3 → 不广播,等庞统) + # 分离:需要升级的 vs 可广播的 broadcastable = [] escalated = [] for t in tasks: - rc = getattr(t, 'retry_count', 0) or 0 - if rc >= 3: + tracker = self._broadcast_tracker.get(t.id) + if tracker and tracker.round_number >= 3: escalated.append(t) else: broadcastable.append(t) @@ -1057,20 +1071,18 @@ Parent Task ID: {parent_task.id} conn, t.id, "escalated", agent="daemon", detail={"reason": "no_taker_after_3_broadcasts", - "retry_count": getattr(t, 'retry_count', 0)}, + "round_number": self._broadcast_tracker.get(t.id).round_number if self._broadcast_tracker.get(t.id) else 0}, ) - logger.warning("Escalated %s: no taker after %d broadcasts", - t.id, getattr(t, 'retry_count', 0)) + logger.warning("Escalated %s: no taker after 3 broadcast rounds", t.id) + self._broadcast_tracker.pop(t.id, None) finally: conn.close() if not broadcastable: return [] - # 获取空闲 Agent idle_agents = self._get_idle_agents() if not idle_agents: - # 无空闲 Agent → 系统容量问题,不递增 retry_count logger.warning("No idle agents for broadcast, skipping (capacity issue)") return [] @@ -1078,7 +1090,7 @@ Parent Task ID: {parent_task.id} logger.info("Broadcasting %d tasks to %d idle agents: %s", len(broadcastable), len(idle_agents), task_ids) - # 广播前为每个任务写审计事件 + # 审计事件 try: conn = get_connection(db_path) try: @@ -1093,28 +1105,48 @@ Parent Task ID: {parent_task.id} finally: conn.close() except Exception: - pass # 审计记录失败不影响广播 + pass + + # 为每个任务获取/创建 tracker + for t in broadcastable: + if t.id not in self._broadcast_tracker: + self._broadcast_tracker[t.id] = BroadcastRound(task_id=t.id) spawned = [] for agent_id in idle_agents: - # v2.7.2: counter 检查在 spawn_full_agent 内部 prompt = self._build_claim_prompt(agent_id, broadcastable, project_id) try: session_id = await self.spawner.spawn_full_agent( agent_id=agent_id, message=prompt, - task_id=broadcastable[0].id if broadcastable else None, + task_id="broadcast", task_db_path=db_path, - use_main_session=True, # #02: 投递到 main session + use_main_session=True, ) - spawned.append(agent_id) - logger.info("Broadcast spawned %s (session=%s)", agent_id, session_id) + if session_id is not None: + spawned.append(session_id) + # 记录已通知的 Agent + for t in broadcastable: + self._broadcast_tracker[t.id].notified_agents.add(agent_id) except AgentBusyError: logger.debug("Broadcast skip %s: busy", agent_id) except Exception: logger.exception("Broadcast spawn failed for %s", agent_id) - return task_ids if spawned else [] + # 检查是否可以结束轮次:所有 Agent 都已通知且全部反馈 + all_agent_ids = set(self._get_all_agent_ids()) + for t in broadcastable: + tracker = self._broadcast_tracker[t.id] + unnotified = all_agent_ids - tracker.notified_agents + if not unnotified and tracker.responded_agents >= tracker.notified_agents: + # 一轮结束 + tracker.round_number += 1 + tracker.notified_agents.clear() + tracker.responded_agents.clear() + logger.info("Broadcast round %d completed for task %s (no taker)", + tracker.round_number, t.id) + + return spawned def _build_claim_prompt(self, agent_id: str, tasks: list, project_id: str) -> str: @@ -1187,6 +1219,23 @@ Parent Task ID: {parent_task.id} except (ProcessLookupError, PermissionError): return False + def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str): + """记录 Agent 对广播任务的反馈(Spawner 调用的公共 API)""" + tracker = self._broadcast_tracker.get(task_id) + if not tracker: + return + if outcome == "claimed": + # Agent 认领了,清理 tracker + self._broadcast_tracker.pop(task_id, None) + else: + tracker.responded_agents.add(agent_id) + + def _get_all_agent_ids(self) -> List[str]: + """获取所有配置的 Agent ID""" + if self.dispatcher and hasattr(self.dispatcher, 'router') and self.dispatcher.router: + return list(self.dispatcher.router.agent_profiles.keys()) + return [] + def _get_idle_agents(self) -> List[str]: """获取当前空闲的 Agent 列表(从 config agents 取,而非 counter._per_agent)""" if not self.counter: diff --git a/src/main.py b/src/main.py index e14aa36..36476c3 100644 --- a/src/main.py +++ b/src/main.py @@ -134,14 +134,8 @@ async def lifespan(app: FastAPI): max_concurrent_sessions=daemon_config.get("max_concurrent_sessions", 3), default_cooldown_seconds=daemon_config.get("cooldown_seconds", 120), ) - # BootstrapBuilder(L2 引擎注入层) - template_dir = DATA_ROOT.parent / "prompt_templates" - if not template_dir.exists(): - template_dir = Path("prompt_templates") - bootstrap_builder = BootstrapBuilder( - template_dir=template_dir, - max_tokens=4096, - ) + # BootstrapBuilder(L2 四段式引擎注入层,v2.1) + bootstrap_builder = BootstrapBuilder(max_tokens=4096) spawner = AgentSpawner( dry_run=False, @@ -221,6 +215,8 @@ async def lifespan(app: FastAPI): experience_distiller=experience_distiller, inbox_watcher=inbox_watcher, ) + # Phase 1 bug fix: spawner 引用 ticker 用于广播反馈追踪 + spawner._ticker = ticker await ticker.start() agent_ids = list(agent_profiles.keys()) logger.info("Ticker started (interval=%ss, dispatch=%d/tick, agents=%s)",