Files
sanguo_vnpy/scripts/utils/import_parquet_to_vnpy.py
T
2026-04-12 10:20:09 +08:00

180 lines
5.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""
将赵云将军下载的parquet数据导入到vn.py数据库
"""
import pandas as pd
import sqlite3
import os
from datetime import datetime
def main():
print("🚀 将parquet数据导入到vn.py数据库")
print("="*60)
# 配置
parquet_path = "/Users/chufeng/nas/stock-data/sanguo_quant_live/zhaoyun-data/data/raw/daily/sh510300_daily.parquet"
db_path = "/Users/chufeng/.openclaw/workspace-zhaoyun/zhaoyun-data/data/database_test.db"
symbol = "510300.SSE"
exchange = "SSE"
interval = "1d"
print(f"源数据: {parquet_path}")
print(f"目标数据库: {db_path}")
print(f"标的: {symbol}")
# 读取parquet
print("\n📥 读取parquet数据...")
df = pd.read_parquet(parquet_path)
print(f" 读取成功: {len(df)}")
# 创建数据库
print(f"\n💾 创建vn.py数据库...")
if os.path.exists(db_path):
os.remove(db_path)
print(f" 删除旧数据库")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建vn.py标准表结构
cursor.execute("""
CREATE TABLE dbbardata (
symbol TEXT NOT NULL,
exchange TEXT,
interval TEXT NOT NULL,
datetime INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume INTEGER NOT NULL,
open_interest REAL,
turnover REAL,
PRIMARY KEY (symbol, interval, datetime)
);
""")
# 创建索引
cursor.execute("CREATE INDEX ix_dbbardata_symbol ON dbbardata(symbol);")
cursor.execute("CREATE INDEX ix_dbbardata_symbol_interval ON dbbardata(symbol, interval);")
# 导入数据
print(f"\n📊 导入数据到vn.py数据库...")
imported = 0
errors = 0
for idx, row in df.iterrows():
# 获取日期
date_val = row['date']
if isinstance(date_val, pd.Timestamp):
dt = date_val.to_pydatetime()
else:
date_str = str(date_val)
if '-' in date_str:
dt = datetime.strptime(date_str, '%Y-%m-%d')
else:
dt = datetime.strptime(date_str, '%Y%m%d')
timestamp = int(dt.timestamp())
# 获取价格数据
open_price = float(row['open'])
high_price = float(row['high'])
low_price = float(row['low'])
close_price = float(row['close'])
volume = int(float(row['volume']))
# 成交额
if 'amount' in row:
turnover = float(row['amount'])
else:
turnover = volume * close_price
# 插入
try:
cursor.execute("""
INSERT INTO dbbardata (
symbol, exchange, interval, datetime,
open, high, low, close, volume, turnover
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
symbol,
exchange,
interval,
timestamp,
open_price,
high_price,
low_price,
close_price,
volume,
turnover
))
imported += 1
if imported % 500 == 0:
print(f" 已导入 {imported} 行...")
except Exception as e:
print(f" ⚠️ 第{idx}行导入失败: {e}")
errors += 1
# 提交
conn.commit()
# 验证
print("\n🔍 验证导入结果...")
cursor.execute("SELECT COUNT(*) FROM dbbardata WHERE symbol = ?", (symbol,))
count = cursor.fetchone()[0]
cursor.execute("SELECT MIN(datetime), MAX(datetime) FROM dbbardata WHERE symbol = ?", (symbol,))
min_ts, max_ts = cursor.fetchone()
if min_ts and max_ts:
min_dt = datetime.fromtimestamp(min_ts).strftime('%Y-%m-%d')
max_dt = datetime.fromtimestamp(max_ts).strftime('%Y-%m-%d')
else:
min_dt = 'N/A'
max_dt = 'N/A'
cursor.execute("SELECT MIN(close), MAX(close), AVG(volume) FROM dbbardata WHERE symbol = ?", (symbol,))
min_close, max_close, avg_volume = cursor.fetchone()
conn.close()
# 统计
print("\n" + "="*60)
print("✅ 导入完成!")
print(f"源文件: {parquet_path}")
print(f"目标数据库: {db_path}")
print(f"标的: {symbol}")
print(f"源数据行数: {len(df)}")
print(f"成功导入: {imported}")
print(f"导入失败: {errors}")
print(f"数据库验证: {count}")
print(f"时间范围: {min_dt} -> {max_dt}")
print(f"价格范围: {min_close:.2f} ~ {max_close:.2f}")
print(f"平均成交量: {avg_volume:.0f}")
print("="*60)
# 显示文件大小
if os.path.exists(db_path):
size_kb = os.path.getsize(db_path) / 1024
print(f"\n📦 数据库文件大小: {size_kb:.1f} KB")
print("\n🎯 完成!")
print("下一步:")
print("1. 确认数据库路径正确")
print("2. 重启回测API服务")
print("3. 关羽将军开始回测")
return True
if __name__ == "__main__":
main()