Files
sanguo_quant_live/data-engineering/README.md
T
cfdaily 420813a690 feat(data-engineering): 完成akshare→vn.py数据适配器系统
- 实现核心数据适配器(akshare_vnpy_adapter.py)
  * 数据库初始化(vn.py DbBarData表)
  * 股票列表获取(全市场A股)
  * 单只/全市场K线数据下载
  * akshare→vn.py格式自动转换
  * 批量插入优化(executemany)
  * 数据完整性验证

- 实现批量下载器(batch_downloader.py)
  * 断点续传支持(JSON进度文件)
  * 失败重试机制
  * 进度实时保存
  * 测试模式支持

- 实现测试脚本(test_adapter.py)
  * 单元测试覆盖所有核心功能
  * 完整流程验证

- 完善文档
  * README.md - 完整使用文档
  * IMPLEMENTATION_REPORT.md - 实施详情报告
  * VALIDATION_REPORT.md - 验证报告
  * VALIDATION_REPORT_TEMPLATE.md - 验证报告模板

作者: 赵云(数据护军)
日期: 2026-03-24
2026-03-24 12:42:56 +08:00

330 lines
6.6 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# akshare → vn.py 数据适配器系统
## 项目概述
本项目实现了从 akshare 数据源获取A股历史数据,并批量写入 vn.py SQLite 数据库的完整解决方案。
**作者**: 赵云(数据护军)
**完成日期**: 2026-03-24
---
## 功能特性
### 1. 数据适配器 (`akshare_vnpy_adapter.py`)
- ✅ 自动初始化 vn.py 数据库表结构
- ✅ 获取全市场A股股票列表
- ✅ 下载单只/全市场历史K线数据
- ✅ 数据格式自动转换(akshare → vn.py
- ✅ 批量插入优化(使用 executemany
- ✅ 数据完整性验证
- ✅ 支持日期范围筛选
- ✅ 支持复权类型选择(不复权/前复权/后复权)
### 2. 批量下载器 (`batch_downloader.py`)
- ✅ 断点续传支持(保存进度到JSON文件)
- ✅ 失败重试机制
- ✅ 进度实时保存
- ✅ 统计信息跟踪
- ✅ 测试模式(可限制下载数量)
### 3. 测试脚本 (`test_adapter.py`)
- ✅ 单元测试
- ✅ 完整流程验证
- ✅ 数据完整性验证
---
## 数据库结构
### DbBarData 表(K线数据)
| 字段 | 类型 | 说明 |
|------|------|------|
| id | INTEGER | 主键(自增) |
| symbol | TEXT | 股票代码 |
| exchange | TEXT | 交易所(SH/SZ/BJ |
| datetime | TEXT | K线时间 |
| interval | TEXT | 周期(1d/1w/1m等) |
| open_price | REAL | 开盘价 |
| high_price | REAL | 最高价 |
| low_price | REAL | 最低价 |
| close_price | REAL | 收盘价 |
| volume | REAL | 成交量 |
| turnover | REAL | 成交额(元) |
| open_interest | REAL | 持仓量 |
### DbTickData 表(TICK数据)
包含完整五档行情数据(预留)
---
## 使用方法
### 1. 基本用法
```python
from akshare_vnpy_adapter import AkshareToVnpyAdapter
# 创建适配器
adapter = AkshareToVnpyAdapter('database.db')
try:
# 初始化数据库
adapter.initialize_database()
# 下载单只股票
inserted = adapter.download_and_insert_stock_daily(
code='600519', # 茅台
start_date='20240101',
end_date='20241231'
)
print(f"插入 {inserted} 条K线")
# 验证数据完整性
integrity = adapter.verify_data_integrity()
print(integrity)
finally:
adapter.close()
```
### 2. 批量下载全市场数据
```python
from batch_downloader import BatchDownloader
downloader = BatchDownloader(
db_path='database.db',
progress_file='download_progress.json'
)
try:
# 批量下载
stats = downloader.download(
start_date='20240101', # 开始日期
max_stocks=None, # None=全部,可设置如100测试
resume=True, # 断点续传
retry_failed=True # 重试失败的
)
# 验证数据
integrity = downloader.verify()
finally:
downloader.close()
```
### 3. 运行测试
```bash
# 运行单元测试
python3 test_adapter.py
# 运行完整下载(测试模式:50只股票)
python3 batch_downloader.py
# 修改配置后运行完整下载(全市场)
# 编辑 batch_downloader.py 中的 config
python3 batch_downloader.py
```
---
## 数据格式映射
### akshare → vn.py 字段映射
| akshare | vn.py | 说明 |
|---------|-------|------|
| date | datetime | 日期时间 |
| open | open_price | 开盘价 |
| high | high_price | 最高价 |
| low | low_price | 最低价 |
| close | close_price | 收盘价 |
| volume | volume | 成交量 |
| money | turnover | 成交额 |
| - | open_interest | 持仓量(默认0 |
### 交易所映射
| 股票代码前缀 | 交易所 |
|-------------|--------|
| 6xxxxx | SH(上交所) |
| 0xxxxx | SZ(深交所) |
| 3xxxxx | SZ(深交所) |
| 8xxxxx | BJ(北交所) |
---
## 性能优化
### 1. 批量写入
- 使用 `executemany` 代替逐条插入
- 默认批量大小:1000 条/批
### 2. 事务控制
- 每个批次在一个事务中完成
- 自动提交或回滚
### 3. 索引优化
- `(symbol, exchange, interval, datetime)` 联合索引
- `datetime` 单独索引
### 4. 连接管理
- 复用数据库连接
- 自动关闭
---
## 断点续传
进度保存在 `download_progress.json` 文件中:
```json
{
"last_code": "600519",
"completed": ["000001", "000002", "600000", ...],
"failed": ["600123", "600456", ...],
"start_time": "2026-03-24T12:00:00",
"stats": {
"total": 5000,
"success": 3000,
"failed": 5,
"total_bars": 1500000
}
}
```
---
## 数据完整性验证
验证结果示例:
```python
{
"total_bars": 150.5,
"total_stocks": 3000,
"min_date": "2024-01-01 09:30:00",
"max_date": "2026-03-23 15:00:00",
"low_count_samples": 0,
"has_duplicates": false,
"duplicates_count": 0,
"status": "OK"
}
```
---
## 配置文件
### batch_downloader.py 配置
```python
config = {
'db_path': '/path/to/database.db',
'progress_file': '/path/to/download_progress.json',
'start_date': '20240101', # 开始日期
'max_stocks': None, # None=全部,测试时可设置
'resume': True, # 断点续传
'retry_failed': True # 重试失败的
}
```
---
## 日志文件
- `akshare_vnpy_adapter.log` - 适配器日志
- `batch_downloader.log` - 批量下载日志
---
## 错误处理
### 1. 网络错误
自动重试(akshare内置重试机制)
### 2. 数据库错误
- 重复数据自动忽略(UNIQUE约束)
- 事务回滚保证一致性
### 3. 格式转换错误
- 记录错误日志
- 跳过错误数据,继续处理
---
## 已知限制
1. **网络依赖**: 需要稳定网络连接访问 akshare API
2. **数据频率**: akshare有访问频率限制,批量下载需要控制并发
3. **数据范围**: 历史数据可能有限(新股上市时间短)
4. **TICK数据**: 当前只实现了K线数据,TICK数据待扩展
---
## 下一步计划
1. ✅ akshare 数据适配器 - **已完成**
2. ⏸️ 聚宽(jqdatasdk)适配器 - 待开发
3. ⏸️ Tushare Pro 适配器 - 待开发
4. ⏸️ Wind 适配器 - 待调研
5. ⏸️ TICK数据支持 - 待扩展
6. ⏸️ 分钟K线支持 - 待扩展
---
## 性能指标(预期)
- **K线数据**: 5000只股票 × 500交易日 = 250万条
- **数据库大小**: 约 200-300 MB
- **下载时间**: 约 2-4 小时(网络依赖)
- **写入速度**: 约 5000-10000 条/秒
---
## 技术栈
- Python 3.8+
- akshare(数据源)
- SQLite(存储)
- pandas(数据处理)
- tqdm(进度条显示)
---
## 许可证
MIT License
---
## 贡献
欢迎提交 Issue 和 Pull Request
---
## 联系方式
作者:赵云(数据护军)
项目:三国之量化交易
仓库:sanguo_quant_live
---
*"数据为兵,策略为将,风控为帅" — 赵云*