409 lines
14 KiB
Python
409 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
# 数据适配器测试工具 - 赵云数据工程工具
|
|
# 用于测试和验证数据转换适配器
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import time
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
# 添加项目路径
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
|
|
|
|
class DataAdapterTester:
|
|
"""数据适配器测试工具"""
|
|
|
|
def __init__(self, config_file: str = None):
|
|
"""初始化测试工具
|
|
|
|
Args:
|
|
config_file: 配置文件路径
|
|
"""
|
|
# 配置日志
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# 加载配置
|
|
self.config = self._load_config(config_file)
|
|
|
|
# 测试结果存储
|
|
self.results = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'tests': [],
|
|
'summary': {
|
|
'total': 0,
|
|
'passed': 0,
|
|
'failed': 0,
|
|
'error': 0
|
|
}
|
|
}
|
|
|
|
def _load_config(self, config_file: str) -> Dict:
|
|
"""加载配置文件
|
|
|
|
Args:
|
|
config_file: 配置文件路径
|
|
|
|
Returns:
|
|
Dict: 配置信息
|
|
"""
|
|
default_config = {
|
|
'test_cases': [],
|
|
'data_sources': {
|
|
'jq_articles': './data/processed/jq_essence_articles/articles',
|
|
'akshare_data': './data/raw/akshare'
|
|
},
|
|
'validation_rules': {
|
|
'completeness': True,
|
|
'consistency': True,
|
|
'accuracy': True,
|
|
'timeliness': True
|
|
}
|
|
}
|
|
|
|
if config_file and os.path.exists(config_file):
|
|
try:
|
|
with open(config_file, 'r', encoding='utf-8') as f:
|
|
user_config = json.load(f)
|
|
default_config.update(user_config)
|
|
except Exception as e:
|
|
self.logger.error(f"加载配置文件失败 {config_file}: {e}")
|
|
|
|
return default_config
|
|
|
|
def test_akshare_vnpy_adapter(self) -> Dict:
|
|
"""测试AKShare到vnPy的适配器
|
|
|
|
Returns:
|
|
Dict: 测试结果
|
|
"""
|
|
test_name = "akshare_vnpy_adapter_test"
|
|
self.logger.info(f"开始测试: {test_name}")
|
|
|
|
test_result = {
|
|
'name': test_name,
|
|
'start_time': datetime.now().isoformat(),
|
|
'status': 'pending',
|
|
'errors': [],
|
|
'warnings': [],
|
|
'passed_tests': [],
|
|
'failed_tests': []
|
|
}
|
|
|
|
try:
|
|
# 尝试导入适配器
|
|
try:
|
|
import akshare_vnpy_adapter
|
|
test_result['passed_tests'].append("模块导入成功")
|
|
self.logger.info("AKShare-vnPy适配器导入成功")
|
|
except ImportError as e:
|
|
test_result['errors'].append(f"模块导入失败: {e}")
|
|
test_result['status'] = 'failed'
|
|
return test_result
|
|
|
|
# 检查适配器类
|
|
if hasattr(akshare_vnpy_adapter, 'AKShareDataAdapter'):
|
|
test_result['passed_tests'].append("适配器类存在")
|
|
|
|
# 测试数据获取方法
|
|
adapter = akshare_vnpy_adapter.AKShareDataAdapter()
|
|
|
|
# 测试股票数据获取
|
|
try:
|
|
# 这里可以根据实际情况调用方法
|
|
# 示例:test_stock_data = adapter.get_stock_daily('000001')
|
|
test_result['passed_tests'].append("适配器实例化成功")
|
|
except Exception as e:
|
|
test_result['errors'].append(f"适配器方法调用失败: {e}")
|
|
test_result['status'] = 'failed'
|
|
|
|
else:
|
|
test_result['errors'].append("适配器类不存在")
|
|
test_result['status'] = 'failed'
|
|
|
|
# 更新测试状态
|
|
if not test_result['errors']:
|
|
test_result['status'] = 'passed'
|
|
test_result['passed_tests'].append("所有测试通过")
|
|
|
|
except Exception as e:
|
|
test_result['status'] = 'error'
|
|
test_result['errors'].append(f"测试执行异常: {e}")
|
|
self.logger.error(f"测试执行异常: {e}")
|
|
|
|
test_result['end_time'] = datetime.now().isoformat()
|
|
self.results['tests'].append(test_result)
|
|
|
|
# 更新统计信息
|
|
self.results['summary']['total'] += 1
|
|
if test_result['status'] == 'passed':
|
|
self.results['summary']['passed'] += 1
|
|
elif test_result['status'] == 'failed':
|
|
self.results['summary']['failed'] += 1
|
|
else:
|
|
self.results['summary']['error'] += 1
|
|
|
|
self.logger.info(f"测试完成: {test_name} - 状态: {test_result['status']}")
|
|
return test_result
|
|
|
|
def test_data_completeness(self, data_path: str, required_fields: List[str]) -> Dict:
|
|
"""测试数据完整性
|
|
|
|
Args:
|
|
data_path: 数据文件路径
|
|
required_fields: 必需字段列表
|
|
|
|
Returns:
|
|
Dict: 测试结果
|
|
"""
|
|
test_name = "data_completeness_test"
|
|
self.logger.info(f"开始测试: {test_name} - 数据: {data_path}")
|
|
|
|
test_result = {
|
|
'name': test_name,
|
|
'data_path': data_path,
|
|
'start_time': datetime.now().isoformat(),
|
|
'status': 'pending',
|
|
'errors': [],
|
|
'warnings': [],
|
|
'missing_fields': [],
|
|
'total_fields': 0
|
|
}
|
|
|
|
try:
|
|
# 加载数据
|
|
if not os.path.exists(data_path):
|
|
test_result['errors'].append(f"数据文件不存在: {data_path}")
|
|
test_result['status'] = 'failed'
|
|
return test_result
|
|
|
|
with open(data_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# 检查必需字段
|
|
missing_fields = []
|
|
for field in required_fields:
|
|
if field not in data:
|
|
missing_fields.append(field)
|
|
|
|
test_result['missing_fields'] = missing_fields
|
|
test_result['total_fields'] = len(data.keys())
|
|
|
|
if missing_fields:
|
|
test_result['errors'].append(f"缺失必需字段: {missing_fields}")
|
|
test_result['status'] = 'failed'
|
|
else:
|
|
test_result['passed_tests'] = [f"所有必需字段完整: {required_fields}"]
|
|
test_result['status'] = 'passed'
|
|
|
|
except Exception as e:
|
|
test_result['status'] = 'error'
|
|
test_result['errors'].append(f"测试执行异常: {e}")
|
|
self.logger.error(f"完整性测试异常: {e}")
|
|
|
|
test_result['end_time'] = datetime.datetime.now().isoformat()
|
|
self.results['tests'].append(test_result)
|
|
|
|
# 更新统计信息
|
|
self.results['summary']['total'] += 1
|
|
if test_result['status'] == 'passed':
|
|
self.results['summary']['passed'] += 1
|
|
elif test_result['status'] == 'failed':
|
|
self.results['summary']['failed'] += 1
|
|
else:
|
|
self.results['summary']['error'] += 1
|
|
|
|
return test_result
|
|
|
|
def test_data_consistency(self, data_path: str, expected_schema: Dict) -> Dict:
|
|
"""测试数据一致性
|
|
|
|
Args:
|
|
data_path: 数据文件路径
|
|
expected_schema: 期望的数据模式
|
|
|
|
Returns:
|
|
Dict: 测试结果
|
|
"""
|
|
test_name = "data_consistency_test"
|
|
self.logger.info(f"开始测试: {test_name} - 数据: {data_path}")
|
|
|
|
test_result = {
|
|
'name': test_name,
|
|
'data_path': data_path,
|
|
'start_time': datetime.now().isoformat(),
|
|
'status': 'pending',
|
|
'errors': [],
|
|
'warnings': [],
|
|
'inconsistent_fields': []
|
|
}
|
|
|
|
try:
|
|
# 加载数据
|
|
if not os.path.exists(data_path):
|
|
test_result['errors'].append(f"数据文件不存在: {data_path}")
|
|
test_result['status'] = 'failed'
|
|
return test_result
|
|
|
|
with open(data_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# 检查数据类型一致性
|
|
inconsistent_fields = []
|
|
for field, expected_type in expected_schema.items():
|
|
if field in data:
|
|
actual_type = type(data[field]).__name__
|
|
if actual_type != expected_type:
|
|
inconsistent_fields.append({
|
|
'field': field,
|
|
'expected': expected_type,
|
|
'actual': actual_type
|
|
})
|
|
|
|
test_result['inconsistent_fields'] = inconsistent_fields
|
|
|
|
if inconsistent_fields:
|
|
test_result['errors'].append(f"字段类型不一致: {inconsistent_fields}")
|
|
test_result['status'] = 'failed'
|
|
else:
|
|
test_result['passed_tests'] = ["所有字段类型一致"]
|
|
test_result['status'] = 'passed'
|
|
|
|
except Exception as e:
|
|
test_result['status'] = 'error'
|
|
test_result['errors'].append(f"测试执行异常: {e}")
|
|
self.logger.error(f"一致性测试异常: {e}")
|
|
|
|
test_result['end_time'] = datetime.datetime.now().isoformat()
|
|
self.results['tests'].append(test_result)
|
|
|
|
# 更新统计信息
|
|
self.results['summary']['total'] += 1
|
|
if test_result['status'] == 'passed':
|
|
self.results['summary']['passed'] += 1
|
|
elif test_result['status'] == 'failed':
|
|
self.results['summary']['failed'] += 1
|
|
else:
|
|
self.results['summary']['error'] += 1
|
|
|
|
return test_result
|
|
|
|
def run_all_tests(self) -> Dict:
|
|
"""运行所有测试
|
|
|
|
Returns:
|
|
Dict: 所有测试结果
|
|
"""
|
|
self.logger.info("开始运行所有测试")
|
|
|
|
# 运行适配器测试
|
|
self.test_akshare_vnpy_adapter()
|
|
|
|
# 如果有配置文件中的测试用例
|
|
for test_case in self.config.get('test_cases', []):
|
|
test_type = test_case.get('type')
|
|
data_path = test_case.get('data_path')
|
|
|
|
if test_type == 'completeness' and 'required_fields' in test_case:
|
|
self.test_data_completeness(data_path, test_case['required_fields'])
|
|
|
|
elif test_type == 'consistency' and 'expected_schema' in test_case:
|
|
self.test_data_consistency(data_path, test_case['expected_schema'])
|
|
|
|
# 生成测试报告
|
|
report = self.generate_test_report()
|
|
|
|
self.logger.info(f"所有测试完成 - 通过: {report['summary']['passed']}, "
|
|
f"失败: {report['summary']['failed']}, "
|
|
f"错误: {report['summary']['error']}")
|
|
|
|
return report
|
|
|
|
def generate_test_report(self) -> Dict:
|
|
"""生成测试报告
|
|
|
|
Returns:
|
|
Dict: 测试报告
|
|
"""
|
|
# 计算测试统计
|
|
total_tests = len(self.results['tests'])
|
|
passed_tests = len([t for t in self.results['tests'] if t['status'] == 'passed'])
|
|
failed_tests = len([t for t in self.results['tests'] if t['status'] == 'failed'])
|
|
error_tests = len([t for t in self.results['tests'] if t['status'] == 'error'])
|
|
|
|
# 更新摘要信息
|
|
self.results['summary'] = {
|
|
'total': total_tests,
|
|
'passed': passed_tests,
|
|
'failed': failed_tests,
|
|
'error': error_tests,
|
|
'pass_rate': f"{(passed_tests/total_tests*100):.1f}%" if total_tests > 0 else "0%"
|
|
}
|
|
|
|
# 保存测试报告
|
|
report_path = './data/running_data/test_report.json'
|
|
os.makedirs(os.path.dirname(report_path), exist_ok=True)
|
|
|
|
with open(report_path, 'w', encoding='utf-8') as f:
|
|
json.dump(self.results, f, ensure_ascii=False, indent=2)
|
|
|
|
self.logger.info(f"测试报告已保存: {report_path}")
|
|
|
|
return self.results
|
|
|
|
def save_results(self, output_path: str = None) -> str:
|
|
"""保存测试结果
|
|
|
|
Args:
|
|
output_path: 输出路径
|
|
|
|
Returns:
|
|
str: 保存的文件路径
|
|
"""
|
|
if output_path is None:
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
output_path = f'./data/running_data/test_results_{timestamp}.json'
|
|
|
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(self.results, f, ensure_ascii=False, indent=2)
|
|
|
|
self.logger.info(f"测试结果已保存: {output_path}")
|
|
return output_path
|
|
|
|
def main():
|
|
"""主函数"""
|
|
tester = DataAdapterTester()
|
|
|
|
# 运行所有测试
|
|
report = tester.run_all_tests()
|
|
|
|
# 打印测试摘要
|
|
summary = report['summary']
|
|
print(f"\n=== 测试摘要 ===")
|
|
print(f"总测试数: {summary['total']}")
|
|
print(f"通过: {summary['passed']}")
|
|
print(f"失败: {summary['failed']}")
|
|
print(f"错误: {summary['error']}")
|
|
print(f"通过率: {summary['pass_rate']}")
|
|
|
|
# 保存详细结果
|
|
results_path = tester.save_results()
|
|
print(f"\n详细结果已保存至: {results_path}")
|
|
|
|
# 返回测试状态
|
|
if summary['failed'] > 0 or summary['error'] > 0:
|
|
return 1
|
|
else:
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
exit_code = main()
|
|
sys.exit(exit_code) |