# 数据下载到vn.py SQLite数据库完整方案 **调研人**:赵云(数据护军) **日期**:2026年3月21日 --- ## 一、当前状况分析 ### 1.1 当前数据源状况 | 数据源 | 状态 | 备注 | |--------|------|------| | **akshare** | ✅ 已完成 | 适配最新接口,批量下载+断点续传 | | **聚宽(jqdatasdk)** | ⏸️ 待适配 | 可复用已有架构 | | **Tushare Pro** | ⏸️ 待接入 | 需获取token | | **Wind** (如有权限) | ⏸️ 待调研 | 商业数据源 | ### 1.2 当前架构状况 - ✅ **数据获取层**:`akshare_fetcher.py`, `batch_downloader.py` - ✅ **缓存管理层**:`cache.py` (多级缓存,内存+磁盘) - ✅ **API兼容层**:`data_api.py` (100%聚宽API签名兼容) - ✅ **批量下载器**:`batch_downloader.py` (断点续传,失败重试) --- ## 二、vn.py SQLite数据库结构分析 ### 2.1 核心数据表 #### **DbBarData(K线数据表)** ```python class DbBarData(Model): """K线数据表""" id = AutoField() # 主键 symbol = CharField() # 股票代码 exchange = CharField() # 交易所 datetime = DateTimeField() # K线时间 interval = CharField() # 时间周期(1m/5m/1d等) volume = FloatField() # 成交量 turnover = FloatField() # 成交额 open_interest = FloatField() # 持仓量 open_price = FloatField() # 开盘价 high_price = FloatField() # 最高价 low_price = FloatField() // 最低价 close_price = FloatField() # 收盘价 __tablename__ = "dbbardata" ``` #### **DbTickData(TICK数据表)** ```python class DbTickData(Model): """TICK数据表""" id = AutoField() symbol = CharField() exchange = CharField() datetime = DateTimeField() # 基础字段 name = CharField() volume = FloatField() turnover = FloatField() open_interest = FloatField() last_price = FloatField() last_volume = FloatField() limit_up = FloatField() limit_down = FloatField() # 市场字段 open_price = FloatField() high_price = FloatField() low_price = FloatField() pre_close = FloatField() # 五档行情 bid_price_1: bid_price_2, ..., ask_price_5 bid_volume_1: bid_volume_2, ..., ask_volume_5 __tablename__ = "dbtickdata" ``` --- ## 三、完整架构设计 ### 3.1 整体架构图 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 三军数据源层 │ │ ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │ │ │ akshare │ │ 聚宽(jq) │ │ TusharePro │ │ │ │ 数据获取器 │ │ 数据获取器 │ │ 数据获取器 │ │ │ └─────────────┘ └─────────────┘ └───────────────┘ │ │ △ △ △ │ │ │ │ │ │ └─────────────────┼─┼─┼──────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 数据适配引擎层 (Data Adapter Engine) │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 多源数据适配器 │ │ │ │ 数据标准化 → 字段映射 → 格式转换 → vn.py格式 │ │ │ └─────────────────────────────────────────────────────────────┘ │ └─────────────────┬───────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 缓冲存储层 │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 批量处理器 │ │ │ │ 分批处理 → 异步写入 → 事务提交 → 错误重试 │ │ │ └─────────────────────────────────────────────────────────────┘ │ └─────────────────┬───────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ vn.py SQLite数据库 │ │ 实际存储: dbbardata / dbtickdata 表 │ │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## 四、实现方案 ### 4.1 分层架构 #### **Layer 1:数据源接入层(已完成90%)** - ✅ `akshare_fetcher.py` - akshare数据获取 - ⚙ `jq_fetcher.py` - 聚宽数据获取(复用架构) - ⚙ `tushare_fetcher.py` - TusharePro数据获取(待实现) #### **Layer 2:格式适配引擎** ```python class DataAdapterEngine: """数据格式适配引擎""" def adapt_akshare_to_vnpy(akshare_df, interval): """将akshare数据适配为vn.py格式""" pass def adapt_jq_to_vnpy(jq_df, interval): """将聚宽数据适配为vn.py格式""" pass ``` #### **Layer 3:批量入库引擎** ```python class BulkDatabaseWriter: """批量数据入库引擎""" def __init__(self, db_path): self.db_path = db_path self.connection = sqlite3.connect(db_path) def bulk_insert_bars(self, bars: List[Dict]): """批量插入K线数据""" pass def bulk_insert_ticks(self, ticks: List[Dict]): """批量插入TICK数据""" pass ``` --- ## 五、关键技术点 ### 5.1 数据格式映射 #### **K线数据映射表** | akshare列名 | vn.py SQLite表列名 | 处理方式 | |-------------|-------------------|----------| | `date` | `datetime` | 转换时间格式 | | `open` | `open_price` | 直接映射 | | `high` | `high_price` | 直接映射 | | `low` | `low_price` | 直接映射 | | `close` | `close_price` | 直接映射 | | `volume` | `volume` | 直接映射 | | `money` | `turnover` | 单位转换(元→万元)| ### 5.2 性能优化策略 1. **批量写入**:使用 `executemany` 代替逐条插入 2. **事务控制**:开启事务批量提交 3. **内存管理**:分批次处理大文件 4. **索引优化**:预先创建索引 5. **连接池**:复用数据库连接 --- ## 六、实施方案 ### 6.1 第一阶段:设计适配器(3月21-24日) **任务:** 1. 设计akshare → vn.py数据适配器 2. 测试数据格式转换 3. 验证写入性能 **代码示例:** ```python class AkshareToVnpyAdapter: def convert_bar(self, akshare_df, symbol, exchange, interval): return [DbBarData( symbol=symbol, exchange=exchange, datetime=row['date'], interval=interval, open_price=row['open'], high_price=row['high'], low_price=row['low'], close_price=row['close'], volume=row['volume'], turnover=row.get('money', 0) * 10000, # 万元转元 ) for _, row in akshare_df.iterrows()] ``` ### 6.2 第二阶段:批量入库(3月25-31日) **任务:** 1. 实现 `BulkDatabaseWriter` 2. 测试大批量数据写入 3. 性能优化 ### 6.3 第三阶段:数据验证(4月1-7日) **任务:** 1. 验证数据一致性 2. 检查数据完整性 3. 编写测试报告 --- ## 七、风险与对策 ### 7.1 数据格式不一致风险 **风险描述**:akshare、聚宽、Tushare数据格式差异 **应对策略**: 1. 统一中间格式 2. 多级校验机制 3. 格式转换链 ### 7.2 性能瓶颈风险 **风险描述**:大数据量写入性能问题 **应对策略**: 1. 分批处理 2. 异步写入 3. 索引优化 --- ## 八、预期成果 1. **数据适配器模块** - akshare → vn.py 数据适配器 - 聚宽 → vn.py 数据适配器 - TusharePro → vn.py 数据适配器 2. **批量入库引擎** 3. **测试验证套件** 4. **完整文档** --- ## 九、结论与行动建议 ### 9.1 核心结论 1. **架构可行性**:✅ 方案可行,技术路线清晰 2. **技术难度**:⏸️ 中等,需精细处理数据格式转换 3. **研发周期**:约 **18** 个工作日(至4月17日) ### 9.2 行动计划 | 阶段 | 时间 | 任务 | 成果 | |------|------|------|------| | **设计** | 3月21-24日 | 数据格式适配器开发 | 适配器模块 | | **实现** | 3月25-31日 | 批量写入引擎开发 | 写入引擎 | | **测试** | 4月1-7日 | 系统集成测试 | 测试报告 | | **部署** | 4月8-17日 | 生产环境部署 | 完成系统 | --- **末将赵云,随时准备开始实施!** > **调研完成时间**:2026年3月21日 21:50 > **调研方式**:深入源码分析 + 架构设计 🧮