Files
sanguo_vnpy/docs/data-platform/01-requirements.md
T
2026-05-02 11:04:04 +08:00

12 KiB
Raw Blame History

需求规格文档:本地数据源体系建设

任务ID: data-platform-20260502 节点: pangtong_requirements 作者: 庞统(副军师) 日期: 2026-05-02


一、项目背景与核心问题

1.1 现状

资产 状态 位置
NAS日线Parquet 2010-2026年全市场,按年分目录 /Volumes/stock/A股数据/日线数据/daily/{year}/sh{code}_daily.parquet
NAS分钟线Parquet ⚠️ 仅84只15分钟线 /Volumes/stock/minute_kline/15min/sz{code}_15min.parquet
vnpy quant_trading.db 空库(8KB0张表) /Volumes/stock/sanguo_vnpy/data/quant_trading.db
回测服务 运行中(http://192.168.2.154:8088 Docker容器
本地数据适配器 ⚠️ 已有但路径硬编码Mac本地 vnpy_local_data_adapter.py(指向/Users/chufeng/nas/stock/...

1.2 核心问题

vnpy回测服务的数据库是空的,回测引擎 engine.load_data() 从数据库读取数据 → 无数据 → 所有回测任务必然失败。

回测服务executor.py关键代码(L171-175):

engine.load_data()  # 从vnpy SQLite数据库加载

如果没有数据,直接抛出 ValueError("无法加载历史数据")

1.3 目标

打通 NAS Parquet → vnpy SQLite DB → 回测引擎 的数据通路,让回测服务可以正常执行回测任务。


二、功能需求

P1:打通vnpy数据通路

P1-1:确认Docker volume映射路径

说明
需求 确认Mac写入的文件,Docker容器内能读到
输入 NAS目录结构、Docker容器配置
输出 明确的映射关系文档:Mac路径 ↔ 容器内路径
验证 在Mac写入测试文件,容器内能读到;反之亦然

关键证据

  • 回测服务配置 base_dir = "/app/backtest_jobs"
  • 数据目录 data_dir = settings.base_dir.replace("backtest_jobs", "data")/app/data
  • quant_trading.db 位于 /Volumes/stock/sanguo_vnpy/data/
  • 需确认Docker容器启动时是否挂载了 /Volumes/stock/sanguo_vnpy/data/app/data

P1-2:编写vnpy DB导入脚本

说明
需求 将NAS日线Parquet数据批量导入vnpy SQLite数据库
输入 /Volumes/stock/A股数据/日线数据/daily/{year}/sh{code}_daily.parquet
输出 quant_trading.db 中有完整的日线bar数据
验证 回测引擎 load_data() 能读出数据
约束 幂等操作(INSERT OR REPLACE),可重复执行

vnpy DB Schema要求(待姜维确认):

  • vnpy 4.x的BacktestingEngine通过 MainEngine + BaseDataManager 加载数据
  • 数据表名和字段名由vnpy内部定义
  • 必须先搞清楚vnpy 4.x期望的数据库结构,再写导入脚本

Parquet字段

date, open, high, low, close, volume, amount, outstanding_share, turnover, year

导入脚本功能要求

  1. 扫描 /Volumes/stock/A股数据/日线数据/daily/ 下所有年份目录
  2. 每个Parquet文件解析股票代码(从文件名提取,如 sh600000600000.SSE
  3. 转换为vnpy DB格式并批量写入
  4. 支持增量导入(只导入新增数据)
  5. 支持断点续传(中断后可继续)
  6. 记录导入日志(成功/失败数、耗时)

P1-3:全量导入日线

说明
需求 运行导入脚本,将全市场2010-2026年日线数据全部导入
输入 P1-2的导入脚本 + NAS日线Parquet
输出 quant_trading.db 填满日线数据
验证 统计导入记录数,抽查几只股票确认数据完整
风险 导入耗时长(预估2-4小时),需支持断点续传

P1-4:验证回测服务可用

说明
需求 提交一个简单回测任务,确认回测引擎能加载数据并完成回测
输入 回测服务API + 简单策略代码
输出 回测成功返回统计结果
验证 total_trades > 0 或 total_days > 0

P2:数据基础设施

P2-1:多源降级管理器 fallback.py

说明
需求 统一数据获取入口,支持多数据源顺序降级
降级链(日线) akshare stock_zh_a_hist → 腾讯K线API
降级链(实时) 新浪实时 → 东方财富 → 腾讯
接口 get_daily(symbol, start, end) / get_realtime(symbol)
行为 第一个源失败自动切下一个,记录使用的源
产出 ~150行

P2-2:数据校验层 validator.py

说明
需求 入库前校验数据质量,fatal级拒绝入库
V1规则(7条fatal D1: close/open/high/low > 0D2: OHLC一致性(high≥max(open,close), low≤min(open,close))D3: volume ≥ 0D6: 同股同日不重复;D7: date ≤ 今天;R1: 实时价格 > 0R7: 必须携带source+fetched_at
接口 validate(df) → (passed: bool, errors: List[str])
产出 ~150行

P2-3:实时行情三源降级 realtime.py

说明
需求 获取实时行情,支持3个源降级
降级链 新浪实时 → 东方财富 → 腾讯
接口 get_realtime_quote(symbol) → dict
产出 ~200行

P2-4:增量更新 updater.py

说明
需求 每日增量更新,Parquet+vnpy DB双写
流程 1.获取最新日期 2.拉取增量数据 3.校验 4.写Parquet(原子:临时文件+rename 5.写vnpy DBINSERT OR REPLACE幂等) 6.一致性校验
约束 Parquet是真相源;vnpy DB失败不影响Parquet
接口 update_daily() → UpdateResult
产出 ~150行

P2-5cron定时任务

说明
需求 每交易日15:30自动执行增量更新
配置 Mac crontabMac已确认永不休眠)
验证 下一个交易日检查是否自动执行

P3:分钟线数据

P3-1P0限频验证

说明
需求 验证腾讯API限频阈值
测试1 100只股票15分钟线连续下载,是否成功
测试2 连续1小时请求,记录每分钟成功次数、封禁恢复时间
输出 限频验证报告(每分钟最大请求数、封禁时长、恢复策略)
决策 报告决定P3-2/P3-3的实现策略(分批间隔、每批数量)

P3-2/P3-3:分钟线全量下载

说明
需求 下载HS300/全市场15分钟线
前置 P3-1限频验证通过
数据源 腾讯mkline API(唯一可用源,akshare分钟线已失效)
存储路径 /Volumes/stock/minute_kline/15min/
约束 15分钟线优先,1分钟线暂缓

P3-4:分钟线导入vnpy DB

说明
需求 将分钟线Parquet导入vnpy DB
前置 P1-2已确认vnpy DB Schema + 分钟线Parquet已下载
不确定项 vnpy 4.x如何区分不同周期(15min vs 1min)的分钟线

P4:配套skill与自动化

P4-1/P4-2:更新skill文档

更新 data-acquisitionquant-backtest SKILL.md,补充vnpy数据通路说明。

P4-3:全量校验脚本

关羽设计的V2规则(14条),用于定期全量扫描。

P4-4:周维护cron

每周校验Parquet与vnpy DB一致性。


三、交付物清单

代码文件(放到 ~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/

文件 功能 阶段 预估行数
import_vnpy.py Parquet → vnpy DB 导入 P1 ~200
fallback.py 多源降级管理器 P2 ~150
validator.py 数据校验(V1 7条fatal P2 ~150
realtime.py 实时行情三源降级 P2 ~200
updater.py 增量更新(双写) P2 ~150
validate_full.py 全量校验(V2 14条) P4 ~100

文档文件

文件 内容 位置
需求规格文档 本文档 docs/data-platform/01-requirements.md
设计方案文档 接口设计、数据流、Schema映射 docs/data-platform/02-design.md
验证报告 限频验证、导入验证、回测验证 docs/data-platform/reports/

配置文件

文件 内容
crontab配置 每日15:30增量更新
vnpy DB路径映射 Mac ↔ Docker

四、假设与不确定项

# 假设/不确定项 影响范围 验证人 验证时机
1 Docker volume映射Mac写入NAS的文件Docker容器能读到 P1全部 姜维 P1开始前
2 vnpy 4.x DB Schema:回测引擎load_data()期望的表结构和字段 P1-2, P3-4 姜维 P1开始前
3 vnpy分钟线周期区分vnpy如何存储/区分不同粒度分钟线 P3-4 姜维 P3开始前
4 腾讯API限频:连续请求的频率上限和封禁恢复时间 P3全部 赵云 P3开始前
5 全量导入耗时5500只×17年数据的导入时间 P1-3 张飞/赵云 P1-3执行时
6 SQLite并发cron写入+回测读取是否冲突 P2-5 姜维 P2-5配置时
7 NAS存储空间充足(1.5TB可用,只需28GB) 全局 已确认 -
8 Mac永不休眠(cron可靠执行) P2-5 已确认 -
9 不引新依赖(只用akshare+urllib+已有库) 全局 约束 -

关键阻塞项#1和#2如果不明确,P1无法开始。建议姜维先验证这两项。


五、约束

  1. 所有产出放到 ~/.openclaw/sanguo_projects/sanguo_vnpy/ 目录下
  2. 不引新依赖(只用akshare + urllib + 已有的库)
  3. 不改Docker/NAS配置,数据通过volume映射
  4. Parquet是唯一真相源,vnpy DB是可重建的派生缓存
  5. 双写顺序:先Parquet(原子写入)→ 再vnpy DB(幂等写入)
  6. 腾讯API是唯一可用的分钟线源
  7. 15分钟线优先,1分钟线暂缓
  8. 不确定项遇到阻塞时,用最大尝试轮数限制,不无限重试
  9. 每个阶段先输出需求和设计方案,经评审再编码

六、成功标准

# 标准 验证方法
1 vnpy DB有全市场日线数据 SELECT count(*) FROM ... > 0
2 回测服务能完成一次完整回测 提交回测任务返回成功
3 增量更新可自动执行 crontab触发后日志显示成功
4 数据校验拦截bad data 构造异常数据,校验返回fatal
5 多源降级正常工作 关掉主源,自动切到备用源
6 分钟线P0验证有结论 限频报告有明确数字

七、数据流架构

Layer 1: 远程数据源
  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐
  │   akshare    │  │  新浪实时     │  │  腾讯API     │
  │  (日线主源)   │  │ (实时主源)    │  │ (分钟线唯一源)│
  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘
         │                 │                 │
         └────────┬────────┴────────┬────────┘
                  │ fallback.py     │
                  │ 降级管理         │
                  ▼                 │
Layer 2: 校验层    │                 │
         validator.py               │
         (7条fatal规则)             │
                  │                 │
                  ▼                 ▼
Layer 3: NAS持久层 (唯一真相源)
  /Volumes/stock/A股数据/日线数据/daily/{year}/{code}_daily.parquet
  /Volumes/stock/minute_kline/15min/{code}_15min.parquet
                  │
                  │ import_vnpy.py / updater.py
                  ▼
Layer 4: vnpy SQLite DB (派生缓存)
  /Volumes/stock/sanguo_vnpy/data/quant_trading.db
                  │
                  │ engine.load_data()
                  ▼
Layer 5: 回测引擎
  BacktestingEngine → 回测结果