#!/usr/bin/env python3 """ 导入赵云提供的 510300.SSE 日线数据到 vnpy sqlite 数据库 按照赵云提供的步骤: 1. 读取 parquet 文件 2. 转换为 vnpy BarData 3. 写入数据库 """ import sys import types # 兼容性模块 print("🔧 [IMPORT] 加载vnpy.app兼容性模块...") vnpy_app_module = types.ModuleType('vnpy.app') sys.modules['vnpy.app'] = vnpy_app_module submodules = ['cta_strategy', 'cta_backtester', 'data_manager'] for name in submodules: full_name = f'vnpy.app.{name}' submodule = types.ModuleType(full_name) sys.modules[full_name] = submodule setattr(vnpy_app_module, name, submodule) from vnpy_ctastrategy import ( CtaTemplate, StopOrder, TickData, BarData, TradeData, OrderData, BarGenerator, ArrayManager, ) from vnpy.trader.constant import Direction, Offset, Interval, Exchange sys.modules['vnpy.app.cta_strategy'].CtaTemplate = CtaTemplate vnpy_app_module.CtaTemplate = CtaTemplate from vnpy_ctabacktester import BacktesterEngine sys.modules['vnpy.app.cta_backtester'].BacktesterEngine = BacktesterEngine vnpy_app_module.BacktesterEngine = BacktesterEngine print("✅ [IMPORT] vnpy.app兼容性模块加载完成!") import pandas as pd from vnpy.trader.object import BarData from vnpy.trader.database import get_database def main(): print("\n🚀 [IMPORT] 开始导入 510300.SSE 日线数据") # 1. 读取parquet文件 parquet_path = "/Users/chufeng/nas/stock-data/sanguo_quant_live/zhaoyun-data/data/raw/daily/sh510300_daily.parquet" print(f"\n📖 [IMPORT] 读取数据: {parquet_path}") df = pd.read_parquet(parquet_path) print(f"✅ [IMPORT] 读取完成,共 {len(df)} 行") print(f" 时间范围: {df['trade_date'].min()} ~ {df['trade_date'].max()}") print(f" 列: {list(df.columns)}") # 2. 转换为 vnpy BarData print(f"\n🔧 [IMPORT] 转换为 BarData...") bars = [] for idx, row in df.iterrows(): bar = BarData( symbol="510300", exchange=Exchange.SSE, interval=Interval.DAILY, datetime=row["trade_date"], open_price=row["open"], high_price=row["high"], low_price=row["low"], close_price=row["close"], volume=row["volume"], turnover=row["amount"], gateway_name="DATA" ) bars.append(bar) print(f"✅ [IMPORT] 转换完成,共 {len(bars)} 个BarData") # 3. 写入数据库 print(f"\n💾 [IMPORT] 写入数据库...") db = get_database() # 先统计原有数据 from datetime import datetime start_dt = datetime(2012, 5, 28) end_dt = datetime(2026, 3, 27) existing = db.load_bar_data("510300", Exchange.SSE, Interval.DAILY, start_dt, end_dt) print(f"⚠️ [IMPORT] 原有数据: {len(existing)} 条在这个范围内") # 保存数据 db.save_bar_data(bars) print(f"✅ [IMPORT] 保存完成,共写入 {len(bars)} 条") # 验证写入 existing_after = db.load_bar_data("510300", Exchange.SSE, Interval.DAILY, start_dt, end_dt) print(f"✅ [IMPORT] 写入后验证: {len(existing_after)} 条在这个范围内") # 验证目标时间范围 print(f"\n✅ [IMPORT] 验证目标时间范围 2021-01-01 ~ 2026-03-01:") start_target = datetime(2021, 1, 1) end_target = datetime(2026, 3, 1) target_bars = db.load_bar_data("510300", Exchange.SSE, Interval.DAILY, start_target, end_target) print(f" 找到 {len(target_bars)} 条数据") if len(target_bars) > 0: print(f" 第一条: {target_bars[0].datetime}") print(f" 最后一条: {target_bars[-1].datetime}") print(f" ✅ 满足需求!") print("\n🎉 [IMPORT] 导入完成!") if __name__ == '__main__': main()