auto-sync: 2026-05-02 11:04:04

This commit is contained in:
cfdaily
2026-05-02 11:04:04 +08:00
parent d7926313a9
commit 04c0f4891f
+311
View File
@@ -0,0 +1,311 @@
# 需求规格文档:本地数据源体系建设
**任务ID**: data-platform-20260502
**节点**: pangtong_requirements
**作者**: 庞统(副军师)
**日期**: 2026-05-02
---
## 一、项目背景与核心问题
### 1.1 现状
| 资产 | 状态 | 位置 |
|------|------|------|
| NAS日线Parquet | ✅ 2010-2026年全市场,按年分目录 | `/Volumes/stock/A股数据/日线数据/daily/{year}/sh{code}_daily.parquet` |
| NAS分钟线Parquet | ⚠️ 仅84只15分钟线 | `/Volumes/stock/minute_kline/15min/sz{code}_15min.parquet` |
| vnpy quant_trading.db | ❌ **空库(8KB0张表)** | `/Volumes/stock/sanguo_vnpy/data/quant_trading.db` |
| 回测服务 | ✅ 运行中(http://192.168.2.154:8088 | Docker容器 |
| 本地数据适配器 | ⚠️ 已有但路径硬编码Mac本地 | `vnpy_local_data_adapter.py`(指向`/Users/chufeng/nas/stock/...` |
### 1.2 核心问题
**vnpy回测服务的数据库是空的**,回测引擎 `engine.load_data()` 从数据库读取数据 → 无数据 → 所有回测任务必然失败。
回测服务executor.py关键代码(L171-175):
```python
engine.load_data() # 从vnpy SQLite数据库加载
```
如果没有数据,直接抛出 `ValueError("无法加载历史数据")`
### 1.3 目标
打通 **NAS Parquet → vnpy SQLite DB → 回测引擎** 的数据通路,让回测服务可以正常执行回测任务。
---
## 二、功能需求
### P1:打通vnpy数据通路
#### P1-1:确认Docker volume映射路径
| 项 | 说明 |
|-----|------|
| 需求 | 确认Mac写入的文件,Docker容器内能读到 |
| 输入 | NAS目录结构、Docker容器配置 |
| 输出 | 明确的映射关系文档:Mac路径 ↔ 容器内路径 |
| 验证 | 在Mac写入测试文件,容器内能读到;反之亦然 |
**关键证据**
- 回测服务配置 `base_dir = "/app/backtest_jobs"`
- 数据目录 `data_dir = settings.base_dir.replace("backtest_jobs", "data")``/app/data`
- quant_trading.db 位于 `/Volumes/stock/sanguo_vnpy/data/`
- 需确认Docker容器启动时是否挂载了 `/Volumes/stock/sanguo_vnpy/data``/app/data`
#### P1-2:编写vnpy DB导入脚本
| 项 | 说明 |
|-----|------|
| 需求 | 将NAS日线Parquet数据批量导入vnpy SQLite数据库 |
| 输入 | `/Volumes/stock/A股数据/日线数据/daily/{year}/sh{code}_daily.parquet` |
| 输出 | quant_trading.db 中有完整的日线bar数据 |
| 验证 | 回测引擎 `load_data()` 能读出数据 |
| 约束 | 幂等操作(INSERT OR REPLACE),可重复执行 |
**vnpy DB Schema要求**(待姜维确认):
- vnpy 4.x的BacktestingEngine通过 `MainEngine` + `BaseDataManager` 加载数据
- 数据表名和字段名由vnpy内部定义
- 必须先搞清楚vnpy 4.x期望的数据库结构,再写导入脚本
**Parquet字段**
```
date, open, high, low, close, volume, amount, outstanding_share, turnover, year
```
**导入脚本功能要求**
1. 扫描 `/Volumes/stock/A股数据/日线数据/daily/` 下所有年份目录
2. 每个Parquet文件解析股票代码(从文件名提取,如 `sh600000``600000.SSE`
3. 转换为vnpy DB格式并批量写入
4. 支持增量导入(只导入新增数据)
5. 支持断点续传(中断后可继续)
6. 记录导入日志(成功/失败数、耗时)
#### P1-3:全量导入日线
| 项 | 说明 |
|-----|------|
| 需求 | 运行导入脚本,将全市场2010-2026年日线数据全部导入 |
| 输入 | P1-2的导入脚本 + NAS日线Parquet |
| 输出 | quant_trading.db 填满日线数据 |
| 验证 | 统计导入记录数,抽查几只股票确认数据完整 |
| 风险 | 导入耗时长(预估2-4小时),需支持断点续传 |
#### P1-4:验证回测服务可用
| 项 | 说明 |
|-----|------|
| 需求 | 提交一个简单回测任务,确认回测引擎能加载数据并完成回测 |
| 输入 | 回测服务API + 简单策略代码 |
| 输出 | 回测成功返回统计结果 |
| 验证 | total_trades > 0 或 total_days > 0 |
---
### P2:数据基础设施
#### P2-1:多源降级管理器 `fallback.py`
| 项 | 说明 |
|-----|------|
| 需求 | 统一数据获取入口,支持多数据源顺序降级 |
| 降级链(日线) | akshare `stock_zh_a_hist` → 腾讯K线API |
| 降级链(实时) | 新浪实时 → 东方财富 → 腾讯 |
| 接口 | `get_daily(symbol, start, end)` / `get_realtime(symbol)` |
| 行为 | 第一个源失败自动切下一个,记录使用的源 |
| 产出 | ~150行 |
#### P2-2:数据校验层 `validator.py`
| 项 | 说明 |
|-----|------|
| 需求 | 入库前校验数据质量,fatal级拒绝入库 |
| V1规则(7条fatal | D1: close/open/high/low > 0D2: OHLC一致性(high≥max(open,close), low≤min(open,close))D3: volume ≥ 0D6: 同股同日不重复;D7: date ≤ 今天;R1: 实时价格 > 0R7: 必须携带source+fetched_at |
| 接口 | `validate(df) → (passed: bool, errors: List[str])` |
| 产出 | ~150行 |
#### P2-3:实时行情三源降级 `realtime.py`
| 项 | 说明 |
|-----|------|
| 需求 | 获取实时行情,支持3个源降级 |
| 降级链 | 新浪实时 → 东方财富 → 腾讯 |
| 接口 | `get_realtime_quote(symbol) → dict` |
| 产出 | ~200行 |
#### P2-4:增量更新 `updater.py`
| 项 | 说明 |
|-----|------|
| 需求 | 每日增量更新,Parquet+vnpy DB双写 |
| 流程 | 1.获取最新日期 2.拉取增量数据 3.校验 4.写Parquet(原子:临时文件+rename 5.写vnpy DBINSERT OR REPLACE幂等) 6.一致性校验 |
| 约束 | Parquet是真相源;vnpy DB失败不影响Parquet |
| 接口 | `update_daily() → UpdateResult` |
| 产出 | ~150行 |
#### P2-5cron定时任务
| 项 | 说明 |
|-----|------|
| 需求 | 每交易日15:30自动执行增量更新 |
| 配置 | Mac crontabMac已确认永不休眠) |
| 验证 | 下一个交易日检查是否自动执行 |
---
### P3:分钟线数据
#### P3-1P0限频验证
| 项 | 说明 |
|-----|------|
| 需求 | 验证腾讯API限频阈值 |
| 测试1 | 100只股票15分钟线连续下载,是否成功 |
| 测试2 | 连续1小时请求,记录每分钟成功次数、封禁恢复时间 |
| 输出 | 限频验证报告(每分钟最大请求数、封禁时长、恢复策略) |
| 决策 | 报告决定P3-2/P3-3的实现策略(分批间隔、每批数量) |
#### P3-2/P3-3:分钟线全量下载
| 项 | 说明 |
|-----|------|
| 需求 | 下载HS300/全市场15分钟线 |
| 前置 | P3-1限频验证通过 |
| 数据源 | 腾讯mkline API(唯一可用源,akshare分钟线已失效) |
| 存储路径 | `/Volumes/stock/minute_kline/15min/` |
| 约束 | 15分钟线优先,1分钟线暂缓 |
#### P3-4:分钟线导入vnpy DB
| 项 | 说明 |
|-----|------|
| 需求 | 将分钟线Parquet导入vnpy DB |
| 前置 | P1-2已确认vnpy DB Schema + 分钟线Parquet已下载 |
| 不确定项 | vnpy 4.x如何区分不同周期(15min vs 1min)的分钟线 |
---
### P4:配套skill与自动化
#### P4-1/P4-2:更新skill文档
更新 `data-acquisition``quant-backtest` SKILL.md,补充vnpy数据通路说明。
#### P4-3:全量校验脚本
关羽设计的V2规则(14条),用于定期全量扫描。
#### P4-4:周维护cron
每周校验Parquet与vnpy DB一致性。
---
## 三、交付物清单
### 代码文件(放到 `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/`
| 文件 | 功能 | 阶段 | 预估行数 |
|------|------|------|---------|
| `import_vnpy.py` | Parquet → vnpy DB 导入 | P1 | ~200 |
| `fallback.py` | 多源降级管理器 | P2 | ~150 |
| `validator.py` | 数据校验(V1 7条fatal | P2 | ~150 |
| `realtime.py` | 实时行情三源降级 | P2 | ~200 |
| `updater.py` | 增量更新(双写) | P2 | ~150 |
| `validate_full.py` | 全量校验(V2 14条) | P4 | ~100 |
### 文档文件
| 文件 | 内容 | 位置 |
|------|------|------|
| 需求规格文档 | 本文档 | `docs/data-platform/01-requirements.md` |
| 设计方案文档 | 接口设计、数据流、Schema映射 | `docs/data-platform/02-design.md` |
| 验证报告 | 限频验证、导入验证、回测验证 | `docs/data-platform/reports/` |
### 配置文件
| 文件 | 内容 |
|------|------|
| crontab配置 | 每日15:30增量更新 |
| vnpy DB路径映射 | Mac ↔ Docker |
---
## 四、假设与不确定项
| # | 假设/不确定项 | 影响范围 | 验证人 | 验证时机 |
|---|-------------|---------|--------|---------|
| 1 | **Docker volume映射**Mac写入NAS的文件Docker容器能读到 | P1全部 | 姜维 | P1开始前 |
| 2 | **vnpy 4.x DB Schema**:回测引擎load_data()期望的表结构和字段 | P1-2, P3-4 | 姜维 | P1开始前 |
| 3 | **vnpy分钟线周期区分**:vnpy如何存储/区分不同粒度分钟线 | P3-4 | 姜维 | P3开始前 |
| 4 | **腾讯API限频**:连续请求的频率上限和封禁恢复时间 | P3全部 | 赵云 | P3开始前 |
| 5 | **全量导入耗时**:5500只×17年数据的导入时间 | P1-3 | 张飞/赵云 | P1-3执行时 |
| 6 | **SQLite并发**:cron写入+回测读取是否冲突 | P2-5 | 姜维 | P2-5配置时 |
| 7 | NAS存储空间充足(1.5TB可用,只需28GB) | 全局 | 已确认 | - |
| 8 | Mac永不休眠(cron可靠执行) | P2-5 | 已确认 | - |
| 9 | 不引新依赖(只用akshare+urllib+已有库) | 全局 | 约束 | - |
**关键阻塞项**#1和#2如果不明确,P1无法开始。**建议姜维先验证这两项。**
---
## 五、约束
1. 所有产出放到 `~/.openclaw/sanguo_projects/sanguo_vnpy/` 目录下
2. 不引新依赖(只用akshare + urllib + 已有的库)
3. 不改Docker/NAS配置,数据通过volume映射
4. Parquet是唯一真相源,vnpy DB是可重建的派生缓存
5. 双写顺序:先Parquet(原子写入)→ 再vnpy DB(幂等写入)
6. 腾讯API是唯一可用的分钟线源
7. 15分钟线优先,1分钟线暂缓
8. 不确定项遇到阻塞时,用最大尝试轮数限制,不无限重试
9. 每个阶段先输出需求和设计方案,经评审再编码
---
## 六、成功标准
| # | 标准 | 验证方法 |
|---|------|---------|
| 1 | vnpy DB有全市场日线数据 | `SELECT count(*) FROM ...` > 0 |
| 2 | 回测服务能完成一次完整回测 | 提交回测任务返回成功 |
| 3 | 增量更新可自动执行 | crontab触发后日志显示成功 |
| 4 | 数据校验拦截bad data | 构造异常数据,校验返回fatal |
| 5 | 多源降级正常工作 | 关掉主源,自动切到备用源 |
| 6 | 分钟线P0验证有结论 | 限频报告有明确数字 |
---
## 七、数据流架构
```
Layer 1: 远程数据源
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ akshare │ │ 新浪实时 │ │ 腾讯API │
│ (日线主源) │ │ (实时主源) │ │ (分钟线唯一源)│
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└────────┬────────┴────────┬────────┘
│ fallback.py │
│ 降级管理 │
▼ │
Layer 2: 校验层 │ │
validator.py │
(7条fatal规则) │
│ │
▼ ▼
Layer 3: NAS持久层 (唯一真相源)
/Volumes/stock/A股数据/日线数据/daily/{year}/{code}_daily.parquet
/Volumes/stock/minute_kline/15min/{code}_15min.parquet
│ import_vnpy.py / updater.py
Layer 4: vnpy SQLite DB (派生缓存)
/Volumes/stock/sanguo_vnpy/data/quant_trading.db
│ engine.load_data()
Layer 5: 回测引擎
BacktestingEngine → 回测结果
```