Files
2026-05-02 18:44:55 +08:00

8.4 KiB
Raw Permalink Blame History

P2 需求规格文档:数据基础设施建设

任务ID: data-platform-p2-20260502 节点: pangtong_requirements 作者: 庞统(副军师) 日期: 2026-05-02


一、背景

1.1 P1已完成的基础

状态 详情
vnpy DB日线数据 5191只,1281万行,2010~2026-03-27
回测服务可用 端到端验证通过
导入脚本 import_vnpy_daily_fast.py126行,pandas向量化)
DB路径 /Volumes/stock/sanguo_vnpy/data/quant_trading.db1.4GB
已有适配器 ⚠️ vnpy_local_data_adapter.py(路径硬编码Mac本地,仅日线)

1.2 当前数据缺口

  • NAS日线数据停在 2026-03-27,需补约 25个交易日(至2026-05-02
  • 无增量更新机制(每次需手动全量导入)
  • 无数据校验(异常数据入库无拦截)
  • 无多源降级(akshare挂了无备用)
  • 无实时行情能力
  • 无自动定时任务

1.3 关键设计决策(P1已确认)

决策 结论
Source of Truth NAS Parquet是唯一真相源
vnpy DB定位 可重建的派生缓存
双写顺序 先Parquet(原子写入:临时文件+rename)→ 再vnpy DBINSERT OR REPLACE幂等)
SMB写入策略 SQLite写本地/tmp,完成后复制到NAS(避免SMB锁库)

二、功能需求

P2-1:多源降级管理器 fallback.py

说明
需求 统一数据获取入口,支持多数据源顺序降级
产出 ~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/fallback.py

日线降级链

  1. akshare stock_zh_a_hist() → 成功则返回
  2. 腾讯K线API → 成功则返回
  3. 全部失败 → 抛异常

接口设计

class FallbackManager:
    def get_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame
    def get_realtime(self, symbol: str) -> dict
    def get_source_used(self) -> str  # 返回实际使用的数据源名称

行为要求

  • 第一个源失败自动切下一个
  • 记录使用的源(写入返回数据的metadata)
  • 每个源的超时控制(单次请求10秒超时)
  • 日志记录降级事件(哪个源失败、切到哪个、耗时)

预估行数~150行

P2-2:数据校验层 validator.py

说明
需求 入库前校验数据质量,fatal级拒绝入库
产出 ~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/validator.py

V1规则(7条fatal

规则ID 检查逻辑 级别
D1 close/open/high/low > 0 fatal
D2 high ≥ max(open,close)low ≤ min(open,close) fatal
D3 volume >= 0 fatal
D6 同股同日不能两条记录 fatal
D7 date <= 当前日期 fatal
R1 实时价格 current > 0, prev_close > 0 fatal
R7 必须携带 source + fetched_at 字段 fatal

接口设计

class DataValidator:
    def validate(self, df: pd.DataFrame, data_type: str = "daily") -> ValidationResult

class ValidationResult:
    passed: bool
    fatal_errors: List[str]    # 阻断入库
    warnings: List[str]        # 标记但不阻断
    checked_rows: int
    failed_rows: int

行为要求

  • fatal错误 → 拒绝整批入库,返回具体失败行号和原因
  • warning → 标记但允许入库(数据中附加warning字段)
  • 校验报告可序列化为JSON

预估行数~150行

P2-3:实时行情三源降级 realtime.py

说明
需求 获取实时行情,支持3个源降级
产出 ~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/realtime.py

降级链

  1. 新浪实时接口 → 成功则返回
  2. 东方财富接口 → 成功则返回
  3. 腾讯实时接口 → 成功则返回
  4. 全部失败 → 抛异常

接口设计

def get_realtime_quote(symbol: str) -> dict
# 返回: {symbol, name, current, prev_close, open, high, low, volume, amount, 
#        bid1_price, ask1_price, timestamp, source, fetched_at}

行为要求

  • 返回标准化的字段(不同数据源字段名不同,需统一映射)
  • 每个源10秒超时
  • 记录实际使用的数据源

预估行数~200行

P2-4:增量更新 updater.py

说明
需求 每日增量更新,Parquet+vnpy DB双写
产出 ~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/updater.py
当前缺口 数据停在2026-03-27,需补约25个交易日

流程

1. 扫描NAS Parquet获取每只股票最后日期
2. 对比今天,确定需要更新的日期范围
3. 调用 fallback.py 获取增量数据
4. 调用 validator.py 校验
5. 写Parquet(原子写入:临时文件+rename
6. 写vnpy DBINSERT OR REPLACE,复用P1的批量导入逻辑)
7. 一致性校验(Parquet条数 vs DB条数)
8. 输出更新报告

接口设计

class DailyUpdater:
    def update_all(self) -> UpdateReport
    def update_symbol(self, symbol: str) -> SymbolUpdateResult

class UpdateReport:
    total_symbols: int
    updated: int
    skipped: int    # 已是最新
    failed: int
    new_records: int
    parquet_size: str
    db_size: str
    consistency_ok: bool

关键约束

  • Parquet写入必须是原子的(临时文件+os.rename)
  • vnpy DB写入失败不影响Parquet
  • 复用 import_vnpy_daily_fast.py 的批量INSERT逻辑
  • SMB锁库:DB操作先在/tmp完成再复制

首次执行:需补2026-03-28~2026-05-02约25天数据

预估行数~200行

P2-5cron定时任务

说明
需求 每交易日15:30自动执行增量更新
配置 Mac crontabMac永不休眠已确认)
验证 下一个交易日检查是否自动执行

crontab配置

30 15 * * 1-5 cd ~/.openclaw/sanguo_projects/sanguo_vnpy && python3 data_platform/updater.py >> data_platform/logs/update.log 2>&1

配套

  • 日志目录:~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/logs/
  • 失败通知:更新失败时写日志(后续可接入三国mail通知)

三、交付物清单

代码文件(~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/

文件 功能 预估行数
fallback.py 多源降级管理器 ~150
validator.py 数据校验(7条fatal ~150
realtime.py 实时行情三源降级 ~200
updater.py 增量更新(双写) ~200

配置文件

文件 内容
crontab条目 每交易日15:30自动更新
logs目录 更新日志

文档

文件 内容
本需求文档 ~/.openclaw/sanguo_projects/sanguo_vnpy/docs/data-platform/02-p2-requirements.md

四、假设与不确定项

# 不确定项 影响 验证方式
1 akshare stock_zh_a_hist() 当前是否可用 降级链主源 赵云编码时测试
2 腾讯K线API的请求格式(备用日线源) 降级链备源 赵云编码时测试
3 新浪/东财/腾讯实时接口的当前可用性 实时行情 赵云编码时测试
4 增量更新数据量(25天×5191只)的耗时 cron窗口 首次执行时实测
5 vnpy DB导入增量数据的SMB性能 更新耗时 首次执行时实测
6 crontab执行时NAS是否已挂载 cron可用性 配置时验证

五、约束

  1. 所有产出放到 ~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/
  2. 不引新依赖(只用akshare + urllib + 已有库)
  3. Parquet是唯一真相源,vnpy DB是可重建的派生缓存
  4. 双写顺序:先Parquet(原子写入)→ 再vnpy DB(幂等写入)
  5. SMB锁库:DB操作先在/tmp完成再复制
  6. 遇阻塞用最大尝试轮数限制
  7. 先输出设计方案经评审再编码
  8. 复用P1已有代码(import_vnpy_daily_fast.py的批量INSERT逻辑)

六、成功标准

# 标准 验证方法
1 降级管理器可用:关掉主源自动切备源 手动测试
2 校验层拦截bad data:构造异常数据返回fatal 单元测试
3 实时行情可获取:输入股票代码返回实时报价 手动测试
4 增量更新可执行:补齐25天数据 执行updater后检查数据日期
5 Parquet+vnpy DB一致性 比对条数
6 cron可触发 配置后下个交易日检查日志