#!/usr/bin/env python3 """ 简单直接下载 510300 数据到vn.py数据库 """ import sys import os import sqlite3 from datetime import datetime import pandas as pd import akshare as ak def main(): """主函数""" print("🚀 直接下载 510300 数据...") print("="*60) # 配置 symbol = "510300.SSE" db_path = "/Users/chufeng/.openclaw/workspace-zhaoyun/zhaoyun-data/data/database_test.db" # 创建数据库 print(f"\n🔧 创建数据库: {db_path}") os.makedirs(os.path.dirname(db_path), exist_ok=True) if os.path.exists(db_path): os.remove(db_path) print(" 删除旧数据库") # 创建数据库 conn = sqlite3.connect(db_path) cursor = conn.cursor() # 创建vn.py标准表结构 cursor.execute(""" CREATE TABLE dbbardata ( symbol TEXT NOT NULL, exchange TEXT, interval TEXT NOT NULL, datetime INTEGER NOT NULL, open REAL NOT NULL, high REAL NOT NULL, low REAL NOT NULL, close REAL NOT NULL, volume INTEGER NOT NULL, open_interest REAL, turnover REAL, PRIMARY KEY (symbol, interval, datetime) ); """) # 创建索引 cursor.execute("CREATE INDEX ix_dbbardata_symbol ON dbbardata(symbol);") cursor.execute("CREATE INDEX ix_dbbardata_symbol_interval ON dbbardata(symbol, interval);") conn.commit() print("✅ 数据库结构创建完成") # 下载数据 print(f"\n📥 开始下载 510300 日线数据...") print(f" 标的代码: 510300") try: # 使用akshare直接下载 df = ak.stock_zh_a_hist(symbol="510300", period="daily", start_date="20160101", end_date="20260330", adjust="qfq") print(f" 下载完成: {len(df)} 行") if not df.empty: print("\n数据预览:") print(df.head()) print("\n数据列名:", list(df.columns)) except Exception as e: print(f"❌ 下载失败: {e}") return False if df.empty: print("❌ 下载数据为空") return False # 导入到数据库 print(f"\n📊 导入数据到vn.py数据库...") imported = 0 # 检查列名 print(f" 原始列名: {list(df.columns)}") # AKShare返回的列名: 日期, 开盘, 收盘, 最高, 最低, 成交量, 成交额, 振幅, 涨跌幅, 涨跌额, 换手率 # 需要映射到vn.py格式 for idx, row in df.iterrows(): # 转换日期: 原始格式是 '2016-01-04' date_str = str(row['日期']) # 解析日期 try: if '-' in date_str: dt = datetime.strptime(date_str, '%Y-%m-%d') else: dt = datetime.strptime(date_str, '%Y%m%d') except Exception as e: print(f" 日期解析失败: {date_str}, 跳过") continue timestamp = int(dt.timestamp()) # 获取价格数据 open_price = float(row['开盘']) high_price = float(row['最高']) low_price = float(row['最低']) close_price = float(row['收盘']) volume = int(float(row['成交量'])) turnover = float(row['成交额']) # 插入 cursor.execute(""" INSERT INTO dbbardata ( symbol, exchange, interval, datetime, open, high, low, close, volume, turnover ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( symbol, 'SSE', '1d', timestamp, open_price, high_price, low_price, close_price, volume, turnover )) imported += 1 # 提交 conn.commit() # 验证 cursor.execute("SELECT COUNT(*) FROM dbbardata WHERE symbol = ?", (symbol,)) count = cursor.fetchone()[0] cursor.execute("SELECT MIN(datetime), MAX(datetime) FROM dbbardata WHERE symbol = ?", (symbol,)) min_ts, max_ts = cursor.fetchone() min_dt = datetime.fromtimestamp(min_ts).strftime('%Y-%m-%d') if min_ts else 'N/A' max_dt = datetime.fromtimestamp(max_ts).strftime('%Y-%m-%d') if max_ts else 'N/A' conn.close() print("\n" + "="*60) print("🎉 下载导入完成!") print(f"标的: {symbol}") print(f"数据库: {db_path}") print(f"总行数: {imported} (验证: {count})") print(f"时间范围: {min_dt} -> {max_dt}") print("="*60) print("\n下一步:") print("1. 检查回测API的数据路径配置") print("2. 重启API服务") print("3. 关羽将军重新运行回测") return True if __name__ == "__main__": success = main() sys.exit(0 if success else 1)