#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ sanguo_quant_live 文件自动监听 文件变化自动同步到 NAS → 重建容器 → 运行回测 → 保存结果 完全无人值守,不需要任何手动操作! 启动: nohup python auto_watcher.py > auto_watcher.log 2>&1 & 停止: pkill -f auto_watcher.py """ import os import sys import time import subprocess from datetime import datetime from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # 配置 PROJECT_DIR = Path("/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live") CI_CD_SCRIPT = Path("/Users/chufeng/.openclaw/workspace-jiangwei/sync_and_redeploy.sh") DEBOUNCE_SECONDS = 30 # 防抖,避免多次触发 class SanguoChangeHandler(FileSystemEventHandler): def __init__(self): self.last_deploy = 0 self.debounce_seconds = DEBOUNCE_SECONDS def on_any_event(self, event): # 忽略这些文件 if any([ event.src_path.endswith('.git/'), event.src_path.endswith('.__pycache__/'), event.src_path.endswith('.ipynb_checkpoints/'), event.src_path.endswith('.DS_Store'), event.src_path.endswith('.log'), event.src_path.endswith('backtest_results/'), ]): return # 只关心这些类型的文件 if not any([ event.src_path.endswith('.py'), event.src_path.endswith('.md'), event.src_path.endswith('.ipynb'), event.src_path.endswith('.yml'), event.src_path.endswith('.yaml'), event.src_path.endswith('Dockerfile'), event.src_path.endswith('docker-compose.yml'), ]): return # 防抖处理 now = time.time() if now - self.last_deploy < self.debounce_seconds: return self.last_deploy = now print("\n" + "="*60) print(f"📝 检测到文件变化: {event.src_path}") print(f"🕐 时间: {datetime.now()}") print("🚀 自动触发部署回测...") print("="*60 + "\n") # 执行自动部署 try: result = subprocess.run([str(CI_CD_SCRIPT)], capture_output=False) if result.returncode == 0: print("\n" + "✅ 自动化部署回测完成!") else: print("\n" + "⚠️ 部署过程中有错误,请检查日志") print("\n" + "="*60 + "\n") except Exception as e: print(f"\n❌ 部署失败: {e}") print("\n" + "="*60 + "\n") def main(): print("============================================") print(" sanguo_quant_live 自动监听服务") print("============================================") print() print(f"📂 监听目录: {PROJECT_DIR}") print(f"📜 部署脚本: {CI_CD_SCRIPT}") print() print("🔍 开始监听文件变化...") print("任何代码修改(策略 OR 框架 OR 配置)都会自动触发部署回测!") print() event_handler = SanguoChangeHandler() observer = Observer() observer.schedule(event_handler, str(PROJECT_DIR), recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() if __name__ == "__main__": main()