auto-sync: 2026-03-26 11:29:53

This commit is contained in:
cfdaily
2026-03-26 11:29:53 +08:00
parent 2e5abaaa54
commit 7f71b06a08
@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
A股日线行情数据采集脚本
获取全市场A股日线行情数据(2010年至今)
"""
import sys
import os
import time
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import warnings
warnings.filterwarnings('ignore')
# 添加项目路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from utils.data_utils import DataUtils
from utils.log_utils import LogUtils
from utils.progress_bar import ProgressBar
class AStockDailyDataCollector:
"""A股日线行情数据采集器"""
def __init__(self, config_path: str = None):
"""初始化采集器
Args:
config_path: 配置文件路径
"""
# 配置日志
self.logger = LogUtils.setup_logger('a_stock_daily_data')
# 加载配置
self.config = self._load_config(config_path)
# 基础路径
self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data"
self.raw_dir = os.path.join(self.base_dir, "raw", "a_stock_daily")
self.processed_dir = os.path.join(self.base_dir, "processed", "a_stock_daily")
# 确保目录存在
os.makedirs(self.raw_dir, exist_ok=True)
os.makedirs(self.processed_dir, exist_ok=True)
# 创建数据分区目录(按年)
for year in range(2010, 2027):
year_dir = os.path.join(self.raw_dir, str(year))
os.makedirs(year_dir, exist_ok=True)
processed_year_dir = os.path.join(self.processed_dir, str(year))
os.makedirs(processed_year_dir, exist_ok=True)
self.logger.info("A股日线数据采集器初始化完成")
def _load_config(self, config_path: Optional[str] = None) -> Dict:
"""加载配置
Args:
config_path: 配置文件路径
Returns:
Dict: 配置信息
"""
default_config = {
"data_range": {
"start_date": "2010-01-01",
"end_date": datetime.now().strftime('%Y-%m-%d'),
"include_delisted": True
},
"data_fields": {
"daily": [
"symbol", # 股票代码
"date", # 交易日期
"open", # 开盘价
"high", # 最高价
"low", # 最低价
"close", # 收盘价
"volume", # 成交量
"amount", # 成交额
"adj_factor", # 复权因子
"trade_status" # 交易状态
]
},
"collection": {
"batch_size": 50,
"max_workers": 10,
"retry_attempts": 3,
"chunk_size": 500,
"delay_between_requests": 0.5
},
"storage": {
"partition_by": ["year", "month"],
"compression": "snappy",
"file_format": "parquet"
}
}
if config_path and os.path.exists(config_path):
try:
with open(config_path, 'r', encoding='utf-8') as f:
user_config = json.load(f)
default_config.update(user_config)
self.logger.info(f"{config_path} 加载用户配置")
except Exception as e:
self.logger.warning(f"加载用户配置文件失败: {e}")
return default_config
def collect_stock_list(self) -> List[str]:
"""获取A股股票代码列表
Returns:
List[str]: 股票代码列表
"""
self.logger.info("开始获取A股股票代码列表")
try:
# 使用AKShare获取A股列表
import akshare as ak
self.logger.info("正在获取A股列表...")
stock_list_df = ak.stock_info_a_code_name()
if stock_list_df is not None and not stock_list_df.empty:
stock_codes = stock_list_df['code'].tolist()
self.logger.info(f"成功获取 {len(stock_codes)} 只A股代码")
return stock_codes
else:
self.logger.warning("未获取到A股列表")
return []
except ImportError:
self.logger.error("AKShare未安装,请安装: pip install akshare")
return []
except Exception as e:
self.logger.error(f"获取股票列表失败: {e}")
return []
def collect_daily