#!/usr/bin/env python3 """ 数据格式转换工具 - 姜维 功能:将赵云将军的本地数据格式转换为vn.py兼容格式 """ import pandas as pd import os import glob import json import logging from datetime import datetime from pathlib import Path # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('data_convert_tool.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class DataConverter: """ 数据格式转换器 赵云格式 → vn.py格式 """ # 赵云数据字段映射到vn.py字段 FIELD_MAPPING = { # 基本字段 'date': 'datetime', 'open': 'open_price', 'high': 'high_price', 'low': 'low_price', 'close': 'close_price', 'volume': 'volume', 'amount': 'turnover', # 注意:vn.py中turnover是成交额 'turnover': 'turnover_rate', # 换手率 # 可选字段 'outstanding_share': 'outstanding_share', 'year': 'year', # 财务数据字段 'pe_ttm': 'pe_ttm', 'pb': 'pb', 'roe': 'roe', 'total_market_cap': 'total_market_cap', 'circulating_market_cap': 'circulating_market_cap', } # 必需字段 REQUIRED_FIELDS = ['date', 'open', 'high', 'low', 'close', 'volume'] def __init__(self, zhaoyun_data_dir: str, output_dir: str): """ 初始化转换器 Args: zhaoyun_data_dir: 赵云数据目录 output_dir: 输出目录 """ self.zhaoyun_dir = zhaoyun_data_dir self.output_dir = output_dir # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 子目录结构 self.subdirs = { 'daily': 'daily', 'minute': 'minute', 'financial': 'financial', 'stock_info': 'stock_info', } for subdir in self.subdirs.values(): os.makedirs(os.path.join(output_dir, subdir), exist_ok=True) def analyze_zhaoyun_structure(self) -> dict: """ 分析赵云数据目录结构 Returns: 结构分析报告 """ report = { 'timestamp': datetime.now().isoformat(), 'zhaoyun_dir': self.zhaoyun_dir, 'exists': os.path.exists(self.zhaoyun_dir), 'subdirectories': {}, 'file_counts': {}, 'sample_files': {}, 'data_quality': {}, } if not report['exists']: logger.error(f"赵云数据目录不存在: {self.zhaoyun_dir}") return report # 分析子目录 for subdir in ['raw/daily', 'raw/financial', 'raw/stock_info', 'raw/minute_kline']: full_path = os.path.join(self.zhaoyun_dir, subdir) if os.path.exists(full_path): # 统计文件 parquet_files = list(glob.glob(os.path.join(full_path, '**/*.parquet'), recursive=True)) csv_files = list(glob.glob(os.path.join(full_path, '**/*.csv'), recursive=True)) report['subdirectories'][subdir] = { 'path': full_path, 'parquet_count': len(parquet_files), 'csv_count': len(csv_files), 'total_files': len(parquet_files) + len(csv_files), } # 取样分析 if parquet_files: sample_file = parquet_files[0] try: df = pd.read_parquet(sample_file) report['sample_files'][subdir] = { 'file': sample_file, 'rows': len(df), 'columns': list(df.columns), 'dtypes': str(df.dtypes.to_dict()), 'date_range': { 'min': str(df['date'].min()) if 'date' in df.columns else 'N/A', 'max': str(df['date'].max()) if 'date' in df.columns else 'N/A', } if 'date' in df.columns else {}, } except Exception as e: report['sample_files'][subdir] = {'error': str(e)} logger.info(f"赵云数据结构分析完成") return report def convert_daily_data(self, year: int = None, symbols: list = None, limit: int = None): """ 转换日线数据 Args: year: 指定年份,None表示所有年份 symbols: 指定股票代码列表,None表示所有股票 limit: 限制转换数量(用于测试) """ daily_dir = os.path.join(self.zhaoyun_dir, 'raw/daily') if not os.path.exists(daily_dir): logger.error(f"赵云日线数据目录不存在: {daily_dir}") return # 确定年份范围 if year: years = [str(year)] else: years = [d for d in os.listdir(daily_dir) if os.path.isdir(os.path.join(daily_dir, d))] years.sort() logger.info(f"开始转换日线数据,年份: {years}") total_converted = 0 total_failed = 0 for year_dir in years: year_path = os.path.join(daily_dir, year_dir) output_year_path = os.path.join(self.output_dir, 'daily', year_dir) os.makedirs(output_year_path, exist_ok=True) # 查找所有parquet文件 parquet_files = glob.glob(os.path.join(year_path, '*.parquet')) if symbols: # 过滤指定股票 filtered_files = [] for file in parquet_files: file_name = os.path.basename(file) # 从文件名提取股票代码 if 'sh' in file_name: symbol = file_name.split('_')[0][2:] + '.SH' elif 'sz' in file_name: symbol = file_name.split('_')[0][2:] + '.SZ' elif 'bj' in file_name: symbol = file_name.split('_')[0][2:] + '.BJ' else: symbol = file_name.split('_')[0] if symbol in symbols or symbol.replace('.SH', '').replace('.SZ', '').replace('.BJ', '') in symbols: filtered_files.append(file) parquet_files = filtered_files if limit: parquet_files = parquet_files[:limit] logger.info(f"转换 {year_dir} 年数据,共 {len(parquet_files)} 个文件") for file_idx, file_path in enumerate(parquet_files, 1): try: # 从文件名提取信息 file_name = os.path.basename(file_path) # 解析股票代码和交易所 if file_name.startswith('sh'): symbol = file_name[2:8] # 提取6位数字代码 exchange = 'SH' elif file_name.startswith('sz'): symbol = file_name[2:8] exchange = 'SZ' elif file_name.startswith('bj'): symbol = file_name[2:8] exchange = 'BJ' else: symbol = file_name.split('_')[0] exchange = 'SH' # 默认 # 读取数据 df = pd.read_parquet(file_path) # 检查必需字段 missing_fields = [field for field in self.REQUIRED_FIELDS if field not in df.columns] if missing_fields: logger.warning(f"文件 {file_name} 缺少必需字段: {missing_fields}") total_failed += 1 continue # 创建vn.py格式DataFrame vnpy_df = pd.DataFrame() # 转换字段 for zhaoyun_field, vnpy_field in self.FIELD_MAPPING.items(): if zhaoyun_field in df.columns: vnpy_df[vnpy_field] = df[zhaoyun_field] # 特殊处理datetime字段 if 'datetime' not in vnpy_df.columns and 'date' in df.columns: vnpy_df['datetime'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d %H:%M:%S') # 添加标识字段 vnpy_df['symbol'] = symbol vnpy_df['exchange'] = exchange vnpy_df['interval'] = '1d' # 添加唯一ID(可选) vnpy_df['id'] = range(1, len(vnpy_df) + 1) # 输出文件名 output_file = os.path.join(output_year_path, f"{exchange}{symbol}_daily_vnpy.parquet") # 保存为parquet vnpy_df.to_parquet(output_file, index=False) total_converted += 1 if file_idx % 100 == 0 or file_idx == len(parquet_files): logger.info(f"进度: {year_dir}年 {file_idx}/{len(parquet_files)} 转换: {total_converted} 失败: {total_failed}") except Exception as e: logger.error(f"转换文件失败 {file_path}: {e}") total_failed += 1 logger.info(f"日线数据转换完成: 成功 {total_converted}, 失败 {total_failed}") # 保存转换报告 report = { 'conversion_date': datetime.now().isoformat(), 'zhaoyun_dir': daily_dir, 'output_dir': os.path.join(self.output_dir, 'daily'), 'years_converted': years, 'total_converted': total_converted, 'total_failed': total_failed, 'symbols_converted': symbols if symbols else 'ALL', } report_file = os.path.join(self.output_dir, 'daily_conversion_report.json') with open(report_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) logger.info(f"转换报告已保存: {report_file}") def convert_stock_info(self): """转换股票基础信息""" stock_info_dir = os.path.join(self.zhaoyun_dir, 'raw/stock_info') if not os.path.exists(stock_info_dir): logger.warning(f"赵云股票信息目录不存在: {stock_info_dir}") return # 查找股票信息文件 stock_files = glob.glob(os.path.join(stock_info_dir, '*.parquet')) + \ glob.glob(os.path.join(stock_info_dir, '*.csv')) if not stock_files: logger.warning(f"未找到股票信息文件") return logger.info(f"开始转换股票信息,共 {len(stock_files)} 个文件") all_stock_info = [] for file_path in stock_files: try: # 读取文件 if file_path.endswith('.parquet'): df = pd.read_parquet(file_path) else: df = pd.read_csv(file_path) # 标准化字段名 column_mapping = { '代码': 'symbol', '名称': 'name', '行业': 'industry', '市场': 'market', '上市日期': 'list_date', '总市值': 'total_market_cap', '流通市值': 'circulating_market_cap', '市盈率': 'pe', '市净率': 'pb', 'ROE': 'roe', } df = df.rename(columns={k: v for k, v in column_mapping.items() if k in df.columns}) # 添加exchange字段 if 'symbol' in df.columns: df['exchange'] = df['symbol'].apply(lambda x: 'SH' if str(x).startswith('6') else 'SZ') all_stock_info.append(df) logger.info(f"转换股票信息文件: {os.path.basename(file_path)} ({len(df)} 条记录)") except Exception as e: logger.error(f"转换股票信息失败 {file_path}: {e}") if all_stock_info: # 合并所有数据 combined_df = pd.concat(all_stock_info, ignore_index=True) # 去重 if 'symbol' in combined_df.columns: combined_df = combined_df.drop_duplicates(subset=['symbol']) # 保存 output_file = os.path.join(self.output_dir, 'stock_info', 'stock_basic_info_vnpy.parquet') combined_df.to_parquet(output_file, index=False) logger.info(f"股票信息转换完成: {output_file} ({len(combined_df)} 只股票)") def create_config_file(self): """创建vn.py配置文件""" config = { 'data_source': 'zhaoyun_local_data', 'data_directory': os.path.abspath(self.output_dir), 'priority': 'local_first', 'fields_mapping': self.FIELD_MAPPING, 'created_at': datetime.now().isoformat(), 'description': '赵云本地数据 → vn.py格式转换配置', 'usage': { 'daily_data_path': '{data_directory}/daily/{year}/{exchange}{symbol}_daily_vnpy.parquet', 'stock_info_path': '{data_directory}/stock_info/stock_basic_info_vnpy.parquet', 'python_import': 'from vnpy_local_data_adapter import VnpyLocalDataAdapter', 'init_code': 'adapter = VnpyLocalDataAdapter(use_local_first=True)', } } config_file = os.path.join(self.output_dir, 'vnpy_data_config.json') with open(config_file, 'w', encoding='utf-8') as f: json.dump(config, f, ensure_ascii=False, indent=2) logger.info(f"vn.py配置文件已创建: {config_file}") # 创建使用说明 readme = f"""# vn.py本地数据使用说明 ## 数据来源 - 原始数据:赵云将军下载的A股数据 - 转换工具:姜维数据格式转换器 - 输出格式:vn.py兼容的parquet格式 ## 目录结构 ``` {self.output_dir}/ ├── daily/ # 日线数据 │ ├── 2010/ # 按年分区 │ ├── 2011/ │ └── ... ├── stock_info/ # 股票基础信息 │ └── stock_basic_info_vnpy.parquet ├── vnpy_data_config.json # 配置文件 └── daily_conversion_report.json # 转换报告 ``` ## 使用方法 ### 1. 在vn.py策略中使用 ```python from vnpy_local_data_adapter import VnpyLocalDataAdapter # 创建适配器(优先使用本地数据) adapter = VnpyLocalDataAdapter(use_local_first=True) # 获取数据 data = adapter.get_daily_data("000001.SZ", "2024-01-01", "2024-03-01") ``` ### 2. 直接读取数据 ```python import pandas as pd # 读取日线数据 file_path = "{self.output_dir}/daily/2024/SH600000_daily_vnpy.parquet" df = pd.read_parquet(file_path) # 读取股票信息 stock_info_path = "{self.output_dir}/stock_info/stock_basic_info_vnpy.parquet" stock_info = pd.read_parquet(stock_info_path) ``` ### 3. 验证数据结构 ```python from vnpy_local_data_adapter import VnpyLocalDataAdapter adapter = VnpyLocalDataAdapter() result = adapter.verify_local_data_structure("000001.SZ") print(result) ``` ## 数据更新 1. 联系赵云将军更新原始数据 2. 运行数据转换工具更新vn.py格式数据 3. 验证数据完整性 ## 注意事项 - 本地数据优先,缺失时自动回退到akshare - 数据文件按年分区,提高查询效率 - 定期检查数据完整性 **维护者**: 姜维(后勤总督) **数据源**: 赵云(数据工程将军) **最后更新**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} """ readme_file = os.path.join(self.output_dir, 'README.md') with open(readme_file, 'w', encoding='utf-8') as f: f.write(readme) logger.info(f"使用说明已创建: {readme_file}") def main(): """主函数""" print("=" * 60) print("赵云数据 → vn.py格式转换工具") print("=" * 60) # 配置路径 ZHAOYUN_DATA_DIR = "/Users/chufeng/nas/stock/sanguo_vnpy/zhaoyun-data/data" OUTPUT_DIR = "/Users/chufeng/.openclaw/workspace-jiangwei/vnpy_local_data" # 创建转换器 converter = DataConverter(ZHAOYUN_DATA_DIR, OUTPUT_DIR) # 1. 分析数据结构 print("\n1. 分析赵云数据结构...") structure_report = converter.analyze_zhaoyun_structure() if not structure_report['exists']: print(f"❌ 赵云数据目录不存在: {ZHAOYUN_DATA_DIR}") return print(f"✅ 赵云数据目录有效") for subdir, info in structure_report['subdirectories'].items(): print(f" {subdir}: {info['total_files']} 个文件") # 2. 转换日线数据(测试模式,只转换2024年的前10个文件) print("\n2. 转换日线数据(测试模式)...") converter.convert_daily_data(year=2024, limit=10) # 3. 转换股票信息 print("\n3. 转换股票信息...") converter.convert_stock_info() # 4. 创建配置文件 print("\n4. 创建配置文件...") converter.create_config_file() print("\n" + "=" * 60) print("转换完成!") print(f"输出目录: {OUTPUT_DIR}") print("=" * 60) print("\n下一步操作:") print("1. 将 vnpy_local_data_adapter.py 集成到vn.py策略中") print("2. 配置数据路径: vnpy_data_config.json") print("3. 测试数据加载: python test_vnpy_data.py") print("4. 联系赵云将军更新数据") print("=" * 60) if __name__ == "__main__": main()