From ec34b5984ef52d14f4187e7de0dab5dd5bee65de Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 19 May 2026 23:28:59 +0800 Subject: [PATCH] auto-sync: 2026-05-19 23:28:59 --- src/daemon/ticker.py | 135 +------------------------------------------ 1 file changed, 1 insertion(+), 134 deletions(-) diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index aad1097..320764f 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -40,9 +40,6 @@ class Ticker: max_dispatch_per_tick: int = 3, claim_timeout_minutes: float = 5.0, default_task_timeout_minutes: float = 30.0, - health_checker: Optional[Any] = None, - inbox_watcher: Optional[Any] = None, - experience_distiller: Optional[Any] = None, ): """ Args: @@ -79,14 +76,6 @@ class Ticker: # 当前项目 ID(_dispatch_pending 需要) self._current_project_id: Optional[str] = None - # 集成模块 - self.health_checker = health_checker - self.inbox_watcher = inbox_watcher - self.experience_distiller = experience_distiller - - # InboxWatcher 即时 tick 事件 - self._inbox_event: asyncio.Event = asyncio.Event() - # ------------------------------------------------------------------ # 生命周期 # ------------------------------------------------------------------ @@ -96,30 +85,12 @@ class Ticker: if self._running: return self._running = True - - # 启动 InboxWatcher - if self.inbox_watcher: - try: - await self.inbox_watcher.start() - logger.info("InboxWatcher started") - except Exception: - logger.exception("Failed to start InboxWatcher") - self._task = asyncio.create_task(self._loop()) logger.info("Ticker started (interval=%.1fs)", self.tick_interval) async def stop(self) -> None: """停止 tick 循环""" self._running = False - - # 停止 InboxWatcher - if self.inbox_watcher: - try: - await self.inbox_watcher.stop() - logger.info("InboxWatcher stopped") - except Exception: - logger.exception("Failed to stop InboxWatcher") - if self._task: self._task.cancel() try: @@ -155,16 +126,8 @@ class Ticker: self._running = False break - # 等待下一个 tick,但 InboxWatcher 事件可提前唤醒 try: - await asyncio.wait_for( - self._inbox_event.wait(), - timeout=self.tick_interval, - ) - self._inbox_event.clear() - logger.debug("Immediate tick triggered by inbox event") - except asyncio.TimeoutError: - pass # 正常超时,继续下一次 tick + await asyncio.sleep(self.tick_interval) except asyncio.CancelledError: return @@ -278,26 +241,6 @@ class Ticker: # 8. 扫描后状态 result["summary_after"] = queries.task_summary() - # 9. HealthChecker 僵尸检测 - if self.health_checker: - try: - health_result = self.health_checker.check( - project_id, db_path, self._tick_count) - result["health"] = health_result - if not health_result.get("healthy", True): - logger.warning("Health check: %s unhealthy (zombie=%s)", - project_id, health_result.get("zombie")) - except Exception: - logger.exception("HealthChecker error for %s", project_id) - - # 10. ExperienceDistiller 经验蒸馏 - if self.experience_distiller: - try: - distilled = self._distill_completed_tasks(db_path, project_id) - result["distilled"] = distilled - except Exception: - logger.exception("ExperienceDistiller error for %s", project_id) - return result # ------------------------------------------------------------------ @@ -625,82 +568,6 @@ class Ticker: except Exception: return False - # ------------------------------------------------------------------ - # 经验蒸馏 - # ------------------------------------------------------------------ - - def _distill_completed_tasks(self, db_path: Path, - project_id: str) -> List[str]: - """从已完成的任务中蒸馏经验""" - queries = Queries(db_path) - distilled_ids: List[str] = [] - - # 找 done/failed/cancelled 的任务 - for status in ("done", "failed", "cancelled"): - tasks = queries.tasks_by_status(status) - for task in tasks: - # 只蒸馏有产出的任务 - bb = Blackboard(db_path) - outputs = bb.get_outputs(task.id) - if not outputs: - continue - - # 获取 review 结果(如果有) - reviews = bb.get_reviews(task.id) - review_result = None - if reviews: - latest = reviews[-1] - review_result = latest if isinstance(latest, dict) else {"verdict": "unknown"} - - # 格式化产出 - output_dicts = [] - for out in outputs: - output_dicts.append({ - "content": out.get("content", "") if isinstance(out, dict) else "", - "path": out.get("content_path", "") if isinstance(out, dict) else "", - }) - - try: - experiences = self.experience_distiller.distill_from_task( - task_id=task.id, - task_title=task.title, - task_type=getattr(task, 'task_type', None), - outputs=output_dicts, - review_result=review_result, - agent_id=task.current_agent or task.assignee, - ) - if experiences: - distilled_ids.append(task.id) - logger.info("Distilled %d experiences from task %s", - len(experiences), task.id) - except Exception: - logger.exception("Distill failed for task %s", task.id) - - return distilled_ids - - # ------------------------------------------------------------------ - # InboxWatcher 即时 tick 回调 - # ------------------------------------------------------------------ - - async def _on_inbox_event(self, event: Dict[str, Any]) -> None: - """InboxWatcher 事件回调:处理事件 + 触发即时 tick""" - logger.info("Inbox event received: type=%s project=%s", - event.get("type"), event.get("project_id")) - - # 1. 通过 default_inbox_callback 处理事件(更新黑板) - try: - from src.daemon.inbox import default_inbox_callback - await default_inbox_callback( - event, - registry_root=self.registry.root, - initialized_dbs=self._initialized_dbs, - ) - except Exception: - logger.exception("Inbox callback error") - - # 2. 设置 event flag,触发即时 tick - self._inbox_event.set() - # ------------------------------------------------------------------ # 手动 tick(API 端点触发) # ------------------------------------------------------------------