#!/usr/bin/env python3 """ 修复AKShare下载问题 - 尝试不同的接口 """ import sys import os import sqlite3 from datetime import datetime import pandas as pd import akshare as ak def test_different_interfaces(): """测试不同的接口""" print("🧪 测试不同的AKShare接口获取510300数据...") print("="*60) # 测试1: stock_zh_a_hist 接口 print("\n1. 测试 stock_zh_a_hist 接口:") try: df = ak.stock_zh_a_hist(symbol="510300", period="daily", start_date="20240101", end_date="20241231", adjust="qfq") print(f" 成功: {len(df)} 行") if not df.empty: print(f" 列名: {list(df.columns)}") print(f" 前3行:\n{df.head(3)}") return df except Exception as e: print(f" 失败: {e}") # 测试2: stock_zh_a_daily 接口 print("\n2. 测试 stock_zh_a_daily 接口:") try: df = ak.stock_zh_a_daily(symbol="sh510300", adjust="qfq") print(f" 成功: {len(df)} 行") if not df.empty: print(f" 列名: {list(df.columns)}") print(f" 前3行:\n{df.head(3)}") return df except Exception as e: print(f" 失败: {e}") # 测试3: 指数接口 print("\n3. 测试 index_zh_a_hist 接口:") try: df = ak.index_zh_a_hist(symbol="000300", period="daily", start_date="20240101", end_date="20241231") print(f" 成功: {len(df)} 行 (沪深300指数)") if not df.empty: print(f" 列名: {list(df.columns)}") except Exception as e: print(f" 失败: {e}") # 测试4: fund ETF接口 print("\n4. 测试 fund_etf_hist_sina 接口:") try: df = ak.fund_etf_hist_sina(symbol="510300", start_date="20240101", end_date="20241231") print(f" 成功: {len(df)} 行") if not df.empty: print(f" 列名: {list(df.columns)}") print(f" 前3行:\n{df.head(3)}") return df except Exception as e: print(f" 失败: {e}") return None def download_to_vnpy(): """下载到vn.py数据库""" print("\n" + "="*60) print("📥 下载510300.SSE数据到vn.py数据库") print("="*60) # 使用 fund_etf_hist_sina 接口(SINA接口更稳定) print("\n使用 fund_etf_hist_sina 接口下载...") try: df = ak.fund_etf_hist_sina(symbol="510300", start_date="20160101", end_date="20260330") print(f"✅ 下载成功: {len(df)} 行") print(f"列名: {list(df.columns)}") if df.empty: print("❌ 数据为空") return False print("\n数据预览:") print(df.head()) except Exception as e: print(f"❌ 下载失败: {e}") return False # 创建数据库 db_path = "/Users/chufeng/.openclaw/workspace-zhaoyun/zhaoyun-data/data/database_test.db" symbol = "510300.SSE" print(f"\n💾 导入到数据库: {db_path}") if os.path.exists(db_path): os.remove(db_path) print(" 删除旧数据库") os.makedirs(os.path.dirname(db_path), exist_ok=True) conn = sqlite3.connect(db_path) cursor = conn.cursor() # 创建vn.py标准表结构 cursor.execute(""" CREATE TABLE dbbardata ( symbol TEXT NOT NULL, exchange TEXT, interval TEXT NOT NULL, datetime INTEGER NOT NULL, open REAL NOT NULL, high REAL NOT NULL, low REAL NOT NULL, close REAL NOT NULL, volume INTEGER NOT NULL, open_interest REAL, turnover REAL, PRIMARY KEY (symbol, interval, datetime) ); """) # 创建索引 cursor.execute("CREATE INDEX ix_dbbardata_symbol ON dbbardata(symbol);") cursor.execute("CREATE INDEX ix_dbbardata_symbol_interval ON dbbardata(symbol, interval);") # 导入数据 # SINA ETF接口返回: date, open, high, low, close, volume imported = 0 for idx, row in df.iterrows(): # 转换日期 date_str = str(row['date']) try: if '-' in date_str: dt = datetime.strptime(date_str, '%Y-%m-%d') else: dt = datetime.strptime(date_str, '%Y%m%d') except Exception as e: print(f" 日期解析失败: {date_str}, 跳过") continue timestamp = int(dt.timestamp()) # SINA ETF接口字段: date, open, high, low, close, volume open_price = float(row['open']) high_price = float(row['high']) low_price = float(row['low']) close_price = float(row['close']) volume = int(float(row['volume'])) turnover = float(row.get('volume', 0)) * close_price # 估算成交额 cursor.execute(""" INSERT INTO dbbardata ( symbol, exchange, interval, datetime, open, high, low, close, volume, turnover ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( symbol, 'SSE', '1d', timestamp, open_price, high_price, low_price, close_price, volume, turnover )) imported += 1 # 提交 conn.commit() # 验证 cursor.execute("SELECT COUNT(*) FROM dbbardata WHERE symbol = ?", (symbol,)) count = cursor.fetchone()[0] cursor.execute("SELECT MIN(datetime), MAX(datetime) FROM dbbardata WHERE symbol = ?", (symbol,)) min_ts, max_ts = cursor.fetchone() min_dt = datetime.fromtimestamp(min_ts).strftime('%Y-%m-%d') if min_ts else 'N/A' max_dt = datetime.fromtimestamp(max_ts).strftime('%Y-%m-%d') if max_ts else 'N/A' conn.close() print("\n" + "="*60) print("✅ 下载导入完成!") print(f"标的: {symbol}") print(f"数据库: {db_path}") print(f"总行数: {imported} (验证: {count})") print(f"时间范围: {min_dt} -> {max_dt}") print("="*60) # 显示统计信息 print(f"\n📊 数据统计:") print(f" 交易天数: {len(df)}") print(f" 最早日期: {min_dt}") print(f" 最新日期: {max_dt}") print(f" 首行收盘价: {close_price:.2f} (最新)") return True def main(): """主函数""" # 测试接口 df = test_different_interfaces() # 下载完整数据 success = download_to_vnpy() if success: print("\n🎉 完成!现在数据已导入到vn.py数据库") print("请重启API服务,关羽将军可以重新回测了") else: print("\n❌ 下载失败,请检查网络或AKShare配置") return success if __name__ == "__main__": success = main() sys.exit(0 if success else 1)