50 lines
1.5 KiB
Python
50 lines
1.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
测试NAS环境和数据源可用性
|
|
"""
|
|
import sys
|
|
import os
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from minute_kline_collector import MinuteKlineCollector
|
|
|
|
print("="*70)
|
|
print("🧪 测试NAS环境和数据源")
|
|
print("="*70)
|
|
|
|
# 使用NAS路径初始化收集器
|
|
collector = MinuteKlineCollector(base_dir="/Users/chufeng/nas/stock/minute_kline")
|
|
|
|
# 环境测试
|
|
print("\n🔍 开始环境测试...")
|
|
test_results = collector.test_environment()
|
|
|
|
print("\n📊 环境测试结果:")
|
|
for key, value in test_results.items():
|
|
if isinstance(value, bool):
|
|
print(f" {key}: {'✅ 通过' if value else '❌ 失败'}")
|
|
|
|
if test_results.get("all_passed"):
|
|
print("\n✅ 环境测试全部通过!")
|
|
else:
|
|
print("\n❌ 环境测试有失败项,请检查配置")
|
|
sys.exit(1)
|
|
|
|
# 数据源测试
|
|
print("\n🔍 开始数据源测试...")
|
|
source_results = collector.test_data_source()
|
|
|
|
print("\n📊 数据源测试结果:")
|
|
for timeframe, result in source_results.get("timeframes", {}).items():
|
|
print(f" {timeframe}: {result['status']} - {result['record_count']} 条记录")
|
|
|
|
if all(result["status"] == "available" for result in source_results.get("timeframes", {}).values()):
|
|
print("\n✅ 数据源测试全部通过!")
|
|
else:
|
|
print("\n❌ 数据源测试有失败项,请检查网络")
|
|
sys.exit(1)
|
|
|
|
print("\n" + "="*70)
|
|
print("🎉 所有测试通过!NAS环境就绪,可以开始下载")
|
|
print("="*70)
|