11 KiB
11 KiB
数据下载到vn.py SQLite数据库完整方案
调研人:赵云(数据护军) 日期:2026年3月21日
一、当前状况分析
1.1 当前数据源状况
| 数据源 | 状态 | 备注 |
|---|---|---|
| akshare | ✅ 已完成 | 适配最新接口,批量下载+断点续传 |
| 聚宽(jqdatasdk) | ⏸️ 待适配 | 可复用已有架构 |
| Tushare Pro | ⏸️ 待接入 | 需获取token |
| Wind (如有权限) | ⏸️ 待调研 | 商业数据源 |
1.2 当前架构状况
- ✅ 数据获取层:
akshare_fetcher.py,batch_downloader.py - ✅ 缓存管理层:
cache.py(多级缓存,内存+磁盘) - ✅ API兼容层:
data_api.py(100%聚宽API签名兼容) - ✅ 批量下载器:
batch_downloader.py(断点续传,失败重试)
二、vn.py SQLite数据库结构分析
2.1 核心数据表
DbBarData(K线数据表)
class DbBarData(Model):
"""K线数据表"""
id = AutoField() # 主键
symbol = CharField() # 股票代码
exchange = CharField() # 交易所
datetime = DateTimeField() # K线时间
interval = CharField() # 时间周期(1m/5m/1d等)
volume = FloatField() # 成交量
turnover = FloatField() # 成交额
open_interest = FloatField() # 持仓量
open_price = FloatField() # 开盘价
high_price = FloatField() # 最高价
low_price = FloatField() // 最低价
close_price = FloatField() # 收盘价
__tablename__ = "dbbardata"
DbTickData(TICK数据表)
class DbTickData(Model):
"""TICK数据表"""
id = AutoField()
symbol = CharField()
exchange = CharField()
datetime = DateTimeField()
# 基础字段
name = CharField()
volume = FloatField()
turnover = FloatField()
open_interest = FloatField()
last_price = FloatField()
last_volume = FloatField()
limit_up = FloatField()
limit_down = FloatField()
# 市场字段
open_price = FloatField()
high_price = FloatField()
low_price = FloatField()
pre_close = FloatField()
# 五档行情
bid_price_1: bid_price_2, ..., ask_price_5
bid_volume_1: bid_volume_2, ..., ask_volume_5
__tablename__ = "dbtickdata"
三、完整架构设计
3.1 整体架构图
┌─────────────────────────────────────────────────────────────────┐
│ 三军数据源层 │
│ ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │
│ │ akshare │ │ 聚宽(jq) │ │ TusharePro │ │
│ │ 数据获取器 │ │ 数据获取器 │ │ 数据获取器 │ │
│ └─────────────┘ └─────────────┘ └───────────────┘ │
│ △ △ △ │
│ │ │ │ │
└─────────────────┼─┼─┼──────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ 数据适配引擎层 (Data Adapter Engine) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 多源数据适配器 │ │
│ │ 数据标准化 → 字段映射 → 格式转换 → vn.py格式 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────┬───────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 缓冲存储层 │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 批量处理器 │ │
│ │ 分批处理 → 异步写入 → 事务提交 → 错误重试 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────┬───────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ vn.py SQLite数据库 │
│ 实际存储: dbbardata / dbtickdata 表 │ │
└─────────────────────────────────────────────────────────────────┘
四、实现方案
4.1 分层架构
Layer 1:数据源接入层(已完成90%)
- ✅
akshare_fetcher.py- akshare数据获取 - ⚙
jq_fetcher.py- 聚宽数据获取(复用架构) - ⚙
tushare_fetcher.py- TusharePro数据获取(待实现)
Layer 2:格式适配引擎
class DataAdapterEngine:
"""数据格式适配引擎"""
def adapt_akshare_to_vnpy(akshare_df, interval):
"""将akshare数据适配为vn.py格式"""
pass
def adapt_jq_to_vnpy(jq_df, interval):
"""将聚宽数据适配为vn.py格式"""
pass
Layer 3:批量入库引擎
class BulkDatabaseWriter:
"""批量数据入库引擎"""
def __init__(self, db_path):
self.db_path = db_path
self.connection = sqlite3.connect(db_path)
def bulk_insert_bars(self, bars: List[Dict]):
"""批量插入K线数据"""
pass
def bulk_insert_ticks(self, ticks: List[Dict]):
"""批量插入TICK数据"""
pass
五、关键技术点
5.1 数据格式映射
K线数据映射表
| akshare列名 | vn.py SQLite表列名 | 处理方式 |
|---|---|---|
date |
datetime |
转换时间格式 |
open |
open_price |
直接映射 |
high |
high_price |
直接映射 |
low |
low_price |
直接映射 |
close |
close_price |
直接映射 |
volume |
volume |
直接映射 |
money |
turnover |
单位转换(元→万元) |
5.2 性能优化策略
- 批量写入:使用
executemany代替逐条插入 - 事务控制:开启事务批量提交
- 内存管理:分批次处理大文件
- 索引优化:预先创建索引
- 连接池:复用数据库连接
六、实施方案
6.1 第一阶段:设计适配器(3月21-24日)
任务:
- 设计akshare → vn.py数据适配器
- 测试数据格式转换
- 验证写入性能
代码示例:
class AkshareToVnpyAdapter:
def convert_bar(self, akshare_df, symbol, exchange, interval):
return [DbBarData(
symbol=symbol,
exchange=exchange,
datetime=row['date'],
interval=interval,
open_price=row['open'],
high_price=row['high'],
low_price=row['low'],
close_price=row['close'],
volume=row['volume'],
turnover=row.get('money', 0) * 10000, # 万元转元
) for _, row in akshare_df.iterrows()]
6.2 第二阶段:批量入库(3月25-31日)
任务:
- 实现
BulkDatabaseWriter - 测试大批量数据写入
- 性能优化
6.3 第三阶段:数据验证(4月1-7日)
任务:
- 验证数据一致性
- 检查数据完整性
- 编写测试报告
七、风险与对策
7.1 数据格式不一致风险
风险描述:akshare、聚宽、Tushare数据格式差异
应对策略:
- 统一中间格式
- 多级校验机制
- 格式转换链
7.2 性能瓶颈风险
风险描述:大数据量写入性能问题
应对策略:
- 分批处理
- 异步写入
- 索引优化
八、预期成果
-
数据适配器模块
- akshare → vn.py 数据适配器
- 聚宽 → vn.py 数据适配器
- TusharePro → vn.py 数据适配器
-
批量入库引擎
-
测试验证套件
-
完整文档
九、结论与行动建议
9.1 核心结论
-
架构可行性:✅ 方案可行,技术路线清晰
-
技术难度:⏸️ 中等,需精细处理数据格式转换
-
研发周期:约 18 个工作日(至4月17日)
9.2 行动计划
| 阶段 | 时间 | 任务 | 成果 |
|---|---|---|---|
| 设计 | 3月21-24日 | 数据格式适配器开发 | 适配器模块 |
| 实现 | 3月25-31日 | 批量写入引擎开发 | 写入引擎 |
| 测试 | 4月1-7日 | 系统集成测试 | 测试报告 |
| 部署 | 4月8-17日 | 生产环境部署 | 完成系统 |
末将赵云,随时准备开始实施!
调研完成时间:2026年3月21日 21:50
调研方式:深入源码分析 + 架构设计
🧮