auto-sync: 2026-06-04 22:33:27

This commit is contained in:
cfdaily
2026-06-04 22:33:27 +08:00
parent 0572923815
commit 231094139f
4 changed files with 105 additions and 65 deletions
+1 -1
View File
@@ -130,7 +130,7 @@ async def create_task(project_id: str, body: Dict[str, Any]):
task = Task(
id=task_id, title=title,
description=body.get("description"),
task_type=body.get("task_type", "coding"),
task_type=body.get("task_type", None),
priority=body.get("priority", 5),
assignee=assignee,
assigned_by=body.get("assigned_by", "user"),
+35 -40
View File
@@ -274,50 +274,40 @@ class AgentSpawner:
task_id, title, description, must_haves,
project_id, agent_id)
# 尝试 BootstrapBuilder
if self.bootstrap_builder and task is not None:
try:
# v3.1: spawn_type 映射到角色 (executor→executor, review→reviewer, discussion→planner)
role_map = {"executor": "executor", "review": "reviewer", "discussion": "planner"}
role = role_map.get(spawn_type, "executor")
bootstrap_prompt = self.bootstrap_builder.build_for_task(
task=task,
role=role,
project_config=project_config,
)
# mail 任务用精简模板,不走 BootstrapBuilder
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
api_section = self._build_api_section(
project_id, task_id, agent_id)
return bootstrap_prompt + "\n\n---\n\n" + api_section
except Exception:
logger.exception("BootstrapBuilder failed, falling back to template")
# mail 任务用精简模板
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
# Fallback: 使用硬编码模板
# mail 任务直接 done,不走 review
completion_status = "done" if project_id == "_mail" else "review"
identity_section = self._inject_agent_identity(agent_id)
guardrails_summary = self._get_guardrails_summary()
return SPAWN_PROMPT_TEMPLATE.format(
identity_section=identity_section,
project_id=project_id,
task_id=task_id,
title=title,
description=description or "(无描述)",
task_type=task_type or "general",
priority=priority,
must_haves=must_haves or "(无)",
agent_id=agent_id,
api_base=f"http://{self.api_host}:{self.api_port}/api",
retry_context=retry_context or "",
completion_status=completion_status,
guardrails_summary=guardrails_summary,
)
# 走 BootstrapBuilder 新路径
if self.bootstrap_builder and task is not None:
role_map = {"executor": "executor", "review": "reviewer", "discussion": "planner"}
role = role_map.get(spawn_type, "executor")
bootstrap_prompt = self.bootstrap_builder.build_for_task(
task=task,
role=role,
)
api_section = self._build_api_section(
project_id, task_id, agent_id)
return bootstrap_prompt + "\n\n---\n\n" + api_section
# 无 BootstrapBuilder 或无 task 对象 → 最小 fallback
# 只保留任务上下文 + API 操作指令
logger.warning("No BootstrapBuilder or task object, using minimal fallback")
return self._build_minimal_fallback(
task_id, title, description, must_haves,
project_id, agent_id)
def _build_minimal_fallback(self, task_id, title, description, must_haves,
project_id, agent_id):
"""最小 fallback:只有任务上下文 + API 指令"""
task_section = f"""## 任务
{title}
{description or "(无描述)"}
项目: {project_id} | ID: {task_id}
验收标准: {must_haves or "(无)"}"""
api_section = self._build_api_section(project_id, task_id, agent_id)
return task_section + "\n\n---\n\n" + api_section
def _build_api_section(self, project_id: str, task_id: str,
agent_id: str) -> str:
@@ -767,6 +757,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)",
agent_id, session_id, outcome, exit_code, task_status)
# 广播反馈追踪(Phase 1 bug fix
if task_id and task_id != "broadcast" and hasattr(self, '_ticker') and self._ticker:
outcome_str = "claimed" if cls.get("status") == "ok" else "no_reply"
self._ticker.record_broadcast_response(task_id, agent_id, outcome_str)
if cls["should_retry"]:
# cooldown: 新增的可恢复场景(A14/A15/A16/A8/A10
cooldown_seconds = cls.get("cooldown_seconds", 0)
+65 -16
View File
@@ -17,6 +17,8 @@ from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, List, Optional
from dataclasses import dataclass, field as dc_field
from src.blackboard.operations import Blackboard
from src.blackboard.db import get_connection
from src.blackboard.models import Task
@@ -24,6 +26,15 @@ from src.daemon.spawner import AgentBusyError
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
@dataclass
class BroadcastRound:
"""追踪单个任务的广播状态"""
task_id: str
notified_agents: set = dc_field(default_factory=set) # 已 spawn 过的 Agent
responded_agents: set = dc_field(default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY
round_number: int = 1 # 当前第几轮
logger = logging.getLogger("moziplus-v2.ticker")
@@ -80,6 +91,9 @@ class Ticker:
# 每个项目上次 tick 的 event count(用于僵尸检测 F8
self._last_event_counts: Dict[str, int] = {}
# 广播轮次追踪(Phase 1 bug fix
self._broadcast_tracker: Dict[str, BroadcastRound] = {}
# 当前项目 ID_dispatch_pending 需要)
self._current_project_id: Optional[str] = None
@@ -1039,12 +1053,12 @@ Parent Task ID: {parent_task.id}
self.counter.global_active, self.counter.max_global)
return []
# 过滤掉已广播太多次的任务(retry_count >= 3 → 不广播,等庞统)
# 分离:需要升级的 vs 可广播的
broadcastable = []
escalated = []
for t in tasks:
rc = getattr(t, 'retry_count', 0) or 0
if rc >= 3:
tracker = self._broadcast_tracker.get(t.id)
if tracker and tracker.round_number >= 3:
escalated.append(t)
else:
broadcastable.append(t)
@@ -1057,20 +1071,18 @@ Parent Task ID: {parent_task.id}
conn, t.id, "escalated",
agent="daemon",
detail={"reason": "no_taker_after_3_broadcasts",
"retry_count": getattr(t, 'retry_count', 0)},
"round_number": self._broadcast_tracker.get(t.id).round_number if self._broadcast_tracker.get(t.id) else 0},
)
logger.warning("Escalated %s: no taker after %d broadcasts",
t.id, getattr(t, 'retry_count', 0))
logger.warning("Escalated %s: no taker after 3 broadcast rounds", t.id)
self._broadcast_tracker.pop(t.id, None)
finally:
conn.close()
if not broadcastable:
return []
# 获取空闲 Agent
idle_agents = self._get_idle_agents()
if not idle_agents:
# 无空闲 Agent → 系统容量问题,不递增 retry_count
logger.warning("No idle agents for broadcast, skipping (capacity issue)")
return []
@@ -1078,7 +1090,7 @@ Parent Task ID: {parent_task.id}
logger.info("Broadcasting %d tasks to %d idle agents: %s",
len(broadcastable), len(idle_agents), task_ids)
# 广播前为每个任务写审计事件
# 审计事件
try:
conn = get_connection(db_path)
try:
@@ -1093,28 +1105,48 @@ Parent Task ID: {parent_task.id}
finally:
conn.close()
except Exception:
pass # 审计记录失败不影响广播
pass
# 为每个任务获取/创建 tracker
for t in broadcastable:
if t.id not in self._broadcast_tracker:
self._broadcast_tracker[t.id] = BroadcastRound(task_id=t.id)
spawned = []
for agent_id in idle_agents:
# v2.7.2: counter 检查在 spawn_full_agent 内部
prompt = self._build_claim_prompt(agent_id, broadcastable, project_id)
try:
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=prompt,
task_id=broadcastable[0].id if broadcastable else None,
task_id="broadcast",
task_db_path=db_path,
use_main_session=True, # #02: 投递到 main session
use_main_session=True,
)
spawned.append(agent_id)
logger.info("Broadcast spawned %s (session=%s)", agent_id, session_id)
if session_id is not None:
spawned.append(session_id)
# 记录已通知的 Agent
for t in broadcastable:
self._broadcast_tracker[t.id].notified_agents.add(agent_id)
except AgentBusyError:
logger.debug("Broadcast skip %s: busy", agent_id)
except Exception:
logger.exception("Broadcast spawn failed for %s", agent_id)
return task_ids if spawned else []
# 检查是否可以结束轮次:所有 Agent 都已通知且全部反馈
all_agent_ids = set(self._get_all_agent_ids())
for t in broadcastable:
tracker = self._broadcast_tracker[t.id]
unnotified = all_agent_ids - tracker.notified_agents
if not unnotified and tracker.responded_agents >= tracker.notified_agents:
# 一轮结束
tracker.round_number += 1
tracker.notified_agents.clear()
tracker.responded_agents.clear()
logger.info("Broadcast round %d completed for task %s (no taker)",
tracker.round_number, t.id)
return spawned
def _build_claim_prompt(self, agent_id: str, tasks: list,
project_id: str) -> str:
@@ -1187,6 +1219,23 @@ Parent Task ID: {parent_task.id}
except (ProcessLookupError, PermissionError):
return False
def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str):
"""记录 Agent 对广播任务的反馈(Spawner 调用的公共 API"""
tracker = self._broadcast_tracker.get(task_id)
if not tracker:
return
if outcome == "claimed":
# Agent 认领了,清理 tracker
self._broadcast_tracker.pop(task_id, None)
else:
tracker.responded_agents.add(agent_id)
def _get_all_agent_ids(self) -> List[str]:
"""获取所有配置的 Agent ID"""
if self.dispatcher and hasattr(self.dispatcher, 'router') and self.dispatcher.router:
return list(self.dispatcher.router.agent_profiles.keys())
return []
def _get_idle_agents(self) -> List[str]:
"""获取当前空闲的 Agent 列表(从 config agents 取,而非 counter._per_agent"""
if not self.counter:
+4 -8
View File
@@ -134,14 +134,8 @@ async def lifespan(app: FastAPI):
max_concurrent_sessions=daemon_config.get("max_concurrent_sessions", 3),
default_cooldown_seconds=daemon_config.get("cooldown_seconds", 120),
)
# BootstrapBuilderL2 引擎注入层)
template_dir = DATA_ROOT.parent / "prompt_templates"
if not template_dir.exists():
template_dir = Path("prompt_templates")
bootstrap_builder = BootstrapBuilder(
template_dir=template_dir,
max_tokens=4096,
)
# BootstrapBuilderL2 四段式引擎注入层v2.1
bootstrap_builder = BootstrapBuilder(max_tokens=4096)
spawner = AgentSpawner(
dry_run=False,
@@ -221,6 +215,8 @@ async def lifespan(app: FastAPI):
experience_distiller=experience_distiller,
inbox_watcher=inbox_watcher,
)
# Phase 1 bug fix: spawner 引用 ticker 用于广播反馈追踪
spawner._ticker = ticker
await ticker.start()
agent_ids = list(agent_profiles.keys())
logger.info("Ticker started (interval=%ss, dispatch=%d/tick, agents=%s)",