Files
2026-05-02 18:44:55 +08:00

266 lines
8.4 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# P2 需求规格文档:数据基础设施建设
**任务ID**: data-platform-p2-20260502
**节点**: pangtong_requirements
**作者**: 庞统(副军师)
**日期**: 2026-05-02
---
## 一、背景
### 1.1 P1已完成的基础
| 项 | 状态 | 详情 |
|----|------|------|
| vnpy DB日线数据 | ✅ | 5191只,1281万行,2010~2026-03-27 |
| 回测服务可用 | ✅ | 端到端验证通过 |
| 导入脚本 | ✅ | `import_vnpy_daily_fast.py`126行,pandas向量化) |
| DB路径 | ✅ | `/Volumes/stock/sanguo_vnpy/data/quant_trading.db`1.4GB |
| 已有适配器 | ⚠️ | `vnpy_local_data_adapter.py`(路径硬编码Mac本地,仅日线) |
### 1.2 当前数据缺口
- NAS日线数据停在 **2026-03-27**,需补约 **25个交易日**(至2026-05-02
- 无增量更新机制(每次需手动全量导入)
- 无数据校验(异常数据入库无拦截)
- 无多源降级(akshare挂了无备用)
- 无实时行情能力
- 无自动定时任务
### 1.3 关键设计决策(P1已确认)
| 决策 | 结论 |
|------|------|
| Source of Truth | NAS Parquet是唯一真相源 |
| vnpy DB定位 | 可重建的派生缓存 |
| 双写顺序 | 先Parquet(原子写入:临时文件+rename)→ 再vnpy DBINSERT OR REPLACE幂等) |
| SMB写入策略 | SQLite写本地/tmp,完成后复制到NAS(避免SMB锁库) |
---
## 二、功能需求
### P2-1:多源降级管理器 `fallback.py`
| 项 | 说明 |
|-----|------|
| 需求 | 统一数据获取入口,支持多数据源顺序降级 |
| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/fallback.py` |
**日线降级链**
1. akshare `stock_zh_a_hist()` → 成功则返回
2. 腾讯K线API → 成功则返回
3. 全部失败 → 抛异常
**接口设计**
```python
class FallbackManager:
def get_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame
def get_realtime(self, symbol: str) -> dict
def get_source_used(self) -> str # 返回实际使用的数据源名称
```
**行为要求**
- 第一个源失败自动切下一个
- 记录使用的源(写入返回数据的metadata)
- 每个源的超时控制(单次请求10秒超时)
- 日志记录降级事件(哪个源失败、切到哪个、耗时)
**预估行数**~150行
### P2-2:数据校验层 `validator.py`
| 项 | 说明 |
|-----|------|
| 需求 | 入库前校验数据质量,fatal级拒绝入库 |
| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/validator.py` |
**V1规则(7条fatal**
| 规则ID | 检查逻辑 | 级别 |
|--------|---------|------|
| D1 | close/open/high/low > 0 | fatal |
| D2 | high ≥ max(open,close)low ≤ min(open,close) | fatal |
| D3 | volume >= 0 | fatal |
| D6 | 同股同日不能两条记录 | fatal |
| D7 | date <= 当前日期 | fatal |
| R1 | 实时价格 current > 0, prev_close > 0 | fatal |
| R7 | 必须携带 source + fetched_at 字段 | fatal |
**接口设计**
```python
class DataValidator:
def validate(self, df: pd.DataFrame, data_type: str = "daily") -> ValidationResult
class ValidationResult:
passed: bool
fatal_errors: List[str] # 阻断入库
warnings: List[str] # 标记但不阻断
checked_rows: int
failed_rows: int
```
**行为要求**
- fatal错误 → 拒绝整批入库,返回具体失败行号和原因
- warning → 标记但允许入库(数据中附加warning字段)
- 校验报告可序列化为JSON
**预估行数**~150行
### P2-3:实时行情三源降级 `realtime.py`
| 项 | 说明 |
|-----|------|
| 需求 | 获取实时行情,支持3个源降级 |
| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/realtime.py` |
**降级链**
1. 新浪实时接口 → 成功则返回
2. 东方财富接口 → 成功则返回
3. 腾讯实时接口 → 成功则返回
4. 全部失败 → 抛异常
**接口设计**
```python
def get_realtime_quote(symbol: str) -> dict
# 返回: {symbol, name, current, prev_close, open, high, low, volume, amount,
# bid1_price, ask1_price, timestamp, source, fetched_at}
```
**行为要求**
- 返回标准化的字段(不同数据源字段名不同,需统一映射)
- 每个源10秒超时
- 记录实际使用的数据源
**预估行数**~200行
### P2-4:增量更新 `updater.py`
| 项 | 说明 |
|-----|------|
| 需求 | 每日增量更新,Parquet+vnpy DB双写 |
| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/updater.py` |
| 当前缺口 | 数据停在2026-03-27,需补约25个交易日 |
**流程**
```
1. 扫描NAS Parquet获取每只股票最后日期
2. 对比今天,确定需要更新的日期范围
3. 调用 fallback.py 获取增量数据
4. 调用 validator.py 校验
5. 写Parquet(原子写入:临时文件+rename)
6. 写vnpy DBINSERT OR REPLACE,复用P1的批量导入逻辑)
7. 一致性校验(Parquet条数 vs DB条数)
8. 输出更新报告
```
**接口设计**
```python
class DailyUpdater:
def update_all(self) -> UpdateReport
def update_symbol(self, symbol: str) -> SymbolUpdateResult
class UpdateReport:
total_symbols: int
updated: int
skipped: int # 已是最新
failed: int
new_records: int
parquet_size: str
db_size: str
consistency_ok: bool
```
**关键约束**
- Parquet写入必须是原子的(临时文件+os.rename)
- vnpy DB写入失败不影响Parquet
- 复用 `import_vnpy_daily_fast.py` 的批量INSERT逻辑
- SMB锁库:DB操作先在/tmp完成再复制
**首次执行**:需补2026-03-28~2026-05-02约25天数据
**预估行数**~200行
### P2-5cron定时任务
| 项 | 说明 |
|-----|------|
| 需求 | 每交易日15:30自动执行增量更新 |
| 配置 | Mac crontabMac永不休眠已确认) |
| 验证 | 下一个交易日检查是否自动执行 |
**crontab配置**
```
30 15 * * 1-5 cd ~/.openclaw/sanguo_projects/sanguo_vnpy && python3 data_platform/updater.py >> data_platform/logs/update.log 2>&1
```
**配套**
- 日志目录:`~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/logs/`
- 失败通知:更新失败时写日志(后续可接入三国mail通知)
---
## 三、交付物清单
### 代码文件(`~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/`
| 文件 | 功能 | 预估行数 |
|------|------|---------|
| `fallback.py` | 多源降级管理器 | ~150 |
| `validator.py` | 数据校验(7条fatal | ~150 |
| `realtime.py` | 实时行情三源降级 | ~200 |
| `updater.py` | 增量更新(双写) | ~200 |
### 配置文件
| 文件 | 内容 |
|------|------|
| crontab条目 | 每交易日15:30自动更新 |
| logs目录 | 更新日志 |
### 文档
| 文件 | 内容 |
|------|------|
| 本需求文档 | `~/.openclaw/sanguo_projects/sanguo_vnpy/docs/data-platform/02-p2-requirements.md` |
---
## 四、假设与不确定项
| # | 不确定项 | 影响 | 验证方式 |
|---|---------|------|---------|
| 1 | akshare `stock_zh_a_hist()` 当前是否可用 | 降级链主源 | 赵云编码时测试 |
| 2 | 腾讯K线API的请求格式(备用日线源) | 降级链备源 | 赵云编码时测试 |
| 3 | 新浪/东财/腾讯实时接口的当前可用性 | 实时行情 | 赵云编码时测试 |
| 4 | 增量更新数据量(25天×5191只)的耗时 | cron窗口 | 首次执行时实测 |
| 5 | vnpy DB导入增量数据的SMB性能 | 更新耗时 | 首次执行时实测 |
| 6 | crontab执行时NAS是否已挂载 | cron可用性 | 配置时验证 |
---
## 五、约束
1. 所有产出放到 `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/`
2. 不引新依赖(只用akshare + urllib + 已有库)
3. Parquet是唯一真相源,vnpy DB是可重建的派生缓存
4. 双写顺序:先Parquet(原子写入)→ 再vnpy DB(幂等写入)
5. SMB锁库:DB操作先在/tmp完成再复制
6. 遇阻塞用最大尝试轮数限制
7. 先输出设计方案经评审再编码
8. 复用P1已有代码(`import_vnpy_daily_fast.py`的批量INSERT逻辑)
---
## 六、成功标准
| # | 标准 | 验证方法 |
|---|------|---------|
| 1 | 降级管理器可用:关掉主源自动切备源 | 手动测试 |
| 2 | 校验层拦截bad data:构造异常数据返回fatal | 单元测试 |
| 3 | 实时行情可获取:输入股票代码返回实时报价 | 手动测试 |
| 4 | 增量更新可执行:补齐25天数据 | 执行updater后检查数据日期 |
| 5 | Parquet+vnpy DB一致性 | 比对条数 |
| 6 | cron可触发 | 配置后下个交易日检查日志 |