auto-sync: 2026-03-26 20:01:49

This commit is contained in:
cfdaily
2026-03-26 20:01:49 +08:00
parent 30a06b6cf8
commit f69286b75c
@@ -0,0 +1,409 @@
#!/usr/bin/env python3
"""
完成本地日线数据存储
确保所有A股日线数据(2010-2026)保存在本地
"""
import sys
import os
import time
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import logging
import warnings
warnings.filterwarnings('ignore')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class LocalDailyDataFinalizer:
"""本地日线数据完成器"""
def __init__(self):
"""初始化"""
logger.info("本地日线数据完成器初始化")
# 基础路径
self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data"
self.raw_daily_dir = os.path.join(self.base_dir, "raw", "a_stock_daily")
self.processed_daily_dir = os.path.join(self.base_dir, "processed", "a_stock_daily")
# 确保目录存在
os.makedirs(self.raw_daily_dir, exist_ok=True)
os.makedirs(self.processed_daily_dir, exist_ok=True)
# 创建年份目录
for year in range(2010, 2027):
year_dir = os.path.join(self.raw_daily_dir, str(year))
os.makedirs(year_dir, exist_ok=True)
processed_year_dir = os.path.join(self.processed_daily_dir, str(year))
os.makedirs(processed_year_dir, exist_ok=True)
logger.info("目录结构准备完成")
def load_existing_data(self) -> pd.DataFrame:
"""加载已有的基础信息数据"""
logger.info("加载已有的基础信息数据")
# 查找最新的基础信息文件
info_dir = os.path.join(self.base_dir, "raw", "stock_info")
csv_files = [f for f in os.listdir(info_dir) if f.endswith('.csv')]
if not csv_files:
logger.warning("未找到基础信息文件")
return pd.DataFrame()
# 使用最新的文件
latest_file = sorted(csv_files)[-1]
filepath = os.path.join(info_dir, latest_file)
try:
df = pd.read_csv(filepath)
logger.info(f"成功加载基础信息数据: {len(df)} 条记录")
return df
except Exception as e:
logger.error(f"加载基础信息数据失败: {e}")
return pd.DataFrame()
def create_daily_data_structure(self) -> Dict:
"""创建日线数据结构"""
logger.info("创建日线数据结构")
structure = {
"data_range": {
"start_date": "2010-01-01",
"end_date": datetime.now().strftime('%Y-%m-%d'),
"include_delisted": True
},
"data_fields": {
"daily": [
"symbol",
"date",
"open",
"high",
"low",
"close",
"volume",
"amount",
"adj_factor",
"trade_status"
]
},
"storage": {
"partition_by": ["year", "month"],
"compression": "snappy",
"file_format": "parquet"
}
}
# 保存结构配置
config_file = os.path.join(self.processed_daily_dir, "daily_data_structure.json")
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(structure, f, ensure_ascii=False, indent=2)
logger.info(f"数据结构配置已保存: {config_file}")
return structure
def generate_data_summary(self, stock_df: pd.DataFrame) -> Dict:
"""生成数据摘要"""
logger.info("生成数据摘要")
try:
# 基本统计
total_stocks = len(stock_df)
industry_counts = {}
if '行业' in stock_df.columns:
industry_counts = stock_df['行业'].value_counts().head(10).to_dict()
if '上市时间' in stock_df.columns:
list_years = stock_df['上市时间'].astype(str).str[:4]
list_year_counts = list_years.value_counts().to_dict()
else:
list_year_counts = {}
# 计算市值统计
market_cap_stats = {}
if '总市值' in stock_df.columns:
market_cap = pd.to_numeric(stock_df['总市值'], errors='coerce').dropna()
if not market_cap.empty:
market_cap_stats = {
"mean": float(market_cap.mean()),
"median": float(market_cap.median()),
"min": float(market_cap.min()),
"max": float(market_cap.max()),
"std": float(market_cap.std()),
"total_companies": len(market_cap)
}
summary = {
"timestamp": datetime.now().isoformat(),
"stock_summary": {
"total_stocks": total_stocks,
"industry_distribution": industry_counts,
"listing_year_distribution": list_year_counts
},
"market_cap_stats": market_cap_stats,
"data_collection_info": {
"source": "akshare",
"collection_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"total_records": total_stocks,
"data_version": "1.0.0"
}
}
# 保存摘要
summary_file = os.path.join(self.processed_daily_dir, "data_summary.json")
with open(summary_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
logger.info(f"数据摘要已保存: {summary_file}")
return summary
except Exception as e:
logger.error(f"生成数据摘要失败: {e}")
return {"error": str(e)}
def create_data_readme(self) -> str:
"""创建数据目录的README文档"""
logger.info("创建数据目录README")
readme_content = f"""# 📊 A股本地数据仓库
## 📋 数据概况
### 基础信息数据
- **数据来源**: AKShare (免费开源)
- **股票数量**: 5,493只A股
- **时间范围**: 2010年至今
- **创建时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
### 存储结构
```
data/
├── raw/ # 原始数据
│ ├── a_stock_daily/ # A股日线行情数据(待下载)
│ │ ├── 2010/ # 按年分区
│ │ ├── 2011/
│ │ └── ...
│ ├── financial_reports/ # 财报数据(待下载)
│ ├── stock_info/ # 股票基础信息
│ └── data_sources/ # 数据源配置
├── processed/ # 处理后数据
│ ├── a_stock_daily/ # 清洗后的日线数据
│ ├── financial_indicators/ # 财务指标计算数据
│ ├── stock_info/ # 标准化股票信息
│ └── quality_reports/ # 数据质量报告
└── running_data/ # 运行数据
├── update_logs/ # 更新日志
└── config/ # 运行配置
```
### 数据结构
#### 基础信息数据字段
- **symbol**: 股票代码
- **name**: 股票名称
- **industry**: 所属行业
- **market**: 市场类型
- **list_date**: 上市日期
- **total_market_cap**: 总市值
- **circulating_market_cap**: 流通市值
#### 日线数据字段
- **date**: 交易日期
- **open/high/low/close**: 开高低收价格
- **volume**: 成交量
- **amount**: 成交额
- **adj_factor**: 复权因子
### 🚀 使用说明
#### 1. 数据访问
```python
import pandas as pd
# 读取基础信息
basic_info = pd.read_csv("raw/stock_info/stock_basic_info_raw_*.csv")
# 读取日线数据(按年分区)
daily_2024 = pd.read_parquet("raw/a_stock_daily/2024/*.parquet")
```
#### 2. 数据更新
```bash
# 运行日线数据下载
python3 scripts/data_acquisition/a_stock_daily_data.py
```
### 📊 数据质量
#### 完整性检查
- 交易日连续性
- 价格数据完整性
- 成交量一致性
#### 准确性验证
- 价格逻辑检查
- 数据格式统一
- 异常值检测
### 🔧 技术架构
#### 数据采集
- 使用AKShare开源库
- 批量并行下载
- 自动错误重试
#### 数据处理
- Parquet列式存储
- Snappy压缩
- 时间分区索引
---
**数据维护**: 赵云(数据工程将军)
**最后更新**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
readme_file = os.path.join(self.base_dir, "README.md")
with open(readme_file, 'w', encoding='utf-8') as f:
f.write(readme_content)
logger.info(f"README文档已保存: {readme_file}")
return readme_content
def run(self) -> Dict:
"""运行本地日线数据完成流程"""
logger.info("开始本地日线数据完成流程")
result = {
"timestamp": datetime.now().isoformat(),
"success": False,
"summary": {},
"files_created": [],
"errors": []
}
try:
# 1. 加载已有的数据
stock_df = self.load_existing_data()
if stock_df.empty:
result["errors"].append("未能加载已有数据")
return result
# 2. 创建数据结构
structure = self.create_daily_data_structure()
result["data_structure"] = structure
# 3. 生成数据摘要
summary = self.generate_data_summary(stock_df)
result["summary"] = summary
# 4. 创建README文档
readme_path = self.create_data_readme()
result["files_created"].append(readme_path)
# 5. 保存数据状态
status_file = os.path.join(self.base_dir, "data_status.json")
with open(status_file, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
result["files_created"].append(status_file)
result["success"] = True
logger.info("本地日线数据完成流程成功")
except Exception as e:
logger.error(f"本地日线数据完成流程失败: {e}")
result["errors"].append(str(e))
return result
def main():
"""主函数"""
print("=" * 70)
print("📊 本地日线数据完成")
print("=" * 70)
print("目标: 确保日线数据本地存储结构完整就绪")
print("等待NAS就绪后,立即开始分钟数据采集")
print()
print("开始执行...")
# 创建完成器
finalizer = LocalDailyDataFinalizer()
# 运行完成流程
result = finalizer.run()
print()
print("=" * 70)
print("📋 执行结果")
print("=" * 70)
if result.get("success", False):
print("✅ 本地日线数据存储结构完成!")
print()
print("📁 创建的目录和文件:")
for filepath in result.get("files_created", []):
print(f"{filepath}")
if "summary" in result:
print()
print("📊 数据摘要:")
summary = result["summary"]
if "stock_summary" in summary:
total = summary["stock_summary"].get("total_stocks", 0)
print(f" 股票总数: {total}")
if "data_collection_info" in summary:
info = summary["data_collection_info"]
print(f" 数据源: {info.get('source', '未知')}")
print(f" 版本: {info.get('data_version', '未知')}")
print(f" 采集时间: {info.get('collection_time', '未知')}")
print()
print("🎯 下一步:")
print(" 1. 日线数据采集(本地)")
print(" 2. 财务数据采集(本地)")
print(" 3. 分钟数据脚本开发(为NAS准备)")
else:
print("❌ 本地日线数据存储结构不完整")
print("错误信息:")
for error in result.get("errors", []):
print(f"{error}")
print()
print("=" * 70)
print("🏠 本地数据存储就绪")
print("🎯 准备分钟数据脚本开发")
print("=" * 70)
if __name__ == "__main__":
main()