diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index 3537384..edac6e6 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -493,20 +493,59 @@ class Ticker: self.counter.global_active, self.counter._max_global) return [] + # 过滤掉已广播太多次的任务(retry_count >= 3 → 不广播,等庞统) + broadcastable = [] + escalated = [] + for t in tasks: + rc = getattr(t, 'retry_count', 0) or 0 + if rc >= 3: + escalated.append(t) + else: + broadcastable.append(t) + + # 升级庞统 + for t in escalated: + conn = get_connection(db_path) + try: + self._transition_status( + conn, t.id, "escalated", + agent="daemon", + detail={"reason": "no_taker_after_3_broadcasts", + "retry_count": getattr(t, 'retry_count', 0)}, + ) + logger.warning("Escalated %s: no taker after %d broadcasts", + t.id, getattr(t, 'retry_count', 0)) + finally: + conn.close() + + if not broadcastable: + return [] + # 获取空闲 Agent idle_agents = self._get_idle_agents() if not idle_agents: + # 无空闲 Agent → 递增 retry_count(下次广播或升级) + conn = get_connection(db_path) + try: + for t in broadcastable: + conn.execute( + "UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?", + (t.id,), + ) + conn.commit() + finally: + conn.close() return [] - task_ids = [t.id for t in tasks] + task_ids = [t.id for t in broadcastable] logger.info("Broadcasting %d tasks to %d idle agents: %s", - len(tasks), len(idle_agents), task_ids) + len(broadcastable), len(idle_agents), task_ids) spawned = [] for agent_id in idle_agents: if not await self.counter.can_acquire(agent_id): continue - prompt = self._build_claim_prompt(agent_id, tasks, project_id) + prompt = self._build_claim_prompt(agent_id, broadcastable, project_id) try: await self.counter.acquire(agent_id) session_id = await self.spawner.spawn_full_agent(