212 lines
7.1 KiB
Python
Executable File
212 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
简单的文件监控脚本
|
|
使用轮询方式检查文件变化,触发同步
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import subprocess
|
|
import logging
|
|
import threading
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# 配置
|
|
PROJECT_DIR = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live"
|
|
SELF_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
LOG_FILE = os.path.join(PROJECT_DIR, "simple-watcher.log")
|
|
SYNC_SCRIPT = os.path.join(PROJECT_DIR, "management/sanguo_auto_sync/auto-sync.sh")
|
|
LOCK_FILE = "/tmp/sanguo_sync.lock"
|
|
CHECK_INTERVAL = 60 # 检查间隔(秒)= 1 分钟
|
|
IGNORE_EXTENSIONS = ['.log', '.tmp', '~', '.pyc']
|
|
IGNORE_DIRS = ['.git', '__pycache__', 'venv', '.venv', 'node_modules']
|
|
|
|
# 设置日志
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='[%(asctime)s] %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class FileWatcher:
|
|
def __init__(self, directory):
|
|
self.directory = Path(directory)
|
|
self.last_modified = {}
|
|
self.running = True
|
|
|
|
# 初始化文件状态
|
|
self._init_file_states()
|
|
|
|
def _init_file_states(self):
|
|
"""初始化文件修改时间记录"""
|
|
for root, dirs, files in os.walk(self.directory):
|
|
# 跳过忽略的目录
|
|
dirs[:] = [d for d in dirs if d not in IGNORE_DIRS]
|
|
|
|
for file in files:
|
|
# 跳过忽略的文件类型
|
|
if any(file.endswith(ext) for ext in IGNORE_EXTENSIONS):
|
|
continue
|
|
|
|
filepath = Path(root) / file
|
|
try:
|
|
self.last_modified[str(filepath)] = filepath.stat().st_mtime
|
|
except (OSError, FileNotFoundError):
|
|
pass
|
|
|
|
def _should_ignore(self, filepath):
|
|
"""检查是否应该忽略该文件"""
|
|
path_str = str(filepath)
|
|
|
|
# 检查文件扩展名
|
|
if any(path_str.endswith(ext) for ext in IGNORE_EXTENSIONS):
|
|
return True
|
|
|
|
# 检查目录
|
|
for ignore_dir in IGNORE_DIRS:
|
|
if f"/{ignore_dir}/" in path_str or path_str.endswith(f"/{ignore_dir}"):
|
|
return True
|
|
|
|
return False
|
|
|
|
def check_for_changes(self):
|
|
"""检查文件变化"""
|
|
changes_detected = False
|
|
|
|
# 第一步:遍历当前目录,检测新增和修改
|
|
for root, dirs, files in os.walk(self.directory):
|
|
# 跳过忽略的目录
|
|
dirs[:] = [d for d in dirs if d not in IGNORE_DIRS]
|
|
|
|
for file in files:
|
|
filepath = Path(root) / file
|
|
filepath_str = str(filepath)
|
|
|
|
# 检查是否应该忽略
|
|
if self._should_ignore(filepath):
|
|
continue
|
|
|
|
try:
|
|
current_mtime = filepath.stat().st_mtime
|
|
last_mtime = self.last_modified.get(filepath_str)
|
|
|
|
if last_mtime is None:
|
|
# 新文件
|
|
self.last_modified[filepath_str] = current_mtime
|
|
changes_detected = True
|
|
logger.info(f"New file detected: {filepath.relative_to(self.directory)}")
|
|
elif current_mtime > last_mtime:
|
|
# 文件被修改
|
|
self.last_modified[filepath_str] = current_mtime
|
|
changes_detected = True
|
|
logger.info(f"File modified: {filepath.relative_to(self.directory)}")
|
|
|
|
except (OSError, FileNotFoundError):
|
|
# 文件被删除
|
|
if filepath_str in self.last_modified:
|
|
del self.last_modified[filepath_str]
|
|
changes_detected = True
|
|
logger.info(f"File deleted: {filepath.relative_to(self.directory)}")
|
|
|
|
# 第二步:反向检查 - 已记录的文件是否还存在
|
|
# 找出已不存在的文件(删除检测)
|
|
existing_files = list(self.last_modified.keys())
|
|
for filepath_str in existing_files:
|
|
filepath = Path(filepath_str)
|
|
if self._should_ignore(filepath):
|
|
continue
|
|
if not filepath.exists():
|
|
# 文件已被删除
|
|
del self.last_modified[filepath_str]
|
|
changes_detected = True
|
|
try:
|
|
rel_path = filepath.relative_to(self.directory)
|
|
logger.info(f"File deleted: {rel_path}")
|
|
except:
|
|
logger.info(f"File deleted: {filepath_str}")
|
|
|
|
return changes_detected
|
|
|
|
def run_sync(self):
|
|
"""运行同步脚本"""
|
|
# 检查锁文件
|
|
if os.path.exists(LOCK_FILE):
|
|
logger.info("Sync already in progress, skipping...")
|
|
return
|
|
|
|
# 创建锁文件
|
|
try:
|
|
with open(LOCK_FILE, 'w') as f:
|
|
f.write(str(datetime.now()))
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
logger.info("Detected file change, running sync...")
|
|
|
|
# 运行同步脚本
|
|
result = subprocess.run([SYNC_SCRIPT], capture_output=True, text=True)
|
|
|
|
if result.returncode == 0:
|
|
logger.info("Sync completed successfully")
|
|
else:
|
|
logger.error(f"Sync failed with code {result.returncode}")
|
|
if result.stderr:
|
|
logger.error(f"Error output: {result.stderr}")
|
|
|
|
finally:
|
|
# 删除锁文件
|
|
try:
|
|
os.remove(LOCK_FILE)
|
|
except:
|
|
pass
|
|
|
|
def start(self):
|
|
"""开始监控"""
|
|
logger.info(f"Starting file watcher in {self.directory}")
|
|
logger.info(f"Check interval: {CHECK_INTERVAL} seconds")
|
|
logger.info(f"Sync script: {SYNC_SCRIPT}")
|
|
|
|
try:
|
|
while self.running:
|
|
if self.check_for_changes():
|
|
self.run_sync()
|
|
# 同步后等待几秒避免频繁触发
|
|
time.sleep(3)
|
|
|
|
time.sleep(CHECK_INTERVAL)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("File watcher stopped by user")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {e}")
|
|
raise
|
|
|
|
def stop(self):
|
|
"""停止监控"""
|
|
self.running = False
|
|
|
|
def main():
|
|
# 确保同步脚本存在且可执行
|
|
if not os.path.exists(SYNC_SCRIPT):
|
|
logger.error(f"Sync script not found: {SYNC_SCRIPT}")
|
|
sys.exit(1)
|
|
|
|
# 确保可执行
|
|
if not os.access(SYNC_SCRIPT, os.X_OK):
|
|
os.chmod(SYNC_SCRIPT, 0o755)
|
|
|
|
# 创建监控器
|
|
watcher = FileWatcher(PROJECT_DIR)
|
|
|
|
# 开始监控
|
|
watcher.start()
|
|
|
|
if __name__ == "__main__":
|
|
main() |