auto-sync: 2026-03-26 17:44:27

This commit is contained in:
cfdaily
2026-03-26 17:44:27 +08:00
parent f204ad974a
commit 41b8e5b11b
@@ -0,0 +1,513 @@
#!/usr/bin/env python3
"""
分钟K线数据源测试脚本
测试各数据源分钟数据的可用性、质量和完整性
"""
import sys
import os
import time
import json
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import logging
import warnings
warnings.filterwarnings('ignore')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MinuteDataSourceTester:
"""分钟数据数据源测试器"""
def __init__(self):
"""初始化测试器"""
logger.info("分钟数据源测试器初始化")
# 测试配置
self.test_symbol = '000001' # 平安银行
self.test_periods = ['1', '5', '15'] # 分钟周期
self.test_dates = {
'recent': ('20240101', '20240110'), # 近期数据
'historical': ('20220101', '20220110') # 历史数据
}
# 结果记录
self.results = {
"timestamp": datetime.now().isoformat(),
"data_sources": {},
"recommendations": []
}
def test_akshare_minute_data(self) -> Dict:
"""测试AKShare分钟数据接口"""
logger.info("开始测试AKShare分钟数据接口")
test_result = {
"source": "akshare",
"test_time": datetime.now().isoformat(),
"availability": {},
"data_quality": {},
"issues": [],
"summary": ""
}
try:
import akshare as ak
# 测试各时间周期的数据可用性
for period in self.test_periods:
period_key = f"{period}min"
logger.info(f" 测试{period_key}数据...")
try:
# 使用接口获取分钟数据
minute_data = ak.stock_zh_a_minute(
symbol=f'sh{self.test_symbol}',
period=period,
adjust='hfq'
)
if minute_data is not None and not minute_data.empty:
test_result["availability"][period_key] = {
"status": "available",
"record_count": len(minute_data),
"date_range": {
"start": minute_data['day'].min() if 'day' in minute_data.columns else None,
"end": minute_data['day'].max() if 'day' in minute_data.columns else None
}
}
# 检查数据质量
quality = self._check_data_quality(minute_data, period)
test_result["data_quality"][period_key] = quality
logger.info(f"{period_key}数据可用,{len(minute_data)}条记录")
else:
test_result["availability"][period_key] = {
"status": "unavailable",
"record_count": 0
}
test_result["issues"].append(f"{period_key}数据为空")
logger.warning(f"{period_key}数据为空")
time.sleep(1) # 避免请求过快
except Exception as e:
error_msg = f"{period_key}数据获取失败: {e}"
test_result["availability"][period_key] = {
"status": "error",
"error": str(e)
}
test_result["issues"].append(error_msg)
logger.error(f"{error_msg}")
# 测试历史数据获取
try:
historical_data = ak.stock_zh_a_hist(
symbol=self.test_symbol,
period='daily',
start_date=self.test_dates['historical'][0],
end_date=self.test_dates['historical'][1]
)
if historical_data is not None and not historical_data.empty:
test_result["historical_availability"] = "available"
logger.info(f" 历史数据可用,{len(historical_data)}条记录")
else:
test_result["historical_availability"] = "unavailable"
test_result["issues"].append("历史数据获取失败")
logger.warning(f" ❌ 历史数据获取失败")
except Exception as e:
test_result["historical_availability"] = "error"
test_result["issues"].append(f"历史数据测试异常: {e}")
logger.error(f" 历史数据测试异常: {e}")
# 统计测试结果
available_count = sum(1 for v in test_result["availability"].values() if v["status"] == "available")
total_count = len(test_result["availability"])
if available_count == total_count:
test_result["summary"] = "✅ AKShare分钟数据接口完全可用"
test_result["overall_status"] = "good"
elif available_count >= 2:
test_result["summary"] = "⚠️ AKShare分钟数据接口部分可用"
test_result["overall_status"] = "warning"
else:
test_result["summary"] = "❌ AKShare分钟数据接口有限"
test_result["overall_status"] = "critical"
except ImportError:
test_result["summary"] = "❌ AKShare未安装"
test_result["overall_status"] = "critical"
test_result["issues"].append("未安装akshare库: pip install akshare")
logger.error("AKShare未安装")
except Exception as e:
test_result["summary"] = f"❌ AKShare测试异常: {e}"
test_result["overall_status"] = "error"
test_result["issues"].append(f"测试过程异常: {e}")
logger.error(f"AKShare测试异常: {e}")
self.results["data_sources"]["akshare"] = test_result
return test_result
def test_tushare_minute_data(self) -> Dict:
"""测试Tushare分钟数据接口(需要API Token)"""
logger.info("开始测试Tushare分钟数据接口")
test_result = {
"source": "tushare",
"test_time": datetime.now().isoformat(),
"availability": {},
"data_quality": {},
"issues": [],
"summary": "",
"requires_token": True
}
try:
import tushare as ts
# 检查是否有Token
if not hasattr(ts, 'get_token') or ts.get_token() == '':
test_result["summary"] = "❌ Tushare API Token未配置"
test_result["overall_status"] = "warning"
test_result["issues"].append("需要配置Tushare Pro API Token")
logger.warning("Tushare API Token未配置,无法测试")
self.results["data_sources"]["tushare"] = test_result
return test_result
# 配置API Token
pro = ts.pro_api()
# 测试各时间周期的数据可用性
for period in self.test_periods:
period_key = f"{period}min"
logger.info(f" 测试{period_key}数据...")
try:
# 构建股票代码
symbol_code = f'{self.test_symbol}.SZ'
# 获取分钟数据
minute_data = ts.pro_bar(
ts_code=symbol_code,
freq=period,
start_date=self.test_dates['recent'][0],
end_date=self.test_dates['recent'][1],
adj='hfq'
)
if minute_data is not None and not minute_data.empty:
test_result["availability"][period_key] = {
"status": "available",
"record_count": len(minute_data),
"date_range": {
"start": minute_data['trade_date'].min() if 'trade_date' in minute_data.columns else None,
"end": minute_data['trade_date'].max() if 'trade_date' in minute_data.columns else None
}
}
# 检查数据质量
quality = self._check_data_quality(minute_data, period)
test_result["data_quality"][period_key] = quality
logger.info(f"{period_key}数据可用,{len(minute_data)}条记录")
else:
test_result["availability"][period_key] = {
"status": "unavailable",
"record_count": 0
}
test_result["issues"].append(f"{period_key}数据为空")
logger.warning(f"{period_key}数据为空")
time.sleep(0.5) # 避免请求过快
except Exception as e:
error_msg = f"{period_key}数据获取失败: {e}"
test_result["availability"][period_key] = {
"status": "error",
"error": str(e)
}
test_result["issues"].append(error_msg)
logger.error(f"{error_msg}")
# 统计测试结果
available_count = sum(1 for v in test_result["availability"].values() if v["status"] == "available")
total_count = len(test_result["availability"])
if available_count == total_count:
test_result["summary"] = "✅ Tushare分钟数据接口完全可用"
test_result["overall_status"] = "good"
elif available_count >= 2:
test_result["summary"] = "⚠️ Tushare分钟数据接口部分可用"
test_result["overall_status"] = "warning"
else:
test_result["summary"] = "❌ Tushare分钟数据接口有限"
test_result["overall_status"] = "critical"
except ImportError:
test_result["summary"] = "❌ Tushare未安装"
test_result["overall_status"] = "critical"
test_result["issues"].append("未安装tushare库: pip install tushare")
logger.error("Tushare未安装")
except Exception as e:
test_result["summary"] = f"❌ Tushare测试异常: {e}"
test_result["overall_status"] = "error"
test_result["issues"].append(f"测试过程异常: {e}")
logger.error(f"Tushare测试异常: {e}")
self.results["data_sources"]["tushare"] = test_result
return test_result
def _check_data_quality(self, data: pd.DataFrame, period: str) -> Dict:
"""检查数据质量
Args:
data: 数据
period: 时间周期
Returns:
Dict: 质量检查结果
"""
quality_result = {
"record_count": len(data),
"date_range": {},
"missing_data": {},
"quality_score": 0
}
try:
# 检查日期范围
date_columns = ['trade_time', 'trade_date', 'day', 'date']
for col in date_columns:
if col in data.columns:
data[col] = pd.to_datetime(data[col], errors='coerce')
quality_result["date_range"]["start"] = data[col].min().isoformat() if pd.notna(data[col].min()) else None
quality_result["date_range"]["end"] = data[col].max().isoformat() if pd.notna(data[col].max()) else None
break
# 检查必要字段
required_columns = ['open', 'high', 'low', 'close', 'volume']
missing_fields = []
for col in required_columns:
if col not in data.columns:
missing_fields.append(col)
if missing_fields:
quality_result["missing_data"]["required_fields"] = missing_fields
# 检查数据完整性
if 'volume' in data.columns:
volume_missing = data['volume'].isnull().sum()
price_missing = sum(data[col].isnull().sum() for col in ['open', 'high', 'low', 'close'] if col in data.columns)
if len(data) > 0:
volume_completeness = 1 - (volume_missing / len(data))
price_completeness = 1 - (price_missing / (4 * len(data))) if len(data) > 0 else 0
quality_result["missing_data"]["volume_missing"] = volume_missing
quality_result["missing_data"]["price_missing"] = price_missing
quality_result["quality_score"] = (volume_completeness + price_completeness) / 2
except Exception as e:
logger.warning(f"数据质量检查失败: {e}")
return quality_result
def generate_test_report(self) -> Dict:
"""生成测试报告"""
# 汇总数据源测试结果
availability_summary = {}
for source_name, source_result in self.results["data_sources"].items():
available_count = sum(1 for v in source_result.get("availability", {}).values() if v.get("status") == "available")
total_count = len(source_result.get("availability", {}))
availability_summary[source_name] = {
"available_periods": available_count,
"total_periods": total_count,
"availability_rate": available_count / total_count if total_count > 0 else 0,
"overall_status": source_result.get("overall_status", "unknown")
}
# 计算总体评估
if availability_summary:
overall_score = sum(info["availability_rate"] for info in availability_summary.values()) / len(availability_summary)
else:
overall_score = 0
# 生成推荐
recommendations = []
if "akshare" in availability_summary:
akshare_score = availability_summary["akshare"]["availability_rate"]
if akshare_score >= 0.8:
recommendations.append("✅ AKShare作为主要免费数据源,质量良好")
elif akshare_score >= 0.5:
recommendations.append("⚠️ AKShare作为备用数据源,部分功能可用")
else:
recommendations.append("❌ AKShare功能性有限,需要其他数据源补充")
if "tushare" in availability_summary:
tushare_score = availability_summary["tushare"]["availability_rate"]
if tushare_score >= 0.9:
recommendations.append("✅ Tushare Pro数据质量优秀,推荐作为主数据源")
elif tushare_score >= 0.7:
recommendations.append("⚠️ Tushare Pro可用,但有部分限制")
else:
recommendations.append("❌ Tushare Pro功能性受限,需验证Token权限")
# 添加综合建议
if "akshare" in availability_summary and "tushare" in availability_summary:
akshare_rate = availability_summary["akshare"]["availability_rate"]
tushare_rate = availability_summary["tushare"]["availability_rate"]
if tushare_rate > 0.9:
recommendations.append("🚀 推荐方案:Tushare Pro为主,AKShare为补充")
elif akshare_rate > 0.8 and tushare_rate < 0.7:
recommendations.append("💰 成本方案:AKShare为主,Tushare为补充")
else:
recommendations.append("🔄 混合方案:AKShare和Tushare混合使用")
# 构建最终报告
final_report = {
"report_timestamp": datetime.now().isoformat(),
"test_summary": {
"total_sources_tested": len(availability_summary),
"overall_score": overall_score,
"data_source_availability": availability_summary
},
"detailed_results": self.results["data_sources"],
"recommendations": recommendations,
"next_steps": [
"验证历史数据获取深度",
"测试批量下载效率",
"建立数据质量监控机制"
],
"test_configuration": {
"test_symbol": self.test_symbol,
"test_periods": self.test_periods,
"test_dates": self.test_dates
}
}
return final_report
def main():
"""主函数"""
print("=" * 70)
print("🧪 分钟K线数据源可用性测试")
print("=" * 70)
print("测试目的: 评估各数据源分钟数据的可用性、质量和完整性")
print("测试范围: 1分钟, 5分钟, 15分钟粒度")
print("测试股票: 000001 (平安银行)")
print()
# 创建测试器
tester = MinuteDataSourceTester()
# 运行测试
print("开始数据源测试...")
print("-" * 50)
# 测试AKShare
print("1. 测试AKShare...")
akshare_result = tester.test_akshare_minute_data()
print()
# 测试Tushare
print("2. 测试Tushare Pro...")
tushare_result = tester.test_tushare_minute_data()
print()
# 生成测试报告
print("生成测试报告...")
report = tester.generate_test_report()
print()
print("=" * 70)
print("📋 测试结果摘要")
print("=" * 70)
# 输出关键信息
if "akshare" in report["data_source_availability"]:
akshare_info = report["data_source_availability"]["akshare"]
print(f"📊 AKShare:")
print(f" 可用周期: {akshare_info['available_periods']}/{akshare_info['total_periods']}")
print(f" 可用率: {akshare_info['availability_rate']*100:.1f}%")
print(f" 状态: {akshare_info['overall_status']}")
if "tushare" in report["data_source_availability"]:
tushare_info = report["data_source_availability"]["tushare"]
print(f"📊 Tushare Pro:")
print(f" 可用周期: {tushare_info['available_periods']}/{tushare_info['total_periods']}")
print(f" 可用率: {tushare_info['availability_rate']*100:.1f}%")
print(f" 状态: {tushare_info['overall_status']}")
print()
print("💡 推荐方案:")
for rec in report["recommendations"]:
print(f"{rec}")
print()
print("🎯 下一步行动:")
for step in report["next_steps"]:
print(f"{step}")
# 保存报告
report_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/reports"
os.makedirs(report_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = os.path.join(report_dir, f"minute_data_source_test_report_{timestamp}.json")
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print()
print(f"📄 详细报告已保存: {report_file}")
print()
print("=" * 70)
print("🎯 测试完成")
print("=" * 70)
if __name__ == "__main__":
main()