From 5cb232a7398ef5c78ac2f2578108d7c452dfce2f Mon Sep 17 00:00:00 2001 From: cfdaily Date: Thu, 26 Mar 2026 11:35:20 +0800 Subject: [PATCH] auto-sync: 2026-03-26 11:35:20 --- .../data_acquisition/a_stock_basic_info.py | 650 +++++++++--------- 1 file changed, 343 insertions(+), 307 deletions(-) diff --git a/zhaoyun-data/scripts/data_acquisition/a_stock_basic_info.py b/zhaoyun-data/scripts/data_acquisition/a_stock_basic_info.py index 028f9ed30..c6140d233 100644 --- a/zhaoyun-data/scripts/data_acquisition/a_stock_basic_info.py +++ b/zhaoyun-data/scripts/data_acquisition/a_stock_basic_info.py @@ -8,241 +8,197 @@ import os import time import json import pandas as pd +import numpy as np from datetime import datetime -from typing import List, Dict, Optional, Any import logging +import warnings -# 添加项目路径 -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +warnings.filterwarnings('ignore') -from utils.data_utils import DataUtils -from utils.log_utils import LogUtils +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) class AStockBasicInfoCollector: """A股基础信息数据采集器""" - def __init__(self, config_path: str = None): - """初始化采集器 - - Args: - config_path: 配置文件路径 - """ - # 配置日志 - self.logger = LogUtils.setup_logger('a_stock_basic_info') - - # 加载配置 - self.config = self._load_config(config_path) + def __init__(self): + """初始化采集器""" + logger.info("A股基础信息采集器初始化") # 基础路径 self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data" - self.raw_dir = os.path.join(self.base_dir, "raw") - self.processed_dir = os.path.join(self.base_dir, "processed") + self.raw_dir = os.path.join(self.base_dir, "raw", "stock_info") + self.processed_dir = os.path.join(self.base_dir, "processed", "stock_info") # 确保目录存在 - os.makedirs(os.path.join(self.raw_dir, "stock_info"), exist_ok=True) - os.makedirs(os.path.join(self.processed_dir, "stock_info"), exist_ok=True) + os.makedirs(self.raw_dir, exist_ok=True) + os.makedirs(self.processed_dir, exist_ok=True) - self.logger.info("A股基础信息采集器初始化完成") + # 数据采集时间 + self.collection_time = datetime.now() + + logger.info("采集器初始化完成") - def _load_config(self, config_path: Optional[str] = None) -> Dict: - """加载配置 - - Args: - config_path: 配置文件路径 - - Returns: - Dict: 配置信息 - """ - default_config = { - "data_sources": { - "akshare": { - "enabled": True, - "rate_limit": 30, # 请求间隔毫秒 - "max_retries": 3, - "retry_delay": 5 # 重试延迟秒 - }, - "tushare": { - "enabled": False, - "token": "", # 需要配置API Token - "rate_limit": 500 # Tushare的请求限制 - } - }, - "data_fields": { - "basic_info": [ - "symbol", # 股票代码 - "name", # 股票名称 - "industry", # 所属行业 - "area", # 地区 - "market", # 市场类型(主板/创业板/科创板) - "list_date", # 上市日期 - "delist_date", # 退市日期(如未退市则为空) - "exchange", # 交易所(SH/SZ) - "is_hs", # 是否沪深港通标的 - "is_st", # 是否ST/*ST股票 - "status", # 上市状态(L上市 D退市 P暂停上市) - "ts_code" # Tushare代码 - ] - }, - "storage": { - "raw_format": "parquet", - "processed_format": "parquet", - "compression": "snappy", - "partition_by": ["year", "month"] - } - } - - if config_path and os.path.exists(config_path): - try: - with open(config_path, 'r', encoding='utf-8') as f: - user_config = json.load(f) - default_config.update(user_config) - self.logger.info(f"从 {config_path} 加载用户配置") - except Exception as e: - self.logger.warning(f"加载用户配置文件失败: {e}") - - return default_config - - def collect_basic_info_akshare(self) -> pd.DataFrame: - """使用AKShare采集A股基础信息 + def collect_basic_info(self) -> pd.DataFrame: + """采集A股基础信息 Returns: pd.DataFrame: 股票基础信息数据 """ - self.logger.info("开始使用AKShare采集A股基础信息") + logger.info("开始采集A股基础信息") try: - # 尝试导入akshare import akshare as ak - self.logger.info("正在获取A股基础信息...") + # 1. 获取A股代码和名称 + logger.info("获取A股代码和名称...") + stock_list = ak.stock_info_a_code_name() - # 采集A股基础信息 - stock_info_df = ak.stock_info_a_code_name() + if stock_list is None or stock_list.empty: + logger.error("未获取到股票列表") + return pd.DataFrame() - # 检查数据是否有效 - if stock_info_df is not None and not stock_info_df.empty: - self.logger.info(f"成功采集 {len(stock_info_df)} 条股票基础信息") + logger.info(f"获取到 {len(stock_list)} 只股票基本信息") + + # 2. 获取更详细的股票信息 + logger.info("获取详细股票信息...") + stock_info_list = [] + + for idx, row in stock_list.iterrows(): + if idx % 100 == 0: + logger.info(f"处理第 {idx+1}/{len(stock_list)} 只股票") - # 标准化字段名 - stock_info_df = self._standardize_columns(stock_info_df) + try: + # 获取单只股票详细信息 + stock_detail = ak.stock_individual_info_em(symbol=row['code']) + + if stock_detail is not None and not stock_detail.empty: + # 转换为字典 + stock_dict = { + 'code': row['code'], + 'name': row['name'], + 'collection_time': self.collection_time + } + + # 添加详细信息 + for _, detail_row in stock_detail.iterrows(): + key = detail_row['item'] + value = detail_row['value'] + stock_dict[key] = value + + stock_info_list.append(stock_dict) + + # 避免请求过快 + time.sleep(0.1) + + except Exception as e: + logger.warning(f"处理股票 {row['code']} 时出错: {e}") + continue + + # 转换为DataFrame + if stock_info_list: + stock_df = pd.DataFrame(stock_info_list) + logger.info(f"成功采集 {len(stock_df)} 只股票详细信息") # 保存原始数据 - self._save_raw_data(stock_info_df, "stock_basic_info", "akshare") + self._save_raw_data(stock_df) # 处理数据 - processed_df = self._process_basic_info(stock_info_df) + processed_df = self._process_basic_info(stock_df) # 保存处理后数据 - self._save_processed_data(processed_df, "stock_basic_info") + self._save_processed_data(processed_df) return processed_df - else: - self.logger.warning("AKShare返回数据为空") + logger.error("未采集到任何股票详细信息") return pd.DataFrame() - - except ImportError: - self.logger.error("AKShare未安装,请安装: pip install akshare") - return pd.DataFrame() + except ImportError: + logger.error("AKShare未安装,请安装: pip install akshare") + return pd.DataFrame() except Exception as e: - self.logger.error(f"使用AKShare采集数据失败: {e}") + logger.error(f"采集基础信息失败: {e}") return pd.DataFrame() - def collect_basic_info_tushare(self) -> pd.DataFrame: - """使用Tushare采集A股基础信息 + def collect_industry_info(self) -> pd.DataFrame: + """采集行业分类信息 Returns: - pd.DataFrame: 股票基础信息数据 + pd.DataFrame: 行业分类数据 """ - self.logger.info("开始使用Tushare采集A股基础信息") + logger.info("开始采集行业分类信息") try: - # 尝试导入tushare - import tushare as ts + import akshare as ak - # 检查是否配置了API Token - if not self.config.get("data_sources", {}).get("tushare", {}).get("enabled", False): - self.logger.warning("Tushare未启用,请在配置中设置enabled: true并配置token") - return pd.DataFrame() + # 获取申万行业分类 + logger.info("获取申万行业分类...") + sw_industry = ak.stock_board_industry_sw_spot() - # 设置API Token - ts.set_token(self.config["data_sources"]["tushare"]["token"]) - pro = ts.pro_api() + if sw_industry is not None and not sw_industry.empty: + logger.info(f"获取到 {len(sw_industry)} 个申万行业") + + # 保存行业数据 + industry_file = os.path.join(self.raw_dir, f"sw_industry_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.csv") + sw_industry.to_csv(industry_file, index=False, encoding='utf-8-sig') + logger.info(f"申万行业数据已保存: {industry_file}") - self.logger.info("正在获取A股基础信息...") + # 获取证监会行业分类 + logger.info("获取证监会行业分类...") + csrc_industry = ak.stock_board_industry_csrc_spot() - # 采集A股基础信息 - stock_info_df = pro.stock_basic( - exchange='', - list_status='L', # 上市状态 - fields='ts_code,symbol,name,area,industry,market,list_date,delist_date,is_hs,is_st,status' - ) + if csrc_industry is not None and not csrc_industry.empty: + logger.info(f"获取到 {len(csrc_industry)} 个证监会行业") + + # 保存行业数据 + industry_file = os.path.join(self.raw_dir, f"csrc_industry_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.csv") + csrc_industry.to_csv(industry_file, index=False, encoding='utf-8-sig') + logger.info(f"证监会行业数据已保存: {industry_file}") - # 检查数据是否有效 - if stock_info_df is not None and not stock_info_df.empty: - self.logger.info(f"成功采集 {len(stock_info_df)} 条股票基础信息") - - # 标准化字段名 - stock_info_df = self._standardize_columns(stock_info_df) - - # 保存原始数据 - self._save_raw_data(stock_info_df, "stock_basic_info", "tushare") - - # 处理数据 - processed_df = self._process_basic_info(stock_info_df) - - # 保存处理后数据 - self._save_processed_data(processed_df, "stock_basic_info") - - return processed_df + # 合并行业信息 + industry_info = pd.DataFrame() + if sw_industry is not None: + industry_info = sw_industry.copy() - else: - self.logger.warning("Tushare返回数据为空") - return pd.DataFrame() - - except ImportError: - self.logger.error("Tushare未安装,请安装: pip install tushare") - return pd.DataFrame() + return industry_info except Exception as e: - self.logger.error(f"使用Tushare采集数据失败: {e}") + logger.error(f"采集行业信息失败: {e}") return pd.DataFrame() - def _standardize_columns(self, df: pd.DataFrame) -> pd.DataFrame: - """标准化数据列名 + def _save_raw_data(self, df: pd.DataFrame): + """保存原始数据 Args: df: 原始数据 - - Returns: - pd.DataFrame: 标准化后的数据 """ - # 定义标准列名映射 - column_mapping = { - 'code': 'symbol', - 'ts_code': 'symbol', - 'name': 'name', - 'industry': 'industry', - 'area': 'area', - 'market': 'market', - 'list_date': 'list_date', - 'delist_date': 'delist_date', - 'exchange': 'exchange', - 'is_hs': 'is_hs', - 'is_st': 'is_st', - 'status': 'status' - } + if df is None or df.empty: + logger.warning("数据为空,不保存原始数据") + return - # 重命名列 - df = df.rename(columns=column_mapping) - - # 确保所有标准列都存在,缺失的填充为空 - for col in column_mapping.values(): - if col not in df.columns: - df[col] = None - - return df + try: + # 保存为CSV格式 + csv_file = os.path.join(self.raw_dir, f"stock_basic_info_raw_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.csv") + df.to_csv(csv_file, index=False, encoding='utf-8-sig') + logger.info(f"原始数据已保存: {csv_file}") + + # 保存为JSON格式 + json_file = os.path.join(self.raw_dir, f"stock_basic_info_raw_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.json") + df.to_json(json_file, orient='records', force_ascii=False, indent=2) + logger.info(f"原始JSON数据已保存: {json_file}") + + # 保存为Parquet格式 + parquet_file = os.path.join(self.raw_dir, f"stock_basic_info_raw_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.parquet") + df.to_parquet(parquet_file, compression='snappy') + logger.info(f"原始Parquet数据已保存: {parquet_file}") + + except Exception as e: + logger.error(f"保存原始数据失败: {e}") def _process_basic_info(self, df: pd.DataFrame) -> pd.DataFrame: """处理基础信息数据 @@ -253,188 +209,268 @@ class AStockBasicInfoCollector: Returns: pd.DataFrame: 处理后数据 """ - self.logger.info("开始处理基础信息数据") + logger.info("开始处理基础信息数据") - # 创建处理后的DataFrame - processed_df = df.copy() - - # 标准化日期格式 - date_columns = ['list_date', 'delist_date'] - for col in date_columns: - if col in processed_df.columns: - # 转换日期格式 - processed_df[col] = pd.to_datetime(processed_df[col], errors='coerce') - - # 添加数据采集时间 - processed_df['data_crawl_time'] = datetime.now() - - # 添加数据源信息 - processed_df['data_source'] = "akshare" if len(df) > 0 else "unknown" - - # 添加数据版本 - processed_df['data_version'] = "1.0.0" - - # 添加处理时间 - processed_df['processed_time'] = datetime.now() - - # 创建唯一标识符 - processed_df['data_id'] = processed_df['symbol'] + '_' + processed_df['data_version'] - - self.logger.info("基础信息数据处理完成") - - return processed_df - - def _save_raw_data(self, df: pd.DataFrame, data_type: str, source: str): - """保存原始数据 - - Args: - df: 数据DataFrame - data_type: 数据类型 - source: 数据源名称 - """ if df is None or df.empty: - self.logger.warning("数据为空,不保存") - return + return df - # 创建文件名 - timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - filename = f"{data_type}_{source}_{timestamp}.parquet" - filepath = os.path.join(self.raw_dir, "stock_info", filename) - - # 保存为Parquet格式 try: - df.to_parquet(filepath, compression='snappy') - self.logger.info(f"原始数据已保存: {filepath}") + # 创建副本 + processed_df = df.copy() - # 记录数据存储信息 - data_info = { - 'data_type': data_type, - 'source': source, - 'timestamp': timestamp, - 'file_size': os.path.getsize(filepath), - 'row_count': len(df), - 'save_time': datetime.now().isoformat() + # 标准化列名 + column_mapping = { + 'code': 'symbol', + 'name': 'name', + '上市日期': 'list_date', + '行业分类': 'industry', + '总市值': 'total_market_cap', + '流通市值': 'circulating_market_cap', + '市盈率': 'pe_ratio', + '市净率': 'pb_ratio', + '涨跌幅': 'change_pct', + '成交量': 'volume', + '成交额': 'turnover' } - # 保存数据存储信息 - info_path = os.path.join(self.raw_dir, "stock_info", f"{data_type}_{source}_{timestamp}_info.json") - with open(info_path, 'w', encoding='utf-8') as f: - json.dump(data_info, f, ensure_ascii=False, indent=2) + # 重命名列 + processed_df = processed_df.rename(columns=column_mapping) - self.logger.info(f"数据存储信息已保存: {info_path}") + # 添加标准字段(如果不存在) + standard_fields = ['symbol', 'name', 'list_date', 'industry', 'exchange', 'status'] + for field in standard_fields: + if field not in processed_df.columns: + processed_df[field] = None + + # 处理日期格式 + if 'list_date' in processed_df.columns: + processed_df['list_date'] = pd.to_datetime(processed_df['list_date'], errors='coerce') + + # 处理数值字段 + numeric_fields = ['total_market_cap', 'circulating_market_cap', 'pe_ratio', 'pb_ratio'] + for field in numeric_fields: + if field in processed_df.columns: + processed_df[field] = pd.to_numeric(processed_df[field], errors='coerce') + + # 添加数据质量标记 + processed_df['data_quality'] = 'good' + processed_df['processed_time'] = datetime.now() + processed_df['data_version'] = '1.0.0' + + # 添加数据源信息 + processed_df['data_source'] = 'akshare' + + # 创建唯一ID + processed_df['stock_id'] = processed_df['symbol'].astype(str) + '_' + processed_df['data_version'] + + logger.info(f"数据处理完成,共 {len(processed_df)} 条记录") + + return processed_df except Exception as e: - self.logger.error(f"保存原始数据失败: {e}") + logger.error(f"处理基础信息数据失败: {e}") + return df - def _save_processed_data(self, df: pd.DataFrame, data_type: str): + def _save_processed_data(self, df: pd.DataFrame): """保存处理后数据 Args: df: 处理后数据 - data_type: 数据类型 """ if df is None or df.empty: - self.logger.warning("数据为空,不保存") + logger.warning("数据为空,不保存处理后数据") return - # 创建文件名 - timestamp = datetime.now().strftime('%Y%m%d_%H%M%S") - filename = f"{data_type}_processed_{timestamp}.parquet" - filepath = os.path.join(self.processed_dir, "stock_info", filename) - - # 保存为Parquet格式 try: - df.to_parquet(filepath, compression='snappy') - self.logger.info(f"处理后数据已保存: {filepath}") + # 保存为Parquet格式(主要格式) + parquet_file = os.path.join(self.processed_dir, f"stock_basic_info_processed_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.parquet") + df.to_parquet(parquet_file, compression='snappy') + logger.info(f"处理后数据已保存(Parquet): {parquet_file}") - # 记录处理信息 - process_info = { - 'data_type': data_type, - 'timestamp': timestamp, - 'file_size': os.path.getsize(filepath), - 'row_count': len(df), - 'processed_time': datetime.now().isoformat(), - 'field_count': len(df.columns) - } + # 保存为CSV格式(便于查看) + csv_file = os.path.join(self.processed_dir, f"stock_basic_info_processed_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.csv") + df.to_csv(csv_file, index=False, encoding='utf-8-sig') + logger.info(f"处理后数据已保存(CSV): {csv_file}") - # 保存处理信息 - info_path = os.path.join(self.processed_dir, "stock_info", f"{data_type}_processed_{timestamp}_info.json") - with open(info_path, 'w', encoding='utf-8') as f: - json.dump(process_info, f, ensure_ascii=False, indent=2) - - self.logger.info(f"数据存储信息已保存: {info_path}") + # 保存数据摘要 + summary = self._generate_summary(df) + summary_file = os.path.join(self.processed_dir, f"stock_basic_info_summary_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.json") + with open(summary_file, 'w', encoding='utf-8') as f: + json.dump(summary, f, ensure_ascii=False, indent=2) + logger.info(f"数据摘要已保存: {summary_file}") except Exception as e: - self.logger.error(f"保存处理后数据失败: {e}") + logger.error(f"保存处理后数据失败: {e}") - def generate_summary_report(self, df: pd.DataFrame) -> Dict: - """生成数据摘要报告 + def _generate_summary(self, df: pd.DataFrame) -> dict: + """生成数据摘要 Args: df: 数据 Returns: - Dict: 数据摘要报告 + dict: 数据摘要 """ - self.logger.info("生成数据摘要报告") - if df is None or df.empty: return {"error": "数据为空"} try: - # 统计数据 summary = { - 'collection_time': datetime.now().isoformat(), - 'total_records': len(df), - 'unique_stocks': df['symbol'].nunique(), - 'market_distribution': df['market'].value_counts().to_dict(), - 'industry_distribution': df['industry'].value_counts().head(10).to_dict(), - 'status_distribution': df['status'].value_counts().to_dict(), + "collection_time": self.collection_time.isoformat(), + "total_records": len(df), + "unique_symbols": df['symbol'].nunique() if 'symbol' in df.columns else 0, + "industry_distribution": {}, + "market_cap_stats": {}, + "data_quality": {} + } + + # 行业分布统计 + if 'industry' in df.columns: + industry_counts = df['industry'].value_counts().head(10).to_dict() + summary["industry_distribution"] = industry_counts + + # 市值统计 + if 'total_market_cap' in df.columns: + market_cap_series = df['total_market_cap'].dropna() + if not market_cap_series.empty: + summary["market_cap_stats"] = { + "mean": float(market_cap_series.mean()), + "median": float(market_cap_series.median()), + "min": float(market_cap_series.min()), + "max": float(market_cap_series.max()), + "std": float(market_cap_series.std()) + } + + # 数据质量统计 + if 'data_quality' in df.columns: + quality_counts = df['data_quality'].value_counts().to_dict() + summary["data_quality"] = quality_counts + + # 交易所分布 + if 'exchange' in df.columns: + exchange_counts = df['exchange'].value_counts().to_dict() + summary["exchange_distribution"] = exchange_counts + + # 数据字段信息 + summary["data_fields"] = { + "total_fields": len(df.columns), + "field_list": list(df.columns), + "field_types": {col: str(df[col].dtype) for col in df.columns} + } + + logger.info("数据摘要生成完成") + + return summary + + except Exception as e: + logger.error(f"生成数据摘要失败: {e}") + return {"error": str(e)} + + def run(self) -> dict: + """运行采集器 + + Returns: + dict: 采集结果 + """ + logger.info("开始执行A股基础信息采集任务") + + result = { + "success": False, + "data_collected": False, + "summary": {}, + "files_saved": [], + "error": None } - # 添加时间信息 - if 'list_date' in df.columns: - summary['list_years'] = { - 'min_list_date': df['list_date'].min().isoformat() if pd.notna(df['list_date'].min()) else None, - 'max_list_date': df['list_date'].max().isoformat() if pd.notna(df['list_date'].max()) else None, - 'by_year': df['list_date'].dt.year.value_counts().to_dict() if pd.notna(df['list_date']).any() else {} - } + try: + # 采集基础信息 + basic_info_df = self.collect_basic_info() + + if basic_info_df is not None and not basic_info_df.empty: + result["data_collected"] = True + result["records_collected"] = len(basic_info_df) + + # 采集行业信息 + industry_df = self.collect_industry_info() + if industry_df is not None and not industry_df.empty: + result["industry_records"] = len(industry_df) + + # 生成摘要 + summary = self._generate_summary(basic_info_df) + result["summary"] = summary + + # 获取保存的文件列表 + raw_files = [f for f in os.listdir(self.raw_dir) if f.endswith(('.csv', '.json', '.parquet'))] + processed_files = [f for f in os.listdir(self.processed_dir) if f.endswith(('.csv', '.json', '.parquet'))] + + result["files_saved"] = { + "raw": raw_files, + "processed": processed_files + } + + result["success"] = True + result["message"] = f"成功采集 {len(basic_info_df)} 只股票基础信息" + + logger.info("A股基础信息采集任务完成") + + else: + result["error"] = "未采集到有效数据" + logger.error("未采集到有效数据") + + except Exception as e: + result["error"] = str(e) + logger.error(f"采集任务执行失败: {e}") - self.logger.info("数据摘要报告生成完成") - - return summary + return result + def main(): """主函数""" + print("=" * 60) + print("📊 A股基础信息数据采集") + print("=" * 60) + + # 创建采集器 collector = AStockBasicInfoCollector() - # 采集基础信息数据 - basic_info_akshare = collector.collect_basic_info_akshare() + # 运行采集 + print("开始采集数据...") + result = collector.run() - if not basic_info_akshare.empty: - # 生成摘要报告 - summary = collector.generate_summary_report(basic_info_akshare) + # 输出结果 + print("\n" + "=" * 60) + print("📋 采集结果") + print("=" * 60) + + if result["success"]: + print(f"✅ {result['message']}") + print(f"📈 记录数: {result.get('records_collected', 0)}") - # 输出摘要报告 - print(json.dumps(summary, ensure_ascii=False, indent=2)) + summary = result.get("summary", {}) + if summary: + print(f"📊 行业数量: {len(summary.get('industry_distribution', {}))}") + print(f"🏦 市值统计: 平均{summary.get('market_cap_stats', {}).get('mean', 0):,.2f}亿元") - # 获取保存的原始数据路径 - raw_files = os.listdir(os.path.join(collector.raw_dir, "stock_info")) - processed_files = os.listdir(os.path.join(collector.processed_dir, "stock_info")) + files = result.get("files_saved", {}) + if files: + print(f"💾 保存文件:") + print(f" 原始数据: {len(files.get('raw', []))}个文件") + print(f" 处理后数据: {len(files.get('processed', []))}个文件") - print(f"\n✅ 数据采集完成!") - print(f"📊 统计信息:") - print(f" 采集记录数: {len(basic_info_akshare)}") - print(f" 原始数据文件: {len(raw_files)}个") - print(f" 处理后数据文件: {len(processed_files)}个") - print(f" 生成摘要报告: 包含{len(summary)}个统计指标") + # 输出保存路径 + print(f"📁 数据保存位置:") + print(f" 原始数据: {collector.raw_dir}") + print(f" 处理后数据: {collector.processed_dir}") else: - print("❌ 数据采集失败!") - print(" 可能原因:") - print(" 1. AKShare未安装: pip install akshare") - print(" 2. 网络连接问题") - print(" 3. 数据源暂时不可用") + print(f"❌ 采集失败: {result.get('error', '未知错误')}") + print("⚠️ 请检查:") + print(" 1. AKShare是否安装: pip install akshare") + print(" 2. 网络连接是否正常") + print(" 3. 数据源是否可用") + + print("=" * 60) + if __name__ == "__main__": main() \ No newline at end of file