auto-sync: 2026-04-06 22:45:02

This commit is contained in:
cfdaily
2026-04-06 22:45:02 +08:00
parent 49bc7b046e
commit 8ec36d47c7
7 changed files with 222 additions and 8 deletions
@@ -168,9 +168,11 @@ class AStockDailyDownloader:
try:
logger.debug(f"下载 {symbol} ({name}) 尝试 {attempt + 1}/{self.retry_count}")
# 获取日线数据
df = ak.stock_zh_a_daily(
symbol=symbol,
# 获取日线数据 - 使用 stock_zh_a_hist 接口(更稳定)
# symbol格式: 纯代码,不需要sh/sz前缀
df = ak.stock_zh_a_hist(
symbol=code,
period="daily",
start_date=self.start_date.strftime("%Y%m%d"),
end_date=self.end_date.strftime("%Y%m%d"),
adjust="hfq" # 后复权
@@ -184,9 +186,22 @@ class AStockDailyDownloader:
# 数据清理
df = df.copy()
# 重置索引,确保日期列存在
if 'date' not in df.columns:
df = df.reset_index()
# 转换中文列名到英文
column_mapping = {
'日期': 'date',
'股票代码': 'code',
'开盘': 'open',
'收盘': 'close',
'最高': 'high',
'最低': 'low',
'成交量': 'volume',
'成交额': 'amount',
'振幅': 'amplitude',
'涨跌幅': 'change_percent',
'涨跌额': 'change_amount',
'换手率': 'turnover'
}
df = df.rename(columns=column_mapping)
# 标准化列名
df.columns = [col.lower().strip() for col in df.columns]
@@ -126,7 +126,8 @@ print("3. 🚀 准备开始全量下载")
print("\n⏱️ 时间预估:")
estimated_hours = (config["stock_count"] - existing_files) / (config["batch_size"] * 60) * 2
print(f" 预计完成时间: {estimated_hours:.1f} 小时")
print(f" 预计完成日期: {(datetime.now().timestamp() + estimated_hours * 3600):%Y-%m-%d %H:%M}")
estimated_completion = datetime.fromtimestamp(datetime.now().timestamp() + estimated_hours * 3600)
print(f" 预计完成日期: {estimated_completion:%Y-%m-%d %H:%M}")
print("\n" + "="*70)
print("🎯 赵云立即开始执行全量下载!")
@@ -1,6 +1,6 @@
#!/bin/bash
# 赵云全量分钟数据下载启动脚本
# 开始时间: 2026-03-27 12:58:32
# 开始时间: 2026-04-06 22:44:42
cd /Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/scripts/data_acquisition
@@ -0,0 +1,61 @@
#!/usr/bin/env python3
"""
测试日线数据API
"""
import akshare as ak
import pandas as pd
import sys
print("测试AKShare日线数据API...")
print(f"AKShare版本: {ak.__version__}")
# 测试不同的参数格式
test_cases = [
{"symbol": "sh000001", "name": "上证指数-sh格式"},
{"symbol": "sz000001", "name": "平安银行-sz格式"},
{"symbol": "000001", "name": "平安银行-纯代码"},
]
for test in test_cases:
print(f"\n{'='*60}")
print(f"测试: {test['name']} - {test['symbol']}")
print(f"{'='*60}")
try:
df = ak.stock_zh_a_daily(
symbol=test['symbol'],
start_date="20240101",
end_date="20241231",
adjust="hfq"
)
if df is not None and not df.empty:
print(f"✅ 成功! 获取到 {len(df)} 条记录")
print(f"列名: {list(df.columns)}")
print(f"前5行:\n{df.head()}")
else:
print(f"❌ 失败! 返回空数据")
except Exception as e:
print(f"❌ 异常: {e}")
# 尝试新接口
print(f"\n{'='*60}")
print("尝试新接口: stock_zh_a_hist")
print(f"{'='*60}")
try:
df = ak.stock_zh_a_hist(
symbol="000001",
period="daily",
start_date="20240101",
end_date="20241231",
adjust="hfq"
)
if df is not None and not df.empty:
print(f"✅ 成功! 获取到 {len(df)} 条记录")
print(f"列名: {list(df.columns)}")
print(f"前5行:\n{df.head()}")
except Exception as e:
print(f"❌ 异常: {e}")
@@ -0,0 +1,53 @@
#!/usr/bin/env python3
"""
测试修复后的日线数据下载
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from a_stock_daily_data_downloader import AStockDailyDownloader
print("="*70)
print("🧪 测试修复后的日线数据下载")
print("="*70)
# 创建下载器,只测试10只股票
downloader = AStockDailyDownloader(
base_dir="/tmp/test_daily_download",
start_date="2024-01-01",
end_date="2024-12-31"
)
# 获取股票列表
stocks = downloader.get_all_a_stock_codes()
print(f"\n📊 获取到 {len(stocks)} 只股票")
# 测试前10只
print("\n🧪 测试前10只股票下载...")
success_count = 0
fail_count = 0
for i, stock in enumerate(stocks[:10]):
print(f"\n{i+1}. {stock['symbol']} {stock['name']}:", end=" ")
df = downloader.download_stock_daily(
stock['symbol'],
stock['code'],
stock['name']
)
if df is not None and not df.empty:
print(f"{len(df)} 条记录")
success_count += 1
else:
print(f"❌ 失败")
fail_count += 1
print(f"\n{'='*70}")
print(f"📊 测试结果: 成功 {success_count}, 失败 {fail_count}, 成功率 {success_count/10*100:.1f}%")
print(f"{'='*70}")
if success_count > 0:
print("\n🎉 修复成功!可以开始全量下载了")
else:
print("\n❌ 还有问题,需要继续修复")
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""
测试NAS环境和数据源可用性
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from minute_kline_collector import MinuteKlineCollector
print("="*70)
print("🧪 测试NAS环境和数据源")
print("="*70)
# 使用NAS路径初始化收集器
collector = MinuteKlineCollector(base_dir="/Users/chufeng/nas/stock/minute_kline")
# 环境测试
print("\n🔍 开始环境测试...")
test_results = collector.test_environment()
print("\n📊 环境测试结果:")
for key, value in test_results.items():
if isinstance(value, bool):
print(f" {key}: {'✅ 通过' if value else '❌ 失败'}")
if test_results.get("all_passed"):
print("\n✅ 环境测试全部通过!")
else:
print("\n❌ 环境测试有失败项,请检查配置")
sys.exit(1)
# 数据源测试
print("\n🔍 开始数据源测试...")
source_results = collector.test_data_source()
print("\n📊 数据源测试结果:")
for timeframe, result in source_results.get("timeframes", {}).items():
print(f" {timeframe}: {result['status']} - {result['record_count']} 条记录")
if all(result["status"] == "available" for result in source_results.get("timeframes", {}).values()):
print("\n✅ 数据源测试全部通过!")
else:
print("\n❌ 数据源测试有失败项,请检查网络")
sys.exit(1)
print("\n" + "="*70)
print("🎉 所有测试通过!NAS环境就绪,可以开始下载")
print("="*70)