Files
sanguo_moziplus_v2/src/main.py
T
cfdaily b69636c408
Deploy / ci (push) Waiting to run
Deploy / deploy (push) Blocked by required conditions
Deploy / notify-deploy-failure (push) Blocked by required conditions
auto-sync: 2026-06-07 11:57:05
2026-06-07 11:57:05 +08:00

315 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""v2.6 主入口 - FastAPI + Daemon ticker 共享 asyncio event loop"""
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
import yaml
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
from src.daemon.spawner import AgentSpawner
from src.daemon.bootstrap import BootstrapBuilder
from src.daemon.dispatcher import Dispatcher
from src.daemon.counter import ActiveAgentCounter
from src.daemon.router import AgentRouter, AgentProfile
from src.daemon.health import HealthChecker
from src.daemon.experience import ExperienceDistiller, ExperienceStore
from src.daemon.inbox import InboxWatcher
from src.daemon.guardrails import GuardrailEngine
from src.utils import get_data_root
logger = logging.getLogger("moziplus-v2")
# ---------------------------------------------------------------------------
# 日志配置:确保 PM2 能捕获所有命名空间日志
# ---------------------------------------------------------------------------
_logging_configured = False
def _setup_logging():
"""配置 logging,把 moziplus-v2 命名空间接到 root handler"""
global _logging_configured
if _logging_configured:
return
_logging_configured = True
# 确保 moziplus-v2 命名空间日志输出到 stderr(PM2 捕获)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
for name in ("moziplus-v2", "moziplus-v2.ticker", "moziplus-v2.spawner",
"moziplus-v2.dispatcher", "moziplus-v2.counter",
"moziplus-v2.health", "moziplus-v2.inbox", "moziplus-v2.experience"):
lg = logging.getLogger(name)
lg.setLevel(logging.INFO)
lg.addHandler(handler)
lg.propagate = False # 避免重复
_setup_logging()
# ---------------------------------------------------------------------------
# 配置加载
# ---------------------------------------------------------------------------
DEFAULT_CONFIG_PATH = Path(__file__).parent.parent / "config" / "default.yaml"
def load_config() -> dict:
"""加载全局默认配置"""
if DEFAULT_CONFIG_PATH.exists():
with open(DEFAULT_CONFIG_PATH) as f:
return yaml.safe_load(f) or {}
return {}
config = load_config()
# ---------------------------------------------------------------------------
# 全局组件
# ---------------------------------------------------------------------------
DATA_ROOT = get_data_root()
ticker: Optional[Ticker] = None
def get_ticker() -> Optional[Ticker]:
"""获取全局 Ticker 实例"""
return ticker
def get_registry() -> ProjectRegistry:
"""获取全局项目注册表"""
return ProjectRegistry(DATA_ROOT)
# ---------------------------------------------------------------------------
# FastAPI 生命周期
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
"""启动 Daemon ticker,关闭时清理"""
global ticker
logger.info("moziplus-v2 starting...")
registry = get_registry()
# v2.7: 从旧 YAML 迁移
registry.migrate_from_yaml()
# v2.7: 自动发现新项目
discovered = registry.discover_projects()
if discovered:
logger.info("Auto-discovered projects: %s", discovered)
# v3.0: 自动发现 sanguo_projects 正式项目
sanguo_discovered = registry.discover_sanguo_projects()
if sanguo_discovered:
logger.info("Auto-discovered sanguo projects: %s", sanguo_discovered)
daemon_config = config.get("daemon", {})
tick_interval = daemon_config.get("tick_interval", 30)
max_dispatch = daemon_config.get("max_dispatch_per_tick", 3)
claim_timeout = daemon_config.get("claim_timeout_minutes", 5.0)
task_timeout = daemon_config.get("default_task_timeout_minutes", 30.0)
api_host = daemon_config.get("api_host", "127.0.0.1")
api_port = daemon_config.get("api_port", 8083)
# 创建 Agent 调度组件
counter = ActiveAgentCounter(
max_global=daemon_config.get("max_global_agents", 5),
max_per_session=daemon_config.get("max_per_session", 1),
max_concurrent_sessions=daemon_config.get("max_concurrent_sessions", 3),
default_cooldown_seconds=daemon_config.get("cooldown_seconds", 120),
)
# BootstrapBuilderL2 四段式引擎注入层,v2.1)
bootstrap_builder = BootstrapBuilder(max_tokens=4096)
spawner = AgentSpawner(
dry_run=False,
agent_timeout=daemon_config.get("agent_timeout", 630),
gateway_timeout=daemon_config.get("gateway_timeout", 600),
max_retries=daemon_config.get("max_retries", 3),
max_monitor_timeouts=daemon_config.get("max_monitor_timeouts", 3),
api_host=api_host,
api_port=api_port,
counter=counter,
bootstrap_builder=bootstrap_builder,
)
# 构建 Agent 能力画像(v2.6.1 路由)
agent_profiles_config = daemon_config.get("agent_profiles", {})
agent_profiles = {}
for aid, prof in agent_profiles_config.items():
agent_profiles[aid] = AgentProfile(
agent_id=aid,
capabilities=prof.get("capabilities", []),
capabilities_zh=prof.get("capabilities_zh", []),
can_review=prof.get("can_review", False),
max_concurrent=prof.get("max_concurrent", 1),
is_fallback=prof.get("is_fallback", False),
)
# 构建 LLM DriverMode A 路由)
# v3.0: routing 配置已移除(模糊路由 delegate 庞统)
# v3.0: 去掉 LLMDriver(独立 LLM 调用),模糊场景 delegate 庞统
# Router + Dispatcher
router = AgentRouter(
agent_profiles=agent_profiles,
counter=counter,
)
# #03: 注入 router 引用到 spawner(用于 Agent 能力画像)
spawner._router_ref = router
# 获取项目级 DB 路径(用于路由审计日志)
default_db_path = DATA_ROOT / "default" / "blackboard.db"
dispatcher = Dispatcher(
router=router,
spawner=spawner,
counter=counter,
db_path=default_db_path,
guardrails=GuardrailEngine(config_path=Path(__file__).parent.parent / "config" / "guardrails.yaml"),
)
# ── 集成模块 ──
# HealthChecker(僵尸检测)
health_checker = HealthChecker(
zombie_threshold=daemon_config.get("zombie_threshold", 20),
)
# ExperienceDistiller(经验自动蒸馏)
experience_config = config.get("experience", {})
experience_distiller = ExperienceDistiller(
store=ExperienceStore(store_path=DATA_ROOT / "experiences.jsonl"),
)
# InboxWatcher(即时事件监听)
inbox_config = config.get("inbox", {})
inbox_watcher = InboxWatcher(
inbox_path=DATA_ROOT / inbox_config.get("path", "inbox/daemon.jsonl"),
watch_interval=inbox_config.get("watch_interval", 1.0),
)
ticker = Ticker(
registry=registry,
tick_interval=tick_interval,
dispatcher=dispatcher,
spawner=spawner,
max_dispatch_per_tick=max_dispatch,
claim_timeout_minutes=claim_timeout,
default_task_timeout_minutes=task_timeout,
health_checker=health_checker,
experience_distiller=experience_distiller,
inbox_watcher=inbox_watcher,
)
# Phase 1 bug fix: spawner 引用 ticker 用于广播反馈追踪
spawner._ticker = ticker
await ticker.start()
agent_ids = list(agent_profiles.keys())
logger.info("Ticker started (interval=%ss, dispatch=%d/tick, agents=%s)",
tick_interval, max_dispatch, agent_ids)
yield
if ticker:
await ticker.stop()
logger.info("moziplus-v2 stopped")
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(
title="Sanguo MoziPlus v2",
description="AI Native DevOps Platform - Blackboard Architecture",
version="3.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# API 路由注册
# ---------------------------------------------------------------------------
from src.api.blackboard_routes import router as blackboard_router
from src.api.checkpoint_routes import router as checkpoint_router
from src.api.daemon_routes import router as daemon_router
from src.api.project_routes import router as project_router
from src.api.sse_routes import router as sse_router
from src.api.mail_routes import router as mail_router
from src.api.toolchain_routes import router as toolchain_router
app.include_router(blackboard_router)
app.include_router(checkpoint_router)
app.include_router(daemon_router)
app.include_router(project_router)
app.include_router(sse_router)
app.include_router(mail_router)
app.include_router(toolchain_router)
# ---------------------------------------------------------------------------
# 兼容端点
# ---------------------------------------------------------------------------
@app.get("/api/projects")
async def list_projects_compat():
"""兼容旧端点"""
from src.api.project_routes import _registry
reg = _registry()
return {"projects": {pid: info for pid, info in reg.list_projects().items()
if info.get("status") not in ("archived", "deleted")}}
# ---------------------------------------------------------------------------
# 静态文件服务(前端 dist/,F18 实现)
# ---------------------------------------------------------------------------
DIST_DIR = Path(__file__).parent / "frontend" / "dist"
if DIST_DIR.exists():
# v3.1: 缓存策略 - HTML 不缓存(确保新版本生效),JS/CSS 长缓存(Vite content hash 已处理)
import mimetypes
_static_app = StaticFiles(directory=str(DIST_DIR), html=True)
class CachedStaticFiles:
"""包装 StaticFiles,添加 Cache-Control 头"""
def __init__(self, app):
self._app = app
async def __call__(self, scope, receive, send):
original_send = send
async def patched_send(message):
if message.get("type") == "http.response.start":
headers = dict(message.get("headers", []))
path = scope.get("path", "")
if path.endswith(".html") or path == "/":
headers[b"cache-control"] = b"no-cache, no-store, must-revalidate"
elif any(path.endswith(ext) for ext in (".js", ".css", ".woff2", ".png", ".ico")):
headers[b"cache-control"] = b"public, max-age=31536000, immutable"
message["headers"] = list(headers.items())
await original_send(message)
await self._app(scope, receive, patched_send)
app.mount("/", CachedStaticFiles(_static_app), name="frontend")