Files
2026-04-29 20:15:43 +08:00

114 lines
3.4 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
sanguo_quant_live 文件自动监听
文件变化自动同步到 NAS → 重建容器 → 运行回测 → 保存结果
完全无人值守,不需要任何手动操作!
启动:
nohup python auto_watcher.py > auto_watcher.log 2>&1 &
停止:
pkill -f auto_watcher.py
"""
import os
import sys
import time
import subprocess
from datetime import datetime
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 配置
PROJECT_DIR = Path("/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live")
CI_CD_SCRIPT = Path("/Users/chufeng/.openclaw/workspace-jiangwei/sync_and_redeploy.sh")
DEBOUNCE_SECONDS = 30 # 防抖,避免多次触发
class SanguoChangeHandler(FileSystemEventHandler):
def __init__(self):
self.last_deploy = 0
self.debounce_seconds = DEBOUNCE_SECONDS
def on_any_event(self, event):
# 忽略这些文件
if any([
event.src_path.endswith('.git/'),
event.src_path.endswith('.__pycache__/'),
event.src_path.endswith('.ipynb_checkpoints/'),
event.src_path.endswith('.DS_Store'),
event.src_path.endswith('.log'),
event.src_path.endswith('backtest_results/'),
]):
return
# 只关心这些类型的文件
if not any([
event.src_path.endswith('.py'),
event.src_path.endswith('.md'),
event.src_path.endswith('.ipynb'),
event.src_path.endswith('.yml'),
event.src_path.endswith('.yaml'),
event.src_path.endswith('Dockerfile'),
event.src_path.endswith('docker-compose.yml'),
]):
return
# 防抖处理
now = time.time()
if now - self.last_deploy < self.debounce_seconds:
return
self.last_deploy = now
print("\n" + "="*60)
print(f"📝 检测到文件变化: {event.src_path}")
print(f"🕐 时间: {datetime.now()}")
print("🚀 自动触发部署回测...")
print("="*60 + "\n")
# 执行自动部署
try:
result = subprocess.run([str(CI_CD_SCRIPT)], capture_output=False)
if result.returncode == 0:
print("\n" + "✅ 自动化部署回测完成!")
else:
print("\n" + "⚠️ 部署过程中有错误,请检查日志")
print("\n" + "="*60 + "\n")
except Exception as e:
print(f"\n❌ 部署失败: {e}")
print("\n" + "="*60 + "\n")
def main():
print("============================================")
print(" sanguo_quant_live 自动监听服务")
print("============================================")
print()
print(f"📂 监听目录: {PROJECT_DIR}")
print(f"📜 部署脚本: {CI_CD_SCRIPT}")
print()
print("🔍 开始监听文件变化...")
print("任何代码修改(策略 OR 框架 OR 配置)都会自动触发部署回测!")
print()
event_handler = SanguoChangeHandler()
observer = Observer()
observer.schedule(event_handler, str(PROJECT_DIR), recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()