From 970b396ef4e5d1e6adec265cd2789f9110e45a83 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 3 May 2026 12:52:27 +0800 Subject: [PATCH] auto-sync: 2026-05-03 12:52:27 --- data_platform/daily_all_update.py | 328 ++++++++++++++++++++++++------ 1 file changed, 270 insertions(+), 58 deletions(-) diff --git a/data_platform/daily_all_update.py b/data_platform/daily_all_update.py index bb57a7d6..3087e57a 100644 --- a/data_platform/daily_all_update.py +++ b/data_platform/daily_all_update.py @@ -9,20 +9,32 @@ 设计原则: - 增量更新,不重复下载 - 失败不影响其他股票 - - 进度持久化,支持断点续传 - - 限频保护 - - 日志完整 + - 进度持久化,支持断点续传(日线+15min均有进度文件) + - 限频保护 + 全局源不可用检测 + - DB轮转备份(保留7天) + - 日志完整 + 失败率告警 用法: python3 daily_all_update.py # 全量更新(日线+15min) python3 daily_all_update.py --skip-daily # 只更新15min python3 daily_all_update.py --skip-15min # 只更新日线 + +变更记录: + v1.1 (2026-05-03) - 司马懿评审后修改: + - interval "1m" → "15m" + - 15min增量改为严格按日期追加(不再用drop_duplicates keep=last) + - 日线增加进度文件 + - 全局源不可用检测(连续30只首次失败→终止) + - DB轮转备份(保留7天) + - 失败率>5%告警 """ import argparse +import glob import json import os import re +import shutil import sqlite3 import sys import time @@ -48,7 +60,10 @@ REQUEST_INTERVAL = 0.3 MAX_RETRIES = 3 CONSECUTIVE_FAIL_PAUSE = 60 MAX_CONSECUTIVE_FAILS = 10 -DB_BATCH_SIZE = 50000 +# 全局源不可用检测:连续N只股票首次请求就失败,判定源不可用 +GLOBAL_FAIL_THRESHOLD = 30 +# DB轮转备份数 +DB_BACKUP_KEEP_DAYS = 7 HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"} @@ -97,6 +112,91 @@ def nas_mounted() -> bool: return DAILY_DIR.exists() and MINUTE_15_DIR.exists() +# ======================== DB备份 ======================== + +def rotate_db_backup(): + """轮转备份vnpy DB,保留最近N天""" + backup_dir = VNPY_DB_PATH.parent + today = datetime.now().strftime("%Y%m%d") + backup_file = backup_dir / f"quant_trading_{today}.db.bak" + + # 如果今天已备份过,跳过 + if backup_file.exists(): + logger.info("DB今日已备份: %s", backup_file) + return + + logger.info("开始DB备份: %s → %s", VNPY_DB_PATH.name, backup_file.name) + try: + shutil.copy2(str(VNPY_DB_PATH), str(backup_file)) + logger.info("✅ DB备份完成 (%.1f MB)", backup_file.stat().st_size / 1024 / 1024) + except Exception as e: + logger.error("❌ DB备份失败: %s", e) + return + + # 清理过期备份 + cutoff = datetime.now() - timedelta(days=DB_BACKUP_KEEP_DAYS) + for f in backup_dir.glob("quant_trading_*.db.bak"): + try: + # 从文件名提取日期 + date_str = f.stem.split("_")[-1] # quant_trading_20260503.db.bak → 20260503 + file_date = datetime.strptime(date_str, "%Y%m%d") + if file_date < cutoff: + f.unlink() + logger.info("清理过期备份: %s", f.name) + except (ValueError, OSError): + pass + + +# ======================== 进度文件 ======================== + +def load_progress(name: str) -> set: + """加载进度文件,返回已完成的代码集合""" + progress_file = PROGRESS_DIR / f"{name}_progress.json" + PROGRESS_DIR.mkdir(parents=True, exist_ok=True) + if progress_file.exists(): + try: + return set(json.loads(progress_file.read_text()).get("done", [])) + except Exception: + pass + return set() + + +def save_progress(name: str, done_set: set): + """保存进度文件""" + progress_file = PROGRESS_DIR / f"{name}_progress.json" + progress_file.write_text(json.dumps({ + "done": sorted(list(done_set)), + "ts": datetime.now().isoformat(), + })) + + +# ======================== 全局源不可用检测 ======================== + +class SourceHealthMonitor: + """监控数据源健康状态,连续N只首次请求失败则判定源不可用""" + + def __init__(self, threshold: int = GLOBAL_FAIL_THRESHOLD): + self.threshold = threshold + self.consecutive_first_fail = 0 + + def report(self, code: str, first_attempt_failed: bool) -> bool: + """ + 报告单只股票的首次请求结果 + 返回True表示源仍健康,False表示源不可用应终止 + """ + if first_attempt_failed: + self.consecutive_first_fail += 1 + if self.consecutive_first_fail >= self.threshold: + logger.error( + "⚠️ 全局源不可用检测触发:连续 %d 只股票首次请求失败,终止本次更新", + self.consecutive_first_fail, + ) + return False + else: + self.consecutive_first_fail = 0 + return True + + # ======================== 日线更新 ======================== def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: @@ -129,7 +229,6 @@ def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[p for c in ["open", "close", "high", "low", "volume", "amount"]: df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") - # 确保date为字符串比较 df["date"] = df["date"].astype(str) mask = (df["date"] >= start_date) & (df["date"] <= end_date) result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] @@ -142,7 +241,6 @@ def update_daily_parquet(code: str, new_data: pd.DataFrame) -> int: year = datetime.now().year parquet_path = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet" - # 确保date为字符串类型,避免和已有parquet的datetime.date冲突 new_data = new_data.copy() new_data["date"] = new_data["date"].astype(str) @@ -186,38 +284,67 @@ def run_daily_update(codes: List[str]) -> dict: stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} all_db_values = [] consecutive_fails = 0 + source_aborted = False + + # 进度文件(硬伤1修复) + done_set = load_progress("daily") + todo = [c for c in codes if c not in done_set] + logger.info("待更新: %d(已完成: %d)", len(todo), len(done_set)) + + # 全局源健康监控(硬伤2修复) + health = SourceHealthMonitor() + + for i, code in enumerate(todo): + if source_aborted: + break - for i, code in enumerate(codes): if i > 0: time.sleep(REQUEST_INTERVAL) last_date = get_daily_last_date(code) if not last_date: stats["skipped"] += 1 + done_set.add(code) continue next_day = (pd.Timestamp(last_date) + timedelta(days=1)).strftime("%Y-%m-%d") if next_day > today: stats["skipped"] += 1 + done_set.add(code) continue data = None + first_attempt_failed = False for attempt in range(MAX_RETRIES): try: data = fetch_tencent_daily(code, next_day, today) - break + if data is not None: + break + if attempt == 0: + first_attempt_failed = True except Exception as e: + if attempt == 0: + first_attempt_failed = True if attempt < MAX_RETRIES - 1: time.sleep(1) else: logger.debug("日线 %s 失败: %s", code, e) + # 全局源不可用检测 + if not health.report(code, first_attempt_failed and data is None): + source_aborted = True + logger.error("❌ 腾讯日线源不可用,终止日线更新") + break + if data is None or data.empty: stats["skipped"] += 1 + done_set.add(code) continue # 校验 if (data[["open", "high", "low", "close"]] <= 0).any().any(): stats["failed"] += 1 + consecutive_fails += 1 + done_set.add(code) continue try: @@ -242,18 +369,27 @@ def run_daily_update(codes: List[str]) -> dict: consecutive_fails += 1 logger.warning("日线写入失败 %s: %s", code, e) + done_set.add(code) + if consecutive_fails >= MAX_CONSECUTIVE_FAILS: logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) time.sleep(CONSECUTIVE_FAIL_PAUSE) consecutive_fails = 0 if (i + 1) % 500 == 0: - logger.info("日线进度: %d/%d updated=%d", i + 1, len(codes), stats["updated"]) + logger.info("日线进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) + save_progress("daily", done_set) # 写vnpy DB if all_db_values: _write_vnpy_db(all_db_values, "日线") + # 保存最终进度 + save_progress("daily", done_set) + + if source_aborted: + stats["source_aborted"] = True + logger.info("日线完成: %s", json.dumps(stats, ensure_ascii=False)) return stats @@ -284,8 +420,24 @@ def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]: return df[cols] -def update_15min_parquet(code: str) -> Tuple[str, int]: - """增量下载并合并15分钟线""" +def get_15min_last_date(parquet_path: Path) -> str: + """获取15min Parquet中最后一条的时间戳""" + if not parquet_path.exists(): + return "" + try: + df = pd.read_parquet(parquet_path, columns=["day"]) + if not df.empty: + return str(df["day"].max()) + except Exception: + pass + return "" + + +def update_15min_parquet(code: str) -> Tuple[str, int, list]: + """ + 增量下载并合并15分钟线(阻塞项2修复:严格按日期追加) + 返回: (status, new_rows, db_values) + """ prefix, clean = get_market_prefix(code) symbol = f"{prefix}{clean}" parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet" @@ -301,7 +453,7 @@ def update_15min_parquet(code: str) -> Tuple[str, int]: time.sleep(1) if df_new is None or df_new.empty: - return "failed", 0 + return "failed", 0, [] # 数据校验 for col in ["open", "high", "low", "close"]: @@ -317,27 +469,62 @@ def update_15min_parquet(code: str) -> Tuple[str, int]: if bad_ohlc.any(): df_new = df_new[~bad_ohlc] if df_new.empty: - return "failed", 0 + return "failed", 0, [] # 转回object类型 df_new["volume"] = df_new["volume"].astype(str) df_new["amount"] = df_new["amount"].astype(str) + df_new["day"] = df_new["day"].astype(str) - if parquet_path.exists(): + # 获取已有数据最后日期 + last_date = get_15min_last_date(parquet_path) + + if last_date: + # 严格追加:只保留 day > last_date 的新数据(阻塞项2修复) + df_increment = df_new[df_new["day"] > last_date].copy() + + if df_increment.empty: + # 没有新数据,不需要写入 + return "ok", 0, [] + + # 缺口检测:API返回的最早数据比last_date晚超过1根K线(15min) + api_first = df_new["day"].min() + expected_next = str(pd.Timestamp(last_date) + pd.Timedelta(minutes=15)) + if api_first > expected_next: + logger.warning( + "⚠️ 15min数据缺口 %s: last=%s, api_first=%s (expected=%s)", + code, last_date, api_first, expected_next, + ) + + # 合并 existing = pd.read_parquet(parquet_path) - old_len = len(existing) - combined = pd.concat([existing, df_new], ignore_index=True) - combined = combined.drop_duplicates(subset=["day"], keep="last") + existing["day"] = existing["day"].astype(str) + combined = pd.concat([existing, df_increment], ignore_index=True) combined = combined.sort_values("day").reset_index(drop=True) - new_rows = len(combined) - old_len + new_rows = len(df_increment) else: - combined = df_new + # 无已有数据,直接写入全部 + combined = df_new.sort_values("day").reset_index(drop=True) + df_increment = df_new new_rows = len(df_new) + # 原子写入 tmp = parquet_path.with_suffix(".tmp") combined.to_parquet(tmp, index=False) tmp.rename(parquet_path) - return "ok", new_rows + + # 构建DB写入数据(interval="15m",阻塞项1修复) + exchange = "SSE" if prefix == "sh" else "SZSE" + db_values = [] + for _, row in df_increment.iterrows(): + db_values.append(( + clean, exchange, str(row["day"]), "15m", + float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, + float(row.get("open", 0)), float(row.get("high", 0)), + float(row.get("low", 0)), float(row.get("close", 0)), + )) + + return "ok", new_rows, db_values def run_15min_update(codes: List[str]) -> dict: @@ -348,55 +535,43 @@ def run_15min_update(codes: List[str]) -> dict: stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} all_db_values = [] consecutive_fails = 0 + source_aborted = False - # 加载进度(支持断点续传) - progress_file = PROGRESS_DIR / "15min_progress.json" - PROGRESS_DIR.mkdir(parents=True, exist_ok=True) - done_set = set() - if progress_file.exists(): - try: - done_set = set(json.loads(progress_file.read_text()).get("done", [])) - except Exception: - pass - + # 进度文件 + done_set = load_progress("15min") todo = [c for c in codes if c not in done_set] logger.info("待更新: %d(已完成: %d)", len(todo), len(done_set)) + # 全局源健康监控 + health = SourceHealthMonitor() + for i, code in enumerate(todo): + if source_aborted: + break + if i > 0: time.sleep(REQUEST_INTERVAL) try: - status, new_rows = update_15min_parquet(code) + status, new_rows, db_values = update_15min_parquet(code) + first_attempt_failed = (status == "failed") except Exception as e: - status, new_rows = "failed", 0 + status, new_rows, db_values = "failed", 0, [] + first_attempt_failed = True logger.debug("15min %s 异常: %s", code, e) + # 全局源不可用检测 + if not health.report(code, first_attempt_failed): + source_aborted = True + logger.error("❌ 新浪15min源不可用,终止15min更新") + break + if status == "ok": stats["updated"] += 1 stats["records"] += new_rows + stats["db_records"] += len(db_values) + all_db_values.extend(db_values) consecutive_fails = 0 - - # 收集vnpy DB数据 - prefix, clean = get_market_prefix(code) - parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet" - if parquet_path.exists() and new_rows > 0: - df = pd.read_parquet(parquet_path) - # 只取最新的增量部分 - if len(df) > new_rows: - df_new_part = df.iloc[-new_rows:] - else: - df_new_part = df - - exchange = "SSE" if prefix == "sh" else "SZSE" - for _, row in df_new_part.iterrows(): - all_db_values.append(( - clean, exchange, str(row["day"]), "1m", - float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, - float(row.get("open", 0)), float(row.get("high", 0)), - float(row.get("low", 0)), float(row.get("close", 0)), - )) - stats["db_records"] += len(df_new_part) else: stats["failed"] += 1 consecutive_fails += 1 @@ -410,15 +585,18 @@ def run_15min_update(codes: List[str]) -> dict: if (i + 1) % 500 == 0: logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) - # 保存进度 - progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()})) + save_progress("15min", done_set) # 写vnpy DB if all_db_values: _write_vnpy_db(all_db_values, "15min") # 保存最终进度 - progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()})) + save_progress("15min", done_set) + + if source_aborted: + stats["source_aborted"] = True + logger.info("15min完成: %s", json.dumps(stats, ensure_ascii=False)) return stats @@ -449,7 +627,7 @@ def _write_vnpy_db(values: list, label: str): conn = sqlite3.connect(str(VNPY_DB_PATH), timeout=120) c = conn.cursor() c.execute("PRAGMA journal_mode=WAL") - c.execute(f"ATTACH DATABASE ? AS tmpdb", (local_tmp,)) + c.execute("ATTACH DATABASE ? AS tmpdb", (local_tmp,)) c.execute("INSERT OR REPLACE INTO main.dbbardata SELECT * FROM tmpdb.dbbardata") conn.commit() c.execute("DETACH DATABASE tmpdb") @@ -470,6 +648,27 @@ def _write_vnpy_db(values: list, label: str): os.remove(local_tmp) +# ======================== 告警与报告 ======================== + +def check_failure_rate(stats: dict, label: str) -> bool: + """检查失败率,返回True表示异常""" + total = stats.get("updated", 0) + stats.get("failed", 0) + stats.get("skipped", 0) + failed = stats.get("failed", 0) + if total == 0: + return False + rate = failed / total + if rate > 0.05: + logger.error( + "❌ 数据更新异常 [%s]:失败率 %.1f%% (%d/%d)", + label, rate * 100, failed, total, + ) + return True + if stats.get("source_aborted"): + logger.error("❌ [%s] 源不可用导致终止", label) + return True + return False + + # ======================== 主入口 ======================== def main(): @@ -488,17 +687,30 @@ def main(): t_start = time.time() report = {} + has_alert = False + + # DB备份(硬伤3修复) + rotate_db_backup() if not args.skip_daily: report["daily"] = run_daily_update(codes) + if check_failure_rate(report["daily"], "日线"): + has_alert = True if not args.skip_15min: report["15min"] = run_15min_update(codes) + if check_failure_rate(report["15min"], "15min"): + has_alert = True elapsed = time.time() - t_start report["elapsed_sec"] = round(elapsed, 1) + report["has_alert"] = has_alert + logger.info("=" * 60) - logger.info("全部完成,耗时 %.1f 秒", elapsed) + if has_alert: + logger.error("⚠️ 本次更新存在异常,请检查日志") + else: + logger.info("✅ 全部完成,耗时 %.1f 秒", elapsed) logger.info(json.dumps(report, ensure_ascii=False, indent=2)) # 写入报告