# P2 需求规格文档:数据基础设施建设 **任务ID**: data-platform-p2-20260502 **节点**: pangtong_requirements **作者**: 庞统(副军师) **日期**: 2026-05-02 --- ## 一、背景 ### 1.1 P1已完成的基础 | 项 | 状态 | 详情 | |----|------|------| | vnpy DB日线数据 | ✅ | 5191只,1281万行,2010~2026-03-27 | | 回测服务可用 | ✅ | 端到端验证通过 | | 导入脚本 | ✅ | `import_vnpy_daily_fast.py`(126行,pandas向量化) | | DB路径 | ✅ | `/Volumes/stock/sanguo_vnpy/data/quant_trading.db`(1.4GB) | | 已有适配器 | ⚠️ | `vnpy_local_data_adapter.py`(路径硬编码Mac本地,仅日线) | ### 1.2 当前数据缺口 - NAS日线数据停在 **2026-03-27**,需补约 **25个交易日**(至2026-05-02) - 无增量更新机制(每次需手动全量导入) - 无数据校验(异常数据入库无拦截) - 无多源降级(akshare挂了无备用) - 无实时行情能力 - 无自动定时任务 ### 1.3 关键设计决策(P1已确认) | 决策 | 结论 | |------|------| | Source of Truth | NAS Parquet是唯一真相源 | | vnpy DB定位 | 可重建的派生缓存 | | 双写顺序 | 先Parquet(原子写入:临时文件+rename)→ 再vnpy DB(INSERT OR REPLACE幂等) | | SMB写入策略 | SQLite写本地/tmp,完成后复制到NAS(避免SMB锁库) | --- ## 二、功能需求 ### P2-1:多源降级管理器 `fallback.py` | 项 | 说明 | |-----|------| | 需求 | 统一数据获取入口,支持多数据源顺序降级 | | 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/fallback.py` | **日线降级链**: 1. akshare `stock_zh_a_hist()` → 成功则返回 2. 腾讯K线API → 成功则返回 3. 全部失败 → 抛异常 **接口设计**: ```python class FallbackManager: def get_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame def get_realtime(self, symbol: str) -> dict def get_source_used(self) -> str # 返回实际使用的数据源名称 ``` **行为要求**: - 第一个源失败自动切下一个 - 记录使用的源(写入返回数据的metadata) - 每个源的超时控制(单次请求10秒超时) - 日志记录降级事件(哪个源失败、切到哪个、耗时) **预估行数**:~150行 ### P2-2:数据校验层 `validator.py` | 项 | 说明 | |-----|------| | 需求 | 入库前校验数据质量,fatal级拒绝入库 | | 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/validator.py` | **V1规则(7条fatal)**: | 规则ID | 检查逻辑 | 级别 | |--------|---------|------| | D1 | close/open/high/low > 0 | fatal | | D2 | high ≥ max(open,close),low ≤ min(open,close) | fatal | | D3 | volume >= 0 | fatal | | D6 | 同股同日不能两条记录 | fatal | | D7 | date <= 当前日期 | fatal | | R1 | 实时价格 current > 0, prev_close > 0 | fatal | | R7 | 必须携带 source + fetched_at 字段 | fatal | **接口设计**: ```python class DataValidator: def validate(self, df: pd.DataFrame, data_type: str = "daily") -> ValidationResult class ValidationResult: passed: bool fatal_errors: List[str] # 阻断入库 warnings: List[str] # 标记但不阻断 checked_rows: int failed_rows: int ``` **行为要求**: - fatal错误 → 拒绝整批入库,返回具体失败行号和原因 - warning → 标记但允许入库(数据中附加warning字段) - 校验报告可序列化为JSON **预估行数**:~150行 ### P2-3:实时行情三源降级 `realtime.py` | 项 | 说明 | |-----|------| | 需求 | 获取实时行情,支持3个源降级 | | 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/realtime.py` | **降级链**: 1. 新浪实时接口 → 成功则返回 2. 东方财富接口 → 成功则返回 3. 腾讯实时接口 → 成功则返回 4. 全部失败 → 抛异常 **接口设计**: ```python def get_realtime_quote(symbol: str) -> dict # 返回: {symbol, name, current, prev_close, open, high, low, volume, amount, # bid1_price, ask1_price, timestamp, source, fetched_at} ``` **行为要求**: - 返回标准化的字段(不同数据源字段名不同,需统一映射) - 每个源10秒超时 - 记录实际使用的数据源 **预估行数**:~200行 ### P2-4:增量更新 `updater.py` | 项 | 说明 | |-----|------| | 需求 | 每日增量更新,Parquet+vnpy DB双写 | | 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/updater.py` | | 当前缺口 | 数据停在2026-03-27,需补约25个交易日 | **流程**: ``` 1. 扫描NAS Parquet获取每只股票最后日期 2. 对比今天,确定需要更新的日期范围 3. 调用 fallback.py 获取增量数据 4. 调用 validator.py 校验 5. 写Parquet(原子写入:临时文件+rename) 6. 写vnpy DB(INSERT OR REPLACE,复用P1的批量导入逻辑) 7. 一致性校验(Parquet条数 vs DB条数) 8. 输出更新报告 ``` **接口设计**: ```python class DailyUpdater: def update_all(self) -> UpdateReport def update_symbol(self, symbol: str) -> SymbolUpdateResult class UpdateReport: total_symbols: int updated: int skipped: int # 已是最新 failed: int new_records: int parquet_size: str db_size: str consistency_ok: bool ``` **关键约束**: - Parquet写入必须是原子的(临时文件+os.rename) - vnpy DB写入失败不影响Parquet - 复用 `import_vnpy_daily_fast.py` 的批量INSERT逻辑 - SMB锁库:DB操作先在/tmp完成再复制 **首次执行**:需补2026-03-28~2026-05-02约25天数据 **预估行数**:~200行 ### P2-5:cron定时任务 | 项 | 说明 | |-----|------| | 需求 | 每交易日15:30自动执行增量更新 | | 配置 | Mac crontab(Mac永不休眠已确认) | | 验证 | 下一个交易日检查是否自动执行 | **crontab配置**: ``` 30 15 * * 1-5 cd ~/.openclaw/sanguo_projects/sanguo_vnpy && python3 data_platform/updater.py >> data_platform/logs/update.log 2>&1 ``` **配套**: - 日志目录:`~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/logs/` - 失败通知:更新失败时写日志(后续可接入三国mail通知) --- ## 三、交付物清单 ### 代码文件(`~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/`) | 文件 | 功能 | 预估行数 | |------|------|---------| | `fallback.py` | 多源降级管理器 | ~150 | | `validator.py` | 数据校验(7条fatal) | ~150 | | `realtime.py` | 实时行情三源降级 | ~200 | | `updater.py` | 增量更新(双写) | ~200 | ### 配置文件 | 文件 | 内容 | |------|------| | crontab条目 | 每交易日15:30自动更新 | | logs目录 | 更新日志 | ### 文档 | 文件 | 内容 | |------|------| | 本需求文档 | `~/.openclaw/sanguo_projects/sanguo_vnpy/docs/data-platform/02-p2-requirements.md` | --- ## 四、假设与不确定项 | # | 不确定项 | 影响 | 验证方式 | |---|---------|------|---------| | 1 | akshare `stock_zh_a_hist()` 当前是否可用 | 降级链主源 | 赵云编码时测试 | | 2 | 腾讯K线API的请求格式(备用日线源) | 降级链备源 | 赵云编码时测试 | | 3 | 新浪/东财/腾讯实时接口的当前可用性 | 实时行情 | 赵云编码时测试 | | 4 | 增量更新数据量(25天×5191只)的耗时 | cron窗口 | 首次执行时实测 | | 5 | vnpy DB导入增量数据的SMB性能 | 更新耗时 | 首次执行时实测 | | 6 | crontab执行时NAS是否已挂载 | cron可用性 | 配置时验证 | --- ## 五、约束 1. 所有产出放到 `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/` 2. 不引新依赖(只用akshare + urllib + 已有库) 3. Parquet是唯一真相源,vnpy DB是可重建的派生缓存 4. 双写顺序:先Parquet(原子写入)→ 再vnpy DB(幂等写入) 5. SMB锁库:DB操作先在/tmp完成再复制 6. 遇阻塞用最大尝试轮数限制 7. 先输出设计方案经评审再编码 8. 复用P1已有代码(`import_vnpy_daily_fast.py`的批量INSERT逻辑) --- ## 六、成功标准 | # | 标准 | 验证方法 | |---|------|---------| | 1 | 降级管理器可用:关掉主源自动切备源 | 手动测试 | | 2 | 校验层拦截bad data:构造异常数据返回fatal | 单元测试 | | 3 | 实时行情可获取:输入股票代码返回实时报价 | 手动测试 | | 4 | 增量更新可执行:补齐25天数据 | 执行updater后检查数据日期 | | 5 | Parquet+vnpy DB一致性 | 比对条数 | | 6 | cron可触发 | 配置后下个交易日检查日志 |