From 41b8e5b11b0fed572dbc00bded3f440caef95d51 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Thu, 26 Mar 2026 17:44:27 +0800 Subject: [PATCH] auto-sync: 2026-03-26 17:44:27 --- .../test_minute_data_sources.py | 513 ++++++++++++++++++ 1 file changed, 513 insertions(+) create mode 100644 zhaoyun-data/scripts/data_acquisition/test_minute_data_sources.py diff --git a/zhaoyun-data/scripts/data_acquisition/test_minute_data_sources.py b/zhaoyun-data/scripts/data_acquisition/test_minute_data_sources.py new file mode 100644 index 000000000..efb5d0a7c --- /dev/null +++ b/zhaoyun-data/scripts/data_acquisition/test_minute_data_sources.py @@ -0,0 +1,513 @@ +#!/usr/bin/env python3 +""" +分钟K线数据源测试脚本 +测试各数据源分钟数据的可用性、质量和完整性 +""" +import sys +import os +import time +import json +import pandas as pd +from datetime import datetime, timedelta +from typing import List, Dict, Optional, Tuple +import logging +import warnings + +warnings.filterwarnings('ignore') + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class MinuteDataSourceTester: + """分钟数据数据源测试器""" + + def __init__(self): + """初始化测试器""" + logger.info("分钟数据源测试器初始化") + + # 测试配置 + self.test_symbol = '000001' # 平安银行 + self.test_periods = ['1', '5', '15'] # 分钟周期 + self.test_dates = { + 'recent': ('20240101', '20240110'), # 近期数据 + 'historical': ('20220101', '20220110') # 历史数据 + } + + # 结果记录 + self.results = { + "timestamp": datetime.now().isoformat(), + "data_sources": {}, + "recommendations": [] + } + + def test_akshare_minute_data(self) -> Dict: + """测试AKShare分钟数据接口""" + logger.info("开始测试AKShare分钟数据接口") + + test_result = { + "source": "akshare", + "test_time": datetime.now().isoformat(), + "availability": {}, + "data_quality": {}, + "issues": [], + "summary": "" + } + + try: + import akshare as ak + + # 测试各时间周期的数据可用性 + for period in self.test_periods: + period_key = f"{period}min" + logger.info(f" 测试{period_key}数据...") + + try: + # 使用接口获取分钟数据 + minute_data = ak.stock_zh_a_minute( + symbol=f'sh{self.test_symbol}', + period=period, + adjust='hfq' + ) + + if minute_data is not None and not minute_data.empty: + test_result["availability"][period_key] = { + "status": "available", + "record_count": len(minute_data), + "date_range": { + "start": minute_data['day'].min() if 'day' in minute_data.columns else None, + "end": minute_data['day'].max() if 'day' in minute_data.columns else None + } + } + + # 检查数据质量 + quality = self._check_data_quality(minute_data, period) + test_result["data_quality"][period_key] = quality + + logger.info(f" ✅ {period_key}数据可用,{len(minute_data)}条记录") + else: + test_result["availability"][period_key] = { + "status": "unavailable", + "record_count": 0 + } + test_result["issues"].append(f"{period_key}数据为空") + logger.warning(f" ❌ {period_key}数据为空") + + time.sleep(1) # 避免请求过快 + + except Exception as e: + error_msg = f"{period_key}数据获取失败: {e}" + test_result["availability"][period_key] = { + "status": "error", + "error": str(e) + } + test_result["issues"].append(error_msg) + logger.error(f" ❌ {error_msg}") + + # 测试历史数据获取 + try: + historical_data = ak.stock_zh_a_hist( + symbol=self.test_symbol, + period='daily', + start_date=self.test_dates['historical'][0], + end_date=self.test_dates['historical'][1] + ) + + if historical_data is not None and not historical_data.empty: + test_result["historical_availability"] = "available" + logger.info(f" 历史数据可用,{len(historical_data)}条记录") + else: + test_result["historical_availability"] = "unavailable" + test_result["issues"].append("历史数据获取失败") + logger.warning(f" ❌ 历史数据获取失败") + + except Exception as e: + test_result["historical_availability"] = "error" + test_result["issues"].append(f"历史数据测试异常: {e}") + logger.error(f" 历史数据测试异常: {e}") + + # 统计测试结果 + available_count = sum(1 for v in test_result["availability"].values() if v["status"] == "available") + total_count = len(test_result["availability"]) + + if available_count == total_count: + test_result["summary"] = "✅ AKShare分钟数据接口完全可用" + test_result["overall_status"] = "good" + elif available_count >= 2: + test_result["summary"] = "⚠️ AKShare分钟数据接口部分可用" + test_result["overall_status"] = "warning" + else: + test_result["summary"] = "❌ AKShare分钟数据接口有限" + test_result["overall_status"] = "critical" + + except ImportError: + test_result["summary"] = "❌ AKShare未安装" + test_result["overall_status"] = "critical" + test_result["issues"].append("未安装akshare库: pip install akshare") + logger.error("AKShare未安装") + + except Exception as e: + test_result["summary"] = f"❌ AKShare测试异常: {e}" + test_result["overall_status"] = "error" + test_result["issues"].append(f"测试过程异常: {e}") + logger.error(f"AKShare测试异常: {e}") + + self.results["data_sources"]["akshare"] = test_result + + return test_result + + def test_tushare_minute_data(self) -> Dict: + """测试Tushare分钟数据接口(需要API Token)""" + logger.info("开始测试Tushare分钟数据接口") + + test_result = { + "source": "tushare", + "test_time": datetime.now().isoformat(), + "availability": {}, + "data_quality": {}, + "issues": [], + "summary": "", + "requires_token": True + } + + try: + import tushare as ts + + # 检查是否有Token + if not hasattr(ts, 'get_token') or ts.get_token() == '': + test_result["summary"] = "❌ Tushare API Token未配置" + test_result["overall_status"] = "warning" + test_result["issues"].append("需要配置Tushare Pro API Token") + logger.warning("Tushare API Token未配置,无法测试") + + self.results["data_sources"]["tushare"] = test_result + return test_result + + # 配置API Token + pro = ts.pro_api() + + # 测试各时间周期的数据可用性 + for period in self.test_periods: + period_key = f"{period}min" + logger.info(f" 测试{period_key}数据...") + + try: + # 构建股票代码 + + symbol_code = f'{self.test_symbol}.SZ' + + # 获取分钟数据 + + minute_data = ts.pro_bar( + ts_code=symbol_code, + freq=period, + start_date=self.test_dates['recent'][0], + end_date=self.test_dates['recent'][1], + adj='hfq' + ) + + if minute_data is not None and not minute_data.empty: + test_result["availability"][period_key] = { + "status": "available", + "record_count": len(minute_data), + "date_range": { + "start": minute_data['trade_date'].min() if 'trade_date' in minute_data.columns else None, + "end": minute_data['trade_date'].max() if 'trade_date' in minute_data.columns else None + + } + } + + # 检查数据质量 + + quality = self._check_data_quality(minute_data, period) + test_result["data_quality"][period_key] = quality + + logger.info(f" ✅ {period_key}数据可用,{len(minute_data)}条记录") + else: + test_result["availability"][period_key] = { + "status": "unavailable", + "record_count": 0 + } + test_result["issues"].append(f"{period_key}数据为空") + logger.warning(f" ❌ {period_key}数据为空") + + time.sleep(0.5) # 避免请求过快 + + except Exception as e: + error_msg = f"{period_key}数据获取失败: {e}" + test_result["availability"][period_key] = { + "status": "error", + "error": str(e) + } + test_result["issues"].append(error_msg) + logger.error(f" ❌ {error_msg}") + + # 统计测试结果 + available_count = sum(1 for v in test_result["availability"].values() if v["status"] == "available") + total_count = len(test_result["availability"]) + + if available_count == total_count: + test_result["summary"] = "✅ Tushare分钟数据接口完全可用" + test_result["overall_status"] = "good" + elif available_count >= 2: + test_result["summary"] = "⚠️ Tushare分钟数据接口部分可用" + test_result["overall_status"] = "warning" + else: + test_result["summary"] = "❌ Tushare分钟数据接口有限" + test_result["overall_status"] = "critical" + + except ImportError: + test_result["summary"] = "❌ Tushare未安装" + test_result["overall_status"] = "critical" + test_result["issues"].append("未安装tushare库: pip install tushare") + logger.error("Tushare未安装") + + except Exception as e: + test_result["summary"] = f"❌ Tushare测试异常: {e}" + test_result["overall_status"] = "error" + test_result["issues"].append(f"测试过程异常: {e}") + logger.error(f"Tushare测试异常: {e}") + + self.results["data_sources"]["tushare"] = test_result + + return test_result + + def _check_data_quality(self, data: pd.DataFrame, period: str) -> Dict: + """检查数据质量 + + Args: + data: 数据 + period: 时间周期 + + Returns: + Dict: 质量检查结果 + """ + quality_result = { + "record_count": len(data), + "date_range": {}, + "missing_data": {}, + "quality_score": 0 + } + + try: + # 检查日期范围 + + date_columns = ['trade_time', 'trade_date', 'day', 'date'] + for col in date_columns: + if col in data.columns: + data[col] = pd.to_datetime(data[col], errors='coerce') + quality_result["date_range"]["start"] = data[col].min().isoformat() if pd.notna(data[col].min()) else None + quality_result["date_range"]["end"] = data[col].max().isoformat() if pd.notna(data[col].max()) else None + + break + + # 检查必要字段 + + required_columns = ['open', 'high', 'low', 'close', 'volume'] + missing_fields = [] + for col in required_columns: + if col not in data.columns: + missing_fields.append(col) + + if missing_fields: + quality_result["missing_data"]["required_fields"] = missing_fields + + # 检查数据完整性 + + if 'volume' in data.columns: + volume_missing = data['volume'].isnull().sum() + price_missing = sum(data[col].isnull().sum() for col in ['open', 'high', 'low', 'close'] if col in data.columns) + + if len(data) > 0: + volume_completeness = 1 - (volume_missing / len(data)) + price_completeness = 1 - (price_missing / (4 * len(data))) if len(data) > 0 else 0 + + quality_result["missing_data"]["volume_missing"] = volume_missing + quality_result["missing_data"]["price_missing"] = price_missing + + quality_result["quality_score"] = (volume_completeness + price_completeness) / 2 + + except Exception as e: + logger.warning(f"数据质量检查失败: {e}") + + return quality_result + + def generate_test_report(self) -> Dict: + """生成测试报告""" + + # 汇总数据源测试结果 + availability_summary = {} + for source_name, source_result in self.results["data_sources"].items(): + available_count = sum(1 for v in source_result.get("availability", {}).values() if v.get("status") == "available") + total_count = len(source_result.get("availability", {})) + + availability_summary[source_name] = { + "available_periods": available_count, + "total_periods": total_count, + "availability_rate": available_count / total_count if total_count > 0 else 0, + "overall_status": source_result.get("overall_status", "unknown") + } + + # 计算总体评估 + if availability_summary: + overall_score = sum(info["availability_rate"] for info in availability_summary.values()) / len(availability_summary) + else: + overall_score = 0 + + # 生成推荐 + + recommendations = [] + + if "akshare" in availability_summary: + akshare_score = availability_summary["akshare"]["availability_rate"] + if akshare_score >= 0.8: + recommendations.append("✅ AKShare作为主要免费数据源,质量良好") + elif akshare_score >= 0.5: + recommendations.append("⚠️ AKShare作为备用数据源,部分功能可用") + else: + recommendations.append("❌ AKShare功能性有限,需要其他数据源补充") + + if "tushare" in availability_summary: + tushare_score = availability_summary["tushare"]["availability_rate"] + if tushare_score >= 0.9: + recommendations.append("✅ Tushare Pro数据质量优秀,推荐作为主数据源") + elif tushare_score >= 0.7: + recommendations.append("⚠️ Tushare Pro可用,但有部分限制") + else: + recommendations.append("❌ Tushare Pro功能性受限,需验证Token权限") + + # 添加综合建议 + + if "akshare" in availability_summary and "tushare" in availability_summary: + akshare_rate = availability_summary["akshare"]["availability_rate"] + tushare_rate = availability_summary["tushare"]["availability_rate"] + + if tushare_rate > 0.9: + recommendations.append("🚀 推荐方案:Tushare Pro为主,AKShare为补充") + elif akshare_rate > 0.8 and tushare_rate < 0.7: + recommendations.append("💰 成本方案:AKShare为主,Tushare为补充") + else: + recommendations.append("🔄 混合方案:AKShare和Tushare混合使用") + + # 构建最终报告 + + final_report = { + "report_timestamp": datetime.now().isoformat(), + "test_summary": { + "total_sources_tested": len(availability_summary), + "overall_score": overall_score, + "data_source_availability": availability_summary + }, + "detailed_results": self.results["data_sources"], + "recommendations": recommendations, + "next_steps": [ + "验证历史数据获取深度", + "测试批量下载效率", + "建立数据质量监控机制" + ], + "test_configuration": { + "test_symbol": self.test_symbol, + "test_periods": self.test_periods, + "test_dates": self.test_dates + } + } + + return final_report + + +def main(): + """主函数""" + print("=" * 70) + print("🧪 分钟K线数据源可用性测试") + print("=" * 70) + + print("测试目的: 评估各数据源分钟数据的可用性、质量和完整性") + print("测试范围: 1分钟, 5分钟, 15分钟粒度") + print("测试股票: 000001 (平安银行)") + print() + + # 创建测试器 + + tester = MinuteDataSourceTester() + + # 运行测试 + + print("开始数据源测试...") + print("-" * 50) + + # 测试AKShare + + print("1. 测试AKShare...") + akshare_result = tester.test_akshare_minute_data() + + print() + + # 测试Tushare + + print("2. 测试Tushare Pro...") + tushare_result = tester.test_tushare_minute_data() + + print() + + # 生成测试报告 + + print("生成测试报告...") + report = tester.generate_test_report() + + print() + print("=" * 70) + print("📋 测试结果摘要") + print("=" * 70) + + # 输出关键信息 + + if "akshare" in report["data_source_availability"]: + akshare_info = report["data_source_availability"]["akshare"] + print(f"📊 AKShare:") + print(f" 可用周期: {akshare_info['available_periods']}/{akshare_info['total_periods']}") + print(f" 可用率: {akshare_info['availability_rate']*100:.1f}%") + print(f" 状态: {akshare_info['overall_status']}") + + if "tushare" in report["data_source_availability"]: + tushare_info = report["data_source_availability"]["tushare"] + print(f"📊 Tushare Pro:") + print(f" 可用周期: {tushare_info['available_periods']}/{tushare_info['total_periods']}") + print(f" 可用率: {tushare_info['availability_rate']*100:.1f}%") + print(f" 状态: {tushare_info['overall_status']}") + + print() + print("💡 推荐方案:") + for rec in report["recommendations"]: + print(f" • {rec}") + + print() + print("🎯 下一步行动:") + for step in report["next_steps"]: + print(f" • {step}") + + # 保存报告 + + report_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/reports" + os.makedirs(report_dir, exist_ok=True) + + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + report_file = os.path.join(report_dir, f"minute_data_source_test_report_{timestamp}.json") + + with open(report_file, 'w', encoding='utf-8') as f: + json.dump(report, f, ensure_ascii=False, indent=2) + + print() + print(f"📄 详细报告已保存: {report_file}") + + print() + print("=" * 70) + print("🎯 测试完成") + print("=" * 70) + + +if __name__ == "__main__": + main() \ No newline at end of file