Files
sanguo_moziplus_v2/src/main.py
T
2026-05-18 00:40:24 +08:00

254 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""v2.6 主入口 - FastAPI + Daemon ticker 共享 asyncio event loop"""
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
import yaml
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
from src.daemon.spawner import AgentSpawner
from src.daemon.dispatcher import Dispatcher
from src.daemon.counter import ActiveAgentCounter
from src.daemon.router import AgentRouter, AgentProfile, LLMDriver
from src.utils import get_data_root
logger = logging.getLogger("moziplus-v2")
# ---------------------------------------------------------------------------
# 日志配置:确保 PM2 能捕获所有命名空间日志
# ---------------------------------------------------------------------------
_logging_configured = False
def _setup_logging():
"""配置 logging,把 moziplus-v2 命名空间接到 root handler"""
global _logging_configured
if _logging_configured:
return
_logging_configured = True
# 确保 moziplus-v2 命名空间日志输出到 stderr(PM2 捕获)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
for name in ("moziplus-v2", "moziplus-v2.ticker", "moziplus-v2.spawner",
"moziplus-v2.dispatcher", "moziplus-v2.counter"):
lg = logging.getLogger(name)
lg.setLevel(logging.INFO)
lg.addHandler(handler)
lg.propagate = False # 避免重复
_setup_logging()
# ---------------------------------------------------------------------------
# 配置加载
# ---------------------------------------------------------------------------
DEFAULT_CONFIG_PATH = Path(__file__).parent.parent / "config" / "default.yaml"
def load_config() -> dict:
"""加载全局默认配置"""
if DEFAULT_CONFIG_PATH.exists():
with open(DEFAULT_CONFIG_PATH) as f:
return yaml.safe_load(f) or {}
return {}
config = load_config()
# ---------------------------------------------------------------------------
# 全局组件
# ---------------------------------------------------------------------------
DATA_ROOT = get_data_root()
ticker: Optional[Ticker] = None
def get_ticker() -> Optional[Ticker]:
"""获取全局 Ticker 实例"""
return ticker
def get_registry() -> ProjectRegistry:
"""获取全局项目注册表"""
return ProjectRegistry(DATA_ROOT)
# ---------------------------------------------------------------------------
# FastAPI 生命周期
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
"""启动 Daemon ticker,关闭时清理"""
global ticker
logger.info("moziplus-v2 starting...")
registry = get_registry()
# v2.7: 从旧 YAML 迁移
registry.migrate_from_yaml()
# v2.7: 自动发现新项目
discovered = registry.discover_projects()
if discovered:
logger.info("Auto-discovered projects: %s", discovered)
daemon_config = config.get("daemon", {})
tick_interval = daemon_config.get("tick_interval", 30)
max_dispatch = daemon_config.get("max_dispatch_per_tick", 3)
claim_timeout = daemon_config.get("claim_timeout_minutes", 5.0)
task_timeout = daemon_config.get("default_task_timeout_minutes", 30.0)
api_host = daemon_config.get("api_host", "127.0.0.1")
api_port = daemon_config.get("api_port", 8083)
# 创建 Agent 调度组件
spawner = AgentSpawner(
dry_run=False,
agent_timeout=daemon_config.get("agent_timeout", 600),
api_host=api_host,
api_port=api_port,
)
counter = ActiveAgentCounter(
max_global=daemon_config.get("max_global_agents", 5),
max_per_agent=daemon_config.get("max_per_agent", 2),
)
# 构建 Agent 能力画像(v2.6.1 路由)
agent_profiles_config = daemon_config.get("agent_profiles", {})
agent_profiles = {}
for aid, prof in agent_profiles_config.items():
agent_profiles[aid] = AgentProfile(
agent_id=aid,
capabilities=prof.get("capabilities", []),
can_review=prof.get("can_review", False),
max_concurrent=prof.get("max_concurrent", 1),
is_fallback=prof.get("is_fallback", False),
)
# 构建 LLM DriverMode A 路由)
routing_config = daemon_config.get("routing", {})
llm_driver = None
routing_model = routing_config.get("model", "")
if routing_model:
llm_driver = LLMDriver(
model=routing_model,
api_base=routing_config.get("api_base", ""),
api_key=routing_config.get("api_key", ""),
timeout=routing_config.get("timeout", 5.0),
max_tokens=routing_config.get("max_tokens", 200),
temperature=routing_config.get("temperature", 0.1),
)
logger.info("LLM routing driver initialized (model=%s)", routing_model)
# 构建 Router + Dispatcher
router = AgentRouter(
agent_profiles=agent_profiles,
llm_driver=llm_driver,
counter=counter,
)
# 获取项目级 DB 路径(用于路由审计日志)
default_db_path = DATA_ROOT / "default" / "blackboard.db"
dispatcher = Dispatcher(
router=router,
spawner=spawner,
counter=counter,
db_path=default_db_path,
)
ticker = Ticker(
registry=registry,
tick_interval=tick_interval,
dispatcher=dispatcher,
spawner=spawner,
max_dispatch_per_tick=max_dispatch,
claim_timeout_minutes=claim_timeout,
default_task_timeout_minutes=task_timeout,
)
await ticker.start()
agent_ids = list(agent_profiles.keys())
logger.info("Ticker started (interval=%ss, dispatch=%d/tick, agents=%s, llm=%s)",
tick_interval, max_dispatch, agent_ids, routing_model)
yield
if ticker:
await ticker.stop()
logger.info("moziplus-v2 stopped")
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(
title="Sanguo MoziPlus v2",
description="AI Native DevOps Platform - Blackboard Architecture",
version="2.7.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# API 路由注册
# ---------------------------------------------------------------------------
from src.api.blackboard_routes import router as blackboard_router
from src.api.daemon_routes import router as daemon_router
from src.api.project_routes import router as project_router
from src.api.sse_routes import router as sse_router
from src.api.card_routes import router as card_router
from src.api.mail_routes import router as mail_router
app.include_router(blackboard_router)
app.include_router(daemon_router)
app.include_router(project_router)
app.include_router(sse_router)
app.include_router(card_router)
app.include_router(mail_router)
# ---------------------------------------------------------------------------
# 兼容端点
# ---------------------------------------------------------------------------
@app.get("/api/projects")
async def list_projects_compat():
"""兼容旧端点"""
from src.api.project_routes import _registry
reg = _registry()
return {"projects": {pid: info for pid, info in reg.list_projects().items()
if info.get("status") != "archived"}}
# ---------------------------------------------------------------------------
# 静态文件服务(前端 dist/,F18 实现)
# ---------------------------------------------------------------------------
DIST_DIR = Path(__file__).parent / "frontend" / "dist"
if DIST_DIR.exists():
app.mount("/", StaticFiles(directory=str(DIST_DIR), html=True), name="frontend")