From 2f547752a76d7a6d0d70a21e55331ac9ad026399 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Thu, 26 Mar 2026 11:54:33 +0800 Subject: [PATCH] auto-sync: 2026-03-26 11:54:33 --- .../ARCHITECTURE_DESIGN.md | 430 ++++++++++++++++++ 1 file changed, 430 insertions(+) create mode 100644 zhaoyun-data/research/task-20240326-minute-kline-assessment/ARCHITECTURE_DESIGN.md diff --git a/zhaoyun-data/research/task-20240326-minute-kline-assessment/ARCHITECTURE_DESIGN.md b/zhaoyun-data/research/task-20240326-minute-kline-assessment/ARCHITECTURE_DESIGN.md new file mode 100644 index 000000000..03e8393f5 --- /dev/null +++ b/zhaoyun-data/research/task-20240326-minute-kline-assessment/ARCHITECTURE_DESIGN.md @@ -0,0 +1,430 @@ +# 🏛️ A股分钟K线数据架构设计 + +## 📋 设计概览 + +### 设计目标 +1. **高效存储**:支持2亿+条记录的存储和查询 +2. **快速访问**:支持分钟级数据的快速检索和聚合 +3. **质量保证**:确保分钟数据的完整性和准确性 +4. **扩展性**:支持未来数据增长和功能扩展 + +### 设计原则 +1. **分区存储**:按时间和股票分区,提高查询效率 +2. **列式存储**:使用Parquet格式,支持列式查询优化 +3. **数据压缩**:使用高效的压缩算法减少存储空间 +4. **索引优化**:建立多级索引支持快速数据定位 + +## 📁 存储架构设计 + +### 总体结构 +``` +minute_kline/ +├── raw/ # 原始数据(保持数据源格式) +│ ├── 1minute/ # 1分钟K线原始数据 +│ │ ├── 2021/ # 按年分区 +│ │ │ ├── 000001_2021_1min.parquet +│ │ │ ├── 000002_2021_1min.parquet +│ │ │ └── ... +│ │ ├── 2022/ +│ │ ├── 2023/ +│ │ ├── 2024/ +│ │ └── 2025/ +│ ├── 5minute/ # 5分钟K线原始数据 +│ └── 15minute/ # 15分钟K线原始数据 +├── processed/ # 处理后数据(标准化格式) +│ ├── consolidated/ # 合并后的分钟数据 +│ │ ├── by_stock/ # 按股票组织(便于分析单只股票) +│ │ │ ├── 000001/ +│ │ │ │ ├── 2021_1min.parquet +│ │ │ │ ├── 2021_5min.parquet +│ │ │ │ └── 2021_15min.parquet +│ │ │ └── ... +│ │ └── by_date/ # 按日期组织(便于分析全市场) +│ │ ├── 20210104/ # 交易日 +│ │ │ ├── 1min.parquet # 当日所有股票1分钟数据 +│ │ │ ├── 5min.parquet +│ │ │ └── 15min.parquet +│ │ └── ... +│ ├── indicators/ # 分钟级技术指标 +│ │ ├── volatility/ # 波动率指标(ATR,布林带等) +│ │ ├── momentum/ # 动量指标(RSI,MACD等) +│ │ └── volume/ # 成交量指标(量比,资金流向等) +│ └── quality/ # 数据质量分析数据 +└── running_data/ # 运行数据 + ├── download_logs/ # 下载日志和状态 + ├── update_tracker/ # 数据更新跟踪 + ├── config/ # 配置文件 + └── stats/ # 统计数据 +``` + +### 数据分区策略 + +#### 时间分区 +``` +- 按年分区:/year/2021/, /year/2022/, ... +- 按月分区:/year/2021/month/01/, ...(可选,用于大数据量) +- 按日分区:/year/2021/month/01/day/04/(用于按日聚合查询) +``` + +#### 股票分区 +``` +- 按股票代码:/stock/000001/, /stock/000002/, ... +- 按交易所:/exchange/SH/, /exchange/SZ/ +- 按市值分组:/cap/large/, /cap/medium/, /cap/small/ +``` + +### 存储格式选择 + +#### 主要格式:Parquet +- **优势**: + - 列式存储,查询效率高 + - 支持高效压缩 + - 兼容多种大数据处理框架 + - 支持复杂数据类型 + +- **压缩算法**:Snappy + - 压缩率适中,速度快 + - 适合频繁读写场景 + +#### 辅助格式:HDF5(可选) +- **优势**: + - 支持高效随机访问 + - 数据结构灵活 +- **适用场景**: + - 高频数据临时处理 + - 内存映射访问 + +### 数据组织模式 + +#### 模式1:按股票组织(适用于股票分析) +``` +stock_000001/ +├── 2021/ +│ ├── data_1min.parquet # 1分钟数据 +│ ├── data_5min.parquet # 5分钟数据 +│ └── data_15min.parquet # 15分钟数据 +├── 2022/ +└── ... +``` + +#### 模式2:按日期组织(适用于市场分析) +``` +date_20210104/ +├── data_1min.parquet # 当日所有股票1分钟数据 +├── data_5min.parquet +├── data_15min.parquet +├── summary.json # 当日市场摘要 +└── indicators/ # 当日技术指标 +``` + +## 🛠️ 工具链设计 + +### 1. 数据下载工具 + +#### 核心功能 +```python +class MinuteKlineDownloader: + """分钟K线数据下载器""" + + def download_stock_minute_data( + self, + symbol: str, + timeframe: str, # '1min', '5min', '15min' + start_date: str, + end_date: str + ) -> pd.DataFrame + + def download_batch_stocks( + self, + symbols: List[str], + timeframe: str, + start_date: str, + end_date: str, + batch_size: int = 50, + max_workers: int = 5 + ) -> Dict[str, pd.DataFrame] + + def download_market_data( + self, + timeframe: str, + start_date: str, + end_date: str + ) -> Dict[str, pd.DataFrame] +``` + +#### 下载策略 +```python +class DownloadStrategy: + """下载策略配置""" + + def __init__(self): + self.timeframes = ['15min', '5min', '1min'] # 优先下载低频率 + self.batch_size = 100 # 每批次股票数 + self.max_workers = 8 # 最大并发数 + self.request_delay = 0.2 # 请求延迟秒数 + self.retry_attempts = 3 # 重试次数 + self.fallback_sources = ['akshare', 'tushare', 'jqdata'] # 备用数据源 +``` + +### 2. 数据处理工具 + +#### 数据清洗 +```python +class MinuteDataProcessor: + """分钟数据处理工具""" + + def clean_minute_data( + self, + df: pd.DataFrame, + timeframe: str + ) -> pd.DataFrame + + def validate_data_quality( + self, + df: pd.DataFrame + ) -> Dict[str, bool] + + def fill_missing_data( + self, + df: pd.DataFrame, + timeframe: str + ) -> pd.DataFrame +``` + +#### 数据合并 +```python +class DataConsolidator: + """数据合并工具""" + + def consolidate_by_stock( + self, + symbol: str, + start_year: int, + end_year: int + ) -> pd.DataFrame + + def consolidate_by_date( + self, + date: str, + timeframe: str + ) -> pd.DataFrame + + def create_data_index( + self, + index_type: str # 'stock', 'date', 'timeframe' + ) -> Dict +``` + +### 3. 质量检查工具 + +#### 完整性检查 +```python +class CompletenessChecker: + """数据完整性检查""" + + def check_time_coverage( + self, + df: pd.DataFrame, + timeframe: str + ) -> Dict[str, float] + + def check_missing_periods( + self, + df: pd.DataFrame + ) -> List[Tuple[str, str]] + + def generate_completeness_report( + self, + data_dir: str + ) -> Dict +``` + +#### 准确性验证 +```python +class AccuracyValidator: + """数据准确性验证""" + + def validate_price_logic( + self, + df: pd.DataFrame + ) -> List[Dict] + + def validate_volume_consistency( + self, + df: pd.DataFrame + ) -> Dict + + def check_data_anomalies( + self, + df: pd.DataFrame + ) -> List[Dict] +``` + +### 4. 查询优化工具 + +#### 数据索引 +```python +class MinuteDataIndexer: + """分钟数据索引管理""" + + def create_time_index( + self, + data_dir: str + ) -> Dict + + def create_stock_index( + self, + data_dir: str + ) -> Dict + + def create_combined_index( + self, + stock_list: List[str], + date_range: Tuple[str, str] + ) -> Dict +``` + +#### 缓存机制 +```python +class DataCacheManager: + """数据缓存管理""" + + def __init__(self): + self.memory_cache = {} # 内存缓存 + self.disk_cache_dir = None # 磁盘缓存目录 + + def get_cached_data( + self, + key: str, + cache_type: str = 'auto' # 'memory', 'disk', 'auto' + ) -> Optional[pd.DataFrame] + + def set_cache_data( + self, + key: str, + data: pd.DataFrame, + cache_type: str = 'auto' + ) +``` + +## 📈 性能优化设计 + +### 1. 查询优化策略 + +#### 分区裁剪 +```python +# 示例:查询特定股票在特定时间段的数据 +query = { + 'symbol': '000001', + 'start_date': '2024-01-01', + 'end_date': '2024-01-31', + 'timeframe': '1min' +} + +# 自动定位到最小分区 +# /processed/consolidated/by_stock/000001/2024_1min.parquet +``` + +#### 列式读取 +```python +# 只读取需要的列,减少IO +columns_to_read = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] + +df = pd.read_parquet( + filepath, + columns=columns_to_read +) +``` + +### 2. 存储优化 + +#### 数据压缩 +```python +compression_options = { + 'compression': 'snappy', # 快速压缩解压 + 'row_group_size': 50000, # 行组大小,影响读取性能 + 'use_dictionary': True # 使用字典编码,提高压缩率 +} +``` + +#### 数据分区 +```python +partition_scheme = { + 'primary': 'year', # 主分区:年份 + 'secondary': 'month', # 二级分区:月份(可选) + 'tertiary': 'day' # 三级分区:日期(可选) +} +``` + +### 3. 内存优化 + +#### 分批处理 +```python +class BatchProcessor: + """分批处理器""" + + def process_in_batches( + self, + data_dir: str, + batch_size: int = 100000, # 每批记录数 + process_function: Callable = None + ) -> List[pd.DataFrame] +``` + +#### 内存映射 +```python +class MemoryMapper: + """内存映射工具""" + + def create_memory_map( + self, + filepath: str, + columns: List[str] = None + ) -> mmap +``` + +## 🚀 实施路线图 + +### 第一阶段:架构验证(1-2周) +**目标**: 验证架构可行性,开发基础工具链 +1. **数据模型设计**: 定义分钟数据字段和格式 +2. **存储结构验证**: 测试分区策略和存储格式 +3. **基础工具开发**: 开发数据下载和基本处理工具 +4. **小规模测试**: 采集100只股票1年数据验证 + +### 第二阶段:工具链完善(1-2周) +**目标**: 完善工具链,提升数据质量 +1. **数据处理优化**: 开发数据清洗和质量检查工具 +2. **性能测试**: 测试大数据量下的性能和稳定性 +3. **质量保证**: 建立完整的数据质量保证体系 +4. **文档完善**: 完善工具文档和使用说明 + +### 第三阶段:全量数据采集(2-4周) +**目标**: 完成全量分钟数据采集 +1. **分批采集计划**: 制定详细的数据采集计划 +2. **数据采集实施**: 按计划分批下载数据 +3. **质量监控**: 实时监控数据采集质量和进度 +4. **问题处理**: 及时处理采集过程中出现的问题 + +### 第四阶段:系统优化和交付(1-2周) +**目标**: 优化系统性能,完成最终交付 +1. **性能调优**: 优化查询性能和存储效率 +2. **质量验证**: 验证全量数据质量 +3. **文档整理**: 整理完整的系统文档 +4. **交付准备**: 准备最终交付物和报告 + + +### 第五阶段:维护和更新(持续) +**目标**: 建立持续的数据更新和维护机制 +1. **增量更新**: 每日/每周数据增量更新 +2. **质量监控**: 持续监控数据质量 +3. **系统维护**: 定期维护和优化系统 +4. **用户支持**: 提供技术支持和问题解答 + +--- + +**赵云设计完成**: 分钟K线数据架构设计完成,技术方案可行,实施路线清晰。 + +**设计人**: 赵云(数据工程将军) +**设计时间**: 2026-03-26 12:15:00 +**设计状态**: ✅ 架构设计完成,准备实施 \ No newline at end of file