Files
2026-04-29 20:15:43 +08:00

193 lines
5.3 KiB
Bash
Executable File

#!/bin/bash
# 在容器内导入数据
echo "🚀 在容器内导入510300.SSE数据..."
echo "============================================================"
# 创建脚本文件并执行
cat > /tmp/import_data.py << 'EOF'
import pandas as pd
import sqlite3
import os
from datetime import datetime
print("🚀 在容器内导入510300.SSE数据...")
print("=" * 60)
# 配置
parquet_path = "/volume1/stock-data/sanguo_quant_live/zhaoyun-data/data/raw/daily/sh510300_daily.parquet"
db_path = "/volume1/stock/sanguo_vnpy/data/database_test.db"
symbol = "510300.SSE"
exchange = "SSE"
interval = "1d"
print("源数据: " + parquet_path)
print("目标数据库: " + db_path)
print("标的: " + symbol)
# 检查源文件存在
if not os.path.exists(parquet_path):
print("❌ 源文件不存在: " + parquet_path)
exit(1)
print("✅ 源文件存在")
# 读取parquet
print("\n📥 读取parquet数据...")
df = pd.read_parquet(parquet_path)
print(" 读取成功: " + str(len(df)) + " 行")
# 创建数据库
print("\n💾 创建vn.py数据库...")
if os.path.exists(db_path):
os.remove(db_path)
print(" 删除旧数据库")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建vn.py标准表结构
cursor.execute("""
CREATE TABLE dbbardata (
symbol TEXT NOT NULL,
exchange TEXT,
interval TEXT NOT NULL,
datetime INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume INTEGER NOT NULL,
open_interest REAL,
turnover REAL,
PRIMARY KEY (symbol, interval, datetime)
);
""")
# 创建索引
cursor.execute("CREATE INDEX ix_dbbardata_symbol ON dbbardata(symbol);")
cursor.execute("CREATE INDEX ix_dbbardata_symbol_interval ON dbbardata(symbol, interval);")
# 导入数据
print("\n📊 导入数据...")
imported = 0
errors = 0
for idx, row in df.iterrows():
# 获取日期
date_val = row['date']
if isinstance(date_val, pd.Timestamp):
dt = date_val.to_pydatetime()
else:
date_str = str(date_val)
if '-' in date_str:
dt = datetime.strptime(date_str, '%Y-%m-%d')
else:
dt = datetime.strptime(date_str, '%Y%m%d')
timestamp = int(dt.timestamp())
# 获取价格数据
open_price = float(row['open'])
high_price = float(row['high'])
low_price = float(row['low'])
close_price = float(row['close'])
volume = int(float(row['volume']))
# 成交额
if 'amount' in row:
turnover = float(row['amount'])
else:
turnover = volume * close_price
# 插入
try:
cursor.execute("""
INSERT INTO dbbardata (
symbol, exchange, interval, datetime,
open, high, low, close, volume, turnover
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
symbol,
exchange,
interval,
timestamp,
open_price,
high_price,
low_price,
close_price,
volume,
turnover
))
imported = imported + 1
if imported % 500 == 0:
print(" 已导入 " + str(imported) + " 行...")
except Exception as e:
print(" ⚠️ 第" + str(idx) + "行导入失败: " + str(e))
errors = errors + 1
# 提交
conn.commit()
# 验证
print("\n🔍 验证导入结果...")
cursor.execute("SELECT COUNT(*) FROM dbbardata WHERE symbol = ?", (symbol,))
count = cursor.fetchone()[0]
cursor.execute("SELECT MIN(datetime), MAX(datetime) FROM dbbardata WHERE symbol = ?", (symbol,))
min_ts_max_ts = cursor.fetchone()
min_ts = min_ts_max_ts[0]
max_ts = min_ts_max_ts[1]
if min_ts and max_ts:
min_dt = datetime.fromtimestamp(min_ts).strftime('%Y-%m-%d')
max_dt = datetime.fromtimestamp(max_ts).strftime('%Y-%m-%d')
else:
min_dt = 'N/A'
max_dt = 'N/A'
cursor.execute("SELECT MIN(close), MAX(close), AVG(volume) FROM dbbardata WHERE symbol = ?", (symbol,))
min_close_max_close_avg_volume = cursor.fetchone()
min_close = min_close_max_close_avg_volume[0]
max_close = min_close_max_close_avg_volume[1]
avg_volume = min_close_max_close_avg_volume[2]
conn.close()
# 统计
print("\n" + "="*60)
print("✅ 导入完成!")
print("源文件: " + parquet_path)
print("目标数据库: " + db_path)
print("标的: " + symbol)
print("源数据行数: " + str(len(df)))
print("成功导入: " + str(imported))
print("导入失败: " + str(errors))
print("数据库验证: " + str(count) + " 行")
print("时间范围: " + min_dt + " -> " + max_dt)
print("价格范围: " + f"{min_close:.2f}" + " ~ " + f"{max_close:.2f}")
print("平均成交量: " + f"{avg_volume:.0f}")
print("="*60)
# 显示文件大小
if os.path.exists(db_path):
size_kb = os.path.getsize(db_path) / 1024
print("\n📦 数据库文件大小: " + f"{size_kb:.1f} KB")
print("\n🎯 完成!")
print("数据库已创建在容器可访问路径: " + db_path)
print("现在可以重启API服务了")
EOF
# 执行脚本
ssh admin@192.168.2.154 "export PATH=\$PATH:/var/packages/Docker/target/usr/bin && docker cp /tmp/import_data.py sanguo_vnpy:/app/scripts/"
ssh admin@192.168.2.154 "export PATH=\$PATH:/var/packages/Docker/target/usr/bin && docker exec sanguo_vnpy python3 /app/scripts/import_data.py