chore: update project structure for new workspace layout

This commit is contained in:
cfdaily
2026-03-25 23:07:52 +08:00
parent e18d0ed3e6
commit fd21c8e1a1
21 changed files with 1641 additions and 402 deletions
@@ -0,0 +1,389 @@
#!/usr/bin/env python3
# AKShare-vnPy数据适配器 - 赵云数据工程工具
# 将AKShare数据格式转换为vnPy兼容格式
import sys
import os
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union, Any
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class AKShareDataAdapter:
"""AKShare到vnPy的数据适配器"""
def __init__(self, config_path: str = None):
"""初始化适配器
Args:
config_path: 配置文件路径
"""
self.config = self._load_config(config_path)
self.data_cache = {}
# 尝试导入akshare(可选)
try:
import akshare as ak
self.ak = ak
self.akshare_available = True
logger.info("AKShare已成功导入")
except ImportError:
self.ak = None
self.akshare_available = False
logger.warning("AKShare未安装,将使用模拟数据")
def _load_config(self, config_path: str) -> Dict:
"""加载配置文件
Args:
config_path: 配置文件路径
Returns:
Dict: 配置信息
"""
default_config = {
'data_sources': {
'stock': {
'provider': 'akshare',
'fields_mapping': {
'date': 'date',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume',
'amount': 'amount',
'turnover': 'turnover'
}
},
'index': {
'provider': 'akshare',
'fields_mapping': {
'date': 'date',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume',
'amount': 'amount'
}
}
},
'vnpy_format': {
'datetime_format': '%Y-%m-%d',
'numeric_precision': 6,
'null_value': 0.0
},
'cache_settings': {
'enabled': True,
'ttl_hours': 24,
'cache_dir': './data/running_data/cache'
}
}
if config_path and os.path.exists(config_path):
try:
with open(config_path, 'r', encoding='utf-8') as f:
user_config = json.load(f)
default_config.update(user_config)
except Exception as e:
logger.error(f"加载配置文件失败 {config_path}: {e}")
return default_config
def get_stock_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
"""获取股票日线数据
Args:
symbol: 股票代码(如:000001
start_date: 开始日期(格式:YYYY-MM-DD
end_date: 结束日期(格式:YYYY-MM-DD
Returns:
pd.DataFrame: 转换后的vnPy格式数据
"""
logger.info(f"获取股票日线数据: {symbol} [{start_date} - {end_date}]")
try:
if self.akshare_available:
# 使用akshare获取数据
df = self.ak.stock_zh_a_hist(
symbol=symbol,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="qfq" # 前复权
)
else:
# 模拟数据
df = self._generate_mock_stock_data(symbol, start_date, end_date)
# 转换数据格式
vnpy_df = self._convert_to_vnpy_format(df, 'stock')
logger.info(f"股票数据获取成功: {symbol}, 数据量: {len(vnpy_df)}")
return vnpy_df
except Exception as e:
logger.error(f"获取股票数据失败 {symbol}: {e}")
# 返回空DataFrame
return pd.DataFrame()
def _generate_mock_stock_data(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
"""生成模拟股票数据(当akshare不可用时)
Args:
symbol: 股票代码
start_date: 开始日期
end_date: 结束日期
Returns:
pd.DataFrame: 模拟数据
"""
# 生成日期范围
dates = pd.date_range(start=start_date, end=end_date, freq='D')
# 生成模拟数据
data = {
'日期': dates,
'开盘': np.random.uniform(10, 100, len(dates)),
'收盘': np.random.uniform(10, 100, len(dates)),
'最高': np.random.uniform(10, 100, len(dates)),
'最低': np.random.uniform(10, 100, len(dates)),
'成交量': np.random.uniform(10000, 1000000, len(dates)),
'成交额': np.random.uniform(100000, 10000000, len(dates)),
'振幅': np.random.uniform(0.1, 5.0, len(dates)),
'涨跌幅': np.random.uniform(-5.0, 5.0, len(dates)),
'涨跌额': np.random.uniform(-5.0, 5.0, len(dates)),
'换手率': np.random.uniform(0.1, 10.0, len(dates))
}
df = pd.DataFrame(data)
return df
def _convert_to_vnpy_format(self, df: pd.DataFrame, data_type: str) -> pd.DataFrame:
"""转换为vnPy格式
Args:
df: 原始数据DataFrame
data_type: 数据类型(stock, index等)
Returns:
pd.DataFrame: 转换后的数据
"""
if df.empty:
return df
# 获取字段映射
mapping = self.config['data_sources'].get(data_type, {}).get('fields_mapping', {})
# 创建新的DataFrame
vnpy_data = {}
for vnpy_field, source_field in mapping.items():
if source_field in df.columns:
vnpy_data[vnpy_field] = df[source_field]
else:
# 如果字段不存在,填充默认值
logger.warning(f"字段 {source_field} 不存在,使用默认值填充 {vnpy_field}")
vnpy_data[vnpy_field] = np.nan
vnpy_df = pd.DataFrame(vnpy_data)
# 确保日期列为datetime类型
if 'date' in vnpy_df.columns:
vnpy_df['date'] = pd.to_datetime(vnpy_df['date'])
# 处理空值
null_value = self.config['vnpy_format'].get('null_value', 0.0)
vnpy_df = vnpy_df.fillna(null_value)
# 设置数值精度
numeric_precision = self.config['vnpy_format'].get('numeric_precision', 6)
for col in vnpy_df.select_dtypes(include=[np.number]).columns:
vnpy_df[col] = vnpy_df[col].round(numeric_precision)
# 按日期排序
if 'date' in vnpy_df.columns:
vnpy_df = vnpy_df.sort_values('date').reset_index(drop=True)
return vnpy_df
def get_index_daily(self, index_symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
"""获取指数日线数据
Args:
index_symbol: 指数代码(如:000001.SH
start_date: 开始日期
end_date: 结束日期
Returns:
pd.DataFrame: 转换后的vnPy格式数据
"""
logger.info(f"获取指数日线数据: {index_symbol} [{start_date} - {end_date}]")
try:
if self.akshare_available:
# 使用akshare获取数据
df = self.ak.index_zh_a_hist(
symbol=index_symbol,
period="daily",
start_date=start_date,
end_date=end_date
)
else:
# 模拟数据
df = self._generate_mock_index_data(index_symbol, start_date, end_date)
# 转换数据格式
vnpy_df = self._convert_to_vnpy_format(df, 'index')
logger.info(f"指数数据获取成功: {index_symbol}, 数据量: {len(vnpy_df)}")
return vnpy_df
except Exception as e:
logger.error(f"获取指数数据失败 {index_symbol}: {e}")
return pd.DataFrame()
def _generate_mock_index_data(self, index_symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
"""生成模拟指数数据
Args:
index_symbol: 指数代码
start_date: 开始日期
end_date: 结束日期
Returns:
pd.DataFrame: 模拟数据
"""
# 生成日期范围
dates = pd.date_range(start=start_date, end=end_date, freq='D')
# 生成模拟数据
data = {
'日期': dates,
'开盘': np.random.uniform(3000, 4000, len(dates)),
'收盘': np.random.uniform(3000, 4000, len(dates)),
'最高': np.random.uniform(3000, 4000, len(dates)),
'最低': np.random.uniform(3000, 4000, len(dates)),
'成交量': np.random.uniform(1000000, 10000000, len(dates)),
'成交额': np.random.uniform(10000000, 100000000, len(dates))
}
df = pd.DataFrame(data)
return df
def export_to_vnpy_csv(self, df: pd.DataFrame, symbol: str, output_dir: str = None) -> str:
"""导出为vnPy CSV格式
Args:
df: 数据DataFrame
symbol: 标的代码
output_dir: 输出目录
Returns:
str: 输出文件路径
"""
if df.empty:
logger.warning(f"数据为空,跳过导出: {symbol}")
return ""
if output_dir is None:
output_dir = './data/running_data/vnpy_import'
os.makedirs(output_dir, exist_ok=True)
# 生成文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"vnpy_{symbol}_{timestamp}.csv"
output_path = os.path.join(output_dir, filename)
# 保存为CSV
df.to_csv(output_path, index=False, encoding='utf-8-sig')
logger.info(f"数据已导出为vnPy CSV格式: {output_path}")
return output_path
def export_to_vnpy_database(self, df: pd.DataFrame, symbol: str, table_name: str = None) -> bool:
"""导出到vnPy数据库格式(模拟)
Args:
df: 数据DataFrame
symbol: 标的代码
table_name: 数据库表名
Returns:
bool: 是否成功
"""
if df.empty:
logger.warning(f"数据为空,跳过数据库导出: {symbol}")
return False
# 这里可以集成vnPy的数据库接口
# 示例:保存为JSON文件
if table_name is None:
table_name = f"vnpy_data_{symbol}"
output_dir = './data/running_data/vnpy_database'
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"{table_name}.json")
# 转换为字典格式
data_dict = {
'symbol': symbol,
'table_name': table_name,
'export_time': datetime.now().isoformat(),
'data_count': len(df),
'data': df.to_dict(orient='records')
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data_dict, f, ensure_ascii=False, indent=2)
logger.info(f"数据已导出为vnPy数据库格式: {output_path}")
return True
def main():
"""示例使用"""
adapter = AKShareDataAdapter()
# 示例:获取股票数据
stock_data = adapter.get_stock_daily(
symbol='000001',
start_date='2024-01-01',
end_date='2024-01-31'
)
if not stock_data.empty:
print(f"股票数据获取成功,数据量: {len(stock_data)}")
print(stock_data.head())
# 导出为vnPy CSV格式
csv_path = adapter.export_to_vnpy_csv(stock_data, '000001')
print(f"CSV导出路径: {csv_path}")
else:
print("股票数据获取失败")
# 示例:获取指数数据
index_data = adapter.get_index_daily(
index_symbol='000001.SH',
start_date='2024-01-01',
end_date='2024-01-31'
)
if not index_data.empty:
print(f"\n指数数据获取成功,数据量: {len(index_data)}")
print(index_data.head())
if __name__ == "__main__":
main()
@@ -0,0 +1,219 @@
#!/usr/bin/env python3
# 批量数据下载器 - 赵云数据工程工具
# 用于批量下载聚宽文章、金融数据等
import requests
import time
import json
import os
from typing import List, Dict, Optional
import logging
from datetime import datetime
class BatchDownloader:
"""批量数据下载器"""
def __init__(self, output_dir: str = "./data/raw"):
self.output_dir = output_dir
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def download_jq_articles(self, article_links: List[str], delay: float = 1.0) -> Dict:
"""下载聚宽文章
Args:
article_links: 文章链接列表
delay: 请求延迟(秒)
Returns:
Dict: 下载结果统计
"""
results = {
'total': len(article_links),
'success': 0,
'failed': 0,
'articles': []
}
for i, link in enumerate(article_links, 1):
try:
self.logger.info(f"下载文章 {i}/{len(article_links)}: {link}")
# 模拟请求
response = self.session.get(link, timeout=10)
response.raise_for_status()
# 解析文章内容
article_data = self._parse_jq_article(response.text)
# 保存文章
article_id = f"article_{i:03d}"
save_path = os.path.join(self.output_dir, f"{article_id}.json")
with open(save_path, 'w', encoding='utf-8') as f:
json.dump(article_data, f, ensure_ascii=False, indent=2)
results['success'] += 1
results['articles'].append({
'id': article_id,
'url': link,
'save_path': save_path,
'timestamp': datetime.now().isoformat()
})
self.logger.info(f"文章 {article_id} 下载成功")
except Exception as e:
self.logger.error(f"下载失败 {link}: {e}")
results['failed'] += 1
# 请求延迟
if i < len(article_links):
time.sleep(delay)
return results
def _parse_jq_article(self, html_content: str) -> Dict:
"""解析聚宽文章内容
Args:
html_content: HTML内容
Returns:
Dict: 解析后的文章数据
"""
# 这里简化处理,实际需要HTML解析
return {
'title': f"聚宽文章 - {datetime.now().strftime('%Y%m%d_%H%M%S')}",
'content': "文章内容解析逻辑待实现",
'metadata': {
'source': 'joinquant',
'crawl_time': datetime.now().isoformat(),
'status': 'raw'
}
}
def download_financial_data(self, symbols: List[str], start_date: str, end_date: str) -> Dict:
"""下载金融数据
Args:
symbols: 股票代码列表
start_date: 开始日期
end_date: 结束日期
Returns:
Dict: 下载结果
"""
results = {}
for symbol in symbols:
try:
self.logger.info(f"下载金融数据: {symbol}")
# 这里可以集成akshare、tushare等数据源
# 示例数据
data = {
'symbol': symbol,
'start_date': start_date,
'end_date': end_date,
'data': [] # 实际数据
}
# 保存数据
save_path = os.path.join(self.output_dir, f"financial_{symbol}_{start_date}_{end_date}.json")
with open(save_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
results[symbol] = {
'status': 'success',
'save_path': save_path
}
except Exception as e:
self.logger.error(f"下载金融数据失败 {symbol}: {e}")
results[symbol] = {
'status': 'failed',
'error': str(e)
}
return results
def resume_download(self, log_file: str) -> Dict:
"""断点续传
Args:
log_file: 下载日志文件
Returns:
Dict: 续传结果
"""
self.logger.info(f"尝试断点续传: {log_file}")
try:
with open(log_file, 'r', encoding='utf-8') as f:
log_data = json.load(f)
# 找出失败的下载项
failed_items = [item for item in log_data.get('items', [])
if item.get('status') == 'failed']
if not failed_items:
self.logger.info("没有失败的下载项")
return {'status': 'completed', 'failed': 0}
self.logger.info(f"发现 {len(failed_items)} 个失败的下载项,尝试重新下载")
# 重新下载失败的项
success_count = 0
for item in failed_items:
try:
# 重新下载逻辑
# ...
success_count += 1
except Exception as e:
self.logger.error(f"重新下载失败: {e}")
return {
'status': f'resumed {success_count}/{len(failed_items)}',
'total_failed': len(failed_items),
'resumed': success_count,
'still_failed': len(failed_items) - success_count
}
except Exception as e:
self.logger.error(f"断点续传失败: {e}")
return {'status': 'failed', 'error': str(e)}
def main():
"""示例使用"""
downloader = BatchDownloader()
# 示例:下载聚宽文章
article_links = [
"https://www.joinquant.com/view/community/detail/12345",
"https://www.joinquant.com/view/community/detail/67890"
]
results = downloader.download_jq_articles(article_links)
print(f"下载结果: {json.dumps(results, ensure_ascii=False, indent=2)}")
# 示例:下载金融数据
stock_symbols = ['000001', '000002']
financial_results = downloader.download_financial_data(
stock_symbols, '2024-01-01', '2024-03-01'
)
print(f"金融数据下载结果: {json.dumps(financial_results, ensure_ascii=False, indent=2)}")
if __name__ == "__main__":
main()
@@ -0,0 +1,409 @@
#!/usr/bin/env python3
# 数据适配器测试工具 - 赵云数据工程工具
# 用于测试和验证数据转换适配器
import sys
import os
import json
import time
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional
# 添加项目路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
class DataAdapterTester:
"""数据适配器测试工具"""
def __init__(self, config_file: str = None):
"""初始化测试工具
Args:
config_file: 配置文件路径
"""
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# 加载配置
self.config = self._load_config(config_file)
# 测试结果存储
self.results = {
'timestamp': datetime.now().isoformat(),
'tests': [],
'summary': {
'total': 0,
'passed': 0,
'failed': 0,
'error': 0
}
}
def _load_config(self, config_file: str) -> Dict:
"""加载配置文件
Args:
config_file: 配置文件路径
Returns:
Dict: 配置信息
"""
default_config = {
'test_cases': [],
'data_sources': {
'jq_articles': './data/processed/jq_essence_articles/articles',
'akshare_data': './data/raw/akshare'
},
'validation_rules': {
'completeness': True,
'consistency': True,
'accuracy': True,
'timeliness': True
}
}
if config_file and os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
default_config.update(user_config)
except Exception as e:
self.logger.error(f"加载配置文件失败 {config_file}: {e}")
return default_config
def test_akshare_vnpy_adapter(self) -> Dict:
"""测试AKShare到vnPy的适配器
Returns:
Dict: 测试结果
"""
test_name = "akshare_vnpy_adapter_test"
self.logger.info(f"开始测试: {test_name}")
test_result = {
'name': test_name,
'start_time': datetime.now().isoformat(),
'status': 'pending',
'errors': [],
'warnings': [],
'passed_tests': [],
'failed_tests': []
}
try:
# 尝试导入适配器
try:
import akshare_vnpy_adapter
test_result['passed_tests'].append("模块导入成功")
self.logger.info("AKShare-vnPy适配器导入成功")
except ImportError as e:
test_result['errors'].append(f"模块导入失败: {e}")
test_result['status'] = 'failed'
return test_result
# 检查适配器类
if hasattr(akshare_vnpy_adapter, 'AKShareDataAdapter'):
test_result['passed_tests'].append("适配器类存在")
# 测试数据获取方法
adapter = akshare_vnpy_adapter.AKShareDataAdapter()
# 测试股票数据获取
try:
# 这里可以根据实际情况调用方法
# 示例:test_stock_data = adapter.get_stock_daily('000001')
test_result['passed_tests'].append("适配器实例化成功")
except Exception as e:
test_result['errors'].append(f"适配器方法调用失败: {e}")
test_result['status'] = 'failed'
else:
test_result['errors'].append("适配器类不存在")
test_result['status'] = 'failed'
# 更新测试状态
if not test_result['errors']:
test_result['status'] = 'passed'
test_result['passed_tests'].append("所有测试通过")
except Exception as e:
test_result['status'] = 'error'
test_result['errors'].append(f"测试执行异常: {e}")
self.logger.error(f"测试执行异常: {e}")
test_result['end_time'] = datetime.now().isoformat()
self.results['tests'].append(test_result)
# 更新统计信息
self.results['summary']['total'] += 1
if test_result['status'] == 'passed':
self.results['summary']['passed'] += 1
elif test_result['status'] == 'failed':
self.results['summary']['failed'] += 1
else:
self.results['summary']['error'] += 1
self.logger.info(f"测试完成: {test_name} - 状态: {test_result['status']}")
return test_result
def test_data_completeness(self, data_path: str, required_fields: List[str]) -> Dict:
"""测试数据完整性
Args:
data_path: 数据文件路径
required_fields: 必需字段列表
Returns:
Dict: 测试结果
"""
test_name = "data_completeness_test"
self.logger.info(f"开始测试: {test_name} - 数据: {data_path}")
test_result = {
'name': test_name,
'data_path': data_path,
'start_time': datetime.now().isoformat(),
'status': 'pending',
'errors': [],
'warnings': [],
'missing_fields': [],
'total_fields': 0
}
try:
# 加载数据
if not os.path.exists(data_path):
test_result['errors'].append(f"数据文件不存在: {data_path}")
test_result['status'] = 'failed'
return test_result
with open(data_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 检查必需字段
missing_fields = []
for field in required_fields:
if field not in data:
missing_fields.append(field)
test_result['missing_fields'] = missing_fields
test_result['total_fields'] = len(data.keys())
if missing_fields:
test_result['errors'].append(f"缺失必需字段: {missing_fields}")
test_result['status'] = 'failed'
else:
test_result['passed_tests'] = [f"所有必需字段完整: {required_fields}"]
test_result['status'] = 'passed'
except Exception as e:
test_result['status'] = 'error'
test_result['errors'].append(f"测试执行异常: {e}")
self.logger.error(f"完整性测试异常: {e}")
test_result['end_time'] = datetime.datetime.now().isoformat()
self.results['tests'].append(test_result)
# 更新统计信息
self.results['summary']['total'] += 1
if test_result['status'] == 'passed':
self.results['summary']['passed'] += 1
elif test_result['status'] == 'failed':
self.results['summary']['failed'] += 1
else:
self.results['summary']['error'] += 1
return test_result
def test_data_consistency(self, data_path: str, expected_schema: Dict) -> Dict:
"""测试数据一致性
Args:
data_path: 数据文件路径
expected_schema: 期望的数据模式
Returns:
Dict: 测试结果
"""
test_name = "data_consistency_test"
self.logger.info(f"开始测试: {test_name} - 数据: {data_path}")
test_result = {
'name': test_name,
'data_path': data_path,
'start_time': datetime.now().isoformat(),
'status': 'pending',
'errors': [],
'warnings': [],
'inconsistent_fields': []
}
try:
# 加载数据
if not os.path.exists(data_path):
test_result['errors'].append(f"数据文件不存在: {data_path}")
test_result['status'] = 'failed'
return test_result
with open(data_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 检查数据类型一致性
inconsistent_fields = []
for field, expected_type in expected_schema.items():
if field in data:
actual_type = type(data[field]).__name__
if actual_type != expected_type:
inconsistent_fields.append({
'field': field,
'expected': expected_type,
'actual': actual_type
})
test_result['inconsistent_fields'] = inconsistent_fields
if inconsistent_fields:
test_result['errors'].append(f"字段类型不一致: {inconsistent_fields}")
test_result['status'] = 'failed'
else:
test_result['passed_tests'] = ["所有字段类型一致"]
test_result['status'] = 'passed'
except Exception as e:
test_result['status'] = 'error'
test_result['errors'].append(f"测试执行异常: {e}")
self.logger.error(f"一致性测试异常: {e}")
test_result['end_time'] = datetime.datetime.now().isoformat()
self.results['tests'].append(test_result)
# 更新统计信息
self.results['summary']['total'] += 1
if test_result['status'] == 'passed':
self.results['summary']['passed'] += 1
elif test_result['status'] == 'failed':
self.results['summary']['failed'] += 1
else:
self.results['summary']['error'] += 1
return test_result
def run_all_tests(self) -> Dict:
"""运行所有测试
Returns:
Dict: 所有测试结果
"""
self.logger.info("开始运行所有测试")
# 运行适配器测试
self.test_akshare_vnpy_adapter()
# 如果有配置文件中的测试用例
for test_case in self.config.get('test_cases', []):
test_type = test_case.get('type')
data_path = test_case.get('data_path')
if test_type == 'completeness' and 'required_fields' in test_case:
self.test_data_completeness(data_path, test_case['required_fields'])
elif test_type == 'consistency' and 'expected_schema' in test_case:
self.test_data_consistency(data_path, test_case['expected_schema'])
# 生成测试报告
report = self.generate_test_report()
self.logger.info(f"所有测试完成 - 通过: {report['summary']['passed']}, "
f"失败: {report['summary']['failed']}, "
f"错误: {report['summary']['error']}")
return report
def generate_test_report(self) -> Dict:
"""生成测试报告
Returns:
Dict: 测试报告
"""
# 计算测试统计
total_tests = len(self.results['tests'])
passed_tests = len([t for t in self.results['tests'] if t['status'] == 'passed'])
failed_tests = len([t for t in self.results['tests'] if t['status'] == 'failed'])
error_tests = len([t for t in self.results['tests'] if t['status'] == 'error'])
# 更新摘要信息
self.results['summary'] = {
'total': total_tests,
'passed': passed_tests,
'failed': failed_tests,
'error': error_tests,
'pass_rate': f"{(passed_tests/total_tests*100):.1f}%" if total_tests > 0 else "0%"
}
# 保存测试报告
report_path = './data/running_data/test_report.json'
os.makedirs(os.path.dirname(report_path), exist_ok=True)
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(self.results, f, ensure_ascii=False, indent=2)
self.logger.info(f"测试报告已保存: {report_path}")
return self.results
def save_results(self, output_path: str = None) -> str:
"""保存测试结果
Args:
output_path: 输出路径
Returns:
str: 保存的文件路径
"""
if output_path is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_path = f'./data/running_data/test_results_{timestamp}.json'
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.results, f, ensure_ascii=False, indent=2)
self.logger.info(f"测试结果已保存: {output_path}")
return output_path
def main():
"""主函数"""
tester = DataAdapterTester()
# 运行所有测试
report = tester.run_all_tests()
# 打印测试摘要
summary = report['summary']
print(f"\n=== 测试摘要 ===")
print(f"总测试数: {summary['total']}")
print(f"通过: {summary['passed']}")
print(f"失败: {summary['failed']}")
print(f"错误: {summary['error']}")
print(f"通过率: {summary['pass_rate']}")
# 保存详细结果
results_path = tester.save_results()
print(f"\n详细结果已保存至: {results_path}")
# 返回测试状态
if summary['failed'] > 0 or summary['error'] > 0:
return 1
else:
return 0
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)