Files
sanguo_moziplus_v2/src/main.py
T
cfdaily 25863634c2 fix: v3.0→HEAD review 修复 — handler 注册 + review verdict + skill 全文注入
基于庞统+司马懿背靠背 review,修复 6 个问题:

P0 致命:
- A1: _legacy_on_complete 补回 review verdict 处理(approved→done,非 approved→@mention assignee)
- A2: 添加 TaskTypeRegistry.register() 启动初始化(注册 Task/Mail/Toolchain handler)

P1 中等:
- B11-1: RoleSkillSection 从索引提示改为全文注入(对齐设计 §2.3 + BootstrapBuilder 行为)
- A8: retry prompt is_mail 硬编码改走 TaskTypeRegistry handler 判断

P2 低:
- _mail_* 4 个方法添加 DEPRECATED 注释
- ticker.py handler check_completion 代码块缩进对齐(28→24 空格)

测试:394 passed, 0 failed
Review reports: docs/design/review-v3-vs-head-{pangtong,simayi}.md
2026-06-11 08:08:20 +08:00

340 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""v2.6 主入口 - FastAPI + Daemon ticker 共享 asyncio event loop"""
from __future__ import annotations
from src.api.toolchain_routes import router as toolchain_router
from src.api.mail_routes import router as mail_router
from src.api.sse_routes import router as sse_router
from src.api.project_routes import router as project_router
from src.api.daemon_routes import router as daemon_router
from src.api.checkpoint_routes import router as checkpoint_router
from src.api.blackboard_routes import router as blackboard_router
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
import yaml
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
from src.daemon.task_handler import TaskHandler
from src.daemon.mail_handler import MailHandler
from src.daemon.toolchain_handler import ToolchainHandler
from src.daemon.task_type_registry import TaskTypeRegistry
from src.daemon.spawner import AgentSpawner
from src.daemon.bootstrap import BootstrapBuilder
from src.daemon.dispatcher import Dispatcher
from src.daemon.counter import ActiveAgentCounter
from src.daemon.router import AgentRouter, AgentProfile
from src.daemon.health import HealthChecker
from src.daemon.experience import ExperienceDistiller, ExperienceStore
from src.daemon.inbox import InboxWatcher
from src.daemon.guardrails import GuardrailEngine
from src.utils import get_data_root
logger = logging.getLogger("moziplus-v2")
# ---------------------------------------------------------------------------
# 日志配置:确保 PM2 能捕获所有命名空间日志
# ---------------------------------------------------------------------------
_logging_configured = False
def _setup_logging():
"""配置 logging,把 moziplus-v2 命名空间接到 root handler"""
global _logging_configured
if _logging_configured:
return
_logging_configured = True
# 确保 moziplus-v2 命名空间日志输出到 stderr(PM2 捕获)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
for name in ("moziplus-v2", "moziplus-v2.ticker", "moziplus-v2.spawner",
"moziplus-v2.dispatcher", "moziplus-v2.counter",
"moziplus-v2.health", "moziplus-v2.inbox", "moziplus-v2.experience"):
lg = logging.getLogger(name)
lg.setLevel(logging.INFO)
lg.addHandler(handler)
lg.propagate = False # 避免重复
_setup_logging()
# ---------------------------------------------------------------------------
# 配置加载
# ---------------------------------------------------------------------------
DEFAULT_CONFIG_PATH = Path(__file__).parent.parent / "config" / "default.yaml"
def load_config() -> dict:
"""加载全局默认配置"""
if DEFAULT_CONFIG_PATH.exists():
with open(DEFAULT_CONFIG_PATH) as f:
return yaml.safe_load(f) or {}
return {}
config = load_config()
# ---------------------------------------------------------------------------
# 全局组件
# ---------------------------------------------------------------------------
DATA_ROOT = get_data_root()
ticker: Optional[Ticker] = None
def get_ticker() -> Optional[Ticker]:
"""获取全局 Ticker 实例"""
return ticker
def get_registry() -> ProjectRegistry:
"""获取全局项目注册表"""
return ProjectRegistry(DATA_ROOT)
# ---------------------------------------------------------------------------
# FastAPI 生命周期
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
"""启动 Daemon ticker,关闭时清理"""
global ticker
logger.info("moziplus-v2 starting...")
registry = get_registry()
# v2.7: 从旧 YAML 迁移
registry.migrate_from_yaml()
# v2.7: 自动发现新项目
discovered = registry.discover_projects()
if discovered:
logger.info("Auto-discovered projects: %s", discovered)
# v3.0: 自动发现 sanguo_projects 正式项目
sanguo_discovered = registry.discover_sanguo_projects()
if sanguo_discovered:
logger.info("Auto-discovered sanguo projects: %s", sanguo_discovered)
daemon_config = config.get("daemon", {})
tick_interval = daemon_config.get("tick_interval", 30)
max_dispatch = daemon_config.get("max_dispatch_per_tick", 3)
claim_timeout = daemon_config.get("claim_timeout_minutes", 5.0)
task_timeout = daemon_config.get("default_task_timeout_minutes", 30.0)
api_host = daemon_config.get("api_host", "127.0.0.1")
api_port = daemon_config.get("api_port", 8083)
# 创建 Agent 调度组件
counter = ActiveAgentCounter(
max_global=daemon_config.get("max_global_agents", 5),
max_per_session=daemon_config.get("max_per_session", 1),
max_concurrent_sessions=daemon_config.get(
"max_concurrent_sessions", 3),
default_cooldown_seconds=daemon_config.get("cooldown_seconds", 120),
)
# BootstrapBuilderL2 四段式引擎注入层,v2.1)
bootstrap_builder = BootstrapBuilder(max_tokens=4096)
spawner = AgentSpawner(
dry_run=False,
agent_timeout=daemon_config.get("agent_timeout", 630),
gateway_timeout=daemon_config.get("gateway_timeout", 600),
max_retries=daemon_config.get("max_retries", 3),
max_monitor_timeouts=daemon_config.get("max_monitor_timeouts", 3),
api_host=api_host,
api_port=api_port,
counter=counter,
bootstrap_builder=bootstrap_builder,
)
# 构建 Agent 能力画像(v2.6.1 路由)
agent_profiles_config = daemon_config.get("agent_profiles", {})
agent_profiles = {}
for aid, prof in agent_profiles_config.items():
agent_profiles[aid] = AgentProfile(
agent_id=aid,
capabilities=prof.get("capabilities", []),
capabilities_zh=prof.get("capabilities_zh", []),
can_review=prof.get("can_review", False),
max_concurrent=prof.get("max_concurrent", 1),
is_fallback=prof.get("is_fallback", False),
)
# 构建 LLM DriverMode A 路由)
# v3.0: routing 配置已移除(模糊路由 delegate 庞统)
# v3.0: 去掉 LLMDriver(独立 LLM 调用),模糊场景 delegate 庞统
# Router + Dispatcher
router = AgentRouter(
agent_profiles=agent_profiles,
counter=counter,
)
# #03: 注入 router 引用到 spawner(用于 Agent 能力画像)
spawner._router_ref = router
# 获取项目级 DB 路径(用于路由审计日志)
default_db_path = DATA_ROOT / "default" / "blackboard.db"
dispatcher = Dispatcher(
router=router,
spawner=spawner,
counter=counter,
db_path=default_db_path,
guardrails=GuardrailEngine(
config_path=Path(__file__).parent.parent /
"config" /
"guardrails.yaml"),
)
# ── 集成模块 ──
# HealthChecker(僵尸检测)
health_checker = HealthChecker(
zombie_threshold=daemon_config.get("zombie_threshold", 20),
)
# ExperienceDistiller(经验自动蒸馏)
config.get("experience", {})
experience_distiller = ExperienceDistiller(
store=ExperienceStore(store_path=DATA_ROOT / "experiences.jsonl"),
)
# InboxWatcher(即时事件监听)
inbox_config = config.get("inbox", {})
inbox_watcher = InboxWatcher(
inbox_path=DATA_ROOT / inbox_config.get("path", "inbox/daemon.jsonl"),
watch_interval=inbox_config.get("watch_interval", 1.0),
)
# [Step 5] 注册 TaskType handler(必须在 ticker 启动前)
TaskTypeRegistry.register(TaskHandler())
TaskTypeRegistry.register(MailHandler())
TaskTypeRegistry.register(ToolchainHandler())
ticker = Ticker(
registry=registry,
tick_interval=tick_interval,
dispatcher=dispatcher,
spawner=spawner,
max_dispatch_per_tick=max_dispatch,
claim_timeout_minutes=claim_timeout,
default_task_timeout_minutes=task_timeout,
health_checker=health_checker,
experience_distiller=experience_distiller,
inbox_watcher=inbox_watcher,
)
# Phase 1 bug fix: spawner 引用 ticker 用于广播反馈追踪
spawner._ticker = ticker
await ticker.start()
agent_ids = list(agent_profiles.keys())
logger.info("Ticker started (interval=%ss, dispatch=%d/tick, agents=%s)",
tick_interval, max_dispatch, agent_ids)
yield
if ticker:
await ticker.stop()
logger.info("moziplus-v2 stopped")
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(
title="Sanguo MoziPlus v2",
description="AI Native DevOps Platform - Blackboard Architecture",
version="3.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# API 路由注册
# ---------------------------------------------------------------------------
app.include_router(blackboard_router)
app.include_router(checkpoint_router)
app.include_router(daemon_router)
app.include_router(project_router)
app.include_router(sse_router)
app.include_router(mail_router)
app.include_router(toolchain_router)
# ---------------------------------------------------------------------------
# 健康检查端点
# ---------------------------------------------------------------------------
@app.get("/api/healthz")
async def healthz():
"""轻量级健康检查,无需认证"""
return {"status": "ok"}
# ---------------------------------------------------------------------------
# 兼容端点
# ---------------------------------------------------------------------------
@app.get("/api/projects")
async def list_projects_compat():
"""兼容旧端点"""
from src.api.project_routes import _registry
reg = _registry()
return {"projects": {pid: info for pid, info in reg.list_projects().items()
if info.get("status") not in ("archived", "deleted")}}
# ---------------------------------------------------------------------------
# 静态文件服务(前端 dist/,F18 实现)
# ---------------------------------------------------------------------------
DIST_DIR = Path(__file__).parent / "frontend" / "dist"
if DIST_DIR.exists():
# v3.1: 缓存策略 - HTML 不缓存(确保新版本生效),JS/CSS 长缓存(Vite content hash 已处理)
_static_app = StaticFiles(directory=str(DIST_DIR), html=True)
class CachedStaticFiles:
"""包装 StaticFiles,添加 Cache-Control 头"""
def __init__(self, app):
self._app = app
async def __call__(self, scope, receive, send):
original_send = send
async def patched_send(message):
if message.get("type") == "http.response.start":
headers = dict(message.get("headers", []))
path = scope.get("path", "")
if path.endswith(".html") or path == "/":
headers[b"cache-control"] = b"no-cache, no-store, must-revalidate"
elif any(path.endswith(ext) for ext in (".js", ".css", ".woff2", ".png", ".ico")):
headers[b"cache-control"] = b"public, max-age=31536000, immutable"
message["headers"] = list(headers.items())
await original_send(message)
await self._app(scope, receive, patched_send)
app.mount("/", CachedStaticFiles(_static_app), name="frontend")