auto-sync: 2026-05-02 18:44:55

This commit is contained in:
cfdaily
2026-05-02 18:44:55 +08:00
parent 47d6a6a1ce
commit 48fdae9a2e
+265
View File
@@ -0,0 +1,265 @@
# P2 需求规格文档:数据基础设施建设
**任务ID**: data-platform-p2-20260502
**节点**: pangtong_requirements
**作者**: 庞统(副军师)
**日期**: 2026-05-02
---
## 一、背景
### 1.1 P1已完成的基础
| 项 | 状态 | 详情 |
|----|------|------|
| vnpy DB日线数据 | ✅ | 5191只,1281万行,2010~2026-03-27 |
| 回测服务可用 | ✅ | 端到端验证通过 |
| 导入脚本 | ✅ | `import_vnpy_daily_fast.py`126行,pandas向量化) |
| DB路径 | ✅ | `/Volumes/stock/sanguo_vnpy/data/quant_trading.db`1.4GB |
| 已有适配器 | ⚠️ | `vnpy_local_data_adapter.py`(路径硬编码Mac本地,仅日线) |
### 1.2 当前数据缺口
- NAS日线数据停在 **2026-03-27**,需补约 **25个交易日**(至2026-05-02
- 无增量更新机制(每次需手动全量导入)
- 无数据校验(异常数据入库无拦截)
- 无多源降级(akshare挂了无备用)
- 无实时行情能力
- 无自动定时任务
### 1.3 关键设计决策(P1已确认)
| 决策 | 结论 |
|------|------|
| Source of Truth | NAS Parquet是唯一真相源 |
| vnpy DB定位 | 可重建的派生缓存 |
| 双写顺序 | 先Parquet(原子写入:临时文件+rename)→ 再vnpy DBINSERT OR REPLACE幂等) |
| SMB写入策略 | SQLite写本地/tmp,完成后复制到NAS(避免SMB锁库) |
---
## 二、功能需求
### P2-1:多源降级管理器 `fallback.py`
| 项 | 说明 |
|-----|------|
| 需求 | 统一数据获取入口,支持多数据源顺序降级 |
| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/fallback.py` |
**日线降级链**
1. akshare `stock_zh_a_hist()` → 成功则返回
2. 腾讯K线API → 成功则返回
3. 全部失败 → 抛异常
**接口设计**
```python
class FallbackManager:
def get_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame
def get_realtime(self, symbol: str) -> dict
def get_source_used(self) -> str # 返回实际使用的数据源名称
```
**行为要求**
- 第一个源失败自动切下一个
- 记录使用的源(写入返回数据的metadata)
- 每个源的超时控制(单次请求10秒超时)
- 日志记录降级事件(哪个源失败、切到哪个、耗时)
**预估行数**~150行
### P2-2:数据校验层 `validator.py`
| 项 | 说明 |
|-----|------|
| 需求 | 入库前校验数据质量,fatal级拒绝入库 |
| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/validator.py` |
**V1规则(7条fatal**
| 规则ID | 检查逻辑 | 级别 |
|--------|---------|------|
| D1 | close/open/high/low > 0 | fatal |
| D2 | high ≥ max(open,close)low ≤ min(open,close) | fatal |
| D3 | volume >= 0 | fatal |
| D6 | 同股同日不能两条记录 | fatal |
| D7 | date <= 当前日期 | fatal |
| R1 | 实时价格 current > 0, prev_close > 0 | fatal |
| R7 | 必须携带 source + fetched_at 字段 | fatal |
**接口设计**
```python
class DataValidator:
def validate(self, df: pd.DataFrame, data_type: str = "daily") -> ValidationResult
class ValidationResult:
passed: bool
fatal_errors: List[str] # 阻断入库
warnings: List[str] # 标记但不阻断
checked_rows: int
failed_rows: int
```
**行为要求**
- fatal错误 → 拒绝整批入库,返回具体失败行号和原因
- warning → 标记但允许入库(数据中附加warning字段)
- 校验报告可序列化为JSON
**预估行数**~150行
### P2-3:实时行情三源降级 `realtime.py`
| 项 | 说明 |
|-----|------|
| 需求 | 获取实时行情,支持3个源降级 |
| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/realtime.py` |
**降级链**
1. 新浪实时接口 → 成功则返回
2. 东方财富接口 → 成功则返回
3. 腾讯实时接口 → 成功则返回
4. 全部失败 → 抛异常
**接口设计**
```python
def get_realtime_quote(symbol: str) -> dict
# 返回: {symbol, name, current, prev_close, open, high, low, volume, amount,
# bid1_price, ask1_price, timestamp, source, fetched_at}
```
**行为要求**
- 返回标准化的字段(不同数据源字段名不同,需统一映射)
- 每个源10秒超时
- 记录实际使用的数据源
**预估行数**~200行
### P2-4:增量更新 `updater.py`
| 项 | 说明 |
|-----|------|
| 需求 | 每日增量更新,Parquet+vnpy DB双写 |
| 产出 | `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/updater.py` |
| 当前缺口 | 数据停在2026-03-27,需补约25个交易日 |
**流程**
```
1. 扫描NAS Parquet获取每只股票最后日期
2. 对比今天,确定需要更新的日期范围
3. 调用 fallback.py 获取增量数据
4. 调用 validator.py 校验
5. 写Parquet(原子写入:临时文件+rename)
6. 写vnpy DBINSERT OR REPLACE,复用P1的批量导入逻辑)
7. 一致性校验(Parquet条数 vs DB条数)
8. 输出更新报告
```
**接口设计**
```python
class DailyUpdater:
def update_all(self) -> UpdateReport
def update_symbol(self, symbol: str) -> SymbolUpdateResult
class UpdateReport:
total_symbols: int
updated: int
skipped: int # 已是最新
failed: int
new_records: int
parquet_size: str
db_size: str
consistency_ok: bool
```
**关键约束**
- Parquet写入必须是原子的(临时文件+os.rename)
- vnpy DB写入失败不影响Parquet
- 复用 `import_vnpy_daily_fast.py` 的批量INSERT逻辑
- SMB锁库:DB操作先在/tmp完成再复制
**首次执行**:需补2026-03-28~2026-05-02约25天数据
**预估行数**~200行
### P2-5cron定时任务
| 项 | 说明 |
|-----|------|
| 需求 | 每交易日15:30自动执行增量更新 |
| 配置 | Mac crontabMac永不休眠已确认) |
| 验证 | 下一个交易日检查是否自动执行 |
**crontab配置**
```
30 15 * * 1-5 cd ~/.openclaw/sanguo_projects/sanguo_vnpy && python3 data_platform/updater.py >> data_platform/logs/update.log 2>&1
```
**配套**
- 日志目录:`~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/logs/`
- 失败通知:更新失败时写日志(后续可接入三国mail通知)
---
## 三、交付物清单
### 代码文件(`~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/`
| 文件 | 功能 | 预估行数 |
|------|------|---------|
| `fallback.py` | 多源降级管理器 | ~150 |
| `validator.py` | 数据校验(7条fatal | ~150 |
| `realtime.py` | 实时行情三源降级 | ~200 |
| `updater.py` | 增量更新(双写) | ~200 |
### 配置文件
| 文件 | 内容 |
|------|------|
| crontab条目 | 每交易日15:30自动更新 |
| logs目录 | 更新日志 |
### 文档
| 文件 | 内容 |
|------|------|
| 本需求文档 | `~/.openclaw/sanguo_projects/sanguo_vnpy/docs/data-platform/02-p2-requirements.md` |
---
## 四、假设与不确定项
| # | 不确定项 | 影响 | 验证方式 |
|---|---------|------|---------|
| 1 | akshare `stock_zh_a_hist()` 当前是否可用 | 降级链主源 | 赵云编码时测试 |
| 2 | 腾讯K线API的请求格式(备用日线源) | 降级链备源 | 赵云编码时测试 |
| 3 | 新浪/东财/腾讯实时接口的当前可用性 | 实时行情 | 赵云编码时测试 |
| 4 | 增量更新数据量(25天×5191只)的耗时 | cron窗口 | 首次执行时实测 |
| 5 | vnpy DB导入增量数据的SMB性能 | 更新耗时 | 首次执行时实测 |
| 6 | crontab执行时NAS是否已挂载 | cron可用性 | 配置时验证 |
---
## 五、约束
1. 所有产出放到 `~/.openclaw/sanguo_projects/sanguo_vnpy/data_platform/`
2. 不引新依赖(只用akshare + urllib + 已有库)
3. Parquet是唯一真相源,vnpy DB是可重建的派生缓存
4. 双写顺序:先Parquet(原子写入)→ 再vnpy DB(幂等写入)
5. SMB锁库:DB操作先在/tmp完成再复制
6. 遇阻塞用最大尝试轮数限制
7. 先输出设计方案经评审再编码
8. 复用P1已有代码(`import_vnpy_daily_fast.py`的批量INSERT逻辑)
---
## 六、成功标准
| # | 标准 | 验证方法 |
|---|------|---------|
| 1 | 降级管理器可用:关掉主源自动切备源 | 手动测试 |
| 2 | 校验层拦截bad data:构造异常数据返回fatal | 单元测试 |
| 3 | 实时行情可获取:输入股票代码返回实时报价 | 手动测试 |
| 4 | 增量更新可执行:补齐25天数据 | 执行updater后检查数据日期 |
| 5 | Parquet+vnpy DB一致性 | 比对条数 |
| 6 | cron可触发 | 配置后下个交易日检查日志 |