diff --git a/data_platform/daily_all_update.py b/data_platform/daily_all_update.py index e06c05ec..e802af55 100644 --- a/data_platform/daily_all_update.py +++ b/data_platform/daily_all_update.py @@ -1,22 +1,22 @@ #!/usr/bin/env python3 """ -全市场每日增量更新 - 日线 + 15分钟线 +全市场每日增量更新 - 日线 + 15分钟线 (v2.0) 功能: - 1. 日线:东方财富API(主) + 腾讯API(备) → 更新Parquet + vnpy DB - 2. 15分钟线:新浪API(800条) → 增量合并Parquet + 导入vnpy DB + 1. 日线:多源fallback → 更新Parquet + 本地vnpy DB + 2. 15分钟线:多源fallback → 增量合并Parquet + 本地vnpy DB + 3. 本地DB构建完成后mv原子重命名到NAS -数据源降级链: - 日线: 东方财富(amount真实,可拿多年) → 腾讯(amount有时0) - 15min: 新浪(amount真实,800条) → 无备源 +数据源降级链(按质量排序,BaoStock T+1延迟已考虑): + 日线增量(当天实时):东方财富(实时,4s限频) → BaoStock(T+1,无反爬) → 腾讯(amount有时0) + 15min增量(当天实时):东方财富(实时7周) → BaoStock(T+1,无反爬) → 新浪(已挂,保留) 设计原则: + - 多源fallback:按质量排序,成功即用,失败试下一个 - 增量更新,不重复下载 - - 失败不影响其他股票 - - 进度持久化,支持断点续传(日线+15min均有进度文件) - - 限频保护 + 全局源不可用检测 + - vnpy DB本地构建 → mv原子重命名到NAS(避免SMB锁) + - 失败率检测(滑动窗口100只,>80%失败则终止) - DB轮转备份(保留7天) - - 日志完整 + 失败率告警 用法: python3 daily_all_update.py # 全量更新(日线+15min) @@ -24,22 +24,21 @@ python3 daily_all_update.py --skip-15min # 只更新日线 变更记录: - v1.1 (2026-05-03) - 司马懿评审后修改: - - interval "1m" → "15m" - - 15min增量改为严格按日期追加(不再用drop_duplicates keep=last) - - 日线增加进度文件 - - 全局源不可用检测(连续30只首次失败→终止) - - DB轮转备份(保留7天) - - 失败率>5%告警 - v1.2 (2026-05-05) - 东方财富集成: - - 日线主源从腾讯切换为东方财富(amount真实,可拿多年历史) - - 腾讯降为日线备源 - - 东方财富限频:4s/请求 + 随机抖动±1s - - 反爬:Referer + 完整UA + ut参数 + JSONP + v1.0 (2026-05-03) - 初始版本 + v1.1 (2026-05-03) - 司马懿评审:interval→15m, 严格增量, 进度文件, 源检测, DB备份 + v1.2 (2026-05-05) - 东方财富集成:日线主源切换东方财富 + v2.0 (2026-05-06) - 重大架构变更(司马懿+姜维评审通过): + - BaoStock替代所有主源,多源fallback机制 + - vnpy DB写入改为本地构建+mv原子重命名(解决SMB锁) + - interval统一1m(vnpy 4.x Interval.MINUTE硬约束) + - 日线跨年写入修复 + - 进度文件加日期 + - overview增量更新(不做全表聚合) + - 失败率检测替代固定次数暂停 + - 东方财富当天实时+BaoStock T+1补全 """ import argparse -import glob import json import os import re @@ -48,33 +47,53 @@ import sqlite3 import sys import time import logging +import random import urllib.request import urllib.error from datetime import datetime, timedelta from pathlib import Path -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, Callable +from collections import deque import pandas as pd +try: + import baostock as bs + HAS_BAOSTOCK = True +except ImportError: + HAS_BAOSTOCK = False + +try: + import requests as _requests + HAS_REQUESTS = True +except ImportError: + HAS_REQUESTS = False + # ======================== 配置 ======================== LOG_DIR = Path("/Volumes/stock/logs/daily_update") DAILY_DIR = Path("/Volumes/stock/A股数据/日线数据/daily") MINUTE_15_DIR = Path("/Volumes/stock/minute_kline/15min") VNPY_DB_PATH = Path("/Volumes/stock/sanguo_vnpy/data/quant_trading.db") +LOCAL_DB_PATH = Path("/tmp/quant_trading_new.db") ALL_STOCKS_FILE = Path("/Volumes/stock/A股数据/stock_info/stock_basic_info_raw_20260326_113530.csv") PROGRESS_DIR = Path("/Volumes/stock/logs/daily_update/progress") -REQUEST_INTERVAL_SINA = 0.3 # 新浪:稳定,0.3s够 -REQUEST_INTERVAL_EM = 4.0 # 东方财富:严格3-5s,加随机抖动 +REQUEST_INTERVAL_EM = 4.0 # 东方财富:4s + 随机抖动±1s +REQUEST_INTERVAL_SINA = 0.3 # 新浪(已挂,保留) +REQUEST_INTERVAL_BS = 0.0 # BaoStock:无需限频 EM_JITTER = 1.0 # 东方财富随机抖动范围(±秒) MAX_RETRIES = 3 -CONSECUTIVE_FAIL_PAUSE = 60 -MAX_CONSECUTIVE_FAILS = 10 -# 全局源不可用检测:连续N只股票首次请求就失败,判定源不可用 -GLOBAL_FAIL_THRESHOLD = 30 -# DB轮转备份数 +# 失败率检测:滑动窗口 +GLOBAL_FAIL_WINDOW = 100 # 最近N只 +GLOBAL_FAIL_THRESHOLD = 0.8 # 失败率阈值 +# DB DB_BACKUP_KEEP_DAYS = 7 +BATCH_SIZE = 50000 +# vnpy interval +# interval='1m' — vnpy 4.x Interval.MINUTE硬约束,实际存储15分钟线 +INTERVAL_MINUTE = "1m" +INTERVAL_DAILY = "d" HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"} HEADERS_EM = { @@ -89,7 +108,6 @@ def setup_logging(): LOG_DIR.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = LOG_DIR / f"update_{ts}.log" - logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", @@ -111,12 +129,19 @@ def _make_opener(): # ======================== 工具函数 ======================== def get_market_prefix(code: str) -> Tuple[str, str]: + """返回 (prefix, clean_code):sh/sz + 6位数字""" code = re.sub(r"[^0-9]", "", code).zfill(6) if code.startswith(("60", "68", "51")): return "sh", code return "sz", code +def code_to_baostock(code: str) -> str: + """纯6位代码转BaoStock格式:sh.600000""" + prefix, clean = get_market_prefix(code) + return f"{prefix}.{clean}" + + def get_all_codes() -> List[str]: df = pd.read_csv(ALL_STOCKS_FILE) for col in ["代码", "code", "股票代码"]: @@ -132,12 +157,11 @@ def nas_mounted() -> bool: # ======================== DB备份 ======================== def rotate_db_backup(): - """轮转备份vnpy DB,保留最近N天""" + """轮转备份NAS vnpy DB,保留最近N天""" backup_dir = VNPY_DB_PATH.parent today = datetime.now().strftime("%Y%m%d") backup_file = backup_dir / f"quant_trading_{today}.db.bak" - # 如果今天已备份过,跳过 if backup_file.exists(): logger.info("DB今日已备份: %s", backup_file) return @@ -150,12 +174,10 @@ def rotate_db_backup(): logger.error("❌ DB备份失败: %s", e) return - # 清理过期备份 cutoff = datetime.now() - timedelta(days=DB_BACKUP_KEEP_DAYS) for f in backup_dir.glob("quant_trading_*.db.bak"): try: - # 从文件名提取日期 - date_str = f.stem.split("_")[-1] # quant_trading_20260503.db.bak → 20260503 + date_str = f.stem.split("_")[-1] file_date = datetime.strptime(date_str, "%Y%m%d") if file_date < cutoff: f.unlink() @@ -167,8 +189,9 @@ def rotate_db_backup(): # ======================== 进度文件 ======================== def load_progress(name: str) -> set: - """加载进度文件,返回已完成的代码集合""" - progress_file = PROGRESS_DIR / f"{name}_progress.json" + """加载进度文件(v2.0:带日期)""" + today = datetime.now().strftime("%Y%m%d") + progress_file = PROGRESS_DIR / f"{name}_{today}_progress.json" PROGRESS_DIR.mkdir(parents=True, exist_ok=True) if progress_file.exists(): try: @@ -179,97 +202,121 @@ def load_progress(name: str) -> set: def save_progress(name: str, done_set: set): - """保存进度文件""" - progress_file = PROGRESS_DIR / f"{name}_progress.json" + """保存进度文件(v2.0:带日期)""" + today = datetime.now().strftime("%Y%m%d") + progress_file = PROGRESS_DIR / f"{name}_{today}_progress.json" progress_file.write_text(json.dumps({ "done": sorted(list(done_set)), "ts": datetime.now().isoformat(), })) -# ======================== 全局源不可用检测 ======================== +# ======================== 失败率检测(v2.0) ======================== class SourceHealthMonitor: - """监控数据源健康状态,连续N只首次请求失败则判定源不可用""" + """滑动窗口失败率检测:最近N只中失败率>T则判定源不可用""" - def __init__(self, threshold: int = GLOBAL_FAIL_THRESHOLD): + def __init__(self, window: int = GLOBAL_FAIL_WINDOW, threshold: float = GLOBAL_FAIL_THRESHOLD): + self.window = window self.threshold = threshold - self.consecutive_first_fail = 0 + self.history = deque(maxlen=window) - def report(self, code: str, first_attempt_failed: bool) -> bool: + def report(self, code: str, failed: bool) -> bool: """ - 报告单只股票的首次请求结果 - 返回True表示源仍健康,False表示源不可用应终止 + 报告单只结果,返回True=源健康,False=源不可用应终止 """ - if first_attempt_failed: - self.consecutive_first_fail += 1 - if self.consecutive_first_fail >= self.threshold: + self.history.append(1 if failed else 0) + if len(self.history) >= 20: # 至少20只才判断 + fail_rate = sum(self.history) / len(self.history) + if fail_rate >= self.threshold: logger.error( - "⚠️ 全局源不可用检测触发:连续 %d 只股票首次请求失败,终止本次更新", - self.consecutive_first_fail, + "⚠️ 源不可用检测触发:最近%d只失败率 %.0f%%,终止更新", + len(self.history), fail_rate * 100, ) return False - else: - self.consecutive_first_fail = 0 return True -# ======================== 日线更新 ======================== +# ======================== 数据源:BaoStock ======================== -def get_em_secid(code: str) -> str: - """获取东方财富secid: 1.沪市 0.深市""" +def fetch_baostock_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: + """BaoStock日线:全量历史,无反爬,amount真实,T+1延迟""" + if not HAS_BAOSTOCK: + return None + bs_code = code_to_baostock(code) + try: + rs = bs.query_history_k_data_plus( + bs_code, + "date,open,high,low,close,volume,amount", + start_date=start_date.replace("-", ""), + end_date=end_date.replace("-", ""), + frequency="d", + adjustflag="2", + ) + rows = [] + while (rs.error_code == "0") and rs.next(): + rows.append(rs.get_row_data()) + if not rows: + return None + df = pd.DataFrame(rows, columns=["date", "open", "high", "low", "close", "volume", "amount"]) + for c in ["open", "high", "low", "close", "volume", "amount"]: + df[c] = pd.to_numeric(df[c], errors="coerce") + df = df.dropna(subset=["close"]) + if df.empty: + return None + return df + except Exception as e: + logger.debug("BaoStock日线失败 %s: %s", code, e) + return None + + +def fetch_baostock_15min(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: + """BaoStock 15min:全量历史,无反爬,amount真实,T+1延迟""" + if not HAS_BAOSTOCK: + return None + bs_code = code_to_baostock(code) + try: + rs = bs.query_history_k_data_plus( + bs_code, + "date,time,open,high,low,close,volume,amount", + start_date=start_date.replace("-", ""), + end_date=end_date.replace("-", ""), + frequency="15", + adjustflag="2", + ) + rows = [] + while (rs.error_code == "0") and rs.next(): + rows.append(rs.get_row_data()) + if not rows: + return None + # BaoStock返回: [date, time(YYYYMMDDHHMMSSSSS), open, high, low, close, volume, amount] + df = pd.DataFrame(rows) + # 构造day列:YYYY-MM-DD HH:MM:SS + df.columns = ["date", "time", "open", "high", "low", "close", "volume", "amount"][:len(df.columns)] + df["day"] = df["time"].apply(lambda t: f"{t[:4]}-{t[4:6]}-{t[6:8]} {t[8:10]}:{t[10:12]}:00") + for c in ["open", "high", "low", "close", "volume", "amount"]: + df[c] = pd.to_numeric(df[c], errors="coerce") + df = df.dropna(subset=["close"]) + if df.empty: + return None + return df[["day", "open", "high", "low", "close", "volume", "amount"]] + except Exception as e: + logger.debug("BaoStock 15min失败 %s: %s", code, e) + return None + + +# ======================== 数据源:东方财富 ======================== + +def _get_em_secid(code: str) -> str: if code.startswith(("60", "68", "51")): return f"1.{code}" return f"0.{code}" -def fetch_eastmoney_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: - """东方财富K线API获取日线(主源) - - 特点: - - 一次请求可拿多年数据(实测4年1046条) - - amount有真实值 - - 反爬严格:必须Referer+UA+ut,限频3-5s/请求 - """ - import requests as _requests - import random - - secid = get_em_secid(code) - ts = str(int(time.time() * 1000)) - url = ( - f"https://push2his.eastmoney.com/api/qt/stock/kline/get?" - f"secid={secid}&klt=101&fqt=1&" - f"beg={start_date.replace('-', '')}&end={end_date.replace('-', '')}&" - f"fields1=f1,f2,f3,f4,f5,f6,f7,f8&" - f"fields2=f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61&" - f"ut=b2884a393a59ad64002292a3e90d46a5&" - f"lmt=10000&" - f"cb=jQuery_em_{ts}&_={ts}" - ) - - session = _requests.Session() - session.trust_env = False # 绕过系统代理 - - try: - r = session.get(url, headers=HEADERS_EM, timeout=15, verify=False) - if r.status_code != 200: - return None - # 解析JSONP - text = r.text - start_idx = text.index("(") + 1 - end_idx = text.rindex(")") - data = json.loads(text[start_idx:end_idx]) - except Exception as e: - logger.debug("东方财富日线失败 %s: %s", code, e) - return None - - if data.get("rc") != 0: - return None - klines = data.get("data", {}).get("klines", []) +def _parse_em_klines(klines: list) -> Optional[pd.DataFrame]: + """解析东方财富K线数据(日线和15min通用)""" if not klines: return None - - # 解析: date,open,close,high,low,volume,amount,amplitude,pct_change,change,turnover_rate rows = [] for line in klines: parts = line.split(",") @@ -284,80 +331,232 @@ def fetch_eastmoney_daily(code: str, start_date: str, end_date: str) -> Optional "volume": float(parts[5]), "amount": float(parts[6]), }) - if not rows: return None - df = pd.DataFrame(rows) - # 东方财富日期格式可能含时间,标准化 - df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") - df["date"] = df["date"].astype(str) - mask = (df["date"] >= start_date) & (df["date"] <= end_date) - result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] - return result if not result.empty else None + return pd.DataFrame(rows) +def fetch_eastmoney_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: + """东方财富日线:当天实时,amount真实,4s限频""" + if not HAS_REQUESTS: + return None + secid = _get_em_secid(code) + ts = str(int(time.time() * 1000)) + url = ( + f"https://push2his.eastmoney.com/api/qt/stock/kline/get?" + f"secid={secid}&klt=101&fqt=1&" + f"beg={start_date.replace('-', '')}&end={end_date.replace('-', '')}&" + f"fields1=f1,f2,f3,f4,f5,f6,f7,f8&" + f"fields2=f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61&" + f"ut=b2884a393a59ad64002292a3e90d46a5&lmt=10000&" + f"cb=jQuery_em_{ts}&_={ts}" + ) + session = _requests.Session() + session.trust_env = False + try: + r = session.get(url, headers=HEADERS_EM, timeout=15, verify=False) + if r.status_code != 200: + return None + text = r.text + data = json.loads(text[text.index("(") + 1:text.rindex(")")]) + if data.get("rc") != 0: + return None + klines = data.get("data", {}).get("klines", []) + df = _parse_em_klines(klines) + if df is None: + return None + df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") + mask = (df["date"] >= start_date) & (df["date"] <= end_date) + result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] + return result if not result.empty else None + except Exception as e: + logger.debug("东方财富日线失败 %s: %s", code, e) + return None + + +def fetch_eastmoney_15min(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: + """东方财富15min:当天实时,约7周历史,4s限频""" + if not HAS_REQUESTS: + return None + secid = _get_em_secid(code) + ts = str(int(time.time() * 1000)) + url = ( + f"https://push2his.eastmoney.com/api/qt/stock/kline/get?" + f"secid={secid}&klt=15&fqt=1&" + f"beg={start_date.replace('-', '')}&end={end_date.replace('-', '')}&" + f"fields1=f1,f2,f3,f4,f5,f6,f7,f8&" + f"fields2=f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61&" + f"ut=b2884a393a59ad64002292a3e90d46a5&lmt=100000&" + f"cb=jQuery_em_{ts}&_={ts}" + ) + session = _requests.Session() + session.trust_env = False + try: + r = session.get(url, headers=HEADERS_EM, timeout=15, verify=False) + if r.status_code != 200: + return None + text = r.text + data = json.loads(text[text.index("(") + 1:text.rindex(")")]) + if data.get("rc") != 0: + return None + klines = data.get("data", {}).get("klines", []) + if not klines: + return None + rows = [] + for line in klines: + parts = line.split(",") + if len(parts) < 7: + continue + rows.append({ + "day": parts[0], + "open": float(parts[1]), + "close": float(parts[2]), + "high": float(parts[3]), + "low": float(parts[4]), + "volume": float(parts[5]), + "amount": float(parts[6]), + }) + if not rows: + return None + df = pd.DataFrame(rows) + # 转day格式:东方财富返回 "2026-04-30 15:00" 或 "2026-04-30" + df["day"] = df["day"].apply(lambda d: d if " " in str(d) else f"{d} 00:00:00") + # 补全秒 + df["day"] = df["day"].apply(lambda d: d if d.count(":") == 2 else d + ":00") + for c in ["open", "high", "low", "close", "volume", "amount"]: + df[c] = pd.to_numeric(df[c], errors="coerce") + df = df.dropna(subset=["close"]) + if df.empty: + return None + return df[["day", "open", "high", "low", "close", "volume", "amount"]] + except Exception as e: + logger.debug("东方财富15min失败 %s: %s", code, e) + return None + + +# ======================== 数据源:腾讯 ======================== + def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: - """腾讯K线API获取日线增量""" + """腾讯日线:amount有时为0""" prefix, clean = get_market_prefix(code) tq = f"{prefix}{clean}" days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10 url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days}," - opener = _make_opener() - req = urllib.request.Request(url, headers=HEADERS) - with opener.open(req, timeout=10) as r: - raw = r.read().decode("utf-8", errors="replace") - data = json.loads(raw) - d = data.get("data") - if not isinstance(d, dict): - return None - klines = d.get(tq, {}).get("day", []) - if not klines: + try: + req = urllib.request.Request(url, headers=HEADERS) + with opener.open(req, timeout=10) as r: + raw = r.read().decode("utf-8", errors="replace") + data = json.loads(raw) + d = data.get("data") + if not isinstance(d, dict): + return None + klines = d.get(tq, {}).get("day", []) + if not klines: + return None + df = pd.DataFrame(klines) + ncols = len(df.columns) + if ncols >= 7: + df.columns = ["date", "open", "close", "high", "low", "volume", "amount"][:ncols] + else: + df.columns = ["date", "open", "close", "high", "low", "volume"][:ncols] + df["amount"] = 0.0 + if "amount" not in df.columns: + df["amount"] = 0.0 + for c in ["open", "close", "high", "low", "volume", "amount"]: + df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) + df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") + mask = (df["date"] >= start_date) & (df["date"] <= end_date) + result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] + return result if not result.empty else None + except Exception as e: + logger.debug("腾讯日线失败 %s: %s", code, e) return None - df = pd.DataFrame(klines) - ncols = len(df.columns) - if ncols >= 7: - df.columns = ["date", "open", "close", "high", "low", "volume", "amount"][:ncols] - else: - df.columns = ["date", "open", "close", "high", "low", "volume"][:ncols] - if "amount" not in df.columns: - df["amount"] = 0.0 - for c in ["open", "close", "high", "low", "volume", "amount"]: - df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) - df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") - df["date"] = df["date"].astype(str) - mask = (df["date"] >= start_date) & (df["date"] <= end_date) - result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] - return result if not result.empty else None + +# ======================== 数据源:新浪(已挂,保留代码) ======================== + +def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]: + """新浪15分钟K线API(当前已挂,保留作为fallback)""" + url = ( + f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20=min15_{symbol}=/" + f"CN_MarketDataService.getKLineData?symbol={symbol}&scale=15&ma=no&datalen={datalen}" + ) + opener = _make_opener() + try: + req = urllib.request.Request(url, headers=HEADERS) + with opener.open(req, timeout=15) as r: + raw = r.read().decode("utf-8", errors="replace") + m = re.search(r"\((\[.*\])\)", raw, re.DOTALL) + if not m: + return None + data = json.loads(m.group(1)) + if not data: + return None + df = pd.DataFrame(data) + cols = ["day", "open", "high", "low", "close", "volume", "amount"] + for c in cols: + if c not in df.columns: + return None + return df[cols] + except Exception: + return None -def update_daily_parquet(code: str, new_data: pd.DataFrame) -> int: - """增量写入日线Parquet""" - prefix, clean = get_market_prefix(code) - year = datetime.now().year - parquet_path = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet" +# ======================== Fallback机制 ======================== - new_data = new_data.copy() - new_data["date"] = new_data["date"].astype(str) +def fetch_with_fallback( + sources: List[Tuple[str, Callable, float]], + code: str, + start_date: str, + end_date: str, + is_daily: bool = True, +) -> Tuple[Optional[pd.DataFrame], str]: + """ + 多源fallback获取数据 + sources: [(name, fetch_fn, interval_seconds), ...] + 返回: (DataFrame或None, 使用的源名) + """ + for name, fetch_fn, interval in sources: + if interval > 0: + jitter = (hash(code + name) % 200 - 100) / 100.0 * (interval * 0.1) + time.sleep(max(0, interval + jitter)) + data = None + for attempt in range(MAX_RETRIES): + try: + if is_daily: + data = fetch_fn(code, start_date, end_date) + else: + data = fetch_fn(code, start_date, end_date) + if data is not None and len(data) > 0: + return data, name + except Exception: + pass + if attempt < MAX_RETRIES - 1: + time.sleep(1) + return None, "" - if parquet_path.exists(): - existing = pd.read_parquet(parquet_path) - existing["date"] = existing["date"].astype(str) - combined = pd.concat([existing, new_data], ignore_index=True) - combined = combined.drop_duplicates(subset=["date"], keep="last") - combined = combined.sort_values("date").reset_index(drop=True) - else: - parquet_path.parent.mkdir(parents=True, exist_ok=True) - combined = new_data - tmp = parquet_path.with_suffix(".tmp") - combined.to_parquet(tmp, index=False) - tmp.rename(parquet_path) - return len(new_data) +# 日线数据源(当天实时优先) +SOURCES_DAILY = [ + ("eastmoney", fetch_eastmoney_daily, REQUEST_INTERVAL_EM), + ("baostock", fetch_baostock_daily, REQUEST_INTERVAL_BS), + ("tencent", fetch_tencent_daily, 0), +] +# 15min数据源(当天实时优先) +SOURCES_15MIN = [ + ("eastmoney", fetch_eastmoney_15min, REQUEST_INTERVAL_EM), + ("baostock", fetch_baostock_15min, REQUEST_INTERVAL_BS), + ("sina", lambda code, s, e: try_sina_15min(f"{get_market_prefix(code)[0]}{get_market_prefix(code)[1]}"), + REQUEST_INTERVAL_SINA), +] + + +# ======================== 日线更新 ======================== def get_daily_last_date(code: str) -> str: + """获取日线Parquet中最后日期""" prefix, clean = get_market_prefix(code) for year in range(datetime.now().year, 2009, -1): fpath = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet" @@ -365,14 +564,42 @@ def get_daily_last_date(code: str) -> str: try: df = pd.read_parquet(fpath, columns=["date"]) if not df.empty: - val = df["date"].max() - return str(val)[:10] + return str(df["date"].max())[:10] except Exception: pass return "" -def run_daily_update(codes: List[str]) -> dict: +def update_daily_parquet(code: str, new_data: pd.DataFrame) -> int: + """增量写入日线Parquet(v2.0:按数据日期分年目录)""" + prefix, clean = get_market_prefix(code) + new_data = new_data.copy() + new_data["date"] = new_data["date"].astype(str) + + total_new = 0 + for yr in new_data["date"].str[:4].unique(): + year_data = new_data[new_data["date"].str[:4] == yr].copy() + parquet_path = DAILY_DIR / str(yr) / f"{prefix}{clean}_daily.parquet" + + if parquet_path.exists(): + existing = pd.read_parquet(parquet_path) + existing["date"] = existing["date"].astype(str) + combined = pd.concat([existing, year_data], ignore_index=True) + combined = combined.drop_duplicates(subset=["date"], keep="last") + combined = combined.sort_values("date").reset_index(drop=True) + else: + parquet_path.parent.mkdir(parents=True, exist_ok=True) + combined = year_data + + tmp = parquet_path.with_suffix(".tmp") + combined.to_parquet(tmp, index=False) + tmp.rename(parquet_path) + total_new += len(year_data) + + return total_new + + +def run_daily_update(codes: List[str], local_conn: sqlite3.Connection) -> dict: """日线增量更新""" logger.info("=" * 60) logger.info("日线增量更新开始,共 %d 只", len(codes)) @@ -380,26 +607,14 @@ def run_daily_update(codes: List[str]) -> dict: stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} all_db_values = [] - consecutive_fails = 0 - source_aborted = False - # 进度文件(硬伤1修复) done_set = load_progress("daily") todo = [c for c in codes if c not in done_set] logger.info("待更新: %d(已完成: %d)", len(todo), len(done_set)) - # 全局源健康监控(硬伤2修复) health = SourceHealthMonitor() for i, code in enumerate(todo): - if source_aborted: - break - - if i > 0: - # 东方财富严格限频:基础间隔4s + 伪随机抖动±1s - jitter = (hash(code) % 200 - 100) / 100.0 * EM_JITTER - time.sleep(REQUEST_INTERVAL_EM + jitter) - last_date = get_daily_last_date(code) if not last_date: stats["skipped"] += 1 @@ -411,62 +626,27 @@ def run_daily_update(codes: List[str]) -> dict: done_set.add(code) continue - data = None - source_used = "" - first_attempt_failed = False + data, source = fetch_with_fallback(SOURCES_DAILY, code, next_day, today, is_daily=True) + failed = (data is None or data.empty) - # 主源:东方财富(amount真实,可拿多年历史) - for attempt in range(MAX_RETRIES): - try: - data = fetch_eastmoney_daily(code, next_day, today) - if data is not None: - source_used = "eastmoney" - break - if attempt == 0: - first_attempt_failed = True - except Exception: - if attempt == 0: - first_attempt_failed = True - if attempt < MAX_RETRIES - 1: - time.sleep(2) - - # 备源:腾讯(amount有时为0) - if data is None: - first_attempt_failed_backup = False - for attempt in range(MAX_RETRIES): - try: - data = fetch_tencent_daily(code, next_day, today) - if data is not None: - source_used = "tencent" - break - if attempt == 0: - first_attempt_failed_backup = True - except Exception: - if attempt == 0: - first_attempt_failed_backup = True - if attempt < MAX_RETRIES - 1: - time.sleep(1) - # 只有两个源都首次失败才算全局失败 - if first_attempt_failed and first_attempt_failed_backup: - first_attempt_failed = True - else: - first_attempt_failed = False - - # 全局源不可用检测 - if not health.report(code, first_attempt_failed and data is None): - source_aborted = True - logger.error("❌ 日线源均不可用(东方财富+腾讯),终止日线更新") + if not health.report(code, failed): + logger.error("❌ 日线所有源不可用,终止日线更新") + stats["source_aborted"] = True break - if data is None or data.empty: - stats["skipped"] += 1 + if failed: + stats["failed"] += 1 done_set.add(code) + if (i + 1) % 500 == 0: + logger.info("日线进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) + save_progress("daily", done_set) continue # 校验 - if (data[["open", "high", "low", "close"]] <= 0).any().any(): + for c in ["open", "high", "low", "close"]: + data[c] = pd.to_numeric(data[c], errors="coerce") + if (data[["close", "open"]] <= 0).any().any(): stats["failed"] += 1 - consecutive_fails += 1 done_set.add(code) continue @@ -474,77 +654,44 @@ def run_daily_update(codes: List[str]) -> dict: n = update_daily_parquet(code, data) stats["updated"] += 1 stats["records"] += n - consecutive_fails = 0 # 收集vnpy DB数据 prefix, clean = get_market_prefix(code) exchange = "SSE" if prefix == "sh" else "SZSE" for _, row in data.iterrows(): all_db_values.append(( - clean, exchange, str(row["date"]), "d", + clean, exchange, str(row["date"]), INTERVAL_DAILY, float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, float(row.get("open", 0)), float(row.get("high", 0)), float(row.get("low", 0)), float(row.get("close", 0)), )) - stats["db_records"] += 1 + stats["db_records"] += len(data) except Exception as e: stats["failed"] += 1 - consecutive_fails += 1 logger.warning("日线写入失败 %s: %s", code, e) done_set.add(code) - if consecutive_fails >= MAX_CONSECUTIVE_FAILS: - logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) - time.sleep(CONSECUTIVE_FAIL_PAUSE) - consecutive_fails = 0 - if (i + 1) % 500 == 0: - logger.info("日线进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) + logger.info("日线进度: %d/%d updated=%d failed=%d src=%s", + i + 1, len(todo), stats["updated"], stats["failed"], source) save_progress("daily", done_set) - # 写vnpy DB + # 写入本地DB if all_db_values: - _write_vnpy_db(all_db_values, "日线") + _write_local_db(local_conn, all_db_values, "日线") - # 保存最终进度 save_progress("daily", done_set) - - if source_aborted: - stats["source_aborted"] = True - + if stats.get("source_aborted"): + pass logger.info("日线完成: %s", json.dumps(stats, ensure_ascii=False)) return stats # ======================== 15分钟线更新 ======================== -def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]: - """新浪15分钟K线API""" - url = ( - f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20=min15_{symbol}=/" - f"CN_MarketDataService.getKLineData?symbol={symbol}&scale=15&ma=no&datalen={datalen}" - ) - opener = _make_opener() - req = urllib.request.Request(url, headers=HEADERS) - with opener.open(req, timeout=15) as r: - raw = r.read().decode("utf-8", errors="replace") - m = re.search(r"\((\[.*\])\)", raw, re.DOTALL) - if not m: - return None - data = json.loads(m.group(1)) - if not data: - return None - df = pd.DataFrame(data) - cols = ["day", "open", "high", "low", "close", "volume", "amount"] - for c in cols: - if c not in df.columns: - return None - return df[cols] - - def get_15min_last_date(parquet_path: Path) -> str: - """获取15min Parquet中最后一条的时间戳""" + """获取15min Parquet中最后一条时间戳""" if not parquet_path.exists(): return "" try: @@ -556,185 +703,188 @@ def get_15min_last_date(parquet_path: Path) -> str: return "" -def update_15min_parquet(code: str) -> Tuple[str, int, list]: - """ - 增量下载并合并15分钟线(阻塞项2修复:严格按日期追加) - 返回: (status, new_rows, db_values) - """ +def fetch_15min_with_fallback(code: str, start_date: str, end_date: str) -> Tuple[Optional[pd.DataFrame], str]: + """15min多源fallback(特殊处理新浪接口不同)""" prefix, clean = get_market_prefix(code) symbol = f"{prefix}{clean}" - parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet" - df_new = None + # 源1:东方财富 + jitter = (hash(code) % 200 - 100) / 100.0 * EM_JITTER + time.sleep(max(0, REQUEST_INTERVAL_EM + jitter)) for attempt in range(MAX_RETRIES): try: - df_new = try_sina_15min(symbol) - if df_new is not None and len(df_new) > 0: - break + data = fetch_eastmoney_15min(code, start_date, end_date) + if data is not None and len(data) > 0: + return data, "eastmoney" except Exception: - if attempt < MAX_RETRIES - 1: - time.sleep(1) + pass + if attempt < MAX_RETRIES - 1: + time.sleep(1) - if df_new is None or df_new.empty: - return "failed", 0, [] + # 源2:BaoStock + for attempt in range(MAX_RETRIES): + try: + data = fetch_baostock_15min(code, start_date, end_date) + if data is not None and len(data) > 0: + return data, "baostock" + except Exception: + pass + if attempt < MAX_RETRIES - 1: + time.sleep(0.5) - # 数据校验 - for col in ["open", "high", "low", "close"]: - df_new[col] = pd.to_numeric(df_new[col], errors="coerce") - df_new["volume"] = pd.to_numeric(df_new["volume"], errors="coerce").fillna(0) - df_new["amount"] = pd.to_numeric(df_new["amount"], errors="coerce").fillna(0) + # 源3:新浪(已挂,保留) + time.sleep(REQUEST_INTERVAL_SINA) + for attempt in range(MAX_RETRIES): + try: + data = try_sina_15min(symbol) + if data is not None and len(data) > 0: + return data, "sina" + except Exception: + pass + if attempt < MAX_RETRIES - 1: + time.sleep(0.5) - bad_zero = (df_new[["close", "open"]] <= 0).any(axis=1) - if bad_zero.any(): - df_new = df_new[~bad_zero] - bad_ohlc = (df_new["high"] < df_new[["open", "close"]].max(axis=1)) | \ - (df_new["low"] > df_new[["open", "close"]].min(axis=1)) - if bad_ohlc.any(): - df_new = df_new[~bad_ohlc] - if df_new.empty: - return "failed", 0, [] - - # 转回object类型 - df_new["volume"] = df_new["volume"].astype(str) - df_new["amount"] = df_new["amount"].astype(str) - df_new["day"] = df_new["day"].astype(str) - - # 获取已有数据最后日期 - last_date = get_15min_last_date(parquet_path) - - if last_date: - # 严格追加:只保留 day > last_date 的新数据(阻塞项2修复) - df_increment = df_new[df_new["day"] > last_date].copy() - - if df_increment.empty: - # 没有新数据,不需要写入 - return "ok", 0, [] - - # 缺口检测:API返回的最早数据比last_date晚超过1根K线(15min) - api_first = df_new["day"].min() - expected_next = str(pd.Timestamp(last_date) + pd.Timedelta(minutes=15)) - if api_first > expected_next: - logger.warning( - "⚠️ 15min数据缺口 %s: last=%s, api_first=%s (expected=%s)", - code, last_date, api_first, expected_next, - ) - - # 合并 - existing = pd.read_parquet(parquet_path) - existing["day"] = existing["day"].astype(str) - combined = pd.concat([existing, df_increment], ignore_index=True) - combined = combined.sort_values("day").reset_index(drop=True) - new_rows = len(df_increment) - else: - # 无已有数据,直接写入全部 - combined = df_new.sort_values("day").reset_index(drop=True) - df_increment = df_new - new_rows = len(df_new) - - # 原子写入 - tmp = parquet_path.with_suffix(".tmp") - combined.to_parquet(tmp, index=False) - tmp.rename(parquet_path) - - # 构建DB写入数据(interval="15m",阻塞项1修复) - exchange = "SSE" if prefix == "sh" else "SZSE" - db_values = [] - for _, row in df_increment.iterrows(): - db_values.append(( - clean, exchange, str(row["day"]), "15m", - float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, - float(row.get("open", 0)), float(row.get("high", 0)), - float(row.get("low", 0)), float(row.get("close", 0)), - )) - - return "ok", new_rows, db_values + return None, "" -def run_15min_update(codes: List[str]) -> dict: +def run_15min_update(codes: List[str], local_conn: sqlite3.Connection) -> dict: """15分钟线增量更新""" logger.info("=" * 60) logger.info("15分钟线增量更新开始,共 %d 只", len(codes)) stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} all_db_values = [] - consecutive_fails = 0 - source_aborted = False - # 进度文件 done_set = load_progress("15min") todo = [c for c in codes if c not in done_set] logger.info("待更新: %d(已完成: %d)", len(todo), len(done_set)) - # 全局源健康监控 health = SourceHealthMonitor() for i, code in enumerate(todo): - if source_aborted: - break + prefix, clean = get_market_prefix(code) + parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet" - if i > 0: - time.sleep(REQUEST_INTERVAL_SINA) - - try: - status, new_rows, db_values = update_15min_parquet(code) - first_attempt_failed = (status == "failed") - except Exception as e: - status, new_rows, db_values = "failed", 0, [] - first_attempt_failed = True - logger.debug("15min %s 异常: %s", code, e) - - # 全局源不可用检测 - if not health.report(code, first_attempt_failed): - source_aborted = True - logger.error("❌ 新浪15min源不可用,终止15min更新") - break - - if status == "ok": - stats["updated"] += 1 - stats["records"] += new_rows - stats["db_records"] += len(db_values) - all_db_values.extend(db_values) - consecutive_fails = 0 + # 获取最后日期,决定增量范围 + last_date = get_15min_last_date(parquet_path) + if last_date: + next_dt = (pd.Timestamp(last_date) + timedelta(minutes=15)).strftime("%Y-%m-%d") else: + next_dt = "2024-01-02" + today = datetime.now().strftime("%Y-%m-%d") + + if next_dt > today: + stats["skipped"] += 1 + done_set.add(code) + continue + + # 多源fallback + df_new, source = fetch_15min_with_fallback(code, next_dt, today) + failed = (df_new is None or (hasattr(df_new, 'empty') and df_new.empty)) + + if not health.report(code, failed): + logger.error("❌ 15min所有源不可用,终止15min更新") + stats["source_aborted"] = True + break + + if failed: stats["failed"] += 1 - consecutive_fails += 1 + done_set.add(code) + if (i + 1) % 500 == 0: + logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) + save_progress("15min", done_set) + continue + + # 数据校验 + for col in ["open", "high", "low", "close"]: + df_new[col] = pd.to_numeric(df_new[col], errors="coerce") + df_new["volume"] = pd.to_numeric(df_new["volume"], errors="coerce").fillna(0) + df_new["amount"] = pd.to_numeric(df_new["amount"], errors="coerce").fillna(0) + df_new["day"] = df_new["day"].astype(str) + + bad = (df_new[["close", "open"]] <= 0).any(axis=1) + if bad.any(): + df_new = df_new[~bad] + if df_new.empty: + stats["failed"] += 1 + done_set.add(code) + continue + + # 增量合并 + if last_date: + df_increment = df_new[df_new["day"] > last_date].copy() + if df_increment.empty: + stats["skipped"] += 1 + done_set.add(code) + continue + + existing = pd.read_parquet(parquet_path) + existing["day"] = existing["day"].astype(str) + combined = pd.concat([existing, df_increment], ignore_index=True) + combined = combined.sort_values("day").reset_index(drop=True) + new_rows = len(df_increment) + else: + df_increment = df_new + combined = df_new.sort_values("day").reset_index(drop=True) + new_rows = len(df_new) + + # 原子写入Parquet + tmp = parquet_path.with_suffix(".tmp") + combined.to_parquet(tmp, index=False) + tmp.rename(parquet_path) + + stats["updated"] += 1 + stats["records"] += new_rows + + # 收集vnpy DB数据 + # interval='1m' — vnpy 4.x Interval.MINUTE硬约束,实际存储15分钟线 + exchange = "SSE" if prefix == "sh" else "SZSE" + for _, row in df_increment.iterrows(): + all_db_values.append(( + clean, exchange, str(row["day"]), + INTERVAL_MINUTE, # interval='1m' — vnpy 4.x Interval.MINUTE硬约束,实际存储15分钟线 + float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, + float(row.get("open", 0)), float(row.get("high", 0)), + float(row.get("low", 0)), float(row.get("close", 0)), + )) + stats["db_records"] += len(df_increment) done_set.add(code) - if consecutive_fails >= MAX_CONSECUTIVE_FAILS: - logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) - time.sleep(CONSECUTIVE_FAIL_PAUSE) - consecutive_fails = 0 - if (i + 1) % 500 == 0: - logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) + logger.info("15min进度: %d/%d updated=%d failed=%d src=%s", + i + 1, len(todo), stats["updated"], stats["failed"], source) save_progress("15min", done_set) - # 写vnpy DB + # 写入本地DB if all_db_values: - _write_vnpy_db(all_db_values, "15min") + _write_local_db(local_conn, all_db_values, "15min") - # 保存最终进度 save_progress("15min", done_set) - - if source_aborted: - stats["source_aborted"] = True - logger.info("15min完成: %s", json.dumps(stats, ensure_ascii=False)) return stats -# ======================== vnpy DB写入 ======================== +# ======================== 本地vnpy DB写入(v2.0) ======================== -def _write_vnpy_db(values: list, label: str): - """增量写入vnpy SQLite DB - 本地临时DB写入后ATTACH导入NAS DB""" - logger.info("写入vnpy DB [%s]: %d 条记录", label, len(values)) - local_tmp = f"/tmp/vnpy_update_{label}_{int(time.time())}.db" - try: - # 1. 本地临时DB写入增量 - conn = sqlite3.connect(local_tmp, timeout=30) +def init_local_db() -> sqlite3.Connection: + """初始化本地vnpy DB:从NAS复制或创建新的""" + local_path = str(LOCAL_DB_PATH) + + if LOCAL_DB_PATH.exists(): + logger.info("使用已有本地DB: %s", local_path) + conn = sqlite3.connect(local_path, timeout=30) + return conn + + # 从NAS复制 + if VNPY_DB_PATH.exists(): + logger.info("从NAS复制DB到本地: %s → %s", VNPY_DB_PATH, local_path) + shutil.copy2(str(VNPY_DB_PATH), local_path) + conn = sqlite3.connect(local_path, timeout=30) + else: + logger.info("创建新本地DB: %s", local_path) + conn = sqlite3.connect(local_path, timeout=30) c = conn.cursor() - c.execute("PRAGMA journal_mode=WAL") c.execute("""CREATE TABLE IF NOT EXISTS dbbardata ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, @@ -750,56 +900,112 @@ def _write_vnpy_db(values: list, label: str): close_price REAL, UNIQUE(symbol, exchange, datetime, interval) )""") + c.execute("""CREATE TABLE IF NOT EXISTS dbbaroverview ( + symbol TEXT NOT NULL, + exchange TEXT NOT NULL, + interval TEXT NOT NULL, + count INTEGER, + start TEXT, + end TEXT, + UNIQUE(symbol, exchange, interval) + )""") + conn.commit() + + c = conn.cursor() + c.execute("PRAGMA journal_mode=WAL") + c.execute("PRAGMA synchronous=NORMAL") + conn.commit() + return conn + + +def _write_local_db(conn: sqlite3.Connection, values: list, label: str): + """批量写入本地vnpy DB""" + logger.info("写入本地DB [%s]: %d 条记录", label, len(values)) + c = conn.cursor() + for i in range(0, len(values), BATCH_SIZE): c.executemany( """INSERT OR REPLACE INTO dbbardata (symbol,exchange,datetime,interval,volume,turnover,open_interest, open_price,high_price,low_price,close_price) VALUES (?,?,?,?,?,?,?,?,?,?,?)""", - values, + values[i:i + BATCH_SIZE], ) - conn.commit() - conn.close() - logger.info(" 本地写入完成: %s (%d rows)", local_tmp, len(values)) + conn.commit() + logger.info("✅ 本地DB [%s] 写入完成: %d条", label, len(values)) - # 2. ATTACH到NAS DB,批量导入 - conn = sqlite3.connect(str(VNPY_DB_PATH), timeout=120) - c = conn.cursor() - c.execute("PRAGMA journal_mode=WAL") - c.execute("ATTACH DATABASE ? AS tmpdb", (local_tmp,)) - c.execute("INSERT OR REPLACE INTO main.dbbardata SELECT * FROM tmpdb.dbbardata") - conn.commit() - c.execute("DETACH DATABASE tmpdb") + # 增量更新overview(v2.0:只更新本次涉及的symbol) + _update_overview_incremental(conn, values) - # 更新overview + +def _update_overview_incremental(conn: sqlite3.Connection, values: list): + """增量更新overview:只更新本次涉及的(symbol, exchange, interval)""" + c = conn.cursor() + affected = set((v[0], v[1], v[3]) for v in values) # symbol, exchange, interval + for sym, exc, ivl in affected: c.execute( - """INSERT OR REPLACE INTO dbbaroverview (symbol,exchange,interval,count,start,end) - SELECT symbol,exchange,interval,COUNT(*),MIN(datetime),MAX(datetime) - FROM dbbardata GROUP BY symbol,exchange,interval""" + """INSERT OR REPLACE INTO dbbaroverview (symbol, exchange, interval, count, start, end) + SELECT ?, ?, ?, + COUNT(*), + MIN(datetime), + MAX(datetime) + FROM dbbardata + WHERE symbol=? AND exchange=? AND interval=?""", + (sym, exc, ivl, sym, exc, ivl), ) - conn.commit() - conn.close() - logger.info("✅ vnpy DB [%s] 写入完成", label) + conn.commit() + logger.info(" overview增量更新: %d 组", len(affected)) + + +def sync_db_to_nas(): + """本地DB → NAS mv原子重命名(v2.0)""" + if not LOCAL_DB_PATH.exists(): + logger.warning("本地DB不存在,跳过同步") + return + + local_size = LOCAL_DB_PATH.stat().st_size / 1024 / 1024 + logger.info("同步DB到NAS: %.1f MB", local_size) + + nas_path = str(VNPY_DB_PATH) + new_path = nas_path + ".new" + old_path = nas_path + ".old" + + # 1. 复制到NAS .new文件 + logger.info(" 复制到 %s", new_path) + shutil.copy2(str(LOCAL_DB_PATH), new_path) + + # 2. 原子重命名:old → old备份,current → old,new → current + # 在NAS同一文件系统上rename是原子的 + try: + if os.path.exists(old_path): + os.remove(old_path) + if os.path.exists(nas_path): + os.rename(nas_path, old_path) + os.rename(new_path, nas_path) + logger.info("✅ DB同步完成(mv原子重命名)") + + # 清理旧文件 + if os.path.exists(old_path): + os.remove(old_path) + logger.info(" 清理旧文件: %s", old_path) except Exception as e: - logger.error("❌ vnpy DB [%s] 写入失败: %s", label, e) - finally: - if os.path.exists(local_tmp): - os.remove(local_tmp) + logger.error("❌ DB同步失败: %s,尝试恢复", e) + # 恢复:把old改回来 + if os.path.exists(old_path) and not os.path.exists(nas_path): + os.rename(old_path, nas_path) + logger.info(" 已恢复旧DB") # ======================== 告警与报告 ======================== def check_failure_rate(stats: dict, label: str) -> bool: - """检查失败率,返回True表示异常""" + """检查失败率""" total = stats.get("updated", 0) + stats.get("failed", 0) + stats.get("skipped", 0) failed = stats.get("failed", 0) if total == 0: return False rate = failed / total if rate > 0.05: - logger.error( - "❌ 数据更新异常 [%s]:失败率 %.1f%% (%d/%d)", - label, rate * 100, failed, total, - ) + logger.error("❌ [%s] 失败率 %.1f%% (%d/%d)", label, rate * 100, failed, total) return True if stats.get("source_aborted"): logger.error("❌ [%s] 源不可用导致终止", label) @@ -810,9 +1016,10 @@ def check_failure_rate(stats: dict, label: str) -> bool: # ======================== 主入口 ======================== def main(): - parser = argparse.ArgumentParser(description="全市场每日增量更新") + parser = argparse.ArgumentParser(description="全市场每日增量更新 v2.0") parser.add_argument("--skip-daily", action="store_true", help="跳过日线更新") parser.add_argument("--skip-15min", action="store_true", help="跳过15分钟线更新") + parser.add_argument("--fresh-db", action="store_true", help="强制从NAS重新复制DB") args = parser.parse_args() if not nas_mounted(): @@ -822,23 +1029,49 @@ def main(): codes = get_all_codes() logger.info("全市场股票数: %d", len(codes)) logger.info("更新时间: %s", datetime.now().isoformat()) + logger.info("版本: v2.0 (多源fallback + 本地DB构建)") + + # 如果强制刷新或本地DB不存在,先删掉旧的 + if args.fresh_db and LOCAL_DB_PATH.exists(): + LOCAL_DB_PATH.unlink() + logger.info("已删除旧本地DB(--fresh-db)") + + # BaoStock login(如果可用) + if HAS_BAOSTOCK: + lg = bs.login() + logger.info("BaoStock login: %s", lg.error_msg) t_start = time.time() report = {} has_alert = False - # DB备份(硬伤3修复) + # DB备份 rotate_db_backup() - if not args.skip_daily: - report["daily"] = run_daily_update(codes) - if check_failure_rate(report["daily"], "日线"): - has_alert = True + # 初始化本地DB + local_conn = init_local_db() - if not args.skip_15min: - report["15min"] = run_15min_update(codes) - if check_failure_rate(report["15min"], "15min"): - has_alert = True + try: + if not args.skip_daily: + report["daily"] = run_daily_update(codes, local_conn) + if check_failure_rate(report["daily"], "日线"): + has_alert = True + + if not args.skip_15min: + report["15min"] = run_15min_update(codes, local_conn) + if check_failure_rate(report["15min"], "15min"): + has_alert = True + finally: + # 关闭本地DB连接 + local_conn.close() + + # 同步DB到NAS + sync_db_to_nas() + + # BaoStock logout + if HAS_BAOSTOCK: + bs.logout() + logger.info("BaoStock logout") elapsed = time.time() - t_start report["elapsed_sec"] = round(elapsed, 1) @@ -851,7 +1084,7 @@ def main(): logger.info("✅ 全部完成,耗时 %.1f 秒", elapsed) logger.info(json.dumps(report, ensure_ascii=False, indent=2)) - # 写入报告 + # 报告文件 report_file = LOG_DIR / f"report_{datetime.now().strftime('%Y%m%d')}.json" report_file.write_text(json.dumps(report, ensure_ascii=False, indent=2))