auto-sync: 2026-05-02 18:46:12

This commit is contained in:
cfdaily
2026-05-02 18:46:12 +08:00
parent 48fdae9a2e
commit 72e3a06e9c
+110
View File
@@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""数据校验层 - V1 7条fatal规则"""
import pandas as pd
from datetime import datetime
from typing import List, Tuple
class ValidationResult:
def __init__(self):
self.passed = True
self.fatal_errors: List[str] = []
self.warnings: List[str] = []
self.checked_rows = 0
self.failed_rows = 0
def __repr__(self):
return (f"ValidationResult(passed={self.passed}, "
f"fatal={len(self.fatal_errors)}, warnings={len(self.warnings)}, "
f"rows={self.checked_rows}, failed={self.failed_rows})")
def to_dict(self):
return {
"passed": self.passed,
"fatal_errors": self.fatal_errors,
"warnings": self.warnings,
"checked_rows": self.checked_rows,
"failed_rows": self.failed_rows,
}
class DataValidator:
"""数据校验器 - V1 7条fatal规则"""
def validate(self, df: pd.DataFrame, data_type: str = "daily") -> ValidationResult:
result = ValidationResult()
if df is None or df.empty:
result.fatal_errors.append("数据为空")
result.passed = False
return result
result.checked_rows = len(df)
if data_type == "daily":
self._validate_daily(df, result)
elif data_type == "realtime":
self._validate_realtime(df, result)
return result
def validate_realtime_dict(self, data: dict) -> ValidationResult:
"""校验单条实时行情"""
result = ValidationResult()
result.checked_rows = 1
errors = []
# R1: 价格>0
if not data or data.get("current", 0) <= 0:
errors.append("R1: current价格<=0")
if data.get("prev_close", 0) <= 0:
errors.append("R1: prev_close<=0")
# R7: 必须携带source和fetched_at
if not data.get("source"):
errors.append("R7: 缺少source字段")
if not data.get("fetched_at"):
errors.append("R7: 缺少fetched_at字段")
if errors:
result.fatal_errors = errors
result.passed = False
result.failed_rows = 1
return result
def _validate_daily(self, df: pd.DataFrame, result: ValidationResult):
today = datetime.now().strftime("%Y-%m-%d")
for idx, row in df.iterrows():
row_errors = []
# D1: 价格>0
for col in ["close", "open", "high", "low"]:
val = row.get(col, 0)
if pd.isna(val) or float(val) <= 0:
row_errors.append(f"D1: {col}<=0 (row {idx})")
break
# D2: OHLC一致性
o, h, l, c = float(row.get("open", 0)), float(row.get("high", 0)), \
float(row.get("low", 0)), float(row.get("close", 0))
if o > 0 and c > 0:
if h < max(o, c) or l > min(o, c):
row_errors.append(f"D2: OHLC不一致 (row {idx}, o={o} h={h} l={l} c={c})")
# D3: volume >= 0
vol = row.get("volume", 0)
if pd.notna(vol) and float(vol) < 0:
row_errors.append(f"D3: volume<0 (row {idx})")
# D7: 非未来日期
dt = str(row.get("date", row.get("datetime", "")))[:10]
if dt > today:
row_errors.append(f"D7: 未来日期 {dt} (row {idx})")
if row_errors:
result.fatal_errors.extend(row_errors)
result.failed_rows += 1
# D6: 日期不重复 (check after all rows)
date_col = "date" if "date" in df.columns else "datetime"
if date_col in df.columns:
dupes = df[df.duplicated(subset=[date_col], keep=False)]
if not dupes.empty and len(df) > 1:
result.fatal_errors.append(f"D6: {len(dupes)}条重复日期")
if result.fatal_errors:
result.passed = False