#!/usr/bin/env python3 """ 全市场每日增量更新 - 日线 + 15分钟线 功能: 1. 日线:东方财富API(主) + 腾讯API(备) → 更新Parquet + vnpy DB 2. 15分钟线:新浪API(800条) → 增量合并Parquet + 导入vnpy DB 数据源降级链: 日线: 东方财富(amount真实,可拿多年) → 腾讯(amount有时0) 15min: 新浪(amount真实,800条) → 无备源 设计原则: - 增量更新,不重复下载 - 失败不影响其他股票 - 进度持久化,支持断点续传(日线+15min均有进度文件) - 限频保护 + 全局源不可用检测 - DB轮转备份(保留7天) - 日志完整 + 失败率告警 用法: python3 daily_all_update.py # 全量更新(日线+15min) python3 daily_all_update.py --skip-daily # 只更新15min python3 daily_all_update.py --skip-15min # 只更新日线 变更记录: v1.1 (2026-05-03) - 司马懿评审后修改: - interval "1m" → "15m" - 15min增量改为严格按日期追加(不再用drop_duplicates keep=last) - 日线增加进度文件 - 全局源不可用检测(连续30只首次失败→终止) - DB轮转备份(保留7天) - 失败率>5%告警 v1.2 (2026-05-05) - 东方财富集成: - 日线主源从腾讯切换为东方财富(amount真实,可拿多年历史) - 腾讯降为日线备源 - 东方财富限频:4s/请求 + 随机抖动±1s - 反爬:Referer + 完整UA + ut参数 + JSONP """ import argparse import glob import json import os import re import shutil import sqlite3 import sys import time import logging import urllib.request import urllib.error from datetime import datetime, timedelta from pathlib import Path from typing import Optional, List, Tuple import pandas as pd # ======================== 配置 ======================== LOG_DIR = Path("/Volumes/stock/logs/daily_update") DAILY_DIR = Path("/Volumes/stock/A股数据/日线数据/daily") MINUTE_15_DIR = Path("/Volumes/stock/minute_kline/15min") VNPY_DB_PATH = Path("/Volumes/stock/sanguo_vnpy/data/quant_trading.db") ALL_STOCKS_FILE = Path("/Volumes/stock/A股数据/stock_info/stock_basic_info_raw_20260326_113530.csv") PROGRESS_DIR = Path("/Volumes/stock/logs/daily_update/progress") REQUEST_INTERVAL_SINA = 0.3 # 新浪:稳定,0.3s够 REQUEST_INTERVAL_EM = 4.0 # 东方财富:严格3-5s,加随机抖动 EM_JITTER = 1.0 # 东方财富随机抖动范围(±秒) MAX_RETRIES = 3 CONSECUTIVE_FAIL_PAUSE = 60 MAX_CONSECUTIVE_FAILS = 10 # 全局源不可用检测:连续N只股票首次请求就失败,判定源不可用 GLOBAL_FAIL_THRESHOLD = 30 # DB轮转备份数 DB_BACKUP_KEEP_DAYS = 7 HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"} HEADERS_EM = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Referer": "https://quote.eastmoney.com/", "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", } def setup_logging(): LOG_DIR.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = LOG_DIR / f"update_{ts}.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", handlers=[ logging.FileHandler(log_file, encoding="utf-8"), logging.StreamHandler(), ], ) return logging.getLogger(__name__) logger = setup_logging() def _make_opener(): return urllib.request.build_opener(urllib.request.ProxyHandler({})) # ======================== 工具函数 ======================== def get_market_prefix(code: str) -> Tuple[str, str]: code = re.sub(r"[^0-9]", "", code).zfill(6) if code.startswith(("60", "68", "51")): return "sh", code return "sz", code def get_all_codes() -> List[str]: df = pd.read_csv(ALL_STOCKS_FILE) for col in ["代码", "code", "股票代码"]: if col in df.columns: return [str(c).zfill(6) for c in df[col].tolist()] raise ValueError(f"找不到代码列: {list(df.columns)}") def nas_mounted() -> bool: return DAILY_DIR.exists() and MINUTE_15_DIR.exists() # ======================== DB备份 ======================== def rotate_db_backup(): """轮转备份vnpy DB,保留最近N天""" backup_dir = VNPY_DB_PATH.parent today = datetime.now().strftime("%Y%m%d") backup_file = backup_dir / f"quant_trading_{today}.db.bak" # 如果今天已备份过,跳过 if backup_file.exists(): logger.info("DB今日已备份: %s", backup_file) return logger.info("开始DB备份: %s → %s", VNPY_DB_PATH.name, backup_file.name) try: shutil.copy2(str(VNPY_DB_PATH), str(backup_file)) logger.info("✅ DB备份完成 (%.1f MB)", backup_file.stat().st_size / 1024 / 1024) except Exception as e: logger.error("❌ DB备份失败: %s", e) return # 清理过期备份 cutoff = datetime.now() - timedelta(days=DB_BACKUP_KEEP_DAYS) for f in backup_dir.glob("quant_trading_*.db.bak"): try: # 从文件名提取日期 date_str = f.stem.split("_")[-1] # quant_trading_20260503.db.bak → 20260503 file_date = datetime.strptime(date_str, "%Y%m%d") if file_date < cutoff: f.unlink() logger.info("清理过期备份: %s", f.name) except (ValueError, OSError): pass # ======================== 进度文件 ======================== def load_progress(name: str) -> set: """加载进度文件,返回已完成的代码集合""" progress_file = PROGRESS_DIR / f"{name}_progress.json" PROGRESS_DIR.mkdir(parents=True, exist_ok=True) if progress_file.exists(): try: return set(json.loads(progress_file.read_text()).get("done", [])) except Exception: pass return set() def save_progress(name: str, done_set: set): """保存进度文件""" progress_file = PROGRESS_DIR / f"{name}_progress.json" progress_file.write_text(json.dumps({ "done": sorted(list(done_set)), "ts": datetime.now().isoformat(), })) # ======================== 全局源不可用检测 ======================== class SourceHealthMonitor: """监控数据源健康状态,连续N只首次请求失败则判定源不可用""" def __init__(self, threshold: int = GLOBAL_FAIL_THRESHOLD): self.threshold = threshold self.consecutive_first_fail = 0 def report(self, code: str, first_attempt_failed: bool) -> bool: """ 报告单只股票的首次请求结果 返回True表示源仍健康,False表示源不可用应终止 """ if first_attempt_failed: self.consecutive_first_fail += 1 if self.consecutive_first_fail >= self.threshold: logger.error( "⚠️ 全局源不可用检测触发:连续 %d 只股票首次请求失败,终止本次更新", self.consecutive_first_fail, ) return False else: self.consecutive_first_fail = 0 return True # ======================== 日线更新 ======================== def get_em_secid(code: str) -> str: """获取东方财富secid: 1.沪市 0.深市""" if code.startswith(("60", "68", "51")): return f"1.{code}" return f"0.{code}" def fetch_eastmoney_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: """东方财富K线API获取日线(主源) 特点: - 一次请求可拿多年数据(实测4年1046条) - amount有真实值 - 反爬严格:必须Referer+UA+ut,限频3-5s/请求 """ import requests as _requests import random secid = get_em_secid(code) ts = str(int(time.time() * 1000)) url = ( f"https://push2his.eastmoney.com/api/qt/stock/kline/get?" f"secid={secid}&klt=101&fqt=1&" f"beg={start_date.replace('-', '')}&end={end_date.replace('-', '')}&" f"fields1=f1,f2,f3,f4,f5,f6,f7,f8&" f"fields2=f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61&" f"ut=b2884a393a59ad64002292a3e90d46a5&" f"lmt=10000&" f"cb=jQuery_em_{ts}&_={ts}" ) session = _requests.Session() session.trust_env = False # 绕过系统代理 try: r = session.get(url, headers=HEADERS_EM, timeout=15, verify=False) if r.status_code != 200: return None # 解析JSONP text = r.text start_idx = text.index("(") + 1 end_idx = text.rindex(")") data = json.loads(text[start_idx:end_idx]) except Exception as e: logger.debug("东方财富日线失败 %s: %s", code, e) return None if data.get("rc") != 0: return None klines = data.get("data", {}).get("klines", []) if not klines: return None # 解析: date,open,close,high,low,volume,amount,amplitude,pct_change,change,turnover_rate rows = [] for line in klines: parts = line.split(",") if len(parts) < 7: continue rows.append({ "date": parts[0], "open": float(parts[1]), "close": float(parts[2]), "high": float(parts[3]), "low": float(parts[4]), "volume": float(parts[5]), "amount": float(parts[6]), }) if not rows: return None df = pd.DataFrame(rows) # 东方财富日期格式可能含时间,标准化 df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") df["date"] = df["date"].astype(str) mask = (df["date"] >= start_date) & (df["date"] <= end_date) result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] return result if not result.empty else None def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: """腾讯K线API获取日线增量""" prefix, clean = get_market_prefix(code) tq = f"{prefix}{clean}" days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10 url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days}," opener = _make_opener() req = urllib.request.Request(url, headers=HEADERS) with opener.open(req, timeout=10) as r: raw = r.read().decode("utf-8", errors="replace") data = json.loads(raw) d = data.get("data") if not isinstance(d, dict): return None klines = d.get(tq, {}).get("day", []) if not klines: return None df = pd.DataFrame(klines) ncols = len(df.columns) if ncols >= 7: df.columns = ["date", "open", "close", "high", "low", "volume", "amount"][:ncols] else: df.columns = ["date", "open", "close", "high", "low", "volume"][:ncols] if "amount" not in df.columns: df["amount"] = 0.0 for c in ["open", "close", "high", "low", "volume", "amount"]: df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") df["date"] = df["date"].astype(str) mask = (df["date"] >= start_date) & (df["date"] <= end_date) result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] return result if not result.empty else None def update_daily_parquet(code: str, new_data: pd.DataFrame) -> int: """增量写入日线Parquet""" prefix, clean = get_market_prefix(code) year = datetime.now().year parquet_path = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet" new_data = new_data.copy() new_data["date"] = new_data["date"].astype(str) if parquet_path.exists(): existing = pd.read_parquet(parquet_path) existing["date"] = existing["date"].astype(str) combined = pd.concat([existing, new_data], ignore_index=True) combined = combined.drop_duplicates(subset=["date"], keep="last") combined = combined.sort_values("date").reset_index(drop=True) else: parquet_path.parent.mkdir(parents=True, exist_ok=True) combined = new_data tmp = parquet_path.with_suffix(".tmp") combined.to_parquet(tmp, index=False) tmp.rename(parquet_path) return len(new_data) def get_daily_last_date(code: str) -> str: prefix, clean = get_market_prefix(code) for year in range(datetime.now().year, 2009, -1): fpath = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet" if fpath.exists(): try: df = pd.read_parquet(fpath, columns=["date"]) if not df.empty: val = df["date"].max() return str(val)[:10] except Exception: pass return "" def run_daily_update(codes: List[str]) -> dict: """日线增量更新""" logger.info("=" * 60) logger.info("日线增量更新开始,共 %d 只", len(codes)) today = datetime.now().strftime("%Y-%m-%d") stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} all_db_values = [] consecutive_fails = 0 source_aborted = False # 进度文件(硬伤1修复) done_set = load_progress("daily") todo = [c for c in codes if c not in done_set] logger.info("待更新: %d(已完成: %d)", len(todo), len(done_set)) # 全局源健康监控(硬伤2修复) health = SourceHealthMonitor() for i, code in enumerate(todo): if source_aborted: break if i > 0: # 东方财富严格限频:基础间隔4s + 伪随机抖动±1s jitter = (hash(code) % 200 - 100) / 100.0 * EM_JITTER time.sleep(REQUEST_INTERVAL_EM + jitter) last_date = get_daily_last_date(code) if not last_date: stats["skipped"] += 1 done_set.add(code) continue next_day = (pd.Timestamp(last_date) + timedelta(days=1)).strftime("%Y-%m-%d") if next_day > today: stats["skipped"] += 1 done_set.add(code) continue data = None source_used = "" first_attempt_failed = False # 主源:东方财富(amount真实,可拿多年历史) for attempt in range(MAX_RETRIES): try: data = fetch_eastmoney_daily(code, next_day, today) if data is not None: source_used = "eastmoney" break if attempt == 0: first_attempt_failed = True except Exception: if attempt == 0: first_attempt_failed = True if attempt < MAX_RETRIES - 1: time.sleep(2) # 备源:腾讯(amount有时为0) if data is None: first_attempt_failed_backup = False for attempt in range(MAX_RETRIES): try: data = fetch_tencent_daily(code, next_day, today) if data is not None: source_used = "tencent" break if attempt == 0: first_attempt_failed_backup = True except Exception: if attempt == 0: first_attempt_failed_backup = True if attempt < MAX_RETRIES - 1: time.sleep(1) # 只有两个源都首次失败才算全局失败 if first_attempt_failed and first_attempt_failed_backup: first_attempt_failed = True else: first_attempt_failed = False # 全局源不可用检测 if not health.report(code, first_attempt_failed and data is None): source_aborted = True logger.error("❌ 日线源均不可用(东方财富+腾讯),终止日线更新") break if data is None or data.empty: stats["skipped"] += 1 done_set.add(code) continue # 校验 if (data[["open", "high", "low", "close"]] <= 0).any().any(): stats["failed"] += 1 consecutive_fails += 1 done_set.add(code) continue try: n = update_daily_parquet(code, data) stats["updated"] += 1 stats["records"] += n consecutive_fails = 0 # 收集vnpy DB数据 prefix, clean = get_market_prefix(code) exchange = "SSE" if prefix == "sh" else "SZSE" for _, row in data.iterrows(): all_db_values.append(( clean, exchange, str(row["date"]), "d", float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, float(row.get("open", 0)), float(row.get("high", 0)), float(row.get("low", 0)), float(row.get("close", 0)), )) stats["db_records"] += 1 except Exception as e: stats["failed"] += 1 consecutive_fails += 1 logger.warning("日线写入失败 %s: %s", code, e) done_set.add(code) if consecutive_fails >= MAX_CONSECUTIVE_FAILS: logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) time.sleep(CONSECUTIVE_FAIL_PAUSE) consecutive_fails = 0 if (i + 1) % 500 == 0: logger.info("日线进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) save_progress("daily", done_set) # 写vnpy DB if all_db_values: _write_vnpy_db(all_db_values, "日线") # 保存最终进度 save_progress("daily", done_set) if source_aborted: stats["source_aborted"] = True logger.info("日线完成: %s", json.dumps(stats, ensure_ascii=False)) return stats # ======================== 15分钟线更新 ======================== def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]: """新浪15分钟K线API""" url = ( f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20=min15_{symbol}=/" f"CN_MarketDataService.getKLineData?symbol={symbol}&scale=15&ma=no&datalen={datalen}" ) opener = _make_opener() req = urllib.request.Request(url, headers=HEADERS) with opener.open(req, timeout=15) as r: raw = r.read().decode("utf-8", errors="replace") m = re.search(r"\((\[.*\])\)", raw, re.DOTALL) if not m: return None data = json.loads(m.group(1)) if not data: return None df = pd.DataFrame(data) cols = ["day", "open", "high", "low", "close", "volume", "amount"] for c in cols: if c not in df.columns: return None return df[cols] def get_15min_last_date(parquet_path: Path) -> str: """获取15min Parquet中最后一条的时间戳""" if not parquet_path.exists(): return "" try: df = pd.read_parquet(parquet_path, columns=["day"]) if not df.empty: return str(df["day"].max()) except Exception: pass return "" def update_15min_parquet(code: str) -> Tuple[str, int, list]: """ 增量下载并合并15分钟线(阻塞项2修复:严格按日期追加) 返回: (status, new_rows, db_values) """ prefix, clean = get_market_prefix(code) symbol = f"{prefix}{clean}" parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet" df_new = None for attempt in range(MAX_RETRIES): try: df_new = try_sina_15min(symbol) if df_new is not None and len(df_new) > 0: break except Exception: if attempt < MAX_RETRIES - 1: time.sleep(1) if df_new is None or df_new.empty: return "failed", 0, [] # 数据校验 for col in ["open", "high", "low", "close"]: df_new[col] = pd.to_numeric(df_new[col], errors="coerce") df_new["volume"] = pd.to_numeric(df_new["volume"], errors="coerce").fillna(0) df_new["amount"] = pd.to_numeric(df_new["amount"], errors="coerce").fillna(0) bad_zero = (df_new[["close", "open"]] <= 0).any(axis=1) if bad_zero.any(): df_new = df_new[~bad_zero] bad_ohlc = (df_new["high"] < df_new[["open", "close"]].max(axis=1)) | \ (df_new["low"] > df_new[["open", "close"]].min(axis=1)) if bad_ohlc.any(): df_new = df_new[~bad_ohlc] if df_new.empty: return "failed", 0, [] # 转回object类型 df_new["volume"] = df_new["volume"].astype(str) df_new["amount"] = df_new["amount"].astype(str) df_new["day"] = df_new["day"].astype(str) # 获取已有数据最后日期 last_date = get_15min_last_date(parquet_path) if last_date: # 严格追加:只保留 day > last_date 的新数据(阻塞项2修复) df_increment = df_new[df_new["day"] > last_date].copy() if df_increment.empty: # 没有新数据,不需要写入 return "ok", 0, [] # 缺口检测:API返回的最早数据比last_date晚超过1根K线(15min) api_first = df_new["day"].min() expected_next = str(pd.Timestamp(last_date) + pd.Timedelta(minutes=15)) if api_first > expected_next: logger.warning( "⚠️ 15min数据缺口 %s: last=%s, api_first=%s (expected=%s)", code, last_date, api_first, expected_next, ) # 合并 existing = pd.read_parquet(parquet_path) existing["day"] = existing["day"].astype(str) combined = pd.concat([existing, df_increment], ignore_index=True) combined = combined.sort_values("day").reset_index(drop=True) new_rows = len(df_increment) else: # 无已有数据,直接写入全部 combined = df_new.sort_values("day").reset_index(drop=True) df_increment = df_new new_rows = len(df_new) # 原子写入 tmp = parquet_path.with_suffix(".tmp") combined.to_parquet(tmp, index=False) tmp.rename(parquet_path) # 构建DB写入数据(interval="15m",阻塞项1修复) exchange = "SSE" if prefix == "sh" else "SZSE" db_values = [] for _, row in df_increment.iterrows(): db_values.append(( clean, exchange, str(row["day"]), "15m", float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, float(row.get("open", 0)), float(row.get("high", 0)), float(row.get("low", 0)), float(row.get("close", 0)), )) return "ok", new_rows, db_values def run_15min_update(codes: List[str]) -> dict: """15分钟线增量更新""" logger.info("=" * 60) logger.info("15分钟线增量更新开始,共 %d 只", len(codes)) stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} all_db_values = [] consecutive_fails = 0 source_aborted = False # 进度文件 done_set = load_progress("15min") todo = [c for c in codes if c not in done_set] logger.info("待更新: %d(已完成: %d)", len(todo), len(done_set)) # 全局源健康监控 health = SourceHealthMonitor() for i, code in enumerate(todo): if source_aborted: break if i > 0: time.sleep(REQUEST_INTERVAL_SINA) try: status, new_rows, db_values = update_15min_parquet(code) first_attempt_failed = (status == "failed") except Exception as e: status, new_rows, db_values = "failed", 0, [] first_attempt_failed = True logger.debug("15min %s 异常: %s", code, e) # 全局源不可用检测 if not health.report(code, first_attempt_failed): source_aborted = True logger.error("❌ 新浪15min源不可用,终止15min更新") break if status == "ok": stats["updated"] += 1 stats["records"] += new_rows stats["db_records"] += len(db_values) all_db_values.extend(db_values) consecutive_fails = 0 else: stats["failed"] += 1 consecutive_fails += 1 done_set.add(code) if consecutive_fails >= MAX_CONSECUTIVE_FAILS: logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) time.sleep(CONSECUTIVE_FAIL_PAUSE) consecutive_fails = 0 if (i + 1) % 500 == 0: logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) save_progress("15min", done_set) # 写vnpy DB if all_db_values: _write_vnpy_db(all_db_values, "15min") # 保存最终进度 save_progress("15min", done_set) if source_aborted: stats["source_aborted"] = True logger.info("15min完成: %s", json.dumps(stats, ensure_ascii=False)) return stats # ======================== vnpy DB写入 ======================== def _write_vnpy_db(values: list, label: str): """增量写入vnpy SQLite DB - 本地临时DB写入后ATTACH导入NAS DB""" logger.info("写入vnpy DB [%s]: %d 条记录", label, len(values)) local_tmp = f"/tmp/vnpy_update_{label}_{int(time.time())}.db" try: # 1. 本地临时DB写入增量 conn = sqlite3.connect(local_tmp, timeout=30) c = conn.cursor() c.execute("PRAGMA journal_mode=WAL") c.execute("""CREATE TABLE IF NOT EXISTS dbbardata ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, exchange TEXT NOT NULL, datetime TEXT NOT NULL, interval TEXT NOT NULL, volume REAL DEFAULT 0, turnover REAL DEFAULT 0, open_interest REAL DEFAULT 0, open_price REAL, high_price REAL, low_price REAL, close_price REAL, UNIQUE(symbol, exchange, datetime, interval) )""") c.executemany( """INSERT OR REPLACE INTO dbbardata (symbol,exchange,datetime,interval,volume,turnover,open_interest, open_price,high_price,low_price,close_price) VALUES (?,?,?,?,?,?,?,?,?,?,?)""", values, ) conn.commit() conn.close() logger.info(" 本地写入完成: %s (%d rows)", local_tmp, len(values)) # 2. ATTACH到NAS DB,批量导入 conn = sqlite3.connect(str(VNPY_DB_PATH), timeout=120) c = conn.cursor() c.execute("PRAGMA journal_mode=WAL") c.execute("ATTACH DATABASE ? AS tmpdb", (local_tmp,)) c.execute("INSERT OR REPLACE INTO main.dbbardata SELECT * FROM tmpdb.dbbardata") conn.commit() c.execute("DETACH DATABASE tmpdb") # 更新overview c.execute( """INSERT OR REPLACE INTO dbbaroverview (symbol,exchange,interval,count,start,end) SELECT symbol,exchange,interval,COUNT(*),MIN(datetime),MAX(datetime) FROM dbbardata GROUP BY symbol,exchange,interval""" ) conn.commit() conn.close() logger.info("✅ vnpy DB [%s] 写入完成", label) except Exception as e: logger.error("❌ vnpy DB [%s] 写入失败: %s", label, e) finally: if os.path.exists(local_tmp): os.remove(local_tmp) # ======================== 告警与报告 ======================== def check_failure_rate(stats: dict, label: str) -> bool: """检查失败率,返回True表示异常""" total = stats.get("updated", 0) + stats.get("failed", 0) + stats.get("skipped", 0) failed = stats.get("failed", 0) if total == 0: return False rate = failed / total if rate > 0.05: logger.error( "❌ 数据更新异常 [%s]:失败率 %.1f%% (%d/%d)", label, rate * 100, failed, total, ) return True if stats.get("source_aborted"): logger.error("❌ [%s] 源不可用导致终止", label) return True return False # ======================== 主入口 ======================== def main(): parser = argparse.ArgumentParser(description="全市场每日增量更新") parser.add_argument("--skip-daily", action="store_true", help="跳过日线更新") parser.add_argument("--skip-15min", action="store_true", help="跳过15分钟线更新") args = parser.parse_args() if not nas_mounted(): logger.error("❌ NAS未挂载,退出") sys.exit(1) codes = get_all_codes() logger.info("全市场股票数: %d", len(codes)) logger.info("更新时间: %s", datetime.now().isoformat()) t_start = time.time() report = {} has_alert = False # DB备份(硬伤3修复) rotate_db_backup() if not args.skip_daily: report["daily"] = run_daily_update(codes) if check_failure_rate(report["daily"], "日线"): has_alert = True if not args.skip_15min: report["15min"] = run_15min_update(codes) if check_failure_rate(report["15min"], "15min"): has_alert = True elapsed = time.time() - t_start report["elapsed_sec"] = round(elapsed, 1) report["has_alert"] = has_alert logger.info("=" * 60) if has_alert: logger.error("⚠️ 本次更新存在异常,请检查日志") else: logger.info("✅ 全部完成,耗时 %.1f 秒", elapsed) logger.info(json.dumps(report, ensure_ascii=False, indent=2)) # 写入报告 report_file = LOG_DIR / f"report_{datetime.now().strftime('%Y%m%d')}.json" report_file.write_text(json.dumps(report, ensure_ascii=False, indent=2)) return report if __name__ == "__main__": main()