112 lines
3.3 KiB
Python
112 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
快速VPN和AKShare可用性测试
|
|
"""
|
|
import sys
|
|
import os
|
|
import time
|
|
import subprocess
|
|
import json
|
|
from datetime import datetime
|
|
|
|
print("="*60)
|
|
print("🧪 快速VPN和AKShare可用性测试")
|
|
print("="*60)
|
|
|
|
# 测试VPN连接性
|
|
print("\n1. 测试VPN连接性...")
|
|
test_targets = [
|
|
("baidu", "www.baidu.com"),
|
|
("akshare_data", "push2his.eastmoney.com"),
|
|
("github", "github.com")
|
|
]
|
|
|
|
vpn_results = {}
|
|
for name, host in test_targets:
|
|
print(f" 测试 {name} ({host})...", end="")
|
|
try:
|
|
result = subprocess.run(
|
|
["curl", "-I", "-m", "5", f"http://{host}"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
print(" ✅ 连接正常")
|
|
vpn_results[name] = {"status": "success"}
|
|
else:
|
|
print(" ⚠️ 连接失败")
|
|
vpn_results[name] = {"status": "failed"}
|
|
except Exception as e:
|
|
print(f" ❌ 错误: {str(e)[:30]}")
|
|
vpn_results[name] = {"status": "error", "error": str(e)}
|
|
|
|
# 测试AKShare数据源
|
|
print("\n2. 测试AKShare数据源...")
|
|
try:
|
|
import akshare as ak
|
|
|
|
print(" AKShare版本:", ak.__version__)
|
|
|
|
# 测试1分钟数据
|
|
print(" 测试1分钟数据...", end="")
|
|
try:
|
|
data = ak.stock_zh_a_minute(symbol='sh000001', period='1', adjust='hfq')
|
|
if data is not None and not data.empty:
|
|
print(f" ✅ 成功获取 {len(data)} 条记录")
|
|
akshare_status = "success"
|
|
else:
|
|
print(" ⚠️ 数据为空")
|
|
akshare_status = "empty"
|
|
except Exception as e:
|
|
print(f" ❌ 错误: {str(e)[:30]}")
|
|
akshare_status = "error"
|
|
|
|
except ImportError:
|
|
print(" ❌ AKShare未安装,请运行: pip install akshare")
|
|
akshare_status = "not_installed"
|
|
|
|
# 结果总结
|
|
print("\n" + "="*60)
|
|
print("📋 测试结果总结")
|
|
print("="*60)
|
|
|
|
print("VPN连接性:")
|
|
success_count = sum(1 for v in vpn_results.values() if v.get("status") == "success")
|
|
for name, result in vpn_results.items():
|
|
status_icon = "✅" if result.get("status") == "success" else "❌"
|
|
print(f" {status_icon} {name}: {result.get('status')}")
|
|
|
|
print("\nAKShare可用性:")
|
|
if akshare_status == "success":
|
|
print(" ✅ AKShare数据源可用")
|
|
elif akshare_status == "empty":
|
|
print(" ⚠️ AKShare连接正常但数据为空")
|
|
elif akshare_status == "not_installed":
|
|
print(" ❌ AKShare未安装")
|
|
else:
|
|
print(f" ❌ AKShare异常: {akshare_status}")
|
|
|
|
print("\n" + "="*60)
|
|
print("🎯 总体评估:")
|
|
if success_count >= 2 and akshare_status == "success":
|
|
print("✅ 网络和VPN环境良好,可以开始分钟数据下载")
|
|
else:
|
|
print("⚠️ 网络或VPN环境需要优化")
|
|
|
|
print("\n🚀 下一步: 等待姜维将军提供NAS配置信息")
|
|
print("="*60)
|
|
|
|
# 保存测试结果
|
|
results = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"vpn_tests": vpn_results,
|
|
"akshare_status": akshare_status,
|
|
"overall_assessment": "good" if (success_count >= 2 and akshare_status == "success") else "needs_improvement"
|
|
}
|
|
|
|
result_file = os.path.join(os.path.dirname(__file__), "quick_test_results.json")
|
|
with open(result_file, 'w', encoding='utf-8') as f:
|
|
json.dump(results, f, indent=2)
|
|
|
|
print(f"\n📄 详细测试结果已保存: {result_file}") |