"""v2.6 主入口 - FastAPI + Daemon ticker 共享 asyncio event loop""" from __future__ import annotations import logging from contextlib import asynccontextmanager from pathlib import Path from typing import Optional import yaml from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from src.blackboard.registry import ProjectRegistry from src.daemon.ticker import Ticker from src.daemon.spawner import AgentSpawner from src.daemon.dispatcher import Dispatcher from src.daemon.counter import ActiveAgentCounter from src.daemon.router import AgentRouter, AgentProfile, LLMDriver from src.utils import get_data_root logger = logging.getLogger("moziplus-v2") # --------------------------------------------------------------------------- # 日志配置:确保 PM2 能捕获所有命名空间日志 # --------------------------------------------------------------------------- _logging_configured = False def _setup_logging(): """配置 logging,把 moziplus-v2 命名空间接到 root handler""" global _logging_configured if _logging_configured: return _logging_configured = True # 确保 moziplus-v2 命名空间日志输出到 stderr(PM2 捕获) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( "%(asctime)s %(levelname)s %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", )) for name in ("moziplus-v2", "moziplus-v2.ticker", "moziplus-v2.spawner", "moziplus-v2.dispatcher", "moziplus-v2.counter"): lg = logging.getLogger(name) lg.setLevel(logging.INFO) lg.addHandler(handler) lg.propagate = False # 避免重复 _setup_logging() # --------------------------------------------------------------------------- # 配置加载 # --------------------------------------------------------------------------- DEFAULT_CONFIG_PATH = Path(__file__).parent.parent / "config" / "default.yaml" def load_config() -> dict: """加载全局默认配置""" if DEFAULT_CONFIG_PATH.exists(): with open(DEFAULT_CONFIG_PATH) as f: return yaml.safe_load(f) or {} return {} config = load_config() # --------------------------------------------------------------------------- # 全局组件 # --------------------------------------------------------------------------- DATA_ROOT = get_data_root() ticker: Optional[Ticker] = None def get_ticker() -> Optional[Ticker]: """获取全局 Ticker 实例""" return ticker def get_registry() -> ProjectRegistry: """获取全局项目注册表""" return ProjectRegistry(DATA_ROOT) # --------------------------------------------------------------------------- # FastAPI 生命周期 # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app: FastAPI): """启动 Daemon ticker,关闭时清理""" global ticker logger.info("moziplus-v2 starting...") registry = get_registry() # v2.7: 从旧 YAML 迁移 registry.migrate_from_yaml() # v2.7: 自动发现新项目 discovered = registry.discover_projects() if discovered: logger.info("Auto-discovered projects: %s", discovered) daemon_config = config.get("daemon", {}) tick_interval = daemon_config.get("tick_interval", 30) max_dispatch = daemon_config.get("max_dispatch_per_tick", 3) claim_timeout = daemon_config.get("claim_timeout_minutes", 5.0) task_timeout = daemon_config.get("default_task_timeout_minutes", 30.0) api_host = daemon_config.get("api_host", "127.0.0.1") api_port = daemon_config.get("api_port", 8083) # 创建 Agent 调度组件 spawner = AgentSpawner( dry_run=False, agent_timeout=daemon_config.get("agent_timeout", 600), api_host=api_host, api_port=api_port, ) counter = ActiveAgentCounter( max_global=daemon_config.get("max_global_agents", 5), max_per_agent=daemon_config.get("max_per_agent", 2), ) # 构建 Agent 能力画像(v2.6.1 路由) agent_profiles_config = daemon_config.get("agent_profiles", {}) agent_profiles = {} for aid, prof in agent_profiles_config.items(): agent_profiles[aid] = AgentProfile( agent_id=aid, capabilities=prof.get("capabilities", []), can_review=prof.get("can_review", False), max_concurrent=prof.get("max_concurrent", 1), is_fallback=prof.get("is_fallback", False), ) # 构建 LLM Driver(Mode A 路由) routing_config = daemon_config.get("routing", {}) llm_driver = None routing_model = routing_config.get("model", "") if routing_model: llm_driver = LLMDriver( model=routing_model, api_base=routing_config.get("api_base", ""), api_key=routing_config.get("api_key", ""), timeout=routing_config.get("timeout", 5.0), max_tokens=routing_config.get("max_tokens", 200), temperature=routing_config.get("temperature", 0.1), ) logger.info("LLM routing driver initialized (model=%s)", routing_model) # 构建 Router + Dispatcher router = AgentRouter( agent_profiles=agent_profiles, llm_driver=llm_driver, counter=counter, ) # 获取项目级 DB 路径(用于路由审计日志) default_db_path = DATA_ROOT / "default" / "blackboard.db" dispatcher = Dispatcher( router=router, spawner=spawner, counter=counter, db_path=default_db_path, ) ticker = Ticker( registry=registry, tick_interval=tick_interval, dispatcher=dispatcher, spawner=spawner, max_dispatch_per_tick=max_dispatch, claim_timeout_minutes=claim_timeout, default_task_timeout_minutes=task_timeout, ) await ticker.start() agent_ids = list(agent_profiles.keys()) logger.info("Ticker started (interval=%ss, dispatch=%d/tick, agents=%s, llm=%s)", tick_interval, max_dispatch, agent_ids, routing_model) yield if ticker: await ticker.stop() logger.info("moziplus-v2 stopped") # --------------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------------- app = FastAPI( title="Sanguo MoziPlus v2", description="AI Native DevOps Platform - Blackboard Architecture", version="2.7.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------------------- # API 路由注册 # --------------------------------------------------------------------------- from src.api.blackboard_routes import router as blackboard_router from src.api.daemon_routes import router as daemon_router from src.api.project_routes import router as project_router from src.api.sse_routes import router as sse_router from src.api.mail_routes import router as mail_router app.include_router(blackboard_router) app.include_router(daemon_router) app.include_router(project_router) app.include_router(sse_router) app.include_router(mail_router) # --------------------------------------------------------------------------- # 兼容端点 # --------------------------------------------------------------------------- @app.get("/api/projects") async def list_projects_compat(): """兼容旧端点""" from src.api.project_routes import _registry reg = _registry() return {"projects": {pid: info for pid, info in reg.list_projects().items() if info.get("status") != "archived"}} # --------------------------------------------------------------------------- # 静态文件服务(前端 dist/,F18 实现) # --------------------------------------------------------------------------- DIST_DIR = Path(__file__).parent / "frontend" / "dist" if DIST_DIR.exists(): app.mount("/", StaticFiles(directory=str(DIST_DIR), html=True), name="frontend")