diff --git a/data_platform/validator.py b/data_platform/validator.py new file mode 100644 index 00000000..55581f07 --- /dev/null +++ b/data_platform/validator.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +"""数据校验层 - V1 7条fatal规则""" +import pandas as pd +from datetime import datetime +from typing import List, Tuple + + +class ValidationResult: + def __init__(self): + self.passed = True + self.fatal_errors: List[str] = [] + self.warnings: List[str] = [] + self.checked_rows = 0 + self.failed_rows = 0 + + def __repr__(self): + return (f"ValidationResult(passed={self.passed}, " + f"fatal={len(self.fatal_errors)}, warnings={len(self.warnings)}, " + f"rows={self.checked_rows}, failed={self.failed_rows})") + + def to_dict(self): + return { + "passed": self.passed, + "fatal_errors": self.fatal_errors, + "warnings": self.warnings, + "checked_rows": self.checked_rows, + "failed_rows": self.failed_rows, + } + + +class DataValidator: + """数据校验器 - V1 7条fatal规则""" + + def validate(self, df: pd.DataFrame, data_type: str = "daily") -> ValidationResult: + result = ValidationResult() + if df is None or df.empty: + result.fatal_errors.append("数据为空") + result.passed = False + return result + result.checked_rows = len(df) + + if data_type == "daily": + self._validate_daily(df, result) + elif data_type == "realtime": + self._validate_realtime(df, result) + return result + + def validate_realtime_dict(self, data: dict) -> ValidationResult: + """校验单条实时行情""" + result = ValidationResult() + result.checked_rows = 1 + errors = [] + # R1: 价格>0 + if not data or data.get("current", 0) <= 0: + errors.append("R1: current价格<=0") + if data.get("prev_close", 0) <= 0: + errors.append("R1: prev_close<=0") + # R7: 必须携带source和fetched_at + if not data.get("source"): + errors.append("R7: 缺少source字段") + if not data.get("fetched_at"): + errors.append("R7: 缺少fetched_at字段") + if errors: + result.fatal_errors = errors + result.passed = False + result.failed_rows = 1 + return result + + def _validate_daily(self, df: pd.DataFrame, result: ValidationResult): + today = datetime.now().strftime("%Y-%m-%d") + + for idx, row in df.iterrows(): + row_errors = [] + # D1: 价格>0 + for col in ["close", "open", "high", "low"]: + val = row.get(col, 0) + if pd.isna(val) or float(val) <= 0: + row_errors.append(f"D1: {col}<=0 (row {idx})") + break + + # D2: OHLC一致性 + o, h, l, c = float(row.get("open", 0)), float(row.get("high", 0)), \ + float(row.get("low", 0)), float(row.get("close", 0)) + if o > 0 and c > 0: + if h < max(o, c) or l > min(o, c): + row_errors.append(f"D2: OHLC不一致 (row {idx}, o={o} h={h} l={l} c={c})") + + # D3: volume >= 0 + vol = row.get("volume", 0) + if pd.notna(vol) and float(vol) < 0: + row_errors.append(f"D3: volume<0 (row {idx})") + + # D7: 非未来日期 + dt = str(row.get("date", row.get("datetime", "")))[:10] + if dt > today: + row_errors.append(f"D7: 未来日期 {dt} (row {idx})") + + if row_errors: + result.fatal_errors.extend(row_errors) + result.failed_rows += 1 + + # D6: 日期不重复 (check after all rows) + date_col = "date" if "date" in df.columns else "datetime" + if date_col in df.columns: + dupes = df[df.duplicated(subset=[date_col], keep=False)] + if not dupes.empty and len(df) > 1: + result.fatal_errors.append(f"D6: {len(dupes)}条重复日期") + + if result.fatal_errors: + result.passed = False