auto-sync: 2026-05-17 13:47:08
This commit is contained in:
+208
-2
@@ -34,6 +34,11 @@ class Ticker:
|
||||
tick_interval: float = 30.0,
|
||||
max_ticks: Optional[int] = None,
|
||||
on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None,
|
||||
dispatcher: Optional[Any] = None,
|
||||
spawner: Optional[Any] = None,
|
||||
max_dispatch_per_tick: int = 3,
|
||||
claim_timeout_minutes: float = 5.0,
|
||||
default_task_timeout_minutes: float = 30.0,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
@@ -41,11 +46,21 @@ class Ticker:
|
||||
tick_interval: tick 间隔秒数
|
||||
max_ticks: 测试用,限制最大 tick 次数
|
||||
on_tick_complete: 每次 tick 完成后的回调(用于通知等)
|
||||
dispatcher: Dispatcher 实例(Agent 调度)
|
||||
spawner: AgentSpawner 实例(Agent spawn)
|
||||
max_dispatch_per_tick: 每个 tick 最多调度多少个 pending 任务
|
||||
claim_timeout_minutes: claimed 超时重置为 pending 的分钟数
|
||||
default_task_timeout_minutes: working 超时标记 failed 的分钟数
|
||||
"""
|
||||
self.registry = registry
|
||||
self.tick_interval = tick_interval
|
||||
self.max_ticks = max_ticks
|
||||
self.on_tick_complete = on_tick_complete
|
||||
self.dispatcher = dispatcher
|
||||
self.spawner = spawner
|
||||
self.max_dispatch_per_tick = max_dispatch_per_tick
|
||||
self.claim_timeout_minutes = claim_timeout_minutes
|
||||
self.default_task_timeout_minutes = default_task_timeout_minutes
|
||||
|
||||
self._tick_count: int = 0
|
||||
self._running: bool = False
|
||||
@@ -57,6 +72,9 @@ class Ticker:
|
||||
# 每个项目上次 tick 的 event count(用于僵尸检测 F8)
|
||||
self._last_event_counts: Dict[str, int] = {}
|
||||
|
||||
# 当前项目 ID(_dispatch_pending 需要)
|
||||
self._current_project_id: Optional[str] = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 生命周期
|
||||
# ------------------------------------------------------------------
|
||||
@@ -170,8 +188,14 @@ class Ticker:
|
||||
"summary_before": {},
|
||||
"summary_after": {},
|
||||
"advanced": [],
|
||||
"dispatched": [],
|
||||
"review_dispatched": [],
|
||||
"zombie_reclaimed": [],
|
||||
}
|
||||
|
||||
# 保存当前 project_id(供 _dispatch_pending 使用)
|
||||
self._current_project_id = project_id
|
||||
|
||||
# 1. 扫描当前状态
|
||||
queries = Queries(db_path)
|
||||
result["summary_before"] = queries.task_summary()
|
||||
@@ -180,7 +204,21 @@ class Ticker:
|
||||
advanced = self._advance_dependencies(db_path)
|
||||
result["advanced"] = advanced
|
||||
|
||||
# 3. 写 daemon_tick 事件(直接用 get_connection,不再 Blackboard() → init_db)
|
||||
# 3. 僵尸/超时处理(在依赖推进后、调度前)
|
||||
zombie_reclaimed = self._check_timeouts(db_path)
|
||||
result["zombie_reclaimed"] = zombie_reclaimed
|
||||
|
||||
# 4. 调度 pending 任务
|
||||
if self.dispatcher and self.spawner:
|
||||
dispatched = await self._dispatch_pending(db_path, project_id)
|
||||
result["dispatched"] = dispatched
|
||||
|
||||
# 5. 调度审查任务
|
||||
if self.dispatcher and self.spawner:
|
||||
review_dispatched = await self._dispatch_reviews(db_path, project_id)
|
||||
result["review_dispatched"] = review_dispatched
|
||||
|
||||
# 6. 写 daemon_tick 事件(直接用 get_connection,不再 Blackboard() → init_db)
|
||||
conn = get_connection(db_path)
|
||||
try:
|
||||
conn.execute("BEGIN IMMEDIATE")
|
||||
@@ -193,7 +231,7 @@ class Ticker:
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
# 4. 扫描后状态
|
||||
# 7. 扫描后状态
|
||||
result["summary_after"] = queries.task_summary()
|
||||
|
||||
return result
|
||||
@@ -263,6 +301,174 @@ class Ticker:
|
||||
conn.commit()
|
||||
return True
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Agent 调度
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _dispatch_pending(self, db_path: Path,
|
||||
project_id: str) -> List[str]:
|
||||
"""扫描 pending 任务并调度"""
|
||||
queries = Queries(db_path)
|
||||
bb = Blackboard(db_path)
|
||||
pending = queries.tasks_by_status("pending")
|
||||
dispatched: List[str] = []
|
||||
|
||||
if not pending:
|
||||
return dispatched
|
||||
|
||||
for task in pending[:self.max_dispatch_per_tick]:
|
||||
try:
|
||||
result = await self.dispatcher.dispatch(
|
||||
task,
|
||||
project_config={"project_id": project_id},
|
||||
)
|
||||
if result["status"] == "dispatched" and result["level"] in ("full", "escalate"):
|
||||
# 标记为 claimed
|
||||
conn = get_connection(db_path)
|
||||
try:
|
||||
ok = self._transition_status(
|
||||
conn, task.id, "claimed",
|
||||
agent="daemon",
|
||||
detail={"dispatched_to": result["agent_id"],
|
||||
"session_id": result.get("session_id")},
|
||||
)
|
||||
if ok:
|
||||
dispatched.append(task.id)
|
||||
logger.info("Dispatched %s to %s (session=%s)",
|
||||
task.id, result["agent_id"],
|
||||
result.get("session_id"))
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception:
|
||||
logger.exception("Dispatch failed for %s", task.id)
|
||||
|
||||
return dispatched
|
||||
|
||||
async def _dispatch_reviews(self, db_path: Path,
|
||||
project_id: str) -> List[str]:
|
||||
"""扫描 review 状态任务,检查是否有产出,调度审查 Agent"""
|
||||
queries = Queries(db_path)
|
||||
bb = Blackboard(db_path)
|
||||
review_tasks = queries.tasks_by_status("review")
|
||||
dispatched: List[str] = []
|
||||
|
||||
for task in review_tasks:
|
||||
# 检查是否已有 review 记录
|
||||
reviews = bb.get_reviews(task.id)
|
||||
if reviews:
|
||||
continue # 已有审查,跳过
|
||||
|
||||
# 检查是否有产出(司马懿建议:无产出直接标 failed)
|
||||
outputs = bb.get_outputs(task.id)
|
||||
if not outputs:
|
||||
conn = get_connection(db_path)
|
||||
try:
|
||||
self._transition_status(
|
||||
conn, task.id, "failed",
|
||||
agent="daemon",
|
||||
detail={"reason": "no_output_for_review"},
|
||||
)
|
||||
bb.add_observation(
|
||||
task.id, "daemon",
|
||||
"任务进入 review 状态但没有产出物,自动标记为 failed",
|
||||
)
|
||||
logger.warning("Task %s in review but no output, marking failed",
|
||||
task.id)
|
||||
finally:
|
||||
conn.close()
|
||||
continue
|
||||
|
||||
# 调度审查 Agent(司马懿)
|
||||
try:
|
||||
result = await self.dispatcher.dispatch(
|
||||
task,
|
||||
action_type="review",
|
||||
project_config={"project_id": project_id},
|
||||
)
|
||||
if result["status"] == "dispatched":
|
||||
dispatched.append(task.id)
|
||||
logger.info("Dispatched review for %s to %s",
|
||||
task.id, result["agent_id"])
|
||||
except Exception:
|
||||
logger.exception("Review dispatch failed for %s", task.id)
|
||||
|
||||
return dispatched
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 僵尸/超时处理
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _check_timeouts(self, db_path: Path) -> List[str]:
|
||||
"""检查 claimed/working 超时的任务"""
|
||||
queries = Queries(db_path)
|
||||
reclaimed: List[str] = []
|
||||
now = datetime.utcnow()
|
||||
|
||||
# claimed 超时 → 重置为 pending
|
||||
claimed = queries.tasks_by_status("claimed")
|
||||
for task in claimed:
|
||||
if not task.claimed_at:
|
||||
continue
|
||||
try:
|
||||
claimed_time = datetime.fromisoformat(task.claimed_at)
|
||||
elapsed = (now - claimed_time).total_seconds() / 60.0
|
||||
if elapsed > self.claim_timeout_minutes:
|
||||
conn = get_connection(db_path)
|
||||
try:
|
||||
ok = self._transition_status(
|
||||
conn, task.id, "pending",
|
||||
agent="daemon",
|
||||
detail={"reason": "claim_timeout",
|
||||
"elapsed_minutes": round(elapsed, 1)},
|
||||
)
|
||||
if ok:
|
||||
reclaimed.append(task.id)
|
||||
logger.info("Reclaimed %s: claimed → pending (timeout %.1fm)",
|
||||
task.id, elapsed)
|
||||
finally:
|
||||
conn.close()
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# working 超时 → 标记为 failed
|
||||
working = queries.tasks_by_status("working")
|
||||
for task in working:
|
||||
start_time_str = task.started_at or task.claimed_at
|
||||
if not start_time_str:
|
||||
continue
|
||||
try:
|
||||
start_time = datetime.fromisoformat(start_time_str)
|
||||
# per-task timeout: deadline 优先,否则用默认值
|
||||
if task.deadline:
|
||||
deadline_time = datetime.fromisoformat(task.deadline)
|
||||
timeout_minutes = (deadline_time - start_time).total_seconds() / 60.0
|
||||
if timeout_minutes < 1:
|
||||
timeout_minutes = self.default_task_timeout_minutes
|
||||
else:
|
||||
timeout_minutes = self.default_task_timeout_minutes
|
||||
|
||||
elapsed = (now - start_time).total_seconds() / 60.0
|
||||
if elapsed > timeout_minutes:
|
||||
conn = get_connection(db_path)
|
||||
try:
|
||||
ok = self._transition_status(
|
||||
conn, task.id, "failed",
|
||||
agent="daemon",
|
||||
detail={"reason": "task_timeout",
|
||||
"elapsed_minutes": round(elapsed, 1),
|
||||
"timeout_minutes": round(timeout_minutes, 1)},
|
||||
)
|
||||
if ok:
|
||||
reclaimed.append(task.id)
|
||||
logger.warning("Task %s timed out (working %.1fm > %.1fm)",
|
||||
task.id, elapsed, timeout_minutes)
|
||||
finally:
|
||||
conn.close()
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
return reclaimed
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 手动 tick(API 端点触发)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user