test: monitor test
This commit is contained in:
Executable
+193
@@ -0,0 +1,193 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
简单的文件监控脚本
|
||||
使用轮询方式检查文件变化,触发同步
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import subprocess
|
||||
import logging
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# 配置
|
||||
PROJECT_DIR = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live"
|
||||
LOG_FILE = os.path.join(PROJECT_DIR, "simple-watcher.log")
|
||||
SYNC_SCRIPT = os.path.join(PROJECT_DIR, "auto-sync.sh")
|
||||
LOCK_FILE = "/tmp/sanguo_sync.lock"
|
||||
CHECK_INTERVAL = 2 # 检查间隔(秒)
|
||||
IGNORE_EXTENSIONS = ['.log', '.tmp', '~']
|
||||
IGNORE_DIRS = ['.git']
|
||||
|
||||
# 设置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='[%(asctime)s] %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler(LOG_FILE),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class FileWatcher:
|
||||
def __init__(self, directory):
|
||||
self.directory = Path(directory)
|
||||
self.last_modified = {}
|
||||
self.running = True
|
||||
|
||||
# 初始化文件状态
|
||||
self._init_file_states()
|
||||
|
||||
def _init_file_states(self):
|
||||
"""初始化文件修改时间记录"""
|
||||
for root, dirs, files in os.walk(self.directory):
|
||||
# 跳过忽略的目录
|
||||
dirs[:] = [d for d in dirs if d not in IGNORE_DIRS]
|
||||
|
||||
for file in files:
|
||||
# 跳过忽略的文件类型
|
||||
if any(file.endswith(ext) for ext in IGNORE_EXTENSIONS):
|
||||
continue
|
||||
|
||||
filepath = Path(root) / file
|
||||
try:
|
||||
self.last_modified[str(filepath)] = filepath.stat().st_mtime
|
||||
except (OSError, FileNotFoundError):
|
||||
pass
|
||||
|
||||
def _should_ignore(self, filepath):
|
||||
"""检查是否应该忽略该文件"""
|
||||
path_str = str(filepath)
|
||||
|
||||
# 检查文件扩展名
|
||||
if any(path_str.endswith(ext) for ext in IGNORE_EXTENSIONS):
|
||||
return True
|
||||
|
||||
# 检查目录
|
||||
for ignore_dir in IGNORE_DIRS:
|
||||
if f"/{ignore_dir}/" in path_str or path_str.endswith(f"/{ignore_dir}"):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def check_for_changes(self):
|
||||
"""检查文件变化"""
|
||||
changes_detected = False
|
||||
|
||||
for root, dirs, files in os.walk(self.directory):
|
||||
# 跳过忽略的目录
|
||||
dirs[:] = [d for d in dirs if d not in IGNORE_DIRS]
|
||||
|
||||
for file in files:
|
||||
filepath = Path(root) / file
|
||||
filepath_str = str(filepath)
|
||||
|
||||
# 检查是否应该忽略
|
||||
if self._should_ignore(filepath):
|
||||
continue
|
||||
|
||||
try:
|
||||
current_mtime = filepath.stat().st_mtime
|
||||
last_mtime = self.last_modified.get(filepath_str)
|
||||
|
||||
if last_mtime is None:
|
||||
# 新文件
|
||||
self.last_modified[filepath_str] = current_mtime
|
||||
changes_detected = True
|
||||
logger.info(f"New file detected: {filepath.relative_to(self.directory)}")
|
||||
elif current_mtime > last_mtime:
|
||||
# 文件被修改
|
||||
self.last_modified[filepath_str] = current_mtime
|
||||
changes_detected = True
|
||||
logger.info(f"File modified: {filepath.relative_to(self.directory)}")
|
||||
|
||||
except (OSError, FileNotFoundError):
|
||||
# 文件被删除
|
||||
if filepath_str in self.last_modified:
|
||||
del self.last_modified[filepath_str]
|
||||
changes_detected = True
|
||||
logger.info(f"File deleted: {filepath.relative_to(self.directory)}")
|
||||
|
||||
return changes_detected
|
||||
|
||||
def run_sync(self):
|
||||
"""运行同步脚本"""
|
||||
# 检查锁文件
|
||||
if os.path.exists(LOCK_FILE):
|
||||
logger.info("Sync already in progress, skipping...")
|
||||
return
|
||||
|
||||
# 创建锁文件
|
||||
try:
|
||||
with open(LOCK_FILE, 'w') as f:
|
||||
f.write(str(datetime.now()))
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
logger.info("Detected file change, running sync...")
|
||||
|
||||
# 运行同步脚本
|
||||
result = subprocess.run([SYNC_SCRIPT], capture_output=True, text=True)
|
||||
|
||||
if result.returncode == 0:
|
||||
logger.info("Sync completed successfully")
|
||||
else:
|
||||
logger.error(f"Sync failed with code {result.returncode}")
|
||||
if result.stderr:
|
||||
logger.error(f"Error output: {result.stderr}")
|
||||
|
||||
finally:
|
||||
# 删除锁文件
|
||||
try:
|
||||
os.remove(LOCK_FILE)
|
||||
except:
|
||||
pass
|
||||
|
||||
def start(self):
|
||||
"""开始监控"""
|
||||
logger.info(f"Starting file watcher in {self.directory}")
|
||||
logger.info(f"Check interval: {CHECK_INTERVAL} seconds")
|
||||
logger.info(f"Sync script: {SYNC_SCRIPT}")
|
||||
|
||||
try:
|
||||
while self.running:
|
||||
if self.check_for_changes():
|
||||
self.run_sync()
|
||||
# 同步后等待几秒避免频繁触发
|
||||
time.sleep(3)
|
||||
|
||||
time.sleep(CHECK_INTERVAL)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("File watcher stopped by user")
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error: {e}")
|
||||
raise
|
||||
|
||||
def stop(self):
|
||||
"""停止监控"""
|
||||
self.running = False
|
||||
|
||||
def main():
|
||||
# 确保同步脚本存在且可执行
|
||||
if not os.path.exists(SYNC_SCRIPT):
|
||||
logger.error(f"Sync script not found: {SYNC_SCRIPT}")
|
||||
sys.exit(1)
|
||||
|
||||
# 确保可执行
|
||||
if not os.access(SYNC_SCRIPT, os.X_OK):
|
||||
os.chmod(SYNC_SCRIPT, 0o755)
|
||||
|
||||
# 创建监控器
|
||||
watcher = FileWatcher(PROJECT_DIR)
|
||||
|
||||
# 开始监控
|
||||
watcher.start()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user