Files
sanguo_vnpy/test/backtest/download_final.py
T
2026-04-11 21:18:55 +08:00

202 lines
6.1 KiB
Python

#!/usr/bin/env python3
"""
最终版本 - 下载510300.SSE数据到vn.py数据库
使用正确的AKShare接口参数
"""
import sys
import os
import sqlite3
from datetime import datetime
import pandas as pd
import akshare as ak
def main():
"""主函数"""
print("🚀 最终版本 - 下载510300.SSE数据到vn.py数据库")
print("="*60)
# 配置
symbol = "510300.SSE"
db_path = "/Users/chufeng/.openclaw/workspace-zhaoyun/zhaoyun-data/data/database_test.db"
# 下载数据 - 使用新浪ETF接口
print("\n📥 下载510300ETF日线数据...")
print(f" 标的: 510300 沪深300ETF")
try:
# fund_etf_hist_sina 接口不需要start_date和end_date参数,返回全部历史
df = ak.fund_etf_hist_sina(symbol="510300")
print(f"✅ 下载成功: {len(df)}")
print(f" 列名: {list(df.columns)}")
if df.empty:
print("❌ 数据为空")
return False
print("\n数据预览:")
print(df.head())
print("\n数据尾部:")
print(df.tail())
except Exception as e:
print(f"❌ 下载失败: {e}")
# 尝试备用接口
print("\n🔄 尝试备用接口 - stock_zh_a_daily...")
try:
df = ak.stock_zh_a_daily(symbol="sh510300", adjust="qfq")
print(f"✅ 备用接口下载成功: {len(df)}")
print(f" 列名: {list(df.columns)}")
except Exception as e2:
print(f"❌ 备用接口也失败: {e2}")
return False
# 创建数据库
print(f"\n💾 创建vn.py数据库: {db_path}")
if os.path.exists(db_path):
os.remove(db_path)
print(" 删除旧数据库")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建vn.py标准表结构
cursor.execute("""
CREATE TABLE dbbardata (
symbol TEXT NOT NULL,
exchange TEXT,
interval TEXT NOT NULL,
datetime INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume INTEGER NOT NULL,
open_interest REAL,
turnover REAL,
PRIMARY KEY (symbol, interval, datetime)
);
""")
# 创建索引
cursor.execute("CREATE INDEX ix_dbbardata_symbol ON dbbardata(symbol);")
cursor.execute("CREATE INDEX ix_dbbardata_symbol_interval ON dbbardata(symbol, interval);")
# 导入数据
print(f"\n📊 导入数据...")
# 新浪ETF接口返回字段: date, open, high, low, close, volume
imported = 0
for idx, row in df.iterrows():
# 转换日期
date_val = row['date']
if isinstance(date_val, pd.Timestamp):
dt = date_val.to_pydatetime()
else:
date_str = str(date_val)
try:
if '-' in date_str:
dt = datetime.strptime(date_str, '%Y-%m-%d')
else:
dt = datetime.strptime(date_str, '%Y%m%d')
except Exception as e:
print(f" 日期解析失败: {date_val}, 跳过")
continue
timestamp = int(dt.timestamp())
# 获取价格数据
open_price = float(row['open'])
high_price = float(row['high'])
low_price = float(row['low'])
close_price = float(row['close'])
volume = int(float(row['volume']))
turnover = float(row['volume']) * close_price # 估算成交额
# 插入
cursor.execute("""
INSERT INTO dbbardata (
symbol, exchange, interval, datetime,
open, high, low, close, volume, turnover
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
symbol,
'SSE',
'1d',
timestamp,
open_price,
high_price,
low_price,
close_price,
volume,
turnover
))
imported += 1
if imported % 500 == 0:
print(f" 已导入 {imported} 行...")
# 提交
conn.commit()
# 验证
cursor.execute("SELECT COUNT(*) FROM dbbardata WHERE symbol = ?", (symbol,))
count = cursor.fetchone()[0]
cursor.execute("SELECT MIN(datetime), MAX(datetime) FROM dbbardata WHERE symbol = ?", (symbol,))
min_ts, max_ts = cursor.fetchone()
if min_ts and max_ts:
min_dt = datetime.fromtimestamp(min_ts).strftime('%Y-%m-%d')
max_dt = datetime.fromtimestamp(max_ts).strftime('%Y-%m-%d')
else:
min_dt = 'N/A'
max_dt = 'N/A'
# 获取统计信息
cursor.execute("SELECT MIN(open), MAX(open), AVG(volume) FROM dbbardata WHERE symbol = ?", (symbol,))
min_open, max_open, avg_volume = cursor.fetchone()
conn.close()
print("\n" + "="*60)
print("✅ 下载导入完成!")
print(f"标的: {symbol}")
print(f"数据库路径: {db_path}")
print(f"总行数: {imported} (数据库验证: {count})")
print(f"时间范围: {min_dt} -> {max_dt}")
print("="*60)
print(f"\n📊 数据统计:")
print(f" 交易天数: {imported}")
print(f" 价格范围: {min_open:.2f} ~ {max_open:.2f}")
print(f" 平均成交量: {avg_volume:.0f}")
print(f" 最新日期: {max_dt}")
# 检查数据路径
print("\n" + "="*60)
print("🔍 数据路径检查:")
if os.path.exists(db_path):
size_mb = os.path.getsize(db_path) / (1024*1024)
print(f"✅ 数据库文件存在,大小: {size_mb:.2f} MB")
else:
print(f"❌ 数据库文件不存在: {db_path}")
return False
print("\n🎯 下一步:")
print("1. 确认API的数据路径配置是否正确")
print(f" 应该配置为: {db_path}")
print("2. 重启回测API服务")
print("3. 关羽将军重新运行回测")
print("="*60)
return True
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)