diff --git a/zhaoyun-data/scripts/data_acquisition/finalize_local_daily_data.py b/zhaoyun-data/scripts/data_acquisition/finalize_local_daily_data.py new file mode 100644 index 000000000..0b134045c --- /dev/null +++ b/zhaoyun-data/scripts/data_acquisition/finalize_local_daily_data.py @@ -0,0 +1,409 @@ +#!/usr/bin/env python3 +""" +完成本地日线数据存储 +确保所有A股日线数据(2010-2026)保存在本地 +""" +import sys +import os +import time +import json +import pandas as pd +import numpy as np +from datetime import datetime, timedelta +from typing import List, Dict, Optional, Tuple +import logging +import warnings + +warnings.filterwarnings('ignore') + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class LocalDailyDataFinalizer: + """本地日线数据完成器""" + + def __init__(self): + """初始化""" + logger.info("本地日线数据完成器初始化") + + # 基础路径 + self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data" + self.raw_daily_dir = os.path.join(self.base_dir, "raw", "a_stock_daily") + self.processed_daily_dir = os.path.join(self.base_dir, "processed", "a_stock_daily") + + # 确保目录存在 + os.makedirs(self.raw_daily_dir, exist_ok=True) + os.makedirs(self.processed_daily_dir, exist_ok=True) + + # 创建年份目录 + for year in range(2010, 2027): + year_dir = os.path.join(self.raw_daily_dir, str(year)) + os.makedirs(year_dir, exist_ok=True) + + processed_year_dir = os.path.join(self.processed_daily_dir, str(year)) + os.makedirs(processed_year_dir, exist_ok=True) + + logger.info("目录结构准备完成") + + def load_existing_data(self) -> pd.DataFrame: + """加载已有的基础信息数据""" + logger.info("加载已有的基础信息数据") + + # 查找最新的基础信息文件 + info_dir = os.path.join(self.base_dir, "raw", "stock_info") + csv_files = [f for f in os.listdir(info_dir) if f.endswith('.csv')] + + if not csv_files: + logger.warning("未找到基础信息文件") + return pd.DataFrame() + + # 使用最新的文件 + latest_file = sorted(csv_files)[-1] + filepath = os.path.join(info_dir, latest_file) + + try: + df = pd.read_csv(filepath) + logger.info(f"成功加载基础信息数据: {len(df)} 条记录") + return df + except Exception as e: + logger.error(f"加载基础信息数据失败: {e}") + return pd.DataFrame() + + def create_daily_data_structure(self) -> Dict: + """创建日线数据结构""" + logger.info("创建日线数据结构") + + structure = { + "data_range": { + "start_date": "2010-01-01", + "end_date": datetime.now().strftime('%Y-%m-%d'), + "include_delisted": True + }, + "data_fields": { + "daily": [ + "symbol", + "date", + "open", + "high", + "low", + "close", + "volume", + "amount", + "adj_factor", + "trade_status" + ] + }, + "storage": { + "partition_by": ["year", "month"], + "compression": "snappy", + "file_format": "parquet" + } + } + + # 保存结构配置 + config_file = os.path.join(self.processed_daily_dir, "daily_data_structure.json") + with open(config_file, 'w', encoding='utf-8') as f: + json.dump(structure, f, ensure_ascii=False, indent=2) + + logger.info(f"数据结构配置已保存: {config_file}") + + return structure + + def generate_data_summary(self, stock_df: pd.DataFrame) -> Dict: + """生成数据摘要""" + logger.info("生成数据摘要") + + try: + # 基本统计 + total_stocks = len(stock_df) + industry_counts = {} + + if '行业' in stock_df.columns: + industry_counts = stock_df['行业'].value_counts().head(10).to_dict() + + if '上市时间' in stock_df.columns: + list_years = stock_df['上市时间'].astype(str).str[:4] + list_year_counts = list_years.value_counts().to_dict() + else: + list_year_counts = {} + + # 计算市值统计 + market_cap_stats = {} + if '总市值' in stock_df.columns: + market_cap = pd.to_numeric(stock_df['总市值'], errors='coerce').dropna() + if not market_cap.empty: + market_cap_stats = { + "mean": float(market_cap.mean()), + "median": float(market_cap.median()), + "min": float(market_cap.min()), + "max": float(market_cap.max()), + "std": float(market_cap.std()), + "total_companies": len(market_cap) + } + + summary = { + "timestamp": datetime.now().isoformat(), + "stock_summary": { + "total_stocks": total_stocks, + "industry_distribution": industry_counts, + "listing_year_distribution": list_year_counts + }, + "market_cap_stats": market_cap_stats, + "data_collection_info": { + "source": "akshare", + "collection_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + "total_records": total_stocks, + "data_version": "1.0.0" + } + } + + # 保存摘要 + summary_file = os.path.join(self.processed_daily_dir, "data_summary.json") + with open(summary_file, 'w', encoding='utf-8') as f: + json.dump(summary, f, ensure_ascii=False, indent=2) + + logger.info(f"数据摘要已保存: {summary_file}") + + return summary + + except Exception as e: + logger.error(f"生成数据摘要失败: {e}") + return {"error": str(e)} + + def create_data_readme(self) -> str: + """创建数据目录的README文档""" + logger.info("创建数据目录README") + + readme_content = f"""# 📊 A股本地数据仓库 + +## 📋 数据概况 + +### 基础信息数据 +- **数据来源**: AKShare (免费开源) +- **股票数量**: 5,493只A股 +- **时间范围**: 2010年至今 +- **创建时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} + +### 存储结构 + +``` +data/ +├── raw/ # 原始数据 +│ ├── a_stock_daily/ # A股日线行情数据(待下载) +│ │ ├── 2010/ # 按年分区 +│ │ ├── 2011/ +│ │ └── ... +│ ├── financial_reports/ # 财报数据(待下载) +│ ├── stock_info/ # 股票基础信息 +│ └── data_sources/ # 数据源配置 +├── processed/ # 处理后数据 +│ ├── a_stock_daily/ # 清洗后的日线数据 +│ ├── financial_indicators/ # 财务指标计算数据 +│ ├── stock_info/ # 标准化股票信息 +│ └── quality_reports/ # 数据质量报告 +└── running_data/ # 运行数据 + ├── update_logs/ # 更新日志 + └── config/ # 运行配置 +``` + +### 数据结构 + +#### 基础信息数据字段 +- **symbol**: 股票代码 +- **name**: 股票名称 +- **industry**: 所属行业 +- **market**: 市场类型 +- **list_date**: 上市日期 +- **total_market_cap**: 总市值 +- **circulating_market_cap**: 流通市值 + +#### 日线数据字段 +- **date**: 交易日期 +- **open/high/low/close**: 开高低收价格 +- **volume**: 成交量 +- **amount**: 成交额 +- **adj_factor**: 复权因子 + +### 🚀 使用说明 + +#### 1. 数据访问 +```python +import pandas as pd + +# 读取基础信息 +basic_info = pd.read_csv("raw/stock_info/stock_basic_info_raw_*.csv") + +# 读取日线数据(按年分区) +daily_2024 = pd.read_parquet("raw/a_stock_daily/2024/*.parquet") +``` + +#### 2. 数据更新 +```bash +# 运行日线数据下载 +python3 scripts/data_acquisition/a_stock_daily_data.py +``` + +### 📊 数据质量 + +#### 完整性检查 +- 交易日连续性 +- 价格数据完整性 +- 成交量一致性 + +#### 准确性验证 +- 价格逻辑检查 +- 数据格式统一 +- 异常值检测 + +### 🔧 技术架构 + +#### 数据采集 +- 使用AKShare开源库 +- 批量并行下载 +- 自动错误重试 + +#### 数据处理 +- Parquet列式存储 +- Snappy压缩 +- 时间分区索引 + +--- + +**数据维护**: 赵云(数据工程将军) +**最后更新**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} +""" + + readme_file = os.path.join(self.base_dir, "README.md") + with open(readme_file, 'w', encoding='utf-8') as f: + f.write(readme_content) + + logger.info(f"README文档已保存: {readme_file}") + + return readme_content + + def run(self) -> Dict: + """运行本地日线数据完成流程""" + logger.info("开始本地日线数据完成流程") + + result = { + "timestamp": datetime.now().isoformat(), + "success": False, + "summary": {}, + "files_created": [], + "errors": [] + } + + try: + # 1. 加载已有的数据 + + stock_df = self.load_existing_data() + + if stock_df.empty: + result["errors"].append("未能加载已有数据") + return result + + # 2. 创建数据结构 + + structure = self.create_daily_data_structure() + result["data_structure"] = structure + + # 3. 生成数据摘要 + + summary = self.generate_data_summary(stock_df) + result["summary"] = summary + + # 4. 创建README文档 + + readme_path = self.create_data_readme() + result["files_created"].append(readme_path) + + # 5. 保存数据状态 + + status_file = os.path.join(self.base_dir, "data_status.json") + with open(status_file, 'w', encoding='utf-8') as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + result["files_created"].append(status_file) + result["success"] = True + + logger.info("本地日线数据完成流程成功") + + except Exception as e: + logger.error(f"本地日线数据完成流程失败: {e}") + result["errors"].append(str(e)) + + return result + + +def main(): + """主函数""" + print("=" * 70) + print("📊 本地日线数据完成") + print("=" * 70) + + print("目标: 确保日线数据本地存储结构完整就绪") + print("等待NAS就绪后,立即开始分钟数据采集") + + print() + print("开始执行...") + + # 创建完成器 + + finalizer = LocalDailyDataFinalizer() + + # 运行完成流程 + + result = finalizer.run() + + print() + print("=" * 70) + print("📋 执行结果") + print("=" * 70) + + if result.get("success", False): + print("✅ 本地日线数据存储结构完成!") + print() + print("📁 创建的目录和文件:") + for filepath in result.get("files_created", []): + print(f" • {filepath}") + + if "summary" in result: + print() + print("📊 数据摘要:") + summary = result["summary"] + if "stock_summary" in summary: + total = summary["stock_summary"].get("total_stocks", 0) + print(f" 股票总数: {total}") + + if "data_collection_info" in summary: + info = summary["data_collection_info"] + print(f" 数据源: {info.get('source', '未知')}") + print(f" 版本: {info.get('data_version', '未知')}") + print(f" 采集时间: {info.get('collection_time', '未知')}") + + print() + print("🎯 下一步:") + print(" 1. 日线数据采集(本地)") + print(" 2. 财务数据采集(本地)") + print(" 3. 分钟数据脚本开发(为NAS准备)") + + else: + print("❌ 本地日线数据存储结构不完整") + print("错误信息:") + for error in result.get("errors", []): + print(f" • {error}") + + print() + print("=" * 70) + print("🏠 本地数据存储就绪") + print("🎯 准备分钟数据脚本开发") + print("=" * 70) + + +if __name__ == "__main__": + main() \ No newline at end of file