20260327 今日开发完成:三个策略新增+结构化适配+消息风控+任务跟踪

This commit is contained in:
cfdaily
2026-03-27 16:44:01 +08:00
parent f043f123a1
commit dd1b06ba4a
41 changed files with 12115 additions and 0 deletions
@@ -0,0 +1,326 @@
#!/usr/bin/env python3
"""
简单稳定的分钟数据下载器
"""
import os
import sys
import time
import json
import logging
from datetime import datetime, timedelta
from typing import List, Optional
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
try:
import akshare as ak
AKSHARE_AVAILABLE = True
except ImportError:
AKSHARE_AVAILABLE = False
print("❌ AKShare未安装,请运行: pip install akshare")
sys.exit(1)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SimpleMinuteDownloader:
"""简单稳定的分钟数据下载器"""
def __init__(
self,
base_dir: str = "/Users/chufeng/nas/stock/minute_kline",
timeframe: str = "15min",
start_date: str = "2021-01-01",
end_date: str = None,
batch_size: int = 100,
max_workers: int = 5,
retry_count: int = 3
):
"""初始化下载器"""
self.base_dir = base_dir
self.timeframe = timeframe
self.start_date = start_date
self.end_date = end_date or datetime.now().strftime("%Y-%m-%d")
self.batch_size = batch_size
self.max_workers = max_workers
self.retry_count = retry_count
# 创建目录
self.data_dir = os.path.join(self.base_dir, self.timeframe)
self.log_dir = os.path.join(self.base_dir, "logs")
self.report_dir = os.path.join(self.base_dir, "reports")
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.log_dir, exist_ok=True)
os.makedirs(self.report_dir, exist_ok=True)
# 下载状态
self.download_stats = {
"total_stocks": 0,
"downloaded_stocks": 0,
"failed_stocks": 0,
"start_time": datetime.now(),
"end_time": None
}
logger.info(f"初始化下载器: {self.timeframe} 数据")
logger.info(f"数据目录: {self.data_dir}")
logger.info(f"时间范围: {self.start_date}{self.end_date}")
def get_all_a_stock_codes(self) -> List[str]:
"""获取所有A股代码"""
logger.info("获取A股代码列表...")
try:
# 使用AKShare获取A股代码
stock_info = ak.stock_info_a_code_name()
if stock_info is not None and not stock_info.empty:
# 提取股票代码,格式化为带市场前缀
codes = []
for _, row in stock_info.iterrows():
code = str(row.get('code', ''))
if not code:
continue
# 添加市场前缀
if code.startswith('6'):
codes.append(f"sh{code}")
elif code.startswith('0') or code.startswith('3'):
codes.append(f"sz{code}")
logger.info(f"获取到 {len(codes)} 只A股代码")
return codes
else:
logger.warning("AKShare返回空数据,使用预定义列表")
return self._get_default_stock_codes()
except Exception as e:
logger.error(f"获取A股代码失败: {e}")
return self._get_default_stock_codes()
def _get_default_stock_codes(self) -> List[str]:
"""获取默认股票代码列表(前1000只)"""
codes = []
# 上证股票
for i in range(1, 600):
codes.append(f"sh600{i:03d}")
# 深证股票
for i in range(1, 300):
codes.append(f"sz000{i:03d}")
for i in range(1, 100):
codes.append(f"sz300{i:03d}")
logger.info(f"使用默认股票代码列表: {len(codes)}")
return codes
def download_stock_data(self, stock_code: str) -> bool:
"""下载单只股票的分钟数据"""
for attempt in range(self.retry_count):
try:
logger.info(f"下载 {stock_code} {self.timeframe} 数据...")
# 根据时间粒度设置参数
period_map = {
'1min': '1',
'5min': '5',
'15min': '15',
'30min': '30',
'60min': '60'
}
period = period_map.get(self.timeframe, '15')
# 下载数据
data = ak.stock_zh_a_minute(
symbol=stock_code,
period=period,
adjust='hfq'
)
if data is None or data.empty:
logger.warning(f"{stock_code}: 数据为空")
return False
# 数据清理
data = data.copy()
data.columns = [col.strip() if isinstance(col, str) else col for col in data.columns]
# 保存数据
parquet_file = os.path.join(self.data_dir, f"{stock_code}_{self.timeframe}.parquet")
csv_file = os.path.join(self.data_dir, f"{stock_code}_{self.timeframe}.csv")
# 保存为Parquet
data.to_parquet(parquet_file, compression='snappy')
# 保存为CSV(便于查看)
data.to_csv(csv_file, index=False)
logger.info(f"{stock_code}: 下载成功 {len(data)} 条记录")
logger.info(f" 保存文件: {parquet_file} ({os.path.getsize(parquet_file) // 1024} KB)")
return True
except Exception as e:
logger.error(f"{stock_code}: 下载失败 (尝试 {attempt + 1}/{self.retry_count}) - {str(e)[:100]}")
if attempt < self.retry_count - 1:
time.sleep(2) # 重试前等待
else:
return False
return False
def download_all_stocks(self, all_stocks: bool = True):
"""下载所有股票数据"""
logger.info("="*70)
logger.info("🚀 开始全量分钟数据下载")
logger.info("="*70)
# 获取股票代码
stock_codes = self.get_all_a_stock_codes()
if not stock_codes:
logger.error("❌ 无法获取股票代码列表")
return
self.download_stats["total_stocks"] = len(stock_codes)
# 检查已下载的股票
downloaded_stocks = set()
if os.path.exists(self.data_dir):
for file in os.listdir(self.data_dir):
if file.endswith(f"_{self.timeframe}.parquet"):
stock_code = file.replace(f"_{self.timeframe}.parquet", "")
downloaded_stocks.add(stock_code)
# 过滤已下载的股票
if not all_stocks and downloaded_stocks:
remaining_stocks = [code for code in stock_codes if code not in downloaded_stocks]
logger.info(f"已下载 {len(downloaded_stocks)} 只,剩余 {len(remaining_stocks)}")
stock_codes = remaining_stocks
logger.info(f"📊 开始下载 {len(stock_codes)} 只股票")
# 分批下载
success_count = 0
fail_count = 0
for i, stock_code in enumerate(stock_codes, 1):
logger.info(f"\n📈 进度: {i}/{len(stock_codes)} ({i/len(stock_codes)*100:.1f}%)")
if self.download_stock_data(stock_code):
success_count += 1
else:
fail_count += 1
# 更新统计
self.download_stats["downloaded_stocks"] = success_count
self.download_stats["failed_stocks"] = fail_count
# 每下载10只股票保存一次进度
if i % 10 == 0 or i == len(stock_codes):
self._save_progress_report()
# 控制下载速度
time.sleep(0.5) # 避免请求过快
# 完成下载
self.download_stats["end_time"] = datetime.now()
self._save_final_report()
logger.info("="*70)
logger.info(f"✅ 下载完成!")
logger.info(f" 成功: {success_count}")
logger.info(f" 失败: {fail_count}")
logger.info(f" 成功率: {success_count/(success_count+fail_count)*100:.1f}%")
logger.info(f" 总耗时: {self.download_stats['end_time'] - self.download_stats['start_time']}")
logger.info("="*70)
def _save_progress_report(self):
"""保存进度报告"""
report = {
"timestamp": datetime.now().isoformat(),
"timeframe": self.timeframe,
"stats": self.download_stats.copy(),
"progress": {
"percentage": self.download_stats["downloaded_stocks"] / max(1, self.download_stats["total_stocks"]) * 100,
"estimated_time_left": self._estimate_time_left()
}
}
report_file = os.path.join(self.report_dir, f"progress_{self.timeframe}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
def _save_final_report(self):
"""保存最终报告"""
report = {
"timestamp": datetime.now().isoformat(),
"timeframe": self.timeframe,
"summary": self.download_stats.copy(),
"duration": str(self.download_stats["end_time"] - self.download_stats["start_time"]),
"success_rate": self.download_stats["downloaded_stocks"] / max(1, self.download_stats["total_stocks"]) * 100
}
report_file = os.path.join(self.report_dir, f"final_{self.timeframe}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
logger.info(f"📄 最终报告已保存: {report_file}")
def _estimate_time_left(self) -> str:
"""估计剩余时间"""
downloaded = self.download_stats["downloaded_stocks"]
total = self.download_stats["total_stocks"]
if downloaded == 0:
return "未知"
elapsed = (datetime.now() - self.download_stats["start_time"]).total_seconds()
time_per_stock = elapsed / downloaded
remaining_stocks = total - downloaded
remaining_seconds = time_per_stock * remaining_stocks
if remaining_seconds < 60:
return f"{int(remaining_seconds)}"
elif remaining_seconds < 3600:
return f"{int(remaining_seconds/60)}分钟"
else:
return f"{int(remaining_seconds/3600)}小时{int((remaining_seconds%3600)/60)}分钟"
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description="简单稳定的分钟数据下载器")
parser.add_argument("--timeframe", default="15min", help="时间粒度: 1min, 5min, 15min, 30min, 60min")
parser.add_argument("--start-date", default="2021-01-01", help="开始日期")
parser.add_argument("--end-date", default=None, help="结束日期")
parser.add_argument("--batch-size", type=int, default=100, help="批次大小")
parser.add_argument("--max-workers", type=int, default=5, help="最大工作线程数")
parser.add_argument("--all-stocks", action="store_true", help="下载所有股票(包括已下载的)")
args = parser.parse_args()
print("="*70)
print("🚀 赵云分钟数据下载器启动")
print("="*70)
downloader = SimpleMinuteDownloader(
timeframe=args.timeframe,
start_date=args.start_date,
end_date=args.end_date,
batch_size=args.batch_size,
max_workers=args.max_workers
)
downloader.download_all_stocks(all_stocks=args.all_stocks)
if __name__ == "__main__":
main()
@@ -0,0 +1,155 @@
#!/usr/bin/env python3
"""
启动全量分钟数据下载
"""
import os
import sys
import time
import json
import subprocess
from datetime import datetime
print("="*70)
print("🚀 赵云启动全量分钟数据下载")
print("="*70)
# 配置信息
config = {
"base_dir": "/Users/chufeng/nas/stock/minute_kline",
"data_source": "akshare",
"timeframe": "15min", # 从15分钟数据开始
"date_range": {
"start": "2021-01-01",
"end": "2026-03-27"
},
"stock_count": 5500, # 全市场A股数量
"download_mode": "full",
"batch_size": 100,
"max_workers": 5,
"retry_count": 3,
"log_file": "/Users/chufeng/nas/stock/minute_kline/logs/full_download_{}.log".format(
datetime.now().strftime("%Y%m%d_%H%M%S")
)
}
# 保存配置
config_file = os.path.join(config["base_dir"], "full_download_config.json")
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
print("📊 配置信息:")
print(f" 存储路径: {config['base_dir']}")
print(f" 数据粒度: {config['timeframe']}")
print(f" 股票数量: {config['stock_count']}")
print(f" 时间范围: {config['date_range']['start']}{config['date_range']['end']}")
print(f" 下载模式: {config['download_mode']}")
print(f" 批次大小: {config['batch_size']} 只/批")
print(f" 并发数: {config['max_workers']}")
# 检查已有数据
print("\n🔍 检查已有数据...")
existing_files = 0
if os.path.exists(os.path.join(config["base_dir"], config["timeframe"])):
existing_files = len([f for f in os.listdir(os.path.join(config["base_dir"], config["timeframe"]))
if f.endswith('.parquet')])
print(f" 已下载股票: {existing_files}")
print(f" 待下载股票: {config['stock_count'] - existing_files}")
print("\n🎯 启动下载命令:")
print("="*70)
# 创建启动脚本
start_script = """#!/bin/bash
# 赵云全量分钟数据下载启动脚本
# 开始时间: {}
cd /Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/scripts/data_acquisition
echo "🚀 开始全量分钟数据下载..."
echo "📊 目标: 下载{}只A股的{}数据"
echo "⏱️ 开始时间: $(date)"
# 使用稳定下载器开始下载
python3 -c "
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from minute_kline_collector import MinuteKlineCollector
collector = MinuteKlineCollector(
base_dir='{}',
timeframe='{}',
start_date='{}',
end_date='{}',
batch_size={},
max_workers={},
retry_count={}
)
print('🎯 赵云开始全量下载任务...')
collector.download_all_stocks()
print('✅ 全量下载任务完成!')
"
echo "⏱️ 结束时间: $(date)"
echo "📈 下载总结: 请查看 {}/reports/"
""".format(
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
config["stock_count"],
config["timeframe"],
config["base_dir"],
config["timeframe"],
config["date_range"]["start"],
config["date_range"]["end"],
config["batch_size"],
config["max_workers"],
config["retry_count"],
config["base_dir"]
)
# 保存启动脚本
script_file = os.path.join(os.path.dirname(__file__), "start_full_download.sh")
with open(script_file, 'w', encoding='utf-8') as f:
f.write(start_script)
os.chmod(script_file, 0o755)
print(f"启动脚本: {script_file}")
print("="*70)
print("\n📋 执行步骤:")
print("1. ✅ 配置文件已生成")
print("2. ✅ 启动脚本已创建")
print("3. 🚀 准备开始全量下载")
print("\n⏱️ 时间预估:")
estimated_hours = (config["stock_count"] - existing_files) / (config["batch_size"] * 60) * 2
print(f" 预计完成时间: {estimated_hours:.1f} 小时")
print(f" 预计完成日期: {(datetime.now().timestamp() + estimated_hours * 3600):%Y-%m-%d %H:%M}")
print("\n" + "="*70)
print("🎯 赵云立即开始执行全量下载!")
print("="*70)
# 立即启动下载
print("\n🚀 启动下载进程...")
try:
# 启动下载进程
subprocess.Popen(
["nohup", "bash", script_file, "&"],
stdout=open(config["log_file"], 'w'),
stderr=open(config["log_file"], 'a')
)
print(f"✅ 下载进程已启动,日志文件: {config['log_file']}")
print("📊 可以使用以下命令监控进度:")
print(f" tail -f {config['log_file']}")
except Exception as e:
print(f"❌ 启动失败: {e}")
print("\n💡 备用方案: 手动运行启动脚本")
print(f" bash {script_file}")
print("\n" + "="*70)
print("📡 赵云确认: 全量分钟数据下载已启动!")
print("="*70)
@@ -0,0 +1,35 @@
#!/bin/bash
# 赵云全量分钟数据下载启动脚本
# 开始时间: 2026-03-27 12:58:32
cd /Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/scripts/data_acquisition
echo "🚀 开始全量分钟数据下载..."
echo "📊 目标: 下载5500只A股的15min数据"
echo "⏱️ 开始时间: $(date)"
# 使用稳定下载器开始下载
python3 -c "
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from minute_kline_collector import MinuteKlineCollector
collector = MinuteKlineCollector(
base_dir='/Users/chufeng/nas/stock/minute_kline',
timeframe='15min',
start_date='2021-01-01',
end_date='2026-03-27',
batch_size=100,
max_workers=5,
retry_count=3
)
print('🎯 赵云开始全量下载任务...')
collector.download_all_stocks()
print('✅ 全量下载任务完成!')
"
echo "⏱️ 结束时间: $(date)"
echo "📈 下载总结: 请查看 /Users/chufeng/nas/stock/minute_kline/reports/"