193 lines
5.3 KiB
Bash
Executable File
193 lines
5.3 KiB
Bash
Executable File
#!/bin/bash
|
|
# 在容器内导入数据
|
|
|
|
echo "🚀 在容器内导入510300.SSE数据..."
|
|
echo "============================================================"
|
|
|
|
# 创建脚本文件并执行
|
|
cat > /tmp/import_data.py << 'EOF'
|
|
import pandas as pd
|
|
import sqlite3
|
|
import os
|
|
from datetime import datetime
|
|
|
|
print("🚀 在容器内导入510300.SSE数据...")
|
|
print("=" * 60)
|
|
|
|
# 配置
|
|
parquet_path = "/volume1/stock-data/sanguo_quant_live/zhaoyun-data/data/raw/daily/sh510300_daily.parquet"
|
|
db_path = "/volume1/stock/sanguo_vnpy/data/database_test.db"
|
|
symbol = "510300.SSE"
|
|
exchange = "SSE"
|
|
interval = "1d"
|
|
|
|
print("源数据: " + parquet_path)
|
|
print("目标数据库: " + db_path)
|
|
print("标的: " + symbol)
|
|
|
|
# 检查源文件存在
|
|
if not os.path.exists(parquet_path):
|
|
print("❌ 源文件不存在: " + parquet_path)
|
|
exit(1)
|
|
|
|
print("✅ 源文件存在")
|
|
|
|
# 读取parquet
|
|
print("\n📥 读取parquet数据...")
|
|
df = pd.read_parquet(parquet_path)
|
|
print(" 读取成功: " + str(len(df)) + " 行")
|
|
|
|
# 创建数据库
|
|
print("\n💾 创建vn.py数据库...")
|
|
|
|
if os.path.exists(db_path):
|
|
os.remove(db_path)
|
|
print(" 删除旧数据库")
|
|
|
|
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
|
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# 创建vn.py标准表结构
|
|
cursor.execute("""
|
|
CREATE TABLE dbbardata (
|
|
symbol TEXT NOT NULL,
|
|
exchange TEXT,
|
|
interval TEXT NOT NULL,
|
|
datetime INTEGER NOT NULL,
|
|
open REAL NOT NULL,
|
|
high REAL NOT NULL,
|
|
low REAL NOT NULL,
|
|
close REAL NOT NULL,
|
|
volume INTEGER NOT NULL,
|
|
open_interest REAL,
|
|
turnover REAL,
|
|
PRIMARY KEY (symbol, interval, datetime)
|
|
);
|
|
""")
|
|
|
|
# 创建索引
|
|
cursor.execute("CREATE INDEX ix_dbbardata_symbol ON dbbardata(symbol);")
|
|
cursor.execute("CREATE INDEX ix_dbbardata_symbol_interval ON dbbardata(symbol, interval);")
|
|
|
|
# 导入数据
|
|
print("\n📊 导入数据...")
|
|
|
|
imported = 0
|
|
errors = 0
|
|
|
|
for idx, row in df.iterrows():
|
|
# 获取日期
|
|
date_val = row['date']
|
|
|
|
if isinstance(date_val, pd.Timestamp):
|
|
dt = date_val.to_pydatetime()
|
|
else:
|
|
date_str = str(date_val)
|
|
if '-' in date_str:
|
|
dt = datetime.strptime(date_str, '%Y-%m-%d')
|
|
else:
|
|
dt = datetime.strptime(date_str, '%Y%m%d')
|
|
|
|
timestamp = int(dt.timestamp())
|
|
|
|
# 获取价格数据
|
|
open_price = float(row['open'])
|
|
high_price = float(row['high'])
|
|
low_price = float(row['low'])
|
|
close_price = float(row['close'])
|
|
volume = int(float(row['volume']))
|
|
|
|
# 成交额
|
|
if 'amount' in row:
|
|
turnover = float(row['amount'])
|
|
else:
|
|
turnover = volume * close_price
|
|
|
|
# 插入
|
|
try:
|
|
cursor.execute("""
|
|
INSERT INTO dbbardata (
|
|
symbol, exchange, interval, datetime,
|
|
open, high, low, close, volume, turnover
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
symbol,
|
|
exchange,
|
|
interval,
|
|
timestamp,
|
|
open_price,
|
|
high_price,
|
|
low_price,
|
|
close_price,
|
|
volume,
|
|
turnover
|
|
))
|
|
|
|
imported = imported + 1
|
|
|
|
if imported % 500 == 0:
|
|
print(" 已导入 " + str(imported) + " 行...")
|
|
|
|
except Exception as e:
|
|
print(" ⚠️ 第" + str(idx) + "行导入失败: " + str(e))
|
|
errors = errors + 1
|
|
|
|
# 提交
|
|
conn.commit()
|
|
|
|
# 验证
|
|
print("\n🔍 验证导入结果...")
|
|
cursor.execute("SELECT COUNT(*) FROM dbbardata WHERE symbol = ?", (symbol,))
|
|
count = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT MIN(datetime), MAX(datetime) FROM dbbardata WHERE symbol = ?", (symbol,))
|
|
min_ts_max_ts = cursor.fetchone()
|
|
min_ts = min_ts_max_ts[0]
|
|
max_ts = min_ts_max_ts[1]
|
|
|
|
if min_ts and max_ts:
|
|
min_dt = datetime.fromtimestamp(min_ts).strftime('%Y-%m-%d')
|
|
max_dt = datetime.fromtimestamp(max_ts).strftime('%Y-%m-%d')
|
|
else:
|
|
min_dt = 'N/A'
|
|
max_dt = 'N/A'
|
|
|
|
cursor.execute("SELECT MIN(close), MAX(close), AVG(volume) FROM dbbardata WHERE symbol = ?", (symbol,))
|
|
min_close_max_close_avg_volume = cursor.fetchone()
|
|
min_close = min_close_max_close_avg_volume[0]
|
|
max_close = min_close_max_close_avg_volume[1]
|
|
avg_volume = min_close_max_close_avg_volume[2]
|
|
|
|
conn.close()
|
|
|
|
# 统计
|
|
print("\n" + "="*60)
|
|
print("✅ 导入完成!")
|
|
print("源文件: " + parquet_path)
|
|
print("目标数据库: " + db_path)
|
|
print("标的: " + symbol)
|
|
print("源数据行数: " + str(len(df)))
|
|
print("成功导入: " + str(imported))
|
|
print("导入失败: " + str(errors))
|
|
print("数据库验证: " + str(count) + " 行")
|
|
print("时间范围: " + min_dt + " -> " + max_dt)
|
|
print("价格范围: " + f"{min_close:.2f}" + " ~ " + f"{max_close:.2f}")
|
|
print("平均成交量: " + f"{avg_volume:.0f}")
|
|
print("="*60)
|
|
|
|
# 显示文件大小
|
|
if os.path.exists(db_path):
|
|
size_kb = os.path.getsize(db_path) / 1024
|
|
print("\n📦 数据库文件大小: " + f"{size_kb:.1f} KB")
|
|
|
|
print("\n🎯 完成!")
|
|
print("数据库已创建在容器可访问路径: " + db_path)
|
|
print("现在可以重启API服务了")
|
|
EOF
|
|
|
|
# 执行脚本
|
|
ssh admin@192.168.2.154 "export PATH=\$PATH:/var/packages/Docker/target/usr/bin && docker cp /tmp/import_data.py sanguo_vnpy:/app/scripts/"
|
|
ssh admin@192.168.2.154 "export PATH=\$PATH:/var/packages/Docker/target/usr/bin && docker exec sanguo_vnpy python3 /app/scripts/import_data.py
|