Files
sanguo_vnpy/scripts/utils/import_data_to_sqlite.py
T
2026-04-11 21:18:55 +08:00

116 lines
3.8 KiB
Python

#!/usr/bin/env python3
"""
导入赵云提供的 510300.SSE 日线数据到 vnpy sqlite 数据库
按照赵云提供的步骤:
1. 读取 parquet 文件
2. 转换为 vnpy BarData
3. 写入数据库
"""
import sys
import types
# 兼容性模块
print("🔧 [IMPORT] 加载vnpy.app兼容性模块...")
vnpy_app_module = types.ModuleType('vnpy.app')
sys.modules['vnpy.app'] = vnpy_app_module
submodules = ['cta_strategy', 'cta_backtester', 'data_manager']
for name in submodules:
full_name = f'vnpy.app.{name}'
submodule = types.ModuleType(full_name)
sys.modules[full_name] = submodule
setattr(vnpy_app_module, name, submodule)
from vnpy_ctastrategy import (
CtaTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager,
)
from vnpy.trader.constant import Direction, Offset, Interval, Exchange
sys.modules['vnpy.app.cta_strategy'].CtaTemplate = CtaTemplate
vnpy_app_module.CtaTemplate = CtaTemplate
from vnpy_ctabacktester import BacktesterEngine
sys.modules['vnpy.app.cta_backtester'].BacktesterEngine = BacktesterEngine
vnpy_app_module.BacktesterEngine = BacktesterEngine
print("✅ [IMPORT] vnpy.app兼容性模块加载完成!")
import pandas as pd
from vnpy.trader.object import BarData
from vnpy.trader.database import get_database
def main():
print("\n🚀 [IMPORT] 开始导入 510300.SSE 日线数据")
# 1. 读取parquet文件
parquet_path = "/Users/chufeng/nas/stock-data/sanguo_quant_live/zhaoyun-data/data/raw/daily/sh510300_daily.parquet"
print(f"\n📖 [IMPORT] 读取数据: {parquet_path}")
df = pd.read_parquet(parquet_path)
print(f"✅ [IMPORT] 读取完成,共 {len(df)}")
print(f" 时间范围: {df['trade_date'].min()} ~ {df['trade_date'].max()}")
print(f" 列: {list(df.columns)}")
# 2. 转换为 vnpy BarData
print(f"\n🔧 [IMPORT] 转换为 BarData...")
bars = []
for idx, row in df.iterrows():
bar = BarData(
symbol="510300",
exchange=Exchange.SSE,
interval=Interval.DAILY,
datetime=row["trade_date"],
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
close_price=row["close"],
volume=row["volume"],
turnover=row["amount"],
gateway_name="DATA"
)
bars.append(bar)
print(f"✅ [IMPORT] 转换完成,共 {len(bars)} 个BarData")
# 3. 写入数据库
print(f"\n💾 [IMPORT] 写入数据库...")
db = get_database()
# 先统计原有数据
from datetime import datetime
start_dt = datetime(2012, 5, 28)
end_dt = datetime(2026, 3, 27)
existing = db.load_bar_data("510300", Exchange.SSE, Interval.DAILY, start_dt, end_dt)
print(f"⚠️ [IMPORT] 原有数据: {len(existing)} 条在这个范围内")
# 保存数据
db.save_bar_data(bars)
print(f"✅ [IMPORT] 保存完成,共写入 {len(bars)}")
# 验证写入
existing_after = db.load_bar_data("510300", Exchange.SSE, Interval.DAILY, start_dt, end_dt)
print(f"✅ [IMPORT] 写入后验证: {len(existing_after)} 条在这个范围内")
# 验证目标时间范围
print(f"\n✅ [IMPORT] 验证目标时间范围 2021-01-01 ~ 2026-03-01:")
start_target = datetime(2021, 1, 1)
end_target = datetime(2026, 3, 1)
target_bars = db.load_bar_data("510300", Exchange.SSE, Interval.DAILY, start_target, end_target)
print(f" 找到 {len(target_bars)} 条数据")
if len(target_bars) > 0:
print(f" 第一条: {target_bars[0].datetime}")
print(f" 最后一条: {target_bars[-1].datetime}")
print(f" ✅ 满足需求!")
print("\n🎉 [IMPORT] 导入完成!")
if __name__ == '__main__':
main()