Files
sanguo_vnpy/scripts/utils/create_database_in_container.py
T
2026-04-11 21:18:55 +08:00

199 lines
5.1 KiB
Python

#!/usr/bin/env python3
"""
在容器内创建vn.py数据库,并导入510300.SSE数据
数据文件已经在NAS: /volume1/stock-data/sanguo_quant_live/zhaoyun-data/data/raw/daily/sh510300_daily.parquet
"""
script_content = '''
import pandas as pd
import sqlite3
import os
from datetime import datetime
print("🚀 在容器内导入510300.SSE数据...")
print("="*60)
# 配置
parquet_path = "/volume1/stock-data/sanguo_quant_live/zhaoyun-data/data/raw/daily/sh510300_daily.parquet"
db_path = "/volume1/stock/sanguo_vnpy/data/database_test.db"
symbol = "510300.SSE"
exchange = "SSE"
interval = "1d"
print(f"源数据: {parquet_path}")
print(f"目标数据库: {db_path}")
print(f"标的: {symbol}")
# 检查源文件存在
if not os.path.exists(parquet_path):
print(f"❌ 源文件不存在: {parquet_path}")
exit(1)
print(f"✅ 源文件存在")
# 读取parquet
print("\\n📥 读取parquet数据...")
df = pd.read_parquet(parquet_path)
print(f" 读取成功: {len(df)} 行")
# 创建数据库
print(f"\\n💾 创建vn.py数据库...")
if os.path.exists(db_path):
os.remove(db_path)
print(f" 删除旧数据库")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建vn.py标准表结构
cursor.execute("""
CREATE TABLE dbbardata (
symbol TEXT NOT NULL,
exchange TEXT,
interval TEXT NOT NULL,
datetime INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume INTEGER NOT NULL,
open_interest REAL,
turnover REAL,
PRIMARY KEY (symbol, interval, datetime)
);
""")
# 创建索引
cursor.execute("CREATE INDEX ix_dbbardata_symbol ON dbbardata(symbol);")
cursor.execute("CREATE INDEX ix_dbbardata_symbol_interval ON dbbardata(symbol, interval);")
# 导入数据
print(f"\\n📊 导入数据...")
imported = 0
errors = 0
for idx, row in df.iterrows():
# 获取日期
date_val = row['date']
if isinstance(date_val, pd.Timestamp):
dt = date_val.to_pydatetime()
else:
date_str = str(date_val)
if '-' in date_str:
dt = datetime.strptime(date_str, '%Y-%m-%d')
else:
dt = datetime.strptime(date_str, '%Y%m%d')
timestamp = int(dt.timestamp())
# 获取价格数据
open_price = float(row['open'])
high_price = float(row['high'])
low_price = float(row['low'])
close_price = float(row['close'])
volume = int(float(row['volume']))
# 成交额
if 'amount' in row:
turnover = float(row['amount'])
else:
turnover = volume * close_price
# 插入
try:
cursor.execute("""
INSERT INTO dbbardata (
symbol, exchange, interval, datetime,
open, high, low, close, volume, turnover
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
symbol,
exchange,
interval,
timestamp,
open_price,
high_price,
low_price,
close_price,
volume,
turnover
))
imported += 1
if imported % 500 == 0:
print(f" 已导入 {imported} 行...")
except Exception as e:
print(f" ⚠️ 第{idx}行导入失败: {e}")
errors += 1
# 提交
conn.commit()
# 验证
print("\\n🔍 验证导入结果...")
cursor.execute("SELECT COUNT(*) FROM dbbardata WHERE symbol = ?", (symbol,))
count = cursor.fetchone()[0]
cursor.execute("SELECT MIN(datetime), MAX(datetime) FROM dbbardata WHERE symbol = ?", (symbol,))
min_ts, max_ts = cursor.fetchone()
if min_ts and max_ts:
min_dt = datetime.fromtimestamp(min_ts).strftime('%Y-%m-%d')
max_dt = datetime.fromtimestamp(max_ts).strftime('%Y-%m-%d')
else:
min_dt = 'N/A'
max_dt = 'N/A'
cursor.execute("SELECT MIN(close), MAX(close), AVG(volume) FROM dbbardata WHERE symbol = ?", (symbol,))
min_close, max_close, avg_volume = cursor.fetchone()
conn.close()
# 统计
print("\\n" + "="*60)
print("✅ 导入完成!")
print(f"源文件: {parquet_path}")
print(f"目标数据库: {db_path}")
print(f"标的: {symbol}")
print(f"源数据行数: {len(df)}")
print(f"成功导入: {imported}")
print(f"导入失败: {errors}")
print(f"数据库验证: {count}")
print(f"时间范围: {min_dt} -> {max_dt}")
print(f"价格范围: {min_close:.2f} ~ {max_close:.2f}")
print(f"平均成交量: {avg_volume:.0f}")
print("="*60)
# 显示文件大小
if os.path.exists(db_path):
size_kb = os.path.getsize(db_path) / 1024
print(f"\\n📦 数据库文件大小: {size_kb:.1f} KB")
print("\\n🎯 完成!")
print("数据库已创建在容器可访问路径: {db_path}")
print("现在可以重启API服务了")
'''
# 将脚本发送到容器
import subprocess
import sys
print("🚀 在容器内创建数据库...")
print("="*60)
cmd = f'''ssh admin@192.168.2.154 "export PATH=\$PATH:/var/packages/Docker/target/usr/bin && docker exec -i sanguo_vnpy python3 - << 'EOF'
{script_content}
EOF
"'''
result = subprocess.run(cmd, shell=True)
print("="*60)
print("完成!")