From 48fdae9a2e4ce83a0fdf3fb835c28b966c6b3795 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sat, 2 May 2026 18:44:55 +0800 Subject: [PATCH] auto-sync: 2026-05-02 18:44:55 --- docs/data-platform/02-p2-requirements.md | 265 +++++++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 docs/data-platform/02-p2-requirements.md diff --git a/docs/data-platform/02-p2-requirements.md b/docs/data-platform/02-p2-requirements.md new file mode 100644 index 00000000..24e0704e --- /dev/null +++ b/docs/data-platform/02-p2-requirements.md @@ -0,0 +1,265 @@ +# P2 需求规格文档:数据基础设施建设 + +**任务ID**: data-platform-p2-20260502 +**节点**: pangtong_requirements +**作者**: 庞统(副军师) +**日期**: 2026-05-02 + +--- + +## 一、背景 + +### 1.1 P1已完成的基础 + +| 项 | 状态 | 详情 | +|----|------|------| +| vnpy DB日线数据 | ✅ | 5191只,1281万行,2010~2026-03-27 | +| 回测服务可用 | ✅ | 端到端验证通过 | +| 导入脚本 | ✅ | `import_vnpy_daily_fast.py`(126行,pandas向量化) | +| DB路径 | ✅ | `/Volumes/stock/sanguo_vnpy/data/quant_trading.db`(1.4GB) | +| 已有适配器 | ⚠️ | `vnpy_local_data_adapter.py`(路径硬编码Mac本地,仅日线) | + +### 1.2 当前数据缺口 + +- NAS日线数据停在 **2026-03-27**,需补约 **25个交易日**(至2026-05-02) +- 无增量更新机制(每次需手动全量导入) +- 无数据校验(异常数据入库无拦截) +- 无多源降级(akshare挂了无备用) +- 无实时行情能力 +- 无自动定时任务 + +### 1.3 关键设计决策(P1已确认) + +| 决策 | 结论 | +|------|------| +| Source of Truth | NAS Parquet是唯一真相源 | +| vnpy DB定位 | 可重建的派生缓存 | +| 双写顺序 | 先Parquet(原子写入:临时文件+rename)→ 再vnpy DB(INSERT OR REPLACE幂等) | +| SMB写入策略 | SQLite写本地/tmp,完成后复制到NAS(避免SMB锁库) | + +--- + +## 二、功能需求 + +### P2-1:多源降级管理器 `fallback.py` + +| 项 | 说明 | +|-----|------| +| 需求 | 统一数据获取入口,支持多数据源顺序降级 | +| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/fallback.py` | + +**日线降级链**: +1. akshare `stock_zh_a_hist()` → 成功则返回 +2. 腾讯K线API → 成功则返回 +3. 全部失败 → 抛异常 + +**接口设计**: +```python +class FallbackManager: + def get_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame + def get_realtime(self, symbol: str) -> dict + def get_source_used(self) -> str # 返回实际使用的数据源名称 +``` + +**行为要求**: +- 第一个源失败自动切下一个 +- 记录使用的源(写入返回数据的metadata) +- 每个源的超时控制(单次请求10秒超时) +- 日志记录降级事件(哪个源失败、切到哪个、耗时) + +**预估行数**:~150行 + +### P2-2:数据校验层 `validator.py` + +| 项 | 说明 | +|-----|------| +| 需求 | 入库前校验数据质量,fatal级拒绝入库 | +| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/validator.py` | + +**V1规则(7条fatal)**: + +| 规则ID | 检查逻辑 | 级别 | +|--------|---------|------| +| D1 | close/open/high/low > 0 | fatal | +| D2 | high ≥ max(open,close),low ≤ min(open,close) | fatal | +| D3 | volume >= 0 | fatal | +| D6 | 同股同日不能两条记录 | fatal | +| D7 | date <= 当前日期 | fatal | +| R1 | 实时价格 current > 0, prev_close > 0 | fatal | +| R7 | 必须携带 source + fetched_at 字段 | fatal | + +**接口设计**: +```python +class DataValidator: + def validate(self, df: pd.DataFrame, data_type: str = "daily") -> ValidationResult + +class ValidationResult: + passed: bool + fatal_errors: List[str] # 阻断入库 + warnings: List[str] # 标记但不阻断 + checked_rows: int + failed_rows: int +``` + +**行为要求**: +- fatal错误 → 拒绝整批入库,返回具体失败行号和原因 +- warning → 标记但允许入库(数据中附加warning字段) +- 校验报告可序列化为JSON + +**预估行数**:~150行 + +### P2-3:实时行情三源降级 `realtime.py` + +| 项 | 说明 | +|-----|------| +| 需求 | 获取实时行情,支持3个源降级 | +| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/realtime.py` | + +**降级链**: +1. 新浪实时接口 → 成功则返回 +2. 东方财富接口 → 成功则返回 +3. 腾讯实时接口 → 成功则返回 +4. 全部失败 → 抛异常 + +**接口设计**: +```python +def get_realtime_quote(symbol: str) -> dict +# 返回: {symbol, name, current, prev_close, open, high, low, volume, amount, +# bid1_price, ask1_price, timestamp, source, fetched_at} +``` + +**行为要求**: +- 返回标准化的字段(不同数据源字段名不同,需统一映射) +- 每个源10秒超时 +- 记录实际使用的数据源 + +**预估行数**:~200行 + +### P2-4:增量更新 `updater.py` + +| 项 | 说明 | +|-----|------| +| 需求 | 每日增量更新,Parquet+vnpy DB双写 | +| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/updater.py` | +| 当前缺口 | 数据停在2026-03-27,需补约25个交易日 | + +**流程**: +``` +1. 扫描NAS Parquet获取每只股票最后日期 +2. 对比今天,确定需要更新的日期范围 +3. 调用 fallback.py 获取增量数据 +4. 调用 validator.py 校验 +5. 写Parquet(原子写入:临时文件+rename) +6. 写vnpy DB(INSERT OR REPLACE,复用P1的批量导入逻辑) +7. 一致性校验(Parquet条数 vs DB条数) +8. 输出更新报告 +``` + +**接口设计**: +```python +class DailyUpdater: + def update_all(self) -> UpdateReport + def update_symbol(self, symbol: str) -> SymbolUpdateResult + +class UpdateReport: + total_symbols: int + updated: int + skipped: int # 已是最新 + failed: int + new_records: int + parquet_size: str + db_size: str + consistency_ok: bool +``` + +**关键约束**: +- Parquet写入必须是原子的(临时文件+os.rename) +- vnpy DB写入失败不影响Parquet +- 复用 `import_vnpy_daily_fast.py` 的批量INSERT逻辑 +- SMB锁库:DB操作先在/tmp完成再复制 + +**首次执行**:需补2026-03-28~2026-05-02约25天数据 + +**预估行数**:~200行 + +### P2-5:cron定时任务 + +| 项 | 说明 | +|-----|------| +| 需求 | 每交易日15:30自动执行增量更新 | +| 配置 | Mac crontab(Mac永不休眠已确认) | +| 验证 | 下一个交易日检查是否自动执行 | + +**crontab配置**: +``` +30 15 * * 1-5 cd ~/.openclaw/sanguo_projects/sanguo_vnpy && python3 data_platform/updater.py >> data_platform/logs/update.log 2>&1 +``` + +**配套**: +- 日志目录:`~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/logs/` +- 失败通知:更新失败时写日志(后续可接入三国mail通知) + +--- + +## 三、交付物清单 + +### 代码文件(`~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/`) + +| 文件 | 功能 | 预估行数 | +|------|------|---------| +| `fallback.py` | 多源降级管理器 | ~150 | +| `validator.py` | 数据校验(7条fatal) | ~150 | +| `realtime.py` | 实时行情三源降级 | ~200 | +| `updater.py` | 增量更新(双写) | ~200 | + +### 配置文件 + +| 文件 | 内容 | +|------|------| +| crontab条目 | 每交易日15:30自动更新 | +| logs目录 | 更新日志 | + +### 文档 + +| 文件 | 内容 | +|------|------| +| 本需求文档 | `~/.openclaw/sanguo_projects/sanguo_vnpy/docs/data-platform/02-p2-requirements.md` | + +--- + +## 四、假设与不确定项 + +| # | 不确定项 | 影响 | 验证方式 | +|---|---------|------|---------| +| 1 | akshare `stock_zh_a_hist()` 当前是否可用 | 降级链主源 | 赵云编码时测试 | +| 2 | 腾讯K线API的请求格式(备用日线源) | 降级链备源 | 赵云编码时测试 | +| 3 | 新浪/东财/腾讯实时接口的当前可用性 | 实时行情 | 赵云编码时测试 | +| 4 | 增量更新数据量(25天×5191只)的耗时 | cron窗口 | 首次执行时实测 | +| 5 | vnpy DB导入增量数据的SMB性能 | 更新耗时 | 首次执行时实测 | +| 6 | crontab执行时NAS是否已挂载 | cron可用性 | 配置时验证 | + +--- + +## 五、约束 + +1. 所有产出放到 `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/` +2. 不引新依赖(只用akshare + urllib + 已有库) +3. Parquet是唯一真相源,vnpy DB是可重建的派生缓存 +4. 双写顺序:先Parquet(原子写入)→ 再vnpy DB(幂等写入) +5. SMB锁库:DB操作先在/tmp完成再复制 +6. 遇阻塞用最大尝试轮数限制 +7. 先输出设计方案经评审再编码 +8. 复用P1已有代码(`import_vnpy_daily_fast.py`的批量INSERT逻辑) + +--- + +## 六、成功标准 + +| # | 标准 | 验证方法 | +|---|------|---------| +| 1 | 降级管理器可用:关掉主源自动切备源 | 手动测试 | +| 2 | 校验层拦截bad data:构造异常数据返回fatal | 单元测试 | +| 3 | 实时行情可获取:输入股票代码返回实时报价 | 手动测试 | +| 4 | 增量更新可执行:补齐25天数据 | 执行updater后检查数据日期 | +| 5 | Parquet+vnpy DB一致性 | 比对条数 | +| 6 | cron可触发 | 配置后下个交易日检查日志 |