#!/usr/bin/env python3 # 数据适配器测试工具 - 赵云数据工程工具 # 用于测试和验证数据转换适配器 import sys import os import json import time import logging from datetime import datetime from typing import Dict, List, Any, Optional # 添加项目路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) class DataAdapterTester: """数据适配器测试工具""" def __init__(self, config_file: str = None): """初始化测试工具 Args: config_file: 配置文件路径 """ # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(__name__) # 加载配置 self.config = self._load_config(config_file) # 测试结果存储 self.results = { 'timestamp': datetime.now().isoformat(), 'tests': [], 'summary': { 'total': 0, 'passed': 0, 'failed': 0, 'error': 0 } } def _load_config(self, config_file: str) -> Dict: """加载配置文件 Args: config_file: 配置文件路径 Returns: Dict: 配置信息 """ default_config = { 'test_cases': [], 'data_sources': { 'jq_articles': './data/processed/jq_essence_articles/articles', 'akshare_data': './data/raw/akshare' }, 'validation_rules': { 'completeness': True, 'consistency': True, 'accuracy': True, 'timeliness': True } } if config_file and os.path.exists(config_file): try: with open(config_file, 'r', encoding='utf-8') as f: user_config = json.load(f) default_config.update(user_config) except Exception as e: self.logger.error(f"加载配置文件失败 {config_file}: {e}") return default_config def test_akshare_vnpy_adapter(self) -> Dict: """测试AKShare到vnPy的适配器 Returns: Dict: 测试结果 """ test_name = "akshare_vnpy_adapter_test" self.logger.info(f"开始测试: {test_name}") test_result = { 'name': test_name, 'start_time': datetime.now().isoformat(), 'status': 'pending', 'errors': [], 'warnings': [], 'passed_tests': [], 'failed_tests': [] } try: # 尝试导入适配器 try: import akshare_vnpy_adapter test_result['passed_tests'].append("模块导入成功") self.logger.info("AKShare-vnPy适配器导入成功") except ImportError as e: test_result['errors'].append(f"模块导入失败: {e}") test_result['status'] = 'failed' return test_result # 检查适配器类 if hasattr(akshare_vnpy_adapter, 'AKShareDataAdapter'): test_result['passed_tests'].append("适配器类存在") # 测试数据获取方法 adapter = akshare_vnpy_adapter.AKShareDataAdapter() # 测试股票数据获取 try: # 这里可以根据实际情况调用方法 # 示例:test_stock_data = adapter.get_stock_daily('000001') test_result['passed_tests'].append("适配器实例化成功") except Exception as e: test_result['errors'].append(f"适配器方法调用失败: {e}") test_result['status'] = 'failed' else: test_result['errors'].append("适配器类不存在") test_result['status'] = 'failed' # 更新测试状态 if not test_result['errors']: test_result['status'] = 'passed' test_result['passed_tests'].append("所有测试通过") except Exception as e: test_result['status'] = 'error' test_result['errors'].append(f"测试执行异常: {e}") self.logger.error(f"测试执行异常: {e}") test_result['end_time'] = datetime.now().isoformat() self.results['tests'].append(test_result) # 更新统计信息 self.results['summary']['total'] += 1 if test_result['status'] == 'passed': self.results['summary']['passed'] += 1 elif test_result['status'] == 'failed': self.results['summary']['failed'] += 1 else: self.results['summary']['error'] += 1 self.logger.info(f"测试完成: {test_name} - 状态: {test_result['status']}") return test_result def test_data_completeness(self, data_path: str, required_fields: List[str]) -> Dict: """测试数据完整性 Args: data_path: 数据文件路径 required_fields: 必需字段列表 Returns: Dict: 测试结果 """ test_name = "data_completeness_test" self.logger.info(f"开始测试: {test_name} - 数据: {data_path}") test_result = { 'name': test_name, 'data_path': data_path, 'start_time': datetime.now().isoformat(), 'status': 'pending', 'errors': [], 'warnings': [], 'missing_fields': [], 'total_fields': 0 } try: # 加载数据 if not os.path.exists(data_path): test_result['errors'].append(f"数据文件不存在: {data_path}") test_result['status'] = 'failed' return test_result with open(data_path, 'r', encoding='utf-8') as f: data = json.load(f) # 检查必需字段 missing_fields = [] for field in required_fields: if field not in data: missing_fields.append(field) test_result['missing_fields'] = missing_fields test_result['total_fields'] = len(data.keys()) if missing_fields: test_result['errors'].append(f"缺失必需字段: {missing_fields}") test_result['status'] = 'failed' else: test_result['passed_tests'] = [f"所有必需字段完整: {required_fields}"] test_result['status'] = 'passed' except Exception as e: test_result['status'] = 'error' test_result['errors'].append(f"测试执行异常: {e}") self.logger.error(f"完整性测试异常: {e}") test_result['end_time'] = datetime.datetime.now().isoformat() self.results['tests'].append(test_result) # 更新统计信息 self.results['summary']['total'] += 1 if test_result['status'] == 'passed': self.results['summary']['passed'] += 1 elif test_result['status'] == 'failed': self.results['summary']['failed'] += 1 else: self.results['summary']['error'] += 1 return test_result def test_data_consistency(self, data_path: str, expected_schema: Dict) -> Dict: """测试数据一致性 Args: data_path: 数据文件路径 expected_schema: 期望的数据模式 Returns: Dict: 测试结果 """ test_name = "data_consistency_test" self.logger.info(f"开始测试: {test_name} - 数据: {data_path}") test_result = { 'name': test_name, 'data_path': data_path, 'start_time': datetime.now().isoformat(), 'status': 'pending', 'errors': [], 'warnings': [], 'inconsistent_fields': [] } try: # 加载数据 if not os.path.exists(data_path): test_result['errors'].append(f"数据文件不存在: {data_path}") test_result['status'] = 'failed' return test_result with open(data_path, 'r', encoding='utf-8') as f: data = json.load(f) # 检查数据类型一致性 inconsistent_fields = [] for field, expected_type in expected_schema.items(): if field in data: actual_type = type(data[field]).__name__ if actual_type != expected_type: inconsistent_fields.append({ 'field': field, 'expected': expected_type, 'actual': actual_type }) test_result['inconsistent_fields'] = inconsistent_fields if inconsistent_fields: test_result['errors'].append(f"字段类型不一致: {inconsistent_fields}") test_result['status'] = 'failed' else: test_result['passed_tests'] = ["所有字段类型一致"] test_result['status'] = 'passed' except Exception as e: test_result['status'] = 'error' test_result['errors'].append(f"测试执行异常: {e}") self.logger.error(f"一致性测试异常: {e}") test_result['end_time'] = datetime.datetime.now().isoformat() self.results['tests'].append(test_result) # 更新统计信息 self.results['summary']['total'] += 1 if test_result['status'] == 'passed': self.results['summary']['passed'] += 1 elif test_result['status'] == 'failed': self.results['summary']['failed'] += 1 else: self.results['summary']['error'] += 1 return test_result def run_all_tests(self) -> Dict: """运行所有测试 Returns: Dict: 所有测试结果 """ self.logger.info("开始运行所有测试") # 运行适配器测试 self.test_akshare_vnpy_adapter() # 如果有配置文件中的测试用例 for test_case in self.config.get('test_cases', []): test_type = test_case.get('type') data_path = test_case.get('data_path') if test_type == 'completeness' and 'required_fields' in test_case: self.test_data_completeness(data_path, test_case['required_fields']) elif test_type == 'consistency' and 'expected_schema' in test_case: self.test_data_consistency(data_path, test_case['expected_schema']) # 生成测试报告 report = self.generate_test_report() self.logger.info(f"所有测试完成 - 通过: {report['summary']['passed']}, " f"失败: {report['summary']['failed']}, " f"错误: {report['summary']['error']}") return report def generate_test_report(self) -> Dict: """生成测试报告 Returns: Dict: 测试报告 """ # 计算测试统计 total_tests = len(self.results['tests']) passed_tests = len([t for t in self.results['tests'] if t['status'] == 'passed']) failed_tests = len([t for t in self.results['tests'] if t['status'] == 'failed']) error_tests = len([t for t in self.results['tests'] if t['status'] == 'error']) # 更新摘要信息 self.results['summary'] = { 'total': total_tests, 'passed': passed_tests, 'failed': failed_tests, 'error': error_tests, 'pass_rate': f"{(passed_tests/total_tests*100):.1f}%" if total_tests > 0 else "0%" } # 保存测试报告 report_path = './data/running_data/test_report.json' os.makedirs(os.path.dirname(report_path), exist_ok=True) with open(report_path, 'w', encoding='utf-8') as f: json.dump(self.results, f, ensure_ascii=False, indent=2) self.logger.info(f"测试报告已保存: {report_path}") return self.results def save_results(self, output_path: str = None) -> str: """保存测试结果 Args: output_path: 输出路径 Returns: str: 保存的文件路径 """ if output_path is None: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_path = f'./data/running_data/test_results_{timestamp}.json' os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(self.results, f, ensure_ascii=False, indent=2) self.logger.info(f"测试结果已保存: {output_path}") return output_path def main(): """主函数""" tester = DataAdapterTester() # 运行所有测试 report = tester.run_all_tests() # 打印测试摘要 summary = report['summary'] print(f"\n=== 测试摘要 ===") print(f"总测试数: {summary['total']}") print(f"通过: {summary['passed']}") print(f"失败: {summary['failed']}") print(f"错误: {summary['error']}") print(f"通过率: {summary['pass_rate']}") # 保存详细结果 results_path = tester.save_results() print(f"\n详细结果已保存至: {results_path}") # 返回测试状态 if summary['failed'] > 0 or summary['error'] > 0: return 1 else: return 0 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)