#!/usr/bin/env python3 """数据校验层 - V1 7条fatal规则""" import pandas as pd from datetime import datetime from typing import List, Tuple class ValidationResult: def __init__(self): self.passed = True self.fatal_errors: List[str] = [] self.warnings: List[str] = [] self.checked_rows = 0 self.failed_rows = 0 def __repr__(self): return (f"ValidationResult(passed={self.passed}, " f"fatal={len(self.fatal_errors)}, warnings={len(self.warnings)}, " f"rows={self.checked_rows}, failed={self.failed_rows})") def to_dict(self): return { "passed": self.passed, "fatal_errors": self.fatal_errors, "warnings": self.warnings, "checked_rows": self.checked_rows, "failed_rows": self.failed_rows, } class DataValidator: """数据校验器 - V1 7条fatal规则""" def validate(self, df: pd.DataFrame, data_type: str = "daily") -> ValidationResult: result = ValidationResult() if df is None or df.empty: result.fatal_errors.append("数据为空") result.passed = False return result result.checked_rows = len(df) if data_type == "daily": self._validate_daily(df, result) elif data_type == "realtime": self._validate_realtime(df, result) return result def validate_realtime_dict(self, data: dict) -> ValidationResult: """校验单条实时行情""" result = ValidationResult() result.checked_rows = 1 errors = [] # R1: 价格>0 if not data or data.get("current", 0) <= 0: errors.append("R1: current价格<=0") if data.get("prev_close", 0) <= 0: errors.append("R1: prev_close<=0") # R7: 必须携带source和fetched_at if not data.get("source"): errors.append("R7: 缺少source字段") if not data.get("fetched_at"): errors.append("R7: 缺少fetched_at字段") if errors: result.fatal_errors = errors result.passed = False result.failed_rows = 1 return result def _validate_daily(self, df: pd.DataFrame, result: ValidationResult): today = datetime.now().strftime("%Y-%m-%d") for idx, row in df.iterrows(): row_errors = [] # D1: 价格>0 for col in ["close", "open", "high", "low"]: val = row.get(col, 0) if pd.isna(val) or float(val) <= 0: row_errors.append(f"D1: {col}<=0 (row {idx})") break # D2: OHLC一致性 o, h, l, c = float(row.get("open", 0)), float(row.get("high", 0)), \ float(row.get("low", 0)), float(row.get("close", 0)) if o > 0 and c > 0: if h < max(o, c) or l > min(o, c): row_errors.append(f"D2: OHLC不一致 (row {idx}, o={o} h={h} l={l} c={c})") # D3: volume >= 0 vol = row.get("volume", 0) if pd.notna(vol) and float(vol) < 0: row_errors.append(f"D3: volume<0 (row {idx})") # D7: 非未来日期 dt = str(row.get("date", row.get("datetime", "")))[:10] if dt > today: row_errors.append(f"D7: 未来日期 {dt} (row {idx})") if row_errors: result.fatal_errors.extend(row_errors) result.failed_rows += 1 # D6: 日期不重复 (check after all rows) date_col = "date" if "date" in df.columns else "datetime" if date_col in df.columns: dupes = df[df.duplicated(subset=[date_col], keep=False)] if not dupes.empty and len(df) > 1: result.fatal_errors.append(f"D6: {len(dupes)}条重复日期") if result.fatal_errors: result.passed = False