auto-sync: 2026-05-17 05:45:47

This commit is contained in:
cfdaily
2026-05-17 05:45:47 +08:00
parent 9300f14f9f
commit b186771e6f
+226
View File
@@ -0,0 +1,226 @@
"""Daemon Ticker — 30s 状态扫描 + 依赖推进 + 事件驱动
Tick 循环是整个 Daemon 的核心驱动:
1. 遍历所有 active 项目
2. 每个 tick 扫描黑板状态
3. 推进依赖链(blocked → pending
4. 写入 daemon_tick 事件
5. 调度 pending 任务(F9 实现)
"""
from __future__ import annotations
import asyncio
import json
import logging
from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, List, Optional
from src.blackboard.blackboard import Blackboard
from src.blackboard.db import get_connection
from src.blackboard.models import Task
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
logger = logging.getLogger("moziplus-v2.ticker")
class Ticker:
"""Daemon ticker 主循环"""
def __init__(
self,
registry: ProjectRegistry,
tick_interval: float = 30.0,
max_ticks: Optional[int] = None,
on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None,
):
"""
Args:
registry: 项目注册表
tick_interval: tick 间隔秒数
max_ticks: 测试用,限制最大 tick 次数
on_tick_complete: 每次 tick 完成后的回调(用于通知等)
"""
self.registry = registry
self.tick_interval = tick_interval
self.max_ticks = max_ticks
self.on_tick_complete = on_tick_complete
self._tick_count: int = 0
self._running: bool = False
self._task: Optional[asyncio.Task] = None
# 每个项目上次 tick 的 event count(用于僵尸检测 F8
self._last_event_counts: Dict[str, int] = {}
# ------------------------------------------------------------------
# 生命周期
# ------------------------------------------------------------------
async def start(self) -> None:
"""启动 tick 循环"""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._loop())
logger.info("Ticker started (interval=%.1fs)", self.tick_interval)
async def stop(self) -> None:
"""停止 tick 循环"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Ticker stopped (ticks=%d)", self._tick_count)
@property
def tick_count(self) -> int:
return self._tick_count
@property
def is_running(self) -> bool:
return self._running
# ------------------------------------------------------------------
# 主循环
# ------------------------------------------------------------------
async def _loop(self) -> None:
"""Tick 主循环"""
while self._running:
try:
await self.tick()
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Tick %d error", self._tick_count + 1)
if self.max_ticks and self._tick_count >= self.max_ticks:
logger.info("Max ticks reached (%d), stopping", self.max_ticks)
self._running = False
break
try:
await asyncio.sleep(self.tick_interval)
except asyncio.CancelledError:
return
# ------------------------------------------------------------------
# 单次 tick
# ------------------------------------------------------------------
async def tick(self) -> Dict[str, Any]:
"""执行一次 tick,遍历所有 active 项目"""
self._tick_count += 1
tick_num = self._tick_count
results: Dict[str, Any] = {
"tick": tick_num,
"projects": {},
}
projects = self.registry.list_projects()
active_projects = {
pid: info for pid, info in projects.items()
if info.get("status") == "active"
}
for project_id, project_info in active_projects.items():
try:
pr = await self._tick_project(project_id, project_info)
results["projects"][project_id] = pr
except Exception as e:
logger.exception("Tick %d project %s error", tick_num, project_id)
results["projects"][project_id] = {"error": str(e)}
logger.debug("Tick %d complete: %d projects", tick_num, len(active_projects))
if self.on_tick_complete:
try:
await self.on_tick_complete()
except Exception:
logger.exception("on_tick_complete callback error")
return results
async def _tick_project(self, project_id: str,
project_info: Dict[str, Any]) -> Dict[str, Any]:
"""单项目的 tick 处理"""
project_dir = self.registry.root / project_id
db_path = project_dir / "blackboard.db"
if not db_path.exists():
return {"status": "no_db"}
queries = Queries(db_path)
result: Dict[str, Any] = {
"status": "ok",
"summary_before": {},
"summary_after": {},
"advanced": [],
}
# 1. 扫描当前状态
result["summary_before"] = queries.task_summary()
# 2. 依赖推进
advanced = self._advance_dependencies(db_path)
result["advanced"] = advanced
# 3. 写 daemon_tick 事件
bb = Blackboard(db_path)
bb.add_event(
event_type="daemon_tick",
detail={"tick": self._tick_count, "advanced_count": len(advanced)},
)
# 4. 扫描后状态
result["summary_after"] = queries.task_summary()
return result
# ------------------------------------------------------------------
# 依赖推进
# ------------------------------------------------------------------
def _advance_dependencies(self, db_path: Path) -> List[str]:
"""检查 blocked 任务,若所有依赖已完成则推进为 pending
Returns:
被推进的 task_id 列表
"""
queries = Queries(db_path)
blocked = queries.blocked_tasks_with_deps()
advanced: List[str] = []
if not blocked:
return advanced
bb = Blackboard(db_path)
for item in blocked:
if item["all_deps_done"]:
task_id = item["task_id"]
ok = bb.update_task_status(
task_id, "pending",
agent="daemon",
detail={"reason": "all_dependencies_done"},
)
if ok:
advanced.append(task_id)
logger.info("Advanced %s: blocked → pending", task_id)
return advanced
# ------------------------------------------------------------------
# 手动 tickAPI 端点触发)
# ------------------------------------------------------------------
async def manual_tick(self) -> Dict[str, Any]:
"""手动触发一次 tick"""
logger.info("Manual tick triggered")
result = await self.tick()
result["manual"] = True
return result