auto-sync: 2026-03-26 11:40:53

This commit is contained in:
cfdaily
2026-03-26 11:40:53 +08:00
parent f138d0a988
commit 68836379ba
@@ -0,0 +1,637 @@
#!/usr/bin/env python3
"""
A股财务数据采集脚本
获取资产负债表、利润表、现金流量表核心指标
计算PE、PB、ROE、股息率等常用估值指标
"""
import sys
import os
import time
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import logging
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
warnings.filterwarnings('ignore')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class AStockFinancialDataCollector:
"""A股财务数据采集器"""
def __init__(self):
"""初始化采集器"""
logger.info("A股财务数据采集器初始化")
# 基础路径
self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data"
self.raw_dir = os.path.join(self.base_dir, "raw", "financial_reports")
self.processed_dir = os.path.join(self.base_dir, "processed", "financial_indicators")
# 确保目录存在
os.makedirs(self.raw_dir, exist_ok=True)
os.makedirs(self.processed_dir, exist_ok=True)
# 创建时间分区目录
self._create_date_directories()
# 数据采集时间
self.collection_time = datetime.now()
# 默认时间范围
self.start_date = "2010-01-01"
self.end_date = self.collection_time.strftime('%Y-%m-%d')
logger.info(f"财务数据采集时间范围: {self.start_date}{self.end_date}")
def _create_date_directories(self):
"""创建日期分区目录"""
# 创建季度目录(2010Q1-2026Q4
for year in range(2010, 2027):
for quarter in range(1, 5):
quarter_dir = os.path.join(self.raw_dir, f"{year}_Q{quarter}")
os.makedirs(quarter_dir, exist_ok=True)
# 创建年度目录
for year in range(2010, 2027):
year_dir = os.path.join(self.raw_dir, "annual", str(year))
os.makedirs(year_dir, exist_ok=True)
def load_stock_list(self) -> List[str]:
"""加载股票代码列表
Returns:
List[str]: 股票代码列表
"""
logger.info("加载股票代码列表")
# 检查是否有已保存的股票基础信息
processed_dir = os.path.join(self.base_dir, "processed", "stock_info")
if os.path.exists(processed_dir):
# 查找最新的处理文件
parquet_files = [f for f in os.listdir(processed_dir) if f.endswith('.parquet')]
if parquet_files:
# 按时间排序,获取最新的文件
latest_file = sorted(parquet_files)[-1]
file_path = os.path.join(processed_dir, latest_file)
try:
df = pd.read_parquet(file_path)
if 'symbol' in df.columns:
stock_list = df['symbol'].dropna().unique().tolist()
logger.info(f"从处理文件加载到 {len(stock_list)} 只股票代码")
return stock_list
except Exception as e:
logger.warning(f"读取处理文件失败: {e}")
# 如果处理文件不存在或读取失败,尝试从AKShare获取
try:
import akshare as ak
stock_list_df = ak.stock_info_a_code_name()
if stock_list_df is not None and not stock_list_df.empty:
stock_list = stock_list_df['code'].dropna().unique().tolist()
logger.info(f"从AKShare获取到 {len(stock_list)} 只股票代码")
return stock_list
except Exception as e:
logger.error(f"获取股票列表失败: {e}")
return []
def collect_financial_indicators(self, symbol: str) -> Optional[pd.DataFrame]:
"""采集单只股票的财务指标
Args:
symbol: 股票代码
Returns:
Optional[pd.DataFrame]: 财务指标数据
"""
logger.debug(f"开始采集股票 {symbol} 的财务指标")
try:
import akshare as ak
# 获取财务指标数据
financial_data = ak.stock_financial_analysis_indicator(symbol=symbol)
if financial_data is None or financial_data.empty:
logger.warning(f"股票 {symbol} 未获取到财务指标数据")
return None
# 添加股票代码
financial_data['symbol'] = symbol
# 添加采集时间
financial_data['collection_time'] = self.collection_time
logger.debug(f"股票 {symbol} 成功采集 {len(financial_data)} 条财务指标数据")
return financial_data
except Exception as e:
logger.error(f"采集股票 {symbol} 财务指标失败: {e}")
return None
def collect_balance_sheet(self, symbol: str) -> Optional[pd.DataFrame]:
"""采集资产负债表数据
Args:
symbol: 股票代码
Returns:
Optional[pd.DataFrame]: 资产负债表数据
"""
logger.debug(f"开始采集股票 {symbol} 的资产负债表")
try:
import akshare as ak
# 获取资产负债表数据
balance_sheet = ak.stock_balance_sheet_by_report_em(symbol=symbol)
if balance_sheet is None or balance_sheet.empty:
logger.warning(f"股票 {symbol} 未获取到资产负债表")
return None
# 添加股票代码
balance_sheet['symbol'] = symbol
# 添加采集时间
balance_sheet['collection_time'] = self.collection_time
logger.debug(f"股票 {symbol} 成功采集 {len(balance_sheet)} 条资产负债表数据")
return balance_sheet
except Exception as e:
logger.error(f"采集股票 {symbol} 资产负债表失败: {e}")
return None
def collect_income_statement(self, symbol: str) -> Optional[pd.DataFrame]:
"""采集利润表数据
Args:
symbol: 股票代码
Returns:
Optional[pd.DataFrame]: 利润表数据
"""
logger.debug(f"开始采集股票 {symbol} 的利润表")
try:
import akshare as ak
# 获取利润表数据
income_statement = ak.stock_profit_sheet_by_report_em(symbol=symbol)
if income_statement is None or income_statement.empty:
logger.warning(f"股票 {symbol} 未获取到利润表")
return None
# 添加股票代码
income_statement['symbol'] = symbol
# 添加采集时间
income_statement['collection_time'] = self.collection_time
logger.debug(f"股票 {symbol} 成功采集 {len(income_statement)} 条利润表数据")
return income_statement
except Exception as e:
logger.error(f"采集股票 {symbol} 利润表失败: {e}")
return None
def collect_cash_flow(self, symbol: str) -> Optional[pd.DataFrame]:
"""采集现金流量表数据
Args:
symbol: 股票代码
Returns:
Optional[pd.DataFrame]: 现金流量表数据
"""
logger.debug(f"开始采集股票 {symbol} 的现金流量表")
try:
import akshare as ak
# 获取现金流量表数据
cash_flow = ak.stock_cash_flow_sheet_by_report_em(symbol=symbol)
if cash_flow is None or cash_flow.empty:
logger.warning(f"股票 {symbol} 未获取到现金流量表")
return None
# 添加股票代码
cash_flow['symbol'] = symbol
# 添加采集时间
cash_flow['collection_time'] = self.collection_time
logger.debug(f"股票 {symbol} 成功采集 {len(cash_flow)} 条现金流量表数据")
return cash_flow
except Exception as e:
logger.error(f"采集股票 {symbol} 现金流量表失败: {e}")
return None
def batch_collect_financial_data(self, stock_list: List[str] = None, max_workers: int = 3) -> Dict:
"""批量采集财务数据
Args:
stock_list: 股票代码列表
max_workers: 最大并发线程数
Returns:
Dict: 批量采集结果
"""
logger.info(f"开始批量采集A股财务数据,最大并发数: {max_workers}")
if stock_list is None:
stock_list = self.load_stock_list()
if not stock_list:
logger.error("股票列表为空,无法采集")
return {
"success": False,
"error": "股票列表为空",
"collected": 0,
"failed": 0
}
total_stocks = len(stock_list)
logger.info(f"需要采集 {total_stocks} 只股票的财务数据")
# 分批处理(财务数据请求较慢,分批小批量)
batch_size = 20
batches = []
for i in range(0, total_stocks, batch_size):
batches.append(stock_list[i:i + batch_size])
logger.info(f"共分为 {len(batches)} 个批次")
financial_indicators_all = []
balance_sheets_all = []
income_statements_all = []
cash_flows_all = []
success_count = 0
failed_count = 0
failed_symbols = []
for batch_idx, batch in enumerate(batches):
logger.info(f"处理批次 {batch_idx + 1}/{len(batches)},包含 {len(batch)} 只股票")
batch_start_time = time.time()
# 使用线程池并发采集
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交财务指标采集任务
future_to_data_type = {}
for symbol in batch:
# 提交四种财务数据采集任务
future_to_data_type[executor.submit(self.collect_financial_indicators, symbol)] = (symbol, 'indicators')
future_to_data_type[executor.submit(self.collect_balance_sheet, symbol)] = (symbol, 'balance_sheet')
future_to_data_type[executor.submit(self.collect_income_statement, symbol)] = (symbol, 'income_statement')
future_to_data_type[executor.submit(self.collect_cash_flow, symbol)] = (symbol, 'cash_flow')
# 处理完成的任务
for future in as_completed(future_to_data_type):
symbol, data_type = future_to_data_type[future]
try:
result = future.result(timeout=30)
if result is not None and not result.empty:
if data_type == 'indicators':
financial_indicators_all.append(result)
elif data_type == 'balance_sheet':
balance_sheets_all.append(result)
elif data_type == 'income_statement':
income_statements_all.append(result)
elif data_type == 'cash_flow':
cash_flows_all.append(result)
logger.debug(f"股票 {symbol} {data_type} 采集成功,{len(result)} 条数据")
else:
logger.warning(f"股票 {symbol} {data_type} 采集失败或数据为空")
except Exception as e:
logger.error(f"股票 {symbol} {data_type} 采集异常: {e}")
# 检查这一批次是否至少有一种数据采集成功
if (len(financial_indicators_all) > 0 or len(balance_sheets_all) > 0 or
len(income_statements_all) > 0 or len(cash_flows_all) > 0):
success_count += len(batch)
else:
failed_count += len(batch)
failed_symbols.extend(batch)
batch_time = time.time() - batch_start_time
logger.info(f"批次 {batch_idx + 1} 完成,耗时 {batch_time:.2f}")
# 避免请求过快
time.sleep(5)
# 保存各种财务数据
saved_files = []
if financial_indicators_all:
indicators_df = pd.concat(financial_indicators_all, ignore_index=True)
saved_files.extend(self._save_financial_data(indicators_df, "financial_indicators"))
if balance_sheets_all:
balance_df = pd.concat(balance_sheets_all, ignore_index=True)
saved_files.extend(self._save_financial_data(balance_df, "balance_sheet"))
if income_statements_all:
income_df = pd.concat(income_statements_all, ignore_index=True)
saved_files.extend(self._save_financial_data(income_df, "income_statement"))
if cash_flows_all:
cash_flow_df = pd.concat(cash_flows_all, ignore_index=True)
saved_files.extend(self._save_financial_data(cash_flow_df, "cash_flow"))
# 计算估值指标
if financial_indicators_all:
valuation_df = self._calculate_valuation_indicators(financial_indicators_all)
if valuation_df is not None and not valuation_df.empty:
saved_files.extend(self._save_valuation_indicators(valuation_df))
# 生成结果报告
result = {
"success": True,
"total_stocks": total_stocks,
"collected": success_count,
"failed": failed_count,
"failed_symbols": failed_symbols,
"indicators_records": len(financial_indicators_all) if financial_indicators_all else 0,
"balance_sheet_records": len(balance_sheets_all) if balance_sheets_all else 0,
"income_statement_records": len(income_statements_all) if income_statements_all else 0,
"cash_flow_records": len(cash_flows_all) if cash_flows_all else 0,
"collection_time": self.collection_time.isoformat(),
"saved_files": saved_files
}
logger.info(f"财务数据采集完成: 成功 {success_count}/{total_stocks},失败 {failed_count}")
return result
def _save_financial_data(self, df: pd.DataFrame, data_type: str) -> List[str]:
"""保存财务数据
Args:
df: 财务数据
data_type: 数据类型
Returns:
List[str]: 保存的文件路径列表
"""
saved_files = []
if df is None or df.empty:
return saved_files
try:
# 保存原始数据
timestamp = self.collection_time.strftime('%Y%m%d_%H%M%S')
filename = f"{data_type}_{timestamp}.parquet"
filepath = os.path.join(self.raw_dir, filename)
df.to_parquet(filepath, compression='snappy')
saved_files.append(filepath)
logger.info(f"{data_type} 已保存: {filepath}")
# 保存处理后数据
processed_filename = f"{data_type}_processed_{timestamp}.parquet"
processed_filepath = os.path.join(self.processed_dir, processed_filename)
# 处理数据(标准化等)
processed_df = self._process_financial_data(df, data_type)
processed_df.to_parquet(processed_filepath, compression='snappy')
saved_files.append(processed_filepath)
logger.info(f"处理后 {data_type} 已保存: {processed_filepath}")
except Exception as e:
logger.error(f"保存 {data_type} 数据失败: {e}")
return saved_files
def _process_financial_data(self, df: pd.DataFrame, data_type: str) -> pd.DataFrame:
"""处理财务数据
Args:
df: 原始财务数据
data_type: 数据类型
Returns:
pd.DataFrame: 处理后的财务数据
"""
processed_df = df.copy()
try:
# 标准化日期列名
date_columns = ['公告日期', '报告期', 'end_date', 'report_date']
for col in date_columns:
if col in processed_df.columns:
processed_df['report_date'] = pd.to_datetime(processed_df[col], errors='coerce')
break
# 添加处理时间
processed_df['processed_time'] = datetime.now()
# 添加数据版本
processed_df['data_version'] = '1.0.0'
# 根据数据类型进行特定处理
if data_type == 'financial_indicators':
# 标准化关键指标名称
indicator_mapping = {
'基本每股收益': 'eps_basic',
'稀释每股收益': 'eps_diluted',
'每股净资产': 'bps',
'每股经营现金流': 'cfps',
'净资产收益率': 'roe',
'总资产报酬率': 'roa',
'销售毛利率': 'gross_margin',
'销售净利率': 'net_margin',
'资产负债率': 'debt_ratio',
'流动比率': 'current_ratio',
'速动比率': 'quick_ratio'
}
# 重命名列
for chinese_name, english_name in indicator_mapping.items():
if chinese_name in processed_df.columns:
processed_df[english_name] = pd.to_numeric(processed_df[chinese_name], errors='coerce')
logger.info(f"{data_type} 数据处理完成,共 {len(processed_df)} 条记录")
except Exception as e:
logger.error(f"处理 {data_type} 数据失败: {e}")
return processed_df
def _calculate_valuation_indicators(self, financial_indicators: List[pd.DataFrame]) -> Optional[pd.DataFrame]:
"""计算估值指标
Args:
financial_indicators: 财务指标数据列表
Returns:
Optional[pd.DataFrame]: 估值指标数据
"""
if not financial_indicators:
return None
try:
# 合并所有财务指标数据
all_indicators = pd.concat(financial_indicators, ignore_index=True)
# 创建估值指标DataFrame
valuation_df = pd.DataFrame()
# 提取必要信息
if 'symbol' in all_indicators.columns:
valuation_df['symbol'] = all_indicators['symbol']
if '报告期' in all_indicators.columns:
valuation_df['report_date'] = pd.to_datetime(all_indicators['报告期'], errors='coerce')
# 计算PE(市盈率) - 需要市场价格数据,这里先假设有市值数据
# 实际应用中需要结合日线数据计算
# 计算PB(市净率) - 需要市净率数据
if '市净率' in all_indicators.columns:
valuation_df['pb_ratio'] = pd.to_numeric(all_indicators['市净率'], errors='coerce')
# 计算ROE(净资产收益率)
if '净资产收益率' in all_indicators.columns:
valuation_df['roe'] = pd.to_numeric(all_indicators['净资产收益率'], errors='coerce')
# 计算股息率(需要分红数据)
# 这里先占位,实际需要分红数据和股价数据
# 添加计算时间
valuation_df['calculation_time'] = datetime.now()
logger.info(f"估值指标计算完成,共 {len(valuation_df)} 条记录")
return valuation_df
except Exception as e:
logger.error(f"计算估值指标失败: {e}")
return None
def _save_valuation_indicators(self, valuation_df: pd.DataFrame) -> List[str]:
"""保存估值指标数据
Args:
valuation_df: 估值指标数据
Returns:
List[str]: 保存的文件路径列表
"""
saved_files = []
if valuation_df is None or valuation_df.empty:
return saved_files
try:
# 保存估值指标数据
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"valuation_indicators_{timestamp}.parquet"
filepath = os.path.join(self.processed_dir, "valuation", filename)
# 确保目录存在
os.makedirs(os.path.dirname(filepath), exist_ok=True)
valuation_df.to_parquet(filepath, compression='snappy')
saved_files.append(filepath)
logger.info(f"估值指标已保存: {filepath}")
except Exception as e:
logger.error(f"保存估值指标失败: {e}")
return saved_files
def main():
"""主函数"""
print("=" * 70)
print("📊 A股财务数据采集")
print("=" * 70)
# 创建采集器
collector = AStockFinancialDataCollector()
# 获取股票列表
print("获取股票代码列表...")
stock_list = collector.load_stock_list()
if not stock_list:
print("❌ 未获取到股票列表,请检查基础信息采集")
return
print(f"✅ 获取到 {len(stock_list)} 只股票代码")
print("开始批量采集财务数据(先测试10只)...")
# 批量采集(先测试小批量)
result = collector.batch_collect_financial_data(
stock_list=stock_list[:10], # 先测试10只
max_workers=3
)
# 输出结果
print("\n" + "=" * 70)
print("📋 采集结果")
print("=" * 70)
if result.get("success", False):
print(f"✅ 财务数据采集成功!")
print(f"📈 统计信息:")
print(f" 股票总数: {result.get('total_stocks', 0)}")
print(f" 采集成功: {result.get('collected', 0)}")
print(f" 采集失败: {result.get('failed', 0)}")
print(f" 财务指标记录: {result.get('indicators_records', 0)}")
print(f" 资产负债表记录: {result.get('balance_sheet_records', 0)}")
print(f" 利润表记录: {result.get('income_statement_records', 0)}")
print(f" 现金流量表记录: {result.get('cash_flow_records', 0)}")
saved_files = result.get("saved_files", [])
print(f"💾 保存文件: {len(saved_files)}")
# 保存采集报告
report_dir = os.path.join(collector.base_dir, "reports")
os.makedirs(report_dir, exist_ok=True)
report_file = os.path.join(report_dir, f"financial_data_collection_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"\n📄 报告文件: {report_file}")
else:
print(f"❌ 财务数据采集失败")
print(f"错误信息: {result.get('error', '未知错误')}")
print("=" * 70)
if __name__ == "__main__":
main()