diff --git a/data_platform/daily_all_update.py b/data_platform/daily_all_update.py new file mode 100644 index 00000000..0110d5de --- /dev/null +++ b/data_platform/daily_all_update.py @@ -0,0 +1,490 @@ +#!/usr/bin/env python3 +""" +全市场每日增量更新 - 日线 + 15分钟线 + +功能: + 1. 日线:腾讯K线API → 更新Parquet + vnpy DB + 2. 15分钟线:新浪API(800条) → 增量合并Parquet + 导入vnpy DB + +设计原则: + - 增量更新,不重复下载 + - 失败不影响其他股票 + - 进度持久化,支持断点续传 + - 限频保护 + - 日志完整 + +用法: + python3 daily_all_update.py # 全量更新(日线+15min) + python3 daily_all_update.py --skip-daily # 只更新15min + python3 daily_all_update.py --skip-15min # 只更新日线 +""" + +import argparse +import json +import os +import re +import sqlite3 +import sys +import time +import logging +import urllib.request +import urllib.error +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional, List, Tuple + +import pandas as pd + +# ======================== 配置 ======================== + +LOG_DIR = Path("/Volumes/stock/logs/daily_update") +DAILY_DIR = Path("/Volumes/stock/A股数据/日线数据/daily") +MINUTE_15_DIR = Path("/Volumes/stock/minute_kline/15min") +VNPY_DB_PATH = Path("/Volumes/stock/sanguo_vnpy/data/quant_trading.db") +ALL_STOCKS_FILE = Path("/Volumes/stock/A股数据/stock_info/stock_basic_info_raw_20260326_113530.csv") +PROGRESS_DIR = Path("/Volumes/stock/logs/daily_update/progress") + +REQUEST_INTERVAL = 0.3 +MAX_RETRIES = 3 +CONSECUTIVE_FAIL_PAUSE = 60 +MAX_CONSECUTIVE_FAILS = 10 +DB_BATCH_SIZE = 50000 + +HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"} + + +def setup_logging(): + LOG_DIR.mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file = LOG_DIR / f"update_{ts}.log" + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + handlers=[ + logging.FileHandler(log_file, encoding="utf-8"), + logging.StreamHandler(), + ], + ) + return logging.getLogger(__name__) + + +logger = setup_logging() + + +def _make_opener(): + return urllib.request.build_opener(urllib.request.ProxyHandler({})) + + +# ======================== 工具函数 ======================== + +def get_market_prefix(code: str) -> Tuple[str, str]: + code = re.sub(r"[^0-9]", "", code).zfill(6) + if code.startswith(("60", "68", "51")): + return "sh", code + return "sz", code + + +def get_all_codes() -> List[str]: + df = pd.read_csv(ALL_STOCKS_FILE) + for col in ["代码", "code", "股票代码"]: + if col in df.columns: + return [str(c).zfill(6) for c in df[col].tolist()] + raise ValueError(f"找不到代码列: {list(df.columns)}") + + +def nas_mounted() -> bool: + return DAILY_DIR.exists() and MINUTE_15_DIR.exists() + + +# ======================== 日线更新 ======================== + +def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: + """腾讯K线API获取日线增量""" + prefix, clean = get_market_prefix(code) + tq = f"{prefix}{clean}" + days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10 + url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days}," + + opener = _make_opener() + req = urllib.request.Request(url, headers=HEADERS) + with opener.open(req, timeout=10) as r: + raw = r.read().decode("utf-8", errors="replace") + data = json.loads(raw) + d = data.get("data") + if not isinstance(d, dict): + return None + klines = d.get(tq, {}).get("day", []) + if not klines: + return None + + df = pd.DataFrame(klines) + ncols = len(df.columns) + if ncols >= 7: + df.columns = ["date", "open", "close", "high", "low", "volume", "amount"][:ncols] + else: + df.columns = ["date", "open", "close", "high", "low", "volume"][:ncols] + if "amount" not in df.columns: + df["amount"] = 0.0 + for c in ["open", "close", "high", "low", "volume", "amount"]: + df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) + df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") + mask = (df["date"] >= start_date) & (df["date"] <= end_date) + result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] + return result if not result.empty else None + + +def update_daily_parquet(code: str, new_data: pd.DataFrame) -> int: + """增量写入日线Parquet""" + prefix, clean = get_market_prefix(code) + year = datetime.now().year + parquet_path = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet" + + if parquet_path.exists(): + existing = pd.read_parquet(parquet_path) + combined = pd.concat([existing, new_data], ignore_index=True) + combined = combined.drop_duplicates(subset=["date"], keep="last") + combined = combined.sort_values("date").reset_index(drop=True) + else: + parquet_path.parent.mkdir(parents=True, exist_ok=True) + combined = new_data + + tmp = parquet_path.with_suffix(".tmp") + combined.to_parquet(tmp, index=False) + tmp.rename(parquet_path) + return len(new_data) + + +def get_daily_last_date(code: str) -> str: + prefix, clean = get_market_prefix(code) + for year in range(datetime.now().year, 2009, -1): + fpath = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet" + if fpath.exists(): + try: + df = pd.read_parquet(fpath, columns=["date"]) + if not df.empty: + return str(df["date"].max())[:10] + except Exception: + pass + return "" + + +def run_daily_update(codes: List[str]) -> dict: + """日线增量更新""" + logger.info("=" * 60) + logger.info("日线增量更新开始,共 %d 只", len(codes)) + today = datetime.now().strftime("%Y-%m-%d") + + stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} + all_db_values = [] + consecutive_fails = 0 + + for i, code in enumerate(codes): + if i > 0: + time.sleep(REQUEST_INTERVAL) + + last_date = get_daily_last_date(code) + if not last_date: + stats["skipped"] += 1 + continue + next_day = (pd.Timestamp(last_date) + timedelta(days=1)).strftime("%Y-%m-%d") + if next_day > today: + stats["skipped"] += 1 + continue + + data = None + for attempt in range(MAX_RETRIES): + try: + data = fetch_tencent_daily(code, next_day, today) + break + except Exception as e: + if attempt < MAX_RETRIES - 1: + time.sleep(1) + else: + logger.debug("日线 %s 失败: %s", code, e) + + if data is None or data.empty: + stats["skipped"] += 1 + continue + + # 校验 + if (data[["open", "high", "low", "close"]] <= 0).any().any(): + stats["failed"] += 1 + continue + + try: + n = update_daily_parquet(code, data) + stats["updated"] += 1 + stats["records"] += n + consecutive_fails = 0 + + # 收集vnpy DB数据 + prefix, clean = get_market_prefix(code) + exchange = "SSE" if prefix == "sh" else "SZSE" + for _, row in data.iterrows(): + all_db_values.append(( + clean, exchange, str(row["date"]), "d", + float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, + float(row.get("open", 0)), float(row.get("high", 0)), + float(row.get("low", 0)), float(row.get("close", 0)), + )) + stats["db_records"] += 1 + except Exception as e: + stats["failed"] += 1 + consecutive_fails += 1 + logger.warning("日线写入失败 %s: %s", code, e) + + if consecutive_fails >= MAX_CONSECUTIVE_FAILS: + logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) + time.sleep(CONSECUTIVE_FAIL_PAUSE) + consecutive_fails = 0 + + if (i + 1) % 500 == 0: + logger.info("日线进度: %d/%d updated=%d", i + 1, len(codes), stats["updated"]) + + # 写vnpy DB + if all_db_values: + _write_vnpy_db(all_db_values, "日线") + + logger.info("日线完成: %s", json.dumps(stats, ensure_ascii=False)) + return stats + + +# ======================== 15分钟线更新 ======================== + +def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]: + """新浪15分钟K线API""" + url = ( + f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20=min15_{symbol}=/" + f"CN_MarketDataService.getKLineData?symbol={symbol}&scale=15&ma=no&datalen={datalen}" + ) + opener = _make_opener() + req = urllib.request.Request(url, headers=HEADERS) + with opener.open(req, timeout=15) as r: + raw = r.read().decode("utf-8", errors="replace") + m = re.search(r"\((\[.*\])\)", raw, re.DOTALL) + if not m: + return None + data = json.loads(m.group(1)) + if not data: + return None + df = pd.DataFrame(data) + cols = ["day", "open", "high", "low", "close", "volume", "amount"] + for c in cols: + if c not in df.columns: + return None + return df[cols] + + +def update_15min_parquet(code: str) -> Tuple[str, int]: + """增量下载并合并15分钟线""" + prefix, clean = get_market_prefix(code) + symbol = f"{prefix}{clean}" + parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet" + + df_new = None + for attempt in range(MAX_RETRIES): + try: + df_new = try_sina_15min(symbol) + if df_new is not None and len(df_new) > 0: + break + except Exception: + if attempt < MAX_RETRIES - 1: + time.sleep(1) + + if df_new is None or df_new.empty: + return "failed", 0 + + # 数据校验 + for col in ["open", "high", "low", "close"]: + df_new[col] = pd.to_numeric(df_new[col], errors="coerce") + df_new["volume"] = pd.to_numeric(df_new["volume"], errors="coerce").fillna(0) + df_new["amount"] = pd.to_numeric(df_new["amount"], errors="coerce").fillna(0) + + bad_zero = (df_new[["close", "open"]] <= 0).any(axis=1) + if bad_zero.any(): + df_new = df_new[~bad_zero] + bad_ohlc = (df_new["high"] < df_new[["open", "close"]].max(axis=1)) | \ + (df_new["low"] > df_new[["open", "close"]].min(axis=1)) + if bad_ohlc.any(): + df_new = df_new[~bad_ohlc] + if df_new.empty: + return "failed", 0 + + # 转回object类型 + df_new["volume"] = df_new["volume"].astype(str) + df_new["amount"] = df_new["amount"].astype(str) + + if parquet_path.exists(): + existing = pd.read_parquet(parquet_path) + old_len = len(existing) + combined = pd.concat([existing, df_new], ignore_index=True) + combined = combined.drop_duplicates(subset=["day"], keep="last") + combined = combined.sort_values("day").reset_index(drop=True) + new_rows = len(combined) - old_len + else: + combined = df_new + new_rows = len(df_new) + + tmp = parquet_path.with_suffix(".tmp") + combined.to_parquet(tmp, index=False) + tmp.rename(parquet_path) + return "ok", new_rows + + +def run_15min_update(codes: List[str]) -> dict: + """15分钟线增量更新""" + logger.info("=" * 60) + logger.info("15分钟线增量更新开始,共 %d 只", len(codes)) + + stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} + all_db_values = [] + consecutive_fails = 0 + + # 加载进度(支持断点续传) + progress_file = PROGRESS_DIR / "15min_progress.json" + PROGRESS_DIR.mkdir(parents=True, exist_ok=True) + done_set = set() + if progress_file.exists(): + try: + done_set = set(json.loads(progress_file.read_text()).get("done", [])) + except Exception: + pass + + todo = [c for c in codes if c not in done_set] + logger.info("待更新: %d(已完成: %d)", len(todo), len(done_set)) + + for i, code in enumerate(todo): + if i > 0: + time.sleep(REQUEST_INTERVAL) + + try: + status, new_rows = update_15min_parquet(code) + except Exception as e: + status, new_rows = "failed", 0 + logger.debug("15min %s 异常: %s", code, e) + + if status == "ok": + stats["updated"] += 1 + stats["records"] += new_rows + consecutive_fails = 0 + + # 收集vnpy DB数据 + prefix, clean = get_market_prefix(code) + parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet" + if parquet_path.exists() and new_rows > 0: + df = pd.read_parquet(parquet_path) + # 只取最新的增量部分 + if len(df) > new_rows: + df_new_part = df.iloc[-new_rows:] + else: + df_new_part = df + + exchange = "SSE" if prefix == "sh" else "SZSE" + for _, row in df_new_part.iterrows(): + all_db_values.append(( + clean, exchange, str(row["day"]), "1m", + float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, + float(row.get("open", 0)), float(row.get("high", 0)), + float(row.get("low", 0)), float(row.get("close", 0)), + )) + stats["db_records"] += len(df_new_part) + else: + stats["failed"] += 1 + consecutive_fails += 1 + + done_set.add(code) + + if consecutive_fails >= MAX_CONSECUTIVE_FAILS: + logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) + time.sleep(CONSECUTIVE_FAIL_PAUSE) + consecutive_fails = 0 + + if (i + 1) % 500 == 0: + logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) + # 保存进度 + progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()})) + + # 写vnpy DB + if all_db_values: + _write_vnpy_db(all_db_values, "15min") + + # 保存最终进度 + progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()})) + logger.info("15min完成: %s", json.dumps(stats, ensure_ascii=False)) + return stats + + +# ======================== vnpy DB写入 ======================== + +def _write_vnpy_db(values: list, label: str): + """增量写入vnpy SQLite DB""" + logger.info("写入vnpy DB [%s]: %d 条记录", label, len(values)) + try: + conn = sqlite3.connect(str(VNPY_DB_PATH), timeout=120) + c = conn.cursor() + c.execute("PRAGMA journal_mode=WAL") + for j in range(0, len(values), DB_BATCH_SIZE): + c.executemany( + """INSERT OR REPLACE INTO dbbardata + (symbol,exchange,datetime,interval,volume,turnover,open_interest, + open_price,high_price,low_price,close_price) + VALUES (?,?,?,?,?,?,?,?,?,?,?)""", + values[j : j + DB_BATCH_SIZE], + ) + conn.commit() + logger.info(" DB批次 %d/%d", j // DB_BATCH_SIZE + 1, (len(values) - 1) // DB_BATCH_SIZE + 1) + + # 更新overview + c.execute( + """INSERT OR REPLACE INTO dbbaroverview (symbol,exchange,interval,count,start,end) + SELECT symbol,exchange,interval,COUNT(*),MIN(datetime),MAX(datetime) + FROM dbbardata GROUP BY symbol,exchange,interval""" + ) + conn.commit() + conn.close() + logger.info("✅ vnpy DB [%s] 写入完成", label) + except Exception as e: + logger.error("❌ vnpy DB [%s] 写入失败: %s", label, e) + + +# ======================== 主入口 ======================== + +def main(): + parser = argparse.ArgumentParser(description="全市场每日增量更新") + parser.add_argument("--skip-daily", action="store_true", help="跳过日线更新") + parser.add_argument("--skip-15min", action="store_true", help="跳过15分钟线更新") + args = parser.parse_args() + + if not nas_mounted(): + logger.error("❌ NAS未挂载,退出") + sys.exit(1) + + codes = get_all_codes() + logger.info("全市场股票数: %d", len(codes)) + logger.info("更新时间: %s", datetime.now().isoformat()) + + t_start = time.time() + report = {} + + if not args.skip_daily: + report["daily"] = run_daily_update(codes) + + if not args.skip_15min: + report["15min"] = run_15min_update(codes) + + elapsed = time.time() - t_start + report["elapsed_sec"] = round(elapsed, 1) + logger.info("=" * 60) + logger.info("全部完成,耗时 %.1f 秒", elapsed) + logger.info(json.dumps(report, ensure_ascii=False, indent=2)) + + # 写入报告 + report_file = LOG_DIR / f"report_{datetime.now().strftime('%Y%m%d')}.json" + report_file.write_text(json.dumps(report, ensure_ascii=False, indent=2)) + + return report + + +if __name__ == "__main__": + main()