auto-sync: 2026-03-26 11:35:20

This commit is contained in:
cfdaily
2026-03-26 11:35:20 +08:00
parent 88b0055981
commit 5cb232a739
@@ -8,241 +8,197 @@ import os
import time
import json
import pandas as pd
import numpy as np
from datetime import datetime
from typing import List, Dict, Optional, Any
import logging
import warnings
# 添加项目路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
warnings.filterwarnings('ignore')
from utils.data_utils import DataUtils
from utils.log_utils import LogUtils
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class AStockBasicInfoCollector:
"""A股基础信息数据采集器"""
def __init__(self, config_path: str = None):
"""初始化采集器
Args:
config_path: 配置文件路径
"""
# 配置日志
self.logger = LogUtils.setup_logger('a_stock_basic_info')
# 加载配置
self.config = self._load_config(config_path)
def __init__(self):
"""初始化采集器"""
logger.info("A股基础信息采集器初始化")
# 基础路径
self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data"
self.raw_dir = os.path.join(self.base_dir, "raw")
self.processed_dir = os.path.join(self.base_dir, "processed")
self.raw_dir = os.path.join(self.base_dir, "raw", "stock_info")
self.processed_dir = os.path.join(self.base_dir, "processed", "stock_info")
# 确保目录存在
os.makedirs(os.path.join(self.raw_dir, "stock_info"), exist_ok=True)
os.makedirs(os.path.join(self.processed_dir, "stock_info"), exist_ok=True)
os.makedirs(self.raw_dir, exist_ok=True)
os.makedirs(self.processed_dir, exist_ok=True)
self.logger.info("A股基础信息采集器初始化完成")
# 数据采集时间
self.collection_time = datetime.now()
logger.info("采集器初始化完成")
def _load_config(self, config_path: Optional[str] = None) -> Dict:
"""加载配置
Args:
config_path: 配置文件路径
Returns:
Dict: 配置信息
"""
default_config = {
"data_sources": {
"akshare": {
"enabled": True,
"rate_limit": 30, # 请求间隔毫秒
"max_retries": 3,
"retry_delay": 5 # 重试延迟秒
},
"tushare": {
"enabled": False,
"token": "", # 需要配置API Token
"rate_limit": 500 # Tushare的请求限制
}
},
"data_fields": {
"basic_info": [
"symbol", # 股票代码
"name", # 股票名称
"industry", # 所属行业
"area", # 地区
"market", # 市场类型(主板/创业板/科创板)
"list_date", # 上市日期
"delist_date", # 退市日期(如未退市则为空)
"exchange", # 交易所(SH/SZ
"is_hs", # 是否沪深港通标的
"is_st", # 是否ST/*ST股票
"status", # 上市状态(L上市 D退市 P暂停上市)
"ts_code" # Tushare代码
]
},
"storage": {
"raw_format": "parquet",
"processed_format": "parquet",
"compression": "snappy",
"partition_by": ["year", "month"]
}
}
if config_path and os.path.exists(config_path):
try:
with open(config_path, 'r', encoding='utf-8') as f:
user_config = json.load(f)
default_config.update(user_config)
self.logger.info(f"{config_path} 加载用户配置")
except Exception as e:
self.logger.warning(f"加载用户配置文件失败: {e}")
return default_config
def collect_basic_info_akshare(self) -> pd.DataFrame:
"""使用AKShare采集A股基础信息
def collect_basic_info(self) -> pd.DataFrame:
"""采集A股基础信息
Returns:
pd.DataFrame: 股票基础信息数据
"""
self.logger.info("开始使用AKShare采集A股基础信息")
logger.info("开始采集A股基础信息")
try:
# 尝试导入akshare
import akshare as ak
self.logger.info("正在获取A股基础信息...")
# 1. 获取A股代码和名称
logger.info("获取A股代码和名称...")
stock_list = ak.stock_info_a_code_name()
# 采集A股基础信息
stock_info_df = ak.stock_info_a_code_name()
if stock_list is None or stock_list.empty:
logger.error("未获取到股票列表")
return pd.DataFrame()
# 检查数据是否有效
if stock_info_df is not None and not stock_info_df.empty:
self.logger.info(f"成功采集 {len(stock_info_df)} 条股票基础信息")
logger.info(f"获取到 {len(stock_list)} 只股票基本信息")
# 2. 获取更详细的股票信息
logger.info("获取详细股票信息...")
stock_info_list = []
for idx, row in stock_list.iterrows():
if idx % 100 == 0:
logger.info(f"处理第 {idx+1}/{len(stock_list)} 只股票")
# 标准化字段名
stock_info_df = self._standardize_columns(stock_info_df)
try:
# 获取单只股票详细信息
stock_detail = ak.stock_individual_info_em(symbol=row['code'])
if stock_detail is not None and not stock_detail.empty:
# 转换为字典
stock_dict = {
'code': row['code'],
'name': row['name'],
'collection_time': self.collection_time
}
# 添加详细信息
for _, detail_row in stock_detail.iterrows():
key = detail_row['item']
value = detail_row['value']
stock_dict[key] = value
stock_info_list.append(stock_dict)
# 避免请求过快
time.sleep(0.1)
except Exception as e:
logger.warning(f"处理股票 {row['code']} 时出错: {e}")
continue
# 转换为DataFrame
if stock_info_list:
stock_df = pd.DataFrame(stock_info_list)
logger.info(f"成功采集 {len(stock_df)} 只股票详细信息")
# 保存原始数据
self._save_raw_data(stock_info_df, "stock_basic_info", "akshare")
self._save_raw_data(stock_df)
# 处理数据
processed_df = self._process_basic_info(stock_info_df)
processed_df = self._process_basic_info(stock_df)
# 保存处理后数据
self._save_processed_data(processed_df, "stock_basic_info")
self._save_processed_data(processed_df)
return processed_df
else:
self.logger.warning("AKShare返回数据为空")
logger.error("未采集到任何股票详细信息")
return pd.DataFrame()
except ImportError:
self.logger.error("AKShare未安装,请安装: pip install akshare")
return pd.DataFrame()
except ImportError:
logger.error("AKShare未安装,请安装: pip install akshare")
return pd.DataFrame()
except Exception as e:
self.logger.error(f"使用AKShare采集数据失败: {e}")
logger.error(f"采集基础信息失败: {e}")
return pd.DataFrame()
def collect_basic_info_tushare(self) -> pd.DataFrame:
"""使用Tushare采集A股基础信息
def collect_industry_info(self) -> pd.DataFrame:
"""采集行业分类信息
Returns:
pd.DataFrame: 股票基础信息数据
pd.DataFrame: 行业分类数据
"""
self.logger.info("开始使用Tushare采集A股基础信息")
logger.info("开始采集行业分类信息")
try:
# 尝试导入tushare
import tushare as ts
import akshare as ak
# 检查是否配置了API Token
if not self.config.get("data_sources", {}).get("tushare", {}).get("enabled", False):
self.logger.warning("Tushare未启用,请在配置中设置enabled: true并配置token")
return pd.DataFrame()
# 获取申万行业分类
logger.info("获取申万行业分类...")
sw_industry = ak.stock_board_industry_sw_spot()
# 设置API Token
ts.set_token(self.config["data_sources"]["tushare"]["token"])
pro = ts.pro_api()
if sw_industry is not None and not sw_industry.empty:
logger.info(f"获取到 {len(sw_industry)} 个申万行业")
# 保存行业数据
industry_file = os.path.join(self.raw_dir, f"sw_industry_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.csv")
sw_industry.to_csv(industry_file, index=False, encoding='utf-8-sig')
logger.info(f"申万行业数据已保存: {industry_file}")
self.logger.info("正在获取A股基础信息...")
# 获取证监会行业分类
logger.info("获取证监会行业分类...")
csrc_industry = ak.stock_board_industry_csrc_spot()
# 采集A股基础信息
stock_info_df = pro.stock_basic(
exchange='',
list_status='L', # 上市状态
fields='ts_code,symbol,name,area,industry,market,list_date,delist_date,is_hs,is_st,status'
)
if csrc_industry is not None and not csrc_industry.empty:
logger.info(f"获取到 {len(csrc_industry)} 个证监会行业")
# 保存行业数据
industry_file = os.path.join(self.raw_dir, f"csrc_industry_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.csv")
csrc_industry.to_csv(industry_file, index=False, encoding='utf-8-sig')
logger.info(f"证监会行业数据已保存: {industry_file}")
# 检查数据是否有效
if stock_info_df is not None and not stock_info_df.empty:
self.logger.info(f"成功采集 {len(stock_info_df)} 条股票基础信息")
# 标准化字段名
stock_info_df = self._standardize_columns(stock_info_df)
# 保存原始数据
self._save_raw_data(stock_info_df, "stock_basic_info", "tushare")
# 处理数据
processed_df = self._process_basic_info(stock_info_df)
# 保存处理后数据
self._save_processed_data(processed_df, "stock_basic_info")
return processed_df
# 合并行业信息
industry_info = pd.DataFrame()
if sw_industry is not None:
industry_info = sw_industry.copy()
else:
self.logger.warning("Tushare返回数据为空")
return pd.DataFrame()
except ImportError:
self.logger.error("Tushare未安装,请安装: pip install tushare")
return pd.DataFrame()
return industry_info
except Exception as e:
self.logger.error(f"使用Tushare采集数据失败: {e}")
logger.error(f"采集行业信息失败: {e}")
return pd.DataFrame()
def _standardize_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""标准化数据列名
def _save_raw_data(self, df: pd.DataFrame):
"""保存原始数据
Args:
df: 原始数据
Returns:
pd.DataFrame: 标准化后的数据
"""
# 定义标准列名映射
column_mapping = {
'code': 'symbol',
'ts_code': 'symbol',
'name': 'name',
'industry': 'industry',
'area': 'area',
'market': 'market',
'list_date': 'list_date',
'delist_date': 'delist_date',
'exchange': 'exchange',
'is_hs': 'is_hs',
'is_st': 'is_st',
'status': 'status'
}
if df is None or df.empty:
logger.warning("数据为空,不保存原始数据")
return
# 重命名列
df = df.rename(columns=column_mapping)
# 确保所有标准列都存在,缺失的填充为空
for col in column_mapping.values():
if col not in df.columns:
df[col] = None
return df
try:
# 保存为CSV格式
csv_file = os.path.join(self.raw_dir, f"stock_basic_info_raw_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.csv")
df.to_csv(csv_file, index=False, encoding='utf-8-sig')
logger.info(f"原始数据已保存: {csv_file}")
# 保存为JSON格式
json_file = os.path.join(self.raw_dir, f"stock_basic_info_raw_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.json")
df.to_json(json_file, orient='records', force_ascii=False, indent=2)
logger.info(f"原始JSON数据已保存: {json_file}")
# 保存为Parquet格式
parquet_file = os.path.join(self.raw_dir, f"stock_basic_info_raw_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.parquet")
df.to_parquet(parquet_file, compression='snappy')
logger.info(f"原始Parquet数据已保存: {parquet_file}")
except Exception as e:
logger.error(f"保存原始数据失败: {e}")
def _process_basic_info(self, df: pd.DataFrame) -> pd.DataFrame:
"""处理基础信息数据
@@ -253,188 +209,268 @@ class AStockBasicInfoCollector:
Returns:
pd.DataFrame: 处理后数据
"""
self.logger.info("开始处理基础信息数据")
logger.info("开始处理基础信息数据")
# 创建处理后的DataFrame
processed_df = df.copy()
# 标准化日期格式
date_columns = ['list_date', 'delist_date']
for col in date_columns:
if col in processed_df.columns:
# 转换日期格式
processed_df[col] = pd.to_datetime(processed_df[col], errors='coerce')
# 添加数据采集时间
processed_df['data_crawl_time'] = datetime.now()
# 添加数据源信息
processed_df['data_source'] = "akshare" if len(df) > 0 else "unknown"
# 添加数据版本
processed_df['data_version'] = "1.0.0"
# 添加处理时间
processed_df['processed_time'] = datetime.now()
# 创建唯一标识符
processed_df['data_id'] = processed_df['symbol'] + '_' + processed_df['data_version']
self.logger.info("基础信息数据处理完成")
return processed_df
def _save_raw_data(self, df: pd.DataFrame, data_type: str, source: str):
"""保存原始数据
Args:
df: 数据DataFrame
data_type: 数据类型
source: 数据源名称
"""
if df is None or df.empty:
self.logger.warning("数据为空,不保存")
return
return df
# 创建文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{data_type}_{source}_{timestamp}.parquet"
filepath = os.path.join(self.raw_dir, "stock_info", filename)
# 保存为Parquet格式
try:
df.to_parquet(filepath, compression='snappy')
self.logger.info(f"原始数据已保存: {filepath}")
# 创建副本
processed_df = df.copy()
# 记录数据存储信息
data_info = {
'data_type': data_type,
'source': source,
'timestamp': timestamp,
'file_size': os.path.getsize(filepath),
'row_count': len(df),
'save_time': datetime.now().isoformat()
# 标准化列名
column_mapping = {
'code': 'symbol',
'name': 'name',
'上市日期': 'list_date',
'行业分类': 'industry',
'总市值': 'total_market_cap',
'流通市值': 'circulating_market_cap',
'市盈率': 'pe_ratio',
'市净率': 'pb_ratio',
'涨跌幅': 'change_pct',
'成交量': 'volume',
'成交额': 'turnover'
}
# 保存数据存储信息
info_path = os.path.join(self.raw_dir, "stock_info", f"{data_type}_{source}_{timestamp}_info.json")
with open(info_path, 'w', encoding='utf-8') as f:
json.dump(data_info, f, ensure_ascii=False, indent=2)
# 重命名列
processed_df = processed_df.rename(columns=column_mapping)
self.logger.info(f"数据存储信息已保存: {info_path}")
# 添加标准字段(如果不存在)
standard_fields = ['symbol', 'name', 'list_date', 'industry', 'exchange', 'status']
for field in standard_fields:
if field not in processed_df.columns:
processed_df[field] = None
# 处理日期格式
if 'list_date' in processed_df.columns:
processed_df['list_date'] = pd.to_datetime(processed_df['list_date'], errors='coerce')
# 处理数值字段
numeric_fields = ['total_market_cap', 'circulating_market_cap', 'pe_ratio', 'pb_ratio']
for field in numeric_fields:
if field in processed_df.columns:
processed_df[field] = pd.to_numeric(processed_df[field], errors='coerce')
# 添加数据质量标记
processed_df['data_quality'] = 'good'
processed_df['processed_time'] = datetime.now()
processed_df['data_version'] = '1.0.0'
# 添加数据源信息
processed_df['data_source'] = 'akshare'
# 创建唯一ID
processed_df['stock_id'] = processed_df['symbol'].astype(str) + '_' + processed_df['data_version']
logger.info(f"数据处理完成,共 {len(processed_df)} 条记录")
return processed_df
except Exception as e:
self.logger.error(f"保存原始数据失败: {e}")
logger.error(f"处理基础信息数据失败: {e}")
return df
def _save_processed_data(self, df: pd.DataFrame, data_type: str):
def _save_processed_data(self, df: pd.DataFrame):
"""保存处理后数据
Args:
df: 处理后数据
data_type: 数据类型
"""
if df is None or df.empty:
self.logger.warning("数据为空,不保存")
logger.warning("数据为空,不保存处理后数据")
return
# 创建文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S")
filename = f"{data_type}_processed_{timestamp}.parquet"
filepath = os.path.join(self.processed_dir, "stock_info", filename)
# 保存为Parquet格式
try:
df.to_parquet(filepath, compression='snappy')
self.logger.info(f"处理后数据已保存: {filepath}")
# 保存为Parquet格式(主要格式)
parquet_file = os.path.join(self.processed_dir, f"stock_basic_info_processed_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.parquet")
df.to_parquet(parquet_file, compression='snappy')
logger.info(f"处理后数据已保存(Parquet: {parquet_file}")
# 记录处理信息
process_info = {
'data_type': data_type,
'timestamp': timestamp,
'file_size': os.path.getsize(filepath),
'row_count': len(df),
'processed_time': datetime.now().isoformat(),
'field_count': len(df.columns)
}
# 保存为CSV格式(便于查看)
csv_file = os.path.join(self.processed_dir, f"stock_basic_info_processed_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.csv")
df.to_csv(csv_file, index=False, encoding='utf-8-sig')
logger.info(f"处理后数据已保存(CSV: {csv_file}")
# 保存处理信息
info_path = os.path.join(self.processed_dir, "stock_info", f"{data_type}_processed_{timestamp}_info.json")
with open(info_path, 'w', encoding='utf-8') as f:
json.dump(process_info, f, ensure_ascii=False, indent=2)
self.logger.info(f"数据存储信息已保存: {info_path}")
# 保存数据摘要
summary = self._generate_summary(df)
summary_file = os.path.join(self.processed_dir, f"stock_basic_info_summary_{self.collection_time.strftime('%Y%m%d_%H%M%S')}.json")
with open(summary_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
logger.info(f"数据摘要已保存: {summary_file}")
except Exception as e:
self.logger.error(f"保存处理后数据失败: {e}")
logger.error(f"保存处理后数据失败: {e}")
def generate_summary_report(self, df: pd.DataFrame) -> Dict:
"""生成数据摘要报告
def _generate_summary(self, df: pd.DataFrame) -> dict:
"""生成数据摘要
Args:
df: 数据
Returns:
Dict: 数据摘要报告
dict: 数据摘要
"""
self.logger.info("生成数据摘要报告")
if df is None or df.empty:
return {"error": "数据为空"}
try:
# 统计数据
summary = {
'collection_time': datetime.now().isoformat(),
'total_records': len(df),
'unique_stocks': df['symbol'].nunique(),
'market_distribution': df['market'].value_counts().to_dict(),
'industry_distribution': df['industry'].value_counts().head(10).to_dict(),
'status_distribution': df['status'].value_counts().to_dict(),
"collection_time": self.collection_time.isoformat(),
"total_records": len(df),
"unique_symbols": df['symbol'].nunique() if 'symbol' in df.columns else 0,
"industry_distribution": {},
"market_cap_stats": {},
"data_quality": {}
}
# 行业分布统计
if 'industry' in df.columns:
industry_counts = df['industry'].value_counts().head(10).to_dict()
summary["industry_distribution"] = industry_counts
# 市值统计
if 'total_market_cap' in df.columns:
market_cap_series = df['total_market_cap'].dropna()
if not market_cap_series.empty:
summary["market_cap_stats"] = {
"mean": float(market_cap_series.mean()),
"median": float(market_cap_series.median()),
"min": float(market_cap_series.min()),
"max": float(market_cap_series.max()),
"std": float(market_cap_series.std())
}
# 数据质量统计
if 'data_quality' in df.columns:
quality_counts = df['data_quality'].value_counts().to_dict()
summary["data_quality"] = quality_counts
# 交易所分布
if 'exchange' in df.columns:
exchange_counts = df['exchange'].value_counts().to_dict()
summary["exchange_distribution"] = exchange_counts
# 数据字段信息
summary["data_fields"] = {
"total_fields": len(df.columns),
"field_list": list(df.columns),
"field_types": {col: str(df[col].dtype) for col in df.columns}
}
logger.info("数据摘要生成完成")
return summary
except Exception as e:
logger.error(f"生成数据摘要失败: {e}")
return {"error": str(e)}
def run(self) -> dict:
"""运行采集器
Returns:
dict: 采集结果
"""
logger.info("开始执行A股基础信息采集任务")
result = {
"success": False,
"data_collected": False,
"summary": {},
"files_saved": [],
"error": None
}
# 添加时间信息
if 'list_date' in df.columns:
summary['list_years'] = {
'min_list_date': df['list_date'].min().isoformat() if pd.notna(df['list_date'].min()) else None,
'max_list_date': df['list_date'].max().isoformat() if pd.notna(df['list_date'].max()) else None,
'by_year': df['list_date'].dt.year.value_counts().to_dict() if pd.notna(df['list_date']).any() else {}
}
try:
# 采集基础信息
basic_info_df = self.collect_basic_info()
if basic_info_df is not None and not basic_info_df.empty:
result["data_collected"] = True
result["records_collected"] = len(basic_info_df)
# 采集行业信息
industry_df = self.collect_industry_info()
if industry_df is not None and not industry_df.empty:
result["industry_records"] = len(industry_df)
# 生成摘要
summary = self._generate_summary(basic_info_df)
result["summary"] = summary
# 获取保存的文件列表
raw_files = [f for f in os.listdir(self.raw_dir) if f.endswith(('.csv', '.json', '.parquet'))]
processed_files = [f for f in os.listdir(self.processed_dir) if f.endswith(('.csv', '.json', '.parquet'))]
result["files_saved"] = {
"raw": raw_files,
"processed": processed_files
}
result["success"] = True
result["message"] = f"成功采集 {len(basic_info_df)} 只股票基础信息"
logger.info("A股基础信息采集任务完成")
else:
result["error"] = "未采集到有效数据"
logger.error("未采集到有效数据")
except Exception as e:
result["error"] = str(e)
logger.error(f"采集任务执行失败: {e}")
self.logger.info("数据摘要报告生成完成")
return summary
return result
def main():
"""主函数"""
print("=" * 60)
print("📊 A股基础信息数据采集")
print("=" * 60)
# 创建采集器
collector = AStockBasicInfoCollector()
# 采集基础信息数据
basic_info_akshare = collector.collect_basic_info_akshare()
# 运行采集
print("开始采集数据...")
result = collector.run()
if not basic_info_akshare.empty:
# 生成摘要报告
summary = collector.generate_summary_report(basic_info_akshare)
# 输出结果
print("\n" + "=" * 60)
print("📋 采集结果")
print("=" * 60)
if result["success"]:
print(f"{result['message']}")
print(f"📈 记录数: {result.get('records_collected', 0)}")
# 输出摘要报告
print(json.dumps(summary, ensure_ascii=False, indent=2))
summary = result.get("summary", {})
if summary:
print(f"📊 行业数量: {len(summary.get('industry_distribution', {}))}")
print(f"🏦 市值统计: 平均{summary.get('market_cap_stats', {}).get('mean', 0):,.2f}亿元")
# 获取保存的原始数据路径
raw_files = os.listdir(os.path.join(collector.raw_dir, "stock_info"))
processed_files = os.listdir(os.path.join(collector.processed_dir, "stock_info"))
files = result.get("files_saved", {})
if files:
print(f"💾 保存文件:")
print(f" 原始数据: {len(files.get('raw', []))}个文件")
print(f" 处理后数据: {len(files.get('processed', []))}个文件")
print(f"\n✅ 数据采集完成!")
print(f"📊 统计信息:")
print(f" 采集记录数: {len(basic_info_akshare)}")
print(f" 原始数据文件: {len(raw_files)}")
print(f" 处理后数据文件: {len(processed_files)}")
print(f" 生成摘要报告: 包含{len(summary)}个统计指标")
# 输出保存路径
print(f"📁 数据保存位置:")
print(f" 原始数据: {collector.raw_dir}")
print(f" 处理后数据: {collector.processed_dir}")
else:
print("数据采集失败!")
print(" 可能原因:")
print(" 1. AKShare安装: pip install akshare")
print(" 2. 网络连接问题")
print(" 3. 数据源暂时不可用")
print(f"❌ 采集失败: {result.get('error', '未知错误')}")
print("⚠️ 请检查:")
print(" 1. AKShare是否安装: pip install akshare")
print(" 2. 网络连接是否正常")
print(" 3. 数据源是否可用")
print("=" * 60)
if __name__ == "__main__":
main()