199 lines
5.1 KiB
Python
199 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
在容器内创建vn.py数据库,并导入510300.SSE数据
|
|
数据文件已经在NAS: /volume1/stock-data/sanguo_quant_live/zhaoyun-data/data/raw/daily/sh510300_daily.parquet
|
|
"""
|
|
|
|
script_content = '''
|
|
import pandas as pd
|
|
import sqlite3
|
|
import os
|
|
from datetime import datetime
|
|
|
|
print("🚀 在容器内导入510300.SSE数据...")
|
|
print("="*60)
|
|
|
|
# 配置
|
|
parquet_path = "/volume1/stock-data/sanguo_quant_live/zhaoyun-data/data/raw/daily/sh510300_daily.parquet"
|
|
db_path = "/volume1/stock/sanguo_vnpy/data/database_test.db"
|
|
symbol = "510300.SSE"
|
|
exchange = "SSE"
|
|
interval = "1d"
|
|
|
|
print(f"源数据: {parquet_path}")
|
|
print(f"目标数据库: {db_path}")
|
|
print(f"标的: {symbol}")
|
|
|
|
# 检查源文件存在
|
|
if not os.path.exists(parquet_path):
|
|
print(f"❌ 源文件不存在: {parquet_path}")
|
|
exit(1)
|
|
|
|
print(f"✅ 源文件存在")
|
|
|
|
# 读取parquet
|
|
print("\\n📥 读取parquet数据...")
|
|
df = pd.read_parquet(parquet_path)
|
|
print(f" 读取成功: {len(df)} 行")
|
|
|
|
# 创建数据库
|
|
print(f"\\n💾 创建vn.py数据库...")
|
|
|
|
if os.path.exists(db_path):
|
|
os.remove(db_path)
|
|
print(f" 删除旧数据库")
|
|
|
|
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
|
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# 创建vn.py标准表结构
|
|
cursor.execute("""
|
|
CREATE TABLE dbbardata (
|
|
symbol TEXT NOT NULL,
|
|
exchange TEXT,
|
|
interval TEXT NOT NULL,
|
|
datetime INTEGER NOT NULL,
|
|
open REAL NOT NULL,
|
|
high REAL NOT NULL,
|
|
low REAL NOT NULL,
|
|
close REAL NOT NULL,
|
|
volume INTEGER NOT NULL,
|
|
open_interest REAL,
|
|
turnover REAL,
|
|
PRIMARY KEY (symbol, interval, datetime)
|
|
);
|
|
""")
|
|
|
|
# 创建索引
|
|
cursor.execute("CREATE INDEX ix_dbbardata_symbol ON dbbardata(symbol);")
|
|
cursor.execute("CREATE INDEX ix_dbbardata_symbol_interval ON dbbardata(symbol, interval);")
|
|
|
|
# 导入数据
|
|
print(f"\\n📊 导入数据...")
|
|
|
|
imported = 0
|
|
errors = 0
|
|
|
|
for idx, row in df.iterrows():
|
|
# 获取日期
|
|
date_val = row['date']
|
|
|
|
if isinstance(date_val, pd.Timestamp):
|
|
dt = date_val.to_pydatetime()
|
|
else:
|
|
date_str = str(date_val)
|
|
if '-' in date_str:
|
|
dt = datetime.strptime(date_str, '%Y-%m-%d')
|
|
else:
|
|
dt = datetime.strptime(date_str, '%Y%m%d')
|
|
|
|
timestamp = int(dt.timestamp())
|
|
|
|
# 获取价格数据
|
|
open_price = float(row['open'])
|
|
high_price = float(row['high'])
|
|
low_price = float(row['low'])
|
|
close_price = float(row['close'])
|
|
volume = int(float(row['volume']))
|
|
|
|
# 成交额
|
|
if 'amount' in row:
|
|
turnover = float(row['amount'])
|
|
else:
|
|
turnover = volume * close_price
|
|
|
|
# 插入
|
|
try:
|
|
cursor.execute("""
|
|
INSERT INTO dbbardata (
|
|
symbol, exchange, interval, datetime,
|
|
open, high, low, close, volume, turnover
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
symbol,
|
|
exchange,
|
|
interval,
|
|
timestamp,
|
|
open_price,
|
|
high_price,
|
|
low_price,
|
|
close_price,
|
|
volume,
|
|
turnover
|
|
))
|
|
|
|
imported += 1
|
|
|
|
if imported % 500 == 0:
|
|
print(f" 已导入 {imported} 行...")
|
|
|
|
except Exception as e:
|
|
print(f" ⚠️ 第{idx}行导入失败: {e}")
|
|
errors += 1
|
|
|
|
# 提交
|
|
conn.commit()
|
|
|
|
# 验证
|
|
print("\\n🔍 验证导入结果...")
|
|
cursor.execute("SELECT COUNT(*) FROM dbbardata WHERE symbol = ?", (symbol,))
|
|
count = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT MIN(datetime), MAX(datetime) FROM dbbardata WHERE symbol = ?", (symbol,))
|
|
min_ts, max_ts = cursor.fetchone()
|
|
|
|
if min_ts and max_ts:
|
|
min_dt = datetime.fromtimestamp(min_ts).strftime('%Y-%m-%d')
|
|
max_dt = datetime.fromtimestamp(max_ts).strftime('%Y-%m-%d')
|
|
else:
|
|
min_dt = 'N/A'
|
|
max_dt = 'N/A'
|
|
|
|
cursor.execute("SELECT MIN(close), MAX(close), AVG(volume) FROM dbbardata WHERE symbol = ?", (symbol,))
|
|
min_close, max_close, avg_volume = cursor.fetchone()
|
|
|
|
conn.close()
|
|
|
|
# 统计
|
|
print("\\n" + "="*60)
|
|
print("✅ 导入完成!")
|
|
print(f"源文件: {parquet_path}")
|
|
print(f"目标数据库: {db_path}")
|
|
print(f"标的: {symbol}")
|
|
print(f"源数据行数: {len(df)}")
|
|
print(f"成功导入: {imported}")
|
|
print(f"导入失败: {errors}")
|
|
print(f"数据库验证: {count} 行")
|
|
print(f"时间范围: {min_dt} -> {max_dt}")
|
|
print(f"价格范围: {min_close:.2f} ~ {max_close:.2f}")
|
|
print(f"平均成交量: {avg_volume:.0f}")
|
|
print("="*60)
|
|
|
|
# 显示文件大小
|
|
if os.path.exists(db_path):
|
|
size_kb = os.path.getsize(db_path) / 1024
|
|
print(f"\\n📦 数据库文件大小: {size_kb:.1f} KB")
|
|
|
|
print("\\n🎯 完成!")
|
|
print("数据库已创建在容器可访问路径: {db_path}")
|
|
print("现在可以重启API服务了")
|
|
'''
|
|
|
|
# 将脚本发送到容器
|
|
import subprocess
|
|
import sys
|
|
|
|
print("🚀 在容器内创建数据库...")
|
|
print("="*60)
|
|
|
|
cmd = f'''ssh admin@192.168.2.154 "export PATH=\$PATH:/var/packages/Docker/target/usr/bin && docker exec -i sanguo_vnpy python3 - << 'EOF'
|
|
{script_content}
|
|
EOF
|
|
"'''
|
|
|
|
result = subprocess.run(cmd, shell=True)
|
|
print("="*60)
|
|
print("完成!")
|