diff --git a/zhaoyun-data/scripts/nas_deployment/quick_vpn_test.py b/zhaoyun-data/scripts/nas_deployment/quick_vpn_test.py new file mode 100644 index 000000000..c09e4bebd --- /dev/null +++ b/zhaoyun-data/scripts/nas_deployment/quick_vpn_test.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +""" +快速VPN和AKShare可用性测试 +""" +import sys +import os +import time +import subprocess +import json +from datetime import datetime + +print("="*60) +print("🧪 快速VPN和AKShare可用性测试") +print("="*60) + +# 测试VPN连接性 +print("\n1. 测试VPN连接性...") +test_targets = [ + ("baidu", "www.baidu.com"), + ("akshare_data", "push2his.eastmoney.com"), + ("github", "github.com") +] + +vpn_results = {} +for name, host in test_targets: + print(f" 测试 {name} ({host})...", end="") + try: + result = subprocess.run( + ["curl", "-I", "-m", "5", f"http://{host}"], + capture_output=True, + text=True, + timeout=10 + ) + if result.returncode == 0: + print(" ✅ 连接正常") + vpn_results[name] = {"status": "success"} + else: + print(" ⚠️ 连接失败") + vpn_results[name] = {"status": "failed"} + except Exception as e: + print(f" ❌ 错误: {str(e)[:30]}") + vpn_results[name] = {"status": "error", "error": str(e)} + +# 测试AKShare数据源 +print("\n2. 测试AKShare数据源...") +try: + import akshare as ak + + print(" AKShare版本:", ak.__version__) + + # 测试1分钟数据 + print(" 测试1分钟数据...", end="") + try: + data = ak.stock_zh_a_minute(symbol='sh000001', period='1', adjust='hfq') + if data is not None and not data.empty: + print(f" ✅ 成功获取 {len(data)} 条记录") + akshare_status = "success" + else: + print(" ⚠️ 数据为空") + akshare_status = "empty" + except Exception as e: + print(f" ❌ 错误: {str(e)[:30]}") + akshare_status = "error" + +except ImportError: + print(" ❌ AKShare未安装,请运行: pip install akshare") + akshare_status = "not_installed" + +# 结果总结 +print("\n" + "="*60) +print("📋 测试结果总结") +print("="*60) + +print("VPN连接性:") +success_count = sum(1 for v in vpn_results.values() if v.get("status") == "success") +for name, result in vpn_results.items(): + status_icon = "✅" if result.get("status") == "success" else "❌" + print(f" {status_icon} {name}: {result.get('status')}") + +print("\nAKShare可用性:") +if akshare_status == "success": + print(" ✅ AKShare数据源可用") +elif akshare_status == "empty": + print(" ⚠️ AKShare连接正常但数据为空") +elif akshare_status == "not_installed": + print(" ❌ AKShare未安装") +else: + print(f" ❌ AKShare异常: {akshare_status}") + +print("\n" + "="*60) +print("🎯 总体评估:") +if success_count >= 2 and akshare_status == "success": + print("✅ 网络和VPN环境良好,可以开始分钟数据下载") +else: + print("⚠️ 网络或VPN环境需要优化") + +print("\n🚀 下一步: 等待姜维将军提供NAS配置信息") +print("="*60) + +# 保存测试结果 +results = { + "timestamp": datetime.now().isoformat(), + "vpn_tests": vpn_results, + "akshare_status": akshare_status, + "overall_assessment": "good" if (success_count >= 2 and akshare_status == "success") else "needs_improvement" +} + +result_file = os.path.join(os.path.dirname(__file__), "quick_test_results.json") +with open(result_file, 'w', encoding='utf-8') as f: + json.dump(results, f, indent=2) + +print(f"\n📄 详细测试结果已保存: {result_file}") \ No newline at end of file