From 68836379ba38517f8e355aed6266999052c13ee2 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Thu, 26 Mar 2026 11:40:53 +0800 Subject: [PATCH] auto-sync: 2026-03-26 11:40:53 --- .../a_stock_financial_data.py | 637 ++++++++++++++++++ 1 file changed, 637 insertions(+) create mode 100644 zhaoyun-data/scripts/data_acquisition/a_stock_financial_data.py diff --git a/zhaoyun-data/scripts/data_acquisition/a_stock_financial_data.py b/zhaoyun-data/scripts/data_acquisition/a_stock_financial_data.py new file mode 100644 index 000000000..67ec844a9 --- /dev/null +++ b/zhaoyun-data/scripts/data_acquisition/a_stock_financial_data.py @@ -0,0 +1,637 @@ +#!/usr/bin/env python3 +""" +A股财务数据采集脚本 +获取资产负债表、利润表、现金流量表核心指标 +计算PE、PB、ROE、股息率等常用估值指标 +""" +import sys +import os +import time +import json +import pandas as pd +import numpy as np +from datetime import datetime, timedelta +from typing import List, Dict, Optional +import logging +import warnings +from concurrent.futures import ThreadPoolExecutor, as_completed + +warnings.filterwarnings('ignore') + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class AStockFinancialDataCollector: + """A股财务数据采集器""" + + def __init__(self): + """初始化采集器""" + logger.info("A股财务数据采集器初始化") + + # 基础路径 + self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data" + self.raw_dir = os.path.join(self.base_dir, "raw", "financial_reports") + self.processed_dir = os.path.join(self.base_dir, "processed", "financial_indicators") + + # 确保目录存在 + os.makedirs(self.raw_dir, exist_ok=True) + os.makedirs(self.processed_dir, exist_ok=True) + + # 创建时间分区目录 + self._create_date_directories() + + # 数据采集时间 + self.collection_time = datetime.now() + + # 默认时间范围 + self.start_date = "2010-01-01" + self.end_date = self.collection_time.strftime('%Y-%m-%d') + + logger.info(f"财务数据采集时间范围: {self.start_date} 至 {self.end_date}") + + def _create_date_directories(self): + """创建日期分区目录""" + # 创建季度目录(2010Q1-2026Q4) + for year in range(2010, 2027): + for quarter in range(1, 5): + quarter_dir = os.path.join(self.raw_dir, f"{year}_Q{quarter}") + os.makedirs(quarter_dir, exist_ok=True) + + # 创建年度目录 + for year in range(2010, 2027): + year_dir = os.path.join(self.raw_dir, "annual", str(year)) + os.makedirs(year_dir, exist_ok=True) + + def load_stock_list(self) -> List[str]: + """加载股票代码列表 + + Returns: + List[str]: 股票代码列表 + """ + logger.info("加载股票代码列表") + + # 检查是否有已保存的股票基础信息 + processed_dir = os.path.join(self.base_dir, "processed", "stock_info") + if os.path.exists(processed_dir): + # 查找最新的处理文件 + parquet_files = [f for f in os.listdir(processed_dir) if f.endswith('.parquet')] + + if parquet_files: + # 按时间排序,获取最新的文件 + latest_file = sorted(parquet_files)[-1] + file_path = os.path.join(processed_dir, latest_file) + + try: + df = pd.read_parquet(file_path) + if 'symbol' in df.columns: + stock_list = df['symbol'].dropna().unique().tolist() + logger.info(f"从处理文件加载到 {len(stock_list)} 只股票代码") + return stock_list + except Exception as e: + logger.warning(f"读取处理文件失败: {e}") + + # 如果处理文件不存在或读取失败,尝试从AKShare获取 + try: + import akshare as ak + stock_list_df = ak.stock_info_a_code_name() + if stock_list_df is not None and not stock_list_df.empty: + stock_list = stock_list_df['code'].dropna().unique().tolist() + logger.info(f"从AKShare获取到 {len(stock_list)} 只股票代码") + return stock_list + except Exception as e: + logger.error(f"获取股票列表失败: {e}") + + return [] + + def collect_financial_indicators(self, symbol: str) -> Optional[pd.DataFrame]: + """采集单只股票的财务指标 + + Args: + symbol: 股票代码 + + Returns: + Optional[pd.DataFrame]: 财务指标数据 + """ + logger.debug(f"开始采集股票 {symbol} 的财务指标") + + try: + import akshare as ak + + # 获取财务指标数据 + financial_data = ak.stock_financial_analysis_indicator(symbol=symbol) + + if financial_data is None or financial_data.empty: + logger.warning(f"股票 {symbol} 未获取到财务指标数据") + return None + + # 添加股票代码 + financial_data['symbol'] = symbol + + # 添加采集时间 + financial_data['collection_time'] = self.collection_time + + logger.debug(f"股票 {symbol} 成功采集 {len(financial_data)} 条财务指标数据") + + return financial_data + + except Exception as e: + logger.error(f"采集股票 {symbol} 财务指标失败: {e}") + return None + + def collect_balance_sheet(self, symbol: str) -> Optional[pd.DataFrame]: + """采集资产负债表数据 + + Args: + symbol: 股票代码 + + Returns: + Optional[pd.DataFrame]: 资产负债表数据 + """ + logger.debug(f"开始采集股票 {symbol} 的资产负债表") + + try: + import akshare as ak + + # 获取资产负债表数据 + balance_sheet = ak.stock_balance_sheet_by_report_em(symbol=symbol) + + if balance_sheet is None or balance_sheet.empty: + logger.warning(f"股票 {symbol} 未获取到资产负债表") + return None + + # 添加股票代码 + balance_sheet['symbol'] = symbol + + # 添加采集时间 + balance_sheet['collection_time'] = self.collection_time + + logger.debug(f"股票 {symbol} 成功采集 {len(balance_sheet)} 条资产负债表数据") + + return balance_sheet + + except Exception as e: + logger.error(f"采集股票 {symbol} 资产负债表失败: {e}") + return None + + def collect_income_statement(self, symbol: str) -> Optional[pd.DataFrame]: + """采集利润表数据 + + Args: + symbol: 股票代码 + + Returns: + Optional[pd.DataFrame]: 利润表数据 + """ + logger.debug(f"开始采集股票 {symbol} 的利润表") + + try: + import akshare as ak + + # 获取利润表数据 + income_statement = ak.stock_profit_sheet_by_report_em(symbol=symbol) + + if income_statement is None or income_statement.empty: + logger.warning(f"股票 {symbol} 未获取到利润表") + return None + + # 添加股票代码 + income_statement['symbol'] = symbol + + # 添加采集时间 + income_statement['collection_time'] = self.collection_time + + logger.debug(f"股票 {symbol} 成功采集 {len(income_statement)} 条利润表数据") + + return income_statement + + except Exception as e: + logger.error(f"采集股票 {symbol} 利润表失败: {e}") + return None + + def collect_cash_flow(self, symbol: str) -> Optional[pd.DataFrame]: + """采集现金流量表数据 + + Args: + symbol: 股票代码 + + Returns: + Optional[pd.DataFrame]: 现金流量表数据 + """ + logger.debug(f"开始采集股票 {symbol} 的现金流量表") + + try: + import akshare as ak + + # 获取现金流量表数据 + cash_flow = ak.stock_cash_flow_sheet_by_report_em(symbol=symbol) + + if cash_flow is None or cash_flow.empty: + logger.warning(f"股票 {symbol} 未获取到现金流量表") + return None + + # 添加股票代码 + cash_flow['symbol'] = symbol + + # 添加采集时间 + cash_flow['collection_time'] = self.collection_time + + logger.debug(f"股票 {symbol} 成功采集 {len(cash_flow)} 条现金流量表数据") + + return cash_flow + + except Exception as e: + logger.error(f"采集股票 {symbol} 现金流量表失败: {e}") + return None + + def batch_collect_financial_data(self, stock_list: List[str] = None, max_workers: int = 3) -> Dict: + """批量采集财务数据 + + Args: + stock_list: 股票代码列表 + max_workers: 最大并发线程数 + + Returns: + Dict: 批量采集结果 + """ + logger.info(f"开始批量采集A股财务数据,最大并发数: {max_workers}") + + if stock_list is None: + stock_list = self.load_stock_list() + + if not stock_list: + logger.error("股票列表为空,无法采集") + return { + "success": False, + "error": "股票列表为空", + "collected": 0, + "failed": 0 + } + + total_stocks = len(stock_list) + logger.info(f"需要采集 {total_stocks} 只股票的财务数据") + + # 分批处理(财务数据请求较慢,分批小批量) + batch_size = 20 + batches = [] + for i in range(0, total_stocks, batch_size): + batches.append(stock_list[i:i + batch_size]) + + logger.info(f"共分为 {len(batches)} 个批次") + + financial_indicators_all = [] + balance_sheets_all = [] + income_statements_all = [] + cash_flows_all = [] + + success_count = 0 + failed_count = 0 + failed_symbols = [] + + for batch_idx, batch in enumerate(batches): + logger.info(f"处理批次 {batch_idx + 1}/{len(batches)},包含 {len(batch)} 只股票") + + batch_start_time = time.time() + + # 使用线程池并发采集 + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # 提交财务指标采集任务 + future_to_data_type = {} + + for symbol in batch: + # 提交四种财务数据采集任务 + future_to_data_type[executor.submit(self.collect_financial_indicators, symbol)] = (symbol, 'indicators') + future_to_data_type[executor.submit(self.collect_balance_sheet, symbol)] = (symbol, 'balance_sheet') + future_to_data_type[executor.submit(self.collect_income_statement, symbol)] = (symbol, 'income_statement') + future_to_data_type[executor.submit(self.collect_cash_flow, symbol)] = (symbol, 'cash_flow') + + # 处理完成的任务 + for future in as_completed(future_to_data_type): + symbol, data_type = future_to_data_type[future] + + try: + result = future.result(timeout=30) + + if result is not None and not result.empty: + if data_type == 'indicators': + financial_indicators_all.append(result) + elif data_type == 'balance_sheet': + balance_sheets_all.append(result) + elif data_type == 'income_statement': + income_statements_all.append(result) + elif data_type == 'cash_flow': + cash_flows_all.append(result) + + logger.debug(f"股票 {symbol} {data_type} 采集成功,{len(result)} 条数据") + else: + logger.warning(f"股票 {symbol} {data_type} 采集失败或数据为空") + + except Exception as e: + logger.error(f"股票 {symbol} {data_type} 采集异常: {e}") + + # 检查这一批次是否至少有一种数据采集成功 + if (len(financial_indicators_all) > 0 or len(balance_sheets_all) > 0 or + len(income_statements_all) > 0 or len(cash_flows_all) > 0): + success_count += len(batch) + else: + failed_count += len(batch) + failed_symbols.extend(batch) + + batch_time = time.time() - batch_start_time + logger.info(f"批次 {batch_idx + 1} 完成,耗时 {batch_time:.2f}秒") + + # 避免请求过快 + time.sleep(5) + + # 保存各种财务数据 + saved_files = [] + + if financial_indicators_all: + indicators_df = pd.concat(financial_indicators_all, ignore_index=True) + saved_files.extend(self._save_financial_data(indicators_df, "financial_indicators")) + + if balance_sheets_all: + balance_df = pd.concat(balance_sheets_all, ignore_index=True) + saved_files.extend(self._save_financial_data(balance_df, "balance_sheet")) + + if income_statements_all: + income_df = pd.concat(income_statements_all, ignore_index=True) + saved_files.extend(self._save_financial_data(income_df, "income_statement")) + + if cash_flows_all: + cash_flow_df = pd.concat(cash_flows_all, ignore_index=True) + saved_files.extend(self._save_financial_data(cash_flow_df, "cash_flow")) + + # 计算估值指标 + if financial_indicators_all: + valuation_df = self._calculate_valuation_indicators(financial_indicators_all) + if valuation_df is not None and not valuation_df.empty: + saved_files.extend(self._save_valuation_indicators(valuation_df)) + + # 生成结果报告 + result = { + "success": True, + "total_stocks": total_stocks, + "collected": success_count, + "failed": failed_count, + "failed_symbols": failed_symbols, + "indicators_records": len(financial_indicators_all) if financial_indicators_all else 0, + "balance_sheet_records": len(balance_sheets_all) if balance_sheets_all else 0, + "income_statement_records": len(income_statements_all) if income_statements_all else 0, + "cash_flow_records": len(cash_flows_all) if cash_flows_all else 0, + "collection_time": self.collection_time.isoformat(), + "saved_files": saved_files + } + + logger.info(f"财务数据采集完成: 成功 {success_count}/{total_stocks},失败 {failed_count}") + + return result + + def _save_financial_data(self, df: pd.DataFrame, data_type: str) -> List[str]: + """保存财务数据 + + Args: + df: 财务数据 + data_type: 数据类型 + + Returns: + List[str]: 保存的文件路径列表 + """ + saved_files = [] + + if df is None or df.empty: + return saved_files + + try: + # 保存原始数据 + timestamp = self.collection_time.strftime('%Y%m%d_%H%M%S') + filename = f"{data_type}_{timestamp}.parquet" + filepath = os.path.join(self.raw_dir, filename) + + df.to_parquet(filepath, compression='snappy') + saved_files.append(filepath) + logger.info(f"{data_type} 已保存: {filepath}") + + # 保存处理后数据 + processed_filename = f"{data_type}_processed_{timestamp}.parquet" + processed_filepath = os.path.join(self.processed_dir, processed_filename) + + # 处理数据(标准化等) + processed_df = self._process_financial_data(df, data_type) + processed_df.to_parquet(processed_filepath, compression='snappy') + saved_files.append(processed_filepath) + logger.info(f"处理后 {data_type} 已保存: {processed_filepath}") + + except Exception as e: + logger.error(f"保存 {data_type} 数据失败: {e}") + + return saved_files + + def _process_financial_data(self, df: pd.DataFrame, data_type: str) -> pd.DataFrame: + """处理财务数据 + + Args: + df: 原始财务数据 + data_type: 数据类型 + + Returns: + pd.DataFrame: 处理后的财务数据 + """ + processed_df = df.copy() + + try: + # 标准化日期列名 + date_columns = ['公告日期', '报告期', 'end_date', 'report_date'] + for col in date_columns: + if col in processed_df.columns: + processed_df['report_date'] = pd.to_datetime(processed_df[col], errors='coerce') + break + + # 添加处理时间 + processed_df['processed_time'] = datetime.now() + + # 添加数据版本 + processed_df['data_version'] = '1.0.0' + + # 根据数据类型进行特定处理 + if data_type == 'financial_indicators': + # 标准化关键指标名称 + indicator_mapping = { + '基本每股收益': 'eps_basic', + '稀释每股收益': 'eps_diluted', + '每股净资产': 'bps', + '每股经营现金流': 'cfps', + '净资产收益率': 'roe', + '总资产报酬率': 'roa', + '销售毛利率': 'gross_margin', + '销售净利率': 'net_margin', + '资产负债率': 'debt_ratio', + '流动比率': 'current_ratio', + '速动比率': 'quick_ratio' + } + + # 重命名列 + for chinese_name, english_name in indicator_mapping.items(): + if chinese_name in processed_df.columns: + processed_df[english_name] = pd.to_numeric(processed_df[chinese_name], errors='coerce') + + logger.info(f"{data_type} 数据处理完成,共 {len(processed_df)} 条记录") + + except Exception as e: + logger.error(f"处理 {data_type} 数据失败: {e}") + + return processed_df + + def _calculate_valuation_indicators(self, financial_indicators: List[pd.DataFrame]) -> Optional[pd.DataFrame]: + """计算估值指标 + + Args: + financial_indicators: 财务指标数据列表 + + Returns: + Optional[pd.DataFrame]: 估值指标数据 + """ + if not financial_indicators: + return None + + try: + # 合并所有财务指标数据 + all_indicators = pd.concat(financial_indicators, ignore_index=True) + + # 创建估值指标DataFrame + valuation_df = pd.DataFrame() + + # 提取必要信息 + if 'symbol' in all_indicators.columns: + valuation_df['symbol'] = all_indicators['symbol'] + + if '报告期' in all_indicators.columns: + valuation_df['report_date'] = pd.to_datetime(all_indicators['报告期'], errors='coerce') + + # 计算PE(市盈率) - 需要市场价格数据,这里先假设有市值数据 + # 实际应用中需要结合日线数据计算 + + # 计算PB(市净率) - 需要市净率数据 + if '市净率' in all_indicators.columns: + valuation_df['pb_ratio'] = pd.to_numeric(all_indicators['市净率'], errors='coerce') + + # 计算ROE(净资产收益率) + if '净资产收益率' in all_indicators.columns: + valuation_df['roe'] = pd.to_numeric(all_indicators['净资产收益率'], errors='coerce') + + # 计算股息率(需要分红数据) + # 这里先占位,实际需要分红数据和股价数据 + + # 添加计算时间 + valuation_df['calculation_time'] = datetime.now() + + logger.info(f"估值指标计算完成,共 {len(valuation_df)} 条记录") + + return valuation_df + + except Exception as e: + logger.error(f"计算估值指标失败: {e}") + return None + + def _save_valuation_indicators(self, valuation_df: pd.DataFrame) -> List[str]: + """保存估值指标数据 + + Args: + valuation_df: 估值指标数据 + + Returns: + List[str]: 保存的文件路径列表 + """ + saved_files = [] + + if valuation_df is None or valuation_df.empty: + return saved_files + + try: + # 保存估值指标数据 + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + filename = f"valuation_indicators_{timestamp}.parquet" + filepath = os.path.join(self.processed_dir, "valuation", filename) + + # 确保目录存在 + os.makedirs(os.path.dirname(filepath), exist_ok=True) + + valuation_df.to_parquet(filepath, compression='snappy') + saved_files.append(filepath) + logger.info(f"估值指标已保存: {filepath}") + + except Exception as e: + logger.error(f"保存估值指标失败: {e}") + + return saved_files + + +def main(): + """主函数""" + print("=" * 70) + print("📊 A股财务数据采集") + print("=" * 70) + + # 创建采集器 + collector = AStockFinancialDataCollector() + + # 获取股票列表 + print("获取股票代码列表...") + stock_list = collector.load_stock_list() + + if not stock_list: + print("❌ 未获取到股票列表,请检查基础信息采集") + return + + print(f"✅ 获取到 {len(stock_list)} 只股票代码") + print("开始批量采集财务数据(先测试10只)...") + + # 批量采集(先测试小批量) + result = collector.batch_collect_financial_data( + stock_list=stock_list[:10], # 先测试10只 + max_workers=3 + ) + + # 输出结果 + print("\n" + "=" * 70) + print("📋 采集结果") + print("=" * 70) + + if result.get("success", False): + print(f"✅ 财务数据采集成功!") + print(f"📈 统计信息:") + print(f" 股票总数: {result.get('total_stocks', 0)}") + print(f" 采集成功: {result.get('collected', 0)}") + print(f" 采集失败: {result.get('failed', 0)}") + print(f" 财务指标记录: {result.get('indicators_records', 0)}") + print(f" 资产负债表记录: {result.get('balance_sheet_records', 0)}") + print(f" 利润表记录: {result.get('income_statement_records', 0)}") + print(f" 现金流量表记录: {result.get('cash_flow_records', 0)}") + + saved_files = result.get("saved_files", []) + print(f"💾 保存文件: {len(saved_files)} 个") + + # 保存采集报告 + report_dir = os.path.join(collector.base_dir, "reports") + os.makedirs(report_dir, exist_ok=True) + + report_file = os.path.join(report_dir, f"financial_data_collection_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") + + with open(report_file, 'w', encoding='utf-8') as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + print(f"\n📄 报告文件: {report_file}") + + else: + print(f"❌ 财务数据采集失败") + print(f"错误信息: {result.get('error', '未知错误')}") + + print("=" * 70) + + +if __name__ == "__main__": + main() \ No newline at end of file