auto-sync: 2026-03-26 11:54:33

This commit is contained in:
cfdaily
2026-03-26 11:54:33 +08:00
parent 44142b2400
commit 2f547752a7
@@ -0,0 +1,430 @@
# 🏛️ A股分钟K线数据架构设计
## 📋 设计概览
### 设计目标
1. **高效存储**:支持2亿+条记录的存储和查询
2. **快速访问**:支持分钟级数据的快速检索和聚合
3. **质量保证**:确保分钟数据的完整性和准确性
4. **扩展性**:支持未来数据增长和功能扩展
### 设计原则
1. **分区存储**:按时间和股票分区,提高查询效率
2. **列式存储**:使用Parquet格式,支持列式查询优化
3. **数据压缩**:使用高效的压缩算法减少存储空间
4. **索引优化**:建立多级索引支持快速数据定位
## 📁 存储架构设计
### 总体结构
```
minute_kline/
├── raw/ # 原始数据(保持数据源格式)
│ ├── 1minute/ # 1分钟K线原始数据
│ │ ├── 2021/ # 按年分区
│ │ │ ├── 000001_2021_1min.parquet
│ │ │ ├── 000002_2021_1min.parquet
│ │ │ └── ...
│ │ ├── 2022/
│ │ ├── 2023/
│ │ ├── 2024/
│ │ └── 2025/
│ ├── 5minute/ # 5分钟K线原始数据
│ └── 15minute/ # 15分钟K线原始数据
├── processed/ # 处理后数据(标准化格式)
│ ├── consolidated/ # 合并后的分钟数据
│ │ ├── by_stock/ # 按股票组织(便于分析单只股票)
│ │ │ ├── 000001/
│ │ │ │ ├── 2021_1min.parquet
│ │ │ │ ├── 2021_5min.parquet
│ │ │ │ └── 2021_15min.parquet
│ │ │ └── ...
│ │ └── by_date/ # 按日期组织(便于分析全市场)
│ │ ├── 20210104/ # 交易日
│ │ │ ├── 1min.parquet # 当日所有股票1分钟数据
│ │ │ ├── 5min.parquet
│ │ │ └── 15min.parquet
│ │ └── ...
│ ├── indicators/ # 分钟级技术指标
│ │ ├── volatility/ # 波动率指标(ATR,布林带等)
│ │ ├── momentum/ # 动量指标(RSIMACD等)
│ │ └── volume/ # 成交量指标(量比,资金流向等)
│ └── quality/ # 数据质量分析数据
└── running_data/ # 运行数据
├── download_logs/ # 下载日志和状态
├── update_tracker/ # 数据更新跟踪
├── config/ # 配置文件
└── stats/ # 统计数据
```
### 数据分区策略
#### 时间分区
```
- 按年分区:/year/2021/, /year/2022/, ...
- 按月分区:/year/2021/month/01/, ...(可选,用于大数据量)
- 按日分区:/year/2021/month/01/day/04/(用于按日聚合查询)
```
#### 股票分区
```
- 按股票代码:/stock/000001/, /stock/000002/, ...
- 按交易所:/exchange/SH/, /exchange/SZ/
- 按市值分组:/cap/large/, /cap/medium/, /cap/small/
```
### 存储格式选择
#### 主要格式:Parquet
- **优势**
- 列式存储,查询效率高
- 支持高效压缩
- 兼容多种大数据处理框架
- 支持复杂数据类型
- **压缩算法**Snappy
- 压缩率适中,速度快
- 适合频繁读写场景
#### 辅助格式:HDF5(可选)
- **优势**
- 支持高效随机访问
- 数据结构灵活
- **适用场景**
- 高频数据临时处理
- 内存映射访问
### 数据组织模式
#### 模式1:按股票组织(适用于股票分析)
```
stock_000001/
├── 2021/
│ ├── data_1min.parquet # 1分钟数据
│ ├── data_5min.parquet # 5分钟数据
│ └── data_15min.parquet # 15分钟数据
├── 2022/
└── ...
```
#### 模式2:按日期组织(适用于市场分析)
```
date_20210104/
├── data_1min.parquet # 当日所有股票1分钟数据
├── data_5min.parquet
├── data_15min.parquet
├── summary.json # 当日市场摘要
└── indicators/ # 当日技术指标
```
## 🛠️ 工具链设计
### 1. 数据下载工具
#### 核心功能
```python
class MinuteKlineDownloader:
"""分钟K线数据下载器"""
def download_stock_minute_data(
self,
symbol: str,
timeframe: str, # '1min', '5min', '15min'
start_date: str,
end_date: str
) -> pd.DataFrame
def download_batch_stocks(
self,
symbols: List[str],
timeframe: str,
start_date: str,
end_date: str,
batch_size: int = 50,
max_workers: int = 5
) -> Dict[str, pd.DataFrame]
def download_market_data(
self,
timeframe: str,
start_date: str,
end_date: str
) -> Dict[str, pd.DataFrame]
```
#### 下载策略
```python
class DownloadStrategy:
"""下载策略配置"""
def __init__(self):
self.timeframes = ['15min', '5min', '1min'] # 优先下载低频率
self.batch_size = 100 # 每批次股票数
self.max_workers = 8 # 最大并发数
self.request_delay = 0.2 # 请求延迟秒数
self.retry_attempts = 3 # 重试次数
self.fallback_sources = ['akshare', 'tushare', 'jqdata'] # 备用数据源
```
### 2. 数据处理工具
#### 数据清洗
```python
class MinuteDataProcessor:
"""分钟数据处理工具"""
def clean_minute_data(
self,
df: pd.DataFrame,
timeframe: str
) -> pd.DataFrame
def validate_data_quality(
self,
df: pd.DataFrame
) -> Dict[str, bool]
def fill_missing_data(
self,
df: pd.DataFrame,
timeframe: str
) -> pd.DataFrame
```
#### 数据合并
```python
class DataConsolidator:
"""数据合并工具"""
def consolidate_by_stock(
self,
symbol: str,
start_year: int,
end_year: int
) -> pd.DataFrame
def consolidate_by_date(
self,
date: str,
timeframe: str
) -> pd.DataFrame
def create_data_index(
self,
index_type: str # 'stock', 'date', 'timeframe'
) -> Dict
```
### 3. 质量检查工具
#### 完整性检查
```python
class CompletenessChecker:
"""数据完整性检查"""
def check_time_coverage(
self,
df: pd.DataFrame,
timeframe: str
) -> Dict[str, float]
def check_missing_periods(
self,
df: pd.DataFrame
) -> List[Tuple[str, str]]
def generate_completeness_report(
self,
data_dir: str
) -> Dict
```
#### 准确性验证
```python
class AccuracyValidator:
"""数据准确性验证"""
def validate_price_logic(
self,
df: pd.DataFrame
) -> List[Dict]
def validate_volume_consistency(
self,
df: pd.DataFrame
) -> Dict
def check_data_anomalies(
self,
df: pd.DataFrame
) -> List[Dict]
```
### 4. 查询优化工具
#### 数据索引
```python
class MinuteDataIndexer:
"""分钟数据索引管理"""
def create_time_index(
self,
data_dir: str
) -> Dict
def create_stock_index(
self,
data_dir: str
) -> Dict
def create_combined_index(
self,
stock_list: List[str],
date_range: Tuple[str, str]
) -> Dict
```
#### 缓存机制
```python
class DataCacheManager:
"""数据缓存管理"""
def __init__(self):
self.memory_cache = {} # 内存缓存
self.disk_cache_dir = None # 磁盘缓存目录
def get_cached_data(
self,
key: str,
cache_type: str = 'auto' # 'memory', 'disk', 'auto'
) -> Optional[pd.DataFrame]
def set_cache_data(
self,
key: str,
data: pd.DataFrame,
cache_type: str = 'auto'
)
```
## 📈 性能优化设计
### 1. 查询优化策略
#### 分区裁剪
```python
# 示例:查询特定股票在特定时间段的数据
query = {
'symbol': '000001',
'start_date': '2024-01-01',
'end_date': '2024-01-31',
'timeframe': '1min'
}
# 自动定位到最小分区
# /processed/consolidated/by_stock/000001/2024_1min.parquet
```
#### 列式读取
```python
# 只读取需要的列,减少IO
columns_to_read = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
df = pd.read_parquet(
filepath,
columns=columns_to_read
)
```
### 2. 存储优化
#### 数据压缩
```python
compression_options = {
'compression': 'snappy', # 快速压缩解压
'row_group_size': 50000, # 行组大小,影响读取性能
'use_dictionary': True # 使用字典编码,提高压缩率
}
```
#### 数据分区
```python
partition_scheme = {
'primary': 'year', # 主分区:年份
'secondary': 'month', # 二级分区:月份(可选)
'tertiary': 'day' # 三级分区:日期(可选)
}
```
### 3. 内存优化
#### 分批处理
```python
class BatchProcessor:
"""分批处理器"""
def process_in_batches(
self,
data_dir: str,
batch_size: int = 100000, # 每批记录数
process_function: Callable = None
) -> List[pd.DataFrame]
```
#### 内存映射
```python
class MemoryMapper:
"""内存映射工具"""
def create_memory_map(
self,
filepath: str,
columns: List[str] = None
) -> mmap
```
## 🚀 实施路线图
### 第一阶段:架构验证(1-2周)
**目标**: 验证架构可行性,开发基础工具链
1. **数据模型设计**: 定义分钟数据字段和格式
2. **存储结构验证**: 测试分区策略和存储格式
3. **基础工具开发**: 开发数据下载和基本处理工具
4. **小规模测试**: 采集100只股票1年数据验证
### 第二阶段:工具链完善(1-2周)
**目标**: 完善工具链,提升数据质量
1. **数据处理优化**: 开发数据清洗和质量检查工具
2. **性能测试**: 测试大数据量下的性能和稳定性
3. **质量保证**: 建立完整的数据质量保证体系
4. **文档完善**: 完善工具文档和使用说明
### 第三阶段:全量数据采集(2-4周)
**目标**: 完成全量分钟数据采集
1. **分批采集计划**: 制定详细的数据采集计划
2. **数据采集实施**: 按计划分批下载数据
3. **质量监控**: 实时监控数据采集质量和进度
4. **问题处理**: 及时处理采集过程中出现的问题
### 第四阶段:系统优化和交付(1-2周)
**目标**: 优化系统性能,完成最终交付
1. **性能调优**: 优化查询性能和存储效率
2. **质量验证**: 验证全量数据质量
3. **文档整理**: 整理完整的系统文档
4. **交付准备**: 准备最终交付物和报告
### 第五阶段:维护和更新(持续)
**目标**: 建立持续的数据更新和维护机制
1. **增量更新**: 每日/每周数据增量更新
2. **质量监控**: 持续监控数据质量
3. **系统维护**: 定期维护和优化系统
4. **用户支持**: 提供技术支持和问题解答
---
**赵云设计完成**: 分钟K线数据架构设计完成,技术方案可行,实施路线清晰。
**设计人**: 赵云(数据工程将军)
**设计时间**: 2026-03-26 12:15:00
**设计状态**: ✅ 架构设计完成,准备实施