diff --git a/zhaoyun-data/scripts/data_acquisition/a_stock_basic_info.py b/zhaoyun-data/scripts/data_acquisition/a_stock_basic_info.py new file mode 100644 index 000000000..028f9ed30 --- /dev/null +++ b/zhaoyun-data/scripts/data_acquisition/a_stock_basic_info.py @@ -0,0 +1,440 @@ +#!/usr/bin/env python3 +""" +A股基础信息数据采集脚本 +获取股票代码、名称、上市日期、行业分类等基础信息 +""" +import sys +import os +import time +import json +import pandas as pd +from datetime import datetime +from typing import List, Dict, Optional, Any +import logging + +# 添加项目路径 +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from utils.data_utils import DataUtils +from utils.log_utils import LogUtils + +class AStockBasicInfoCollector: + """A股基础信息数据采集器""" + + def __init__(self, config_path: str = None): + """初始化采集器 + + Args: + config_path: 配置文件路径 + """ + # 配置日志 + self.logger = LogUtils.setup_logger('a_stock_basic_info') + + # 加载配置 + self.config = self._load_config(config_path) + + # 基础路径 + self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data" + self.raw_dir = os.path.join(self.base_dir, "raw") + self.processed_dir = os.path.join(self.base_dir, "processed") + + # 确保目录存在 + os.makedirs(os.path.join(self.raw_dir, "stock_info"), exist_ok=True) + os.makedirs(os.path.join(self.processed_dir, "stock_info"), exist_ok=True) + + self.logger.info("A股基础信息采集器初始化完成") + + def _load_config(self, config_path: Optional[str] = None) -> Dict: + """加载配置 + + Args: + config_path: 配置文件路径 + + Returns: + Dict: 配置信息 + """ + default_config = { + "data_sources": { + "akshare": { + "enabled": True, + "rate_limit": 30, # 请求间隔毫秒 + "max_retries": 3, + "retry_delay": 5 # 重试延迟秒 + }, + "tushare": { + "enabled": False, + "token": "", # 需要配置API Token + "rate_limit": 500 # Tushare的请求限制 + } + }, + "data_fields": { + "basic_info": [ + "symbol", # 股票代码 + "name", # 股票名称 + "industry", # 所属行业 + "area", # 地区 + "market", # 市场类型(主板/创业板/科创板) + "list_date", # 上市日期 + "delist_date", # 退市日期(如未退市则为空) + "exchange", # 交易所(SH/SZ) + "is_hs", # 是否沪深港通标的 + "is_st", # 是否ST/*ST股票 + "status", # 上市状态(L上市 D退市 P暂停上市) + "ts_code" # Tushare代码 + ] + }, + "storage": { + "raw_format": "parquet", + "processed_format": "parquet", + "compression": "snappy", + "partition_by": ["year", "month"] + } + } + + if config_path and os.path.exists(config_path): + try: + with open(config_path, 'r', encoding='utf-8') as f: + user_config = json.load(f) + default_config.update(user_config) + self.logger.info(f"从 {config_path} 加载用户配置") + except Exception as e: + self.logger.warning(f"加载用户配置文件失败: {e}") + + return default_config + + def collect_basic_info_akshare(self) -> pd.DataFrame: + """使用AKShare采集A股基础信息 + + Returns: + pd.DataFrame: 股票基础信息数据 + """ + self.logger.info("开始使用AKShare采集A股基础信息") + + try: + # 尝试导入akshare + import akshare as ak + + self.logger.info("正在获取A股基础信息...") + + # 采集A股基础信息 + stock_info_df = ak.stock_info_a_code_name() + + # 检查数据是否有效 + if stock_info_df is not None and not stock_info_df.empty: + self.logger.info(f"成功采集 {len(stock_info_df)} 条股票基础信息") + + # 标准化字段名 + stock_info_df = self._standardize_columns(stock_info_df) + + # 保存原始数据 + self._save_raw_data(stock_info_df, "stock_basic_info", "akshare") + + # 处理数据 + processed_df = self._process_basic_info(stock_info_df) + + # 保存处理后数据 + self._save_processed_data(processed_df, "stock_basic_info") + + return processed_df + + else: + self.logger.warning("AKShare返回数据为空") + return pd.DataFrame() + + except ImportError: + self.logger.error("AKShare未安装,请安装: pip install akshare") + return pd.DataFrame() + + except Exception as e: + self.logger.error(f"使用AKShare采集数据失败: {e}") + return pd.DataFrame() + + def collect_basic_info_tushare(self) -> pd.DataFrame: + """使用Tushare采集A股基础信息 + + Returns: + pd.DataFrame: 股票基础信息数据 + """ + self.logger.info("开始使用Tushare采集A股基础信息") + + try: + # 尝试导入tushare + import tushare as ts + + # 检查是否配置了API Token + if not self.config.get("data_sources", {}).get("tushare", {}).get("enabled", False): + self.logger.warning("Tushare未启用,请在配置中设置enabled: true并配置token") + return pd.DataFrame() + + # 设置API Token + ts.set_token(self.config["data_sources"]["tushare"]["token"]) + pro = ts.pro_api() + + self.logger.info("正在获取A股基础信息...") + + # 采集A股基础信息 + stock_info_df = pro.stock_basic( + exchange='', + list_status='L', # 上市状态 + fields='ts_code,symbol,name,area,industry,market,list_date,delist_date,is_hs,is_st,status' + ) + + # 检查数据是否有效 + if stock_info_df is not None and not stock_info_df.empty: + self.logger.info(f"成功采集 {len(stock_info_df)} 条股票基础信息") + + # 标准化字段名 + stock_info_df = self._standardize_columns(stock_info_df) + + # 保存原始数据 + self._save_raw_data(stock_info_df, "stock_basic_info", "tushare") + + # 处理数据 + processed_df = self._process_basic_info(stock_info_df) + + # 保存处理后数据 + self._save_processed_data(processed_df, "stock_basic_info") + + return processed_df + + else: + self.logger.warning("Tushare返回数据为空") + return pd.DataFrame() + + except ImportError: + self.logger.error("Tushare未安装,请安装: pip install tushare") + return pd.DataFrame() + + except Exception as e: + self.logger.error(f"使用Tushare采集数据失败: {e}") + return pd.DataFrame() + + def _standardize_columns(self, df: pd.DataFrame) -> pd.DataFrame: + """标准化数据列名 + + Args: + df: 原始数据 + + Returns: + pd.DataFrame: 标准化后的数据 + """ + # 定义标准列名映射 + column_mapping = { + 'code': 'symbol', + 'ts_code': 'symbol', + 'name': 'name', + 'industry': 'industry', + 'area': 'area', + 'market': 'market', + 'list_date': 'list_date', + 'delist_date': 'delist_date', + 'exchange': 'exchange', + 'is_hs': 'is_hs', + 'is_st': 'is_st', + 'status': 'status' + } + + # 重命名列 + df = df.rename(columns=column_mapping) + + # 确保所有标准列都存在,缺失的填充为空 + for col in column_mapping.values(): + if col not in df.columns: + df[col] = None + + return df + + def _process_basic_info(self, df: pd.DataFrame) -> pd.DataFrame: + """处理基础信息数据 + + Args: + df: 原始数据 + + Returns: + pd.DataFrame: 处理后数据 + """ + self.logger.info("开始处理基础信息数据") + + # 创建处理后的DataFrame + processed_df = df.copy() + + # 标准化日期格式 + date_columns = ['list_date', 'delist_date'] + for col in date_columns: + if col in processed_df.columns: + # 转换日期格式 + processed_df[col] = pd.to_datetime(processed_df[col], errors='coerce') + + # 添加数据采集时间 + processed_df['data_crawl_time'] = datetime.now() + + # 添加数据源信息 + processed_df['data_source'] = "akshare" if len(df) > 0 else "unknown" + + # 添加数据版本 + processed_df['data_version'] = "1.0.0" + + # 添加处理时间 + processed_df['processed_time'] = datetime.now() + + # 创建唯一标识符 + processed_df['data_id'] = processed_df['symbol'] + '_' + processed_df['data_version'] + + self.logger.info("基础信息数据处理完成") + + return processed_df + + def _save_raw_data(self, df: pd.DataFrame, data_type: str, source: str): + """保存原始数据 + + Args: + df: 数据DataFrame + data_type: 数据类型 + source: 数据源名称 + """ + if df is None or df.empty: + self.logger.warning("数据为空,不保存") + return + + # 创建文件名 + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + filename = f"{data_type}_{source}_{timestamp}.parquet" + filepath = os.path.join(self.raw_dir, "stock_info", filename) + + # 保存为Parquet格式 + try: + df.to_parquet(filepath, compression='snappy') + self.logger.info(f"原始数据已保存: {filepath}") + + # 记录数据存储信息 + data_info = { + 'data_type': data_type, + 'source': source, + 'timestamp': timestamp, + 'file_size': os.path.getsize(filepath), + 'row_count': len(df), + 'save_time': datetime.now().isoformat() + } + + # 保存数据存储信息 + info_path = os.path.join(self.raw_dir, "stock_info", f"{data_type}_{source}_{timestamp}_info.json") + with open(info_path, 'w', encoding='utf-8') as f: + json.dump(data_info, f, ensure_ascii=False, indent=2) + + self.logger.info(f"数据存储信息已保存: {info_path}") + + except Exception as e: + self.logger.error(f"保存原始数据失败: {e}") + + def _save_processed_data(self, df: pd.DataFrame, data_type: str): + """保存处理后数据 + + Args: + df: 处理后数据 + data_type: 数据类型 + """ + if df is None or df.empty: + self.logger.warning("数据为空,不保存") + return + + # 创建文件名 + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S") + filename = f"{data_type}_processed_{timestamp}.parquet" + filepath = os.path.join(self.processed_dir, "stock_info", filename) + + # 保存为Parquet格式 + try: + df.to_parquet(filepath, compression='snappy') + self.logger.info(f"处理后数据已保存: {filepath}") + + # 记录处理信息 + process_info = { + 'data_type': data_type, + 'timestamp': timestamp, + 'file_size': os.path.getsize(filepath), + 'row_count': len(df), + 'processed_time': datetime.now().isoformat(), + 'field_count': len(df.columns) + } + + # 保存处理信息 + info_path = os.path.join(self.processed_dir, "stock_info", f"{data_type}_processed_{timestamp}_info.json") + with open(info_path, 'w', encoding='utf-8') as f: + json.dump(process_info, f, ensure_ascii=False, indent=2) + + self.logger.info(f"数据存储信息已保存: {info_path}") + + except Exception as e: + self.logger.error(f"保存处理后数据失败: {e}") + + def generate_summary_report(self, df: pd.DataFrame) -> Dict: + """生成数据摘要报告 + + Args: + df: 数据 + + Returns: + Dict: 数据摘要报告 + """ + self.logger.info("生成数据摘要报告") + + if df is None or df.empty: + return {"error": "数据为空"} + + try: + # 统计数据 + summary = { + 'collection_time': datetime.now().isoformat(), + 'total_records': len(df), + 'unique_stocks': df['symbol'].nunique(), + 'market_distribution': df['market'].value_counts().to_dict(), + 'industry_distribution': df['industry'].value_counts().head(10).to_dict(), + 'status_distribution': df['status'].value_counts().to_dict(), + } + + # 添加时间信息 + if 'list_date' in df.columns: + summary['list_years'] = { + 'min_list_date': df['list_date'].min().isoformat() if pd.notna(df['list_date'].min()) else None, + 'max_list_date': df['list_date'].max().isoformat() if pd.notna(df['list_date'].max()) else None, + 'by_year': df['list_date'].dt.year.value_counts().to_dict() if pd.notna(df['list_date']).any() else {} + } + + self.logger.info("数据摘要报告生成完成") + + return summary + +def main(): + """主函数""" + collector = AStockBasicInfoCollector() + + # 采集基础信息数据 + basic_info_akshare = collector.collect_basic_info_akshare() + + if not basic_info_akshare.empty: + # 生成摘要报告 + summary = collector.generate_summary_report(basic_info_akshare) + + # 输出摘要报告 + print(json.dumps(summary, ensure_ascii=False, indent=2)) + + # 获取保存的原始数据路径 + raw_files = os.listdir(os.path.join(collector.raw_dir, "stock_info")) + processed_files = os.listdir(os.path.join(collector.processed_dir, "stock_info")) + + print(f"\n✅ 数据采集完成!") + print(f"📊 统计信息:") + print(f" 采集记录数: {len(basic_info_akshare)}") + print(f" 原始数据文件: {len(raw_files)}个") + print(f" 处理后数据文件: {len(processed_files)}个") + print(f" 生成摘要报告: 包含{len(summary)}个统计指标") + + else: + print("❌ 数据采集失败!") + print(" 可能原因:") + print(" 1. AKShare未安装: pip install akshare") + print(" 2. 网络连接问题") + print(" 3. 数据源暂时不可用") + +if __name__ == "__main__": + main() \ No newline at end of file