#!/usr/bin/env python3 """ 全市场每日增量更新 - 日线 + 15分钟线 功能: 1. 日线:腾讯K线API → 更新Parquet + vnpy DB 2. 15分钟线:新浪API(800条) → 增量合并Parquet + 导入vnpy DB 设计原则: - 增量更新,不重复下载 - 失败不影响其他股票 - 进度持久化,支持断点续传 - 限频保护 - 日志完整 用法: python3 daily_all_update.py # 全量更新(日线+15min) python3 daily_all_update.py --skip-daily # 只更新15min python3 daily_all_update.py --skip-15min # 只更新日线 """ import argparse import json import os import re import sqlite3 import sys import time import logging import urllib.request import urllib.error from datetime import datetime, timedelta from pathlib import Path from typing import Optional, List, Tuple import pandas as pd # ======================== 配置 ======================== LOG_DIR = Path("/Volumes/stock/logs/daily_update") DAILY_DIR = Path("/Volumes/stock/A股数据/日线数据/daily") MINUTE_15_DIR = Path("/Volumes/stock/minute_kline/15min") VNPY_DB_PATH = Path("/Volumes/stock/sanguo_vnpy/data/quant_trading.db") ALL_STOCKS_FILE = Path("/Volumes/stock/A股数据/stock_info/stock_basic_info_raw_20260326_113530.csv") PROGRESS_DIR = Path("/Volumes/stock/logs/daily_update/progress") REQUEST_INTERVAL = 0.3 MAX_RETRIES = 3 CONSECUTIVE_FAIL_PAUSE = 60 MAX_CONSECUTIVE_FAILS = 10 DB_BATCH_SIZE = 50000 HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"} def setup_logging(): LOG_DIR.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = LOG_DIR / f"update_{ts}.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", handlers=[ logging.FileHandler(log_file, encoding="utf-8"), logging.StreamHandler(), ], ) return logging.getLogger(__name__) logger = setup_logging() def _make_opener(): return urllib.request.build_opener(urllib.request.ProxyHandler({})) # ======================== 工具函数 ======================== def get_market_prefix(code: str) -> Tuple[str, str]: code = re.sub(r"[^0-9]", "", code).zfill(6) if code.startswith(("60", "68", "51")): return "sh", code return "sz", code def get_all_codes() -> List[str]: df = pd.read_csv(ALL_STOCKS_FILE) for col in ["代码", "code", "股票代码"]: if col in df.columns: return [str(c).zfill(6) for c in df[col].tolist()] raise ValueError(f"找不到代码列: {list(df.columns)}") def nas_mounted() -> bool: return DAILY_DIR.exists() and MINUTE_15_DIR.exists() # ======================== 日线更新 ======================== def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: """腾讯K线API获取日线增量""" prefix, clean = get_market_prefix(code) tq = f"{prefix}{clean}" days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10 url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days}," opener = _make_opener() req = urllib.request.Request(url, headers=HEADERS) with opener.open(req, timeout=10) as r: raw = r.read().decode("utf-8", errors="replace") data = json.loads(raw) d = data.get("data") if not isinstance(d, dict): return None klines = d.get(tq, {}).get("day", []) if not klines: return None df = pd.DataFrame(klines) ncols = len(df.columns) if ncols >= 7: df.columns = ["date", "open", "close", "high", "low", "volume", "amount"][:ncols] else: df.columns = ["date", "open", "close", "high", "low", "volume"][:ncols] if "amount" not in df.columns: df["amount"] = 0.0 for c in ["open", "close", "high", "low", "volume", "amount"]: df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") # 确保date为字符串比较 df["date"] = df["date"].astype(str) mask = (df["date"] >= start_date) & (df["date"] <= end_date) result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] return result if not result.empty else None def update_daily_parquet(code: str, new_data: pd.DataFrame) -> int: """增量写入日线Parquet""" prefix, clean = get_market_prefix(code) year = datetime.now().year parquet_path = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet" # 确保date为字符串类型,避免和已有parquet的datetime.date冲突 new_data = new_data.copy() new_data["date"] = new_data["date"].astype(str) if parquet_path.exists(): existing = pd.read_parquet(parquet_path) existing["date"] = existing["date"].astype(str) combined = pd.concat([existing, new_data], ignore_index=True) combined = combined.drop_duplicates(subset=["date"], keep="last") combined = combined.sort_values("date").reset_index(drop=True) else: parquet_path.parent.mkdir(parents=True, exist_ok=True) combined = new_data tmp = parquet_path.with_suffix(".tmp") combined.to_parquet(tmp, index=False) tmp.rename(parquet_path) return len(new_data) def get_daily_last_date(code: str) -> str: prefix, clean = get_market_prefix(code) for year in range(datetime.now().year, 2009, -1): fpath = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet" if fpath.exists(): try: df = pd.read_parquet(fpath, columns=["date"]) if not df.empty: val = df["date"].max() return str(val)[:10] except Exception: pass return "" def run_daily_update(codes: List[str]) -> dict: """日线增量更新""" logger.info("=" * 60) logger.info("日线增量更新开始,共 %d 只", len(codes)) today = datetime.now().strftime("%Y-%m-%d") stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} all_db_values = [] consecutive_fails = 0 for i, code in enumerate(codes): if i > 0: time.sleep(REQUEST_INTERVAL) last_date = get_daily_last_date(code) if not last_date: stats["skipped"] += 1 continue next_day = (pd.Timestamp(last_date) + timedelta(days=1)).strftime("%Y-%m-%d") if next_day > today: stats["skipped"] += 1 continue data = None for attempt in range(MAX_RETRIES): try: data = fetch_tencent_daily(code, next_day, today) break except Exception as e: if attempt < MAX_RETRIES - 1: time.sleep(1) else: logger.debug("日线 %s 失败: %s", code, e) if data is None or data.empty: stats["skipped"] += 1 continue # 校验 if (data[["open", "high", "low", "close"]] <= 0).any().any(): stats["failed"] += 1 continue try: n = update_daily_parquet(code, data) stats["updated"] += 1 stats["records"] += n consecutive_fails = 0 # 收集vnpy DB数据 prefix, clean = get_market_prefix(code) exchange = "SSE" if prefix == "sh" else "SZSE" for _, row in data.iterrows(): all_db_values.append(( clean, exchange, str(row["date"]), "d", float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, float(row.get("open", 0)), float(row.get("high", 0)), float(row.get("low", 0)), float(row.get("close", 0)), )) stats["db_records"] += 1 except Exception as e: stats["failed"] += 1 consecutive_fails += 1 logger.warning("日线写入失败 %s: %s", code, e) if consecutive_fails >= MAX_CONSECUTIVE_FAILS: logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) time.sleep(CONSECUTIVE_FAIL_PAUSE) consecutive_fails = 0 if (i + 1) % 500 == 0: logger.info("日线进度: %d/%d updated=%d", i + 1, len(codes), stats["updated"]) # 写vnpy DB if all_db_values: _write_vnpy_db(all_db_values, "日线") logger.info("日线完成: %s", json.dumps(stats, ensure_ascii=False)) return stats # ======================== 15分钟线更新 ======================== def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]: """新浪15分钟K线API""" url = ( f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20=min15_{symbol}=/" f"CN_MarketDataService.getKLineData?symbol={symbol}&scale=15&ma=no&datalen={datalen}" ) opener = _make_opener() req = urllib.request.Request(url, headers=HEADERS) with opener.open(req, timeout=15) as r: raw = r.read().decode("utf-8", errors="replace") m = re.search(r"\((\[.*\])\)", raw, re.DOTALL) if not m: return None data = json.loads(m.group(1)) if not data: return None df = pd.DataFrame(data) cols = ["day", "open", "high", "low", "close", "volume", "amount"] for c in cols: if c not in df.columns: return None return df[cols] def update_15min_parquet(code: str) -> Tuple[str, int]: """增量下载并合并15分钟线""" prefix, clean = get_market_prefix(code) symbol = f"{prefix}{clean}" parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet" df_new = None for attempt in range(MAX_RETRIES): try: df_new = try_sina_15min(symbol) if df_new is not None and len(df_new) > 0: break except Exception: if attempt < MAX_RETRIES - 1: time.sleep(1) if df_new is None or df_new.empty: return "failed", 0 # 数据校验 for col in ["open", "high", "low", "close"]: df_new[col] = pd.to_numeric(df_new[col], errors="coerce") df_new["volume"] = pd.to_numeric(df_new["volume"], errors="coerce").fillna(0) df_new["amount"] = pd.to_numeric(df_new["amount"], errors="coerce").fillna(0) bad_zero = (df_new[["close", "open"]] <= 0).any(axis=1) if bad_zero.any(): df_new = df_new[~bad_zero] bad_ohlc = (df_new["high"] < df_new[["open", "close"]].max(axis=1)) | \ (df_new["low"] > df_new[["open", "close"]].min(axis=1)) if bad_ohlc.any(): df_new = df_new[~bad_ohlc] if df_new.empty: return "failed", 0 # 转回object类型 df_new["volume"] = df_new["volume"].astype(str) df_new["amount"] = df_new["amount"].astype(str) if parquet_path.exists(): existing = pd.read_parquet(parquet_path) old_len = len(existing) combined = pd.concat([existing, df_new], ignore_index=True) combined = combined.drop_duplicates(subset=["day"], keep="last") combined = combined.sort_values("day").reset_index(drop=True) new_rows = len(combined) - old_len else: combined = df_new new_rows = len(df_new) tmp = parquet_path.with_suffix(".tmp") combined.to_parquet(tmp, index=False) tmp.rename(parquet_path) return "ok", new_rows def run_15min_update(codes: List[str]) -> dict: """15分钟线增量更新""" logger.info("=" * 60) logger.info("15分钟线增量更新开始,共 %d 只", len(codes)) stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} all_db_values = [] consecutive_fails = 0 # 加载进度(支持断点续传) progress_file = PROGRESS_DIR / "15min_progress.json" PROGRESS_DIR.mkdir(parents=True, exist_ok=True) done_set = set() if progress_file.exists(): try: done_set = set(json.loads(progress_file.read_text()).get("done", [])) except Exception: pass todo = [c for c in codes if c not in done_set] logger.info("待更新: %d(已完成: %d)", len(todo), len(done_set)) for i, code in enumerate(todo): if i > 0: time.sleep(REQUEST_INTERVAL) try: status, new_rows = update_15min_parquet(code) except Exception as e: status, new_rows = "failed", 0 logger.debug("15min %s 异常: %s", code, e) if status == "ok": stats["updated"] += 1 stats["records"] += new_rows consecutive_fails = 0 # 收集vnpy DB数据 prefix, clean = get_market_prefix(code) parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet" if parquet_path.exists() and new_rows > 0: df = pd.read_parquet(parquet_path) # 只取最新的增量部分 if len(df) > new_rows: df_new_part = df.iloc[-new_rows:] else: df_new_part = df exchange = "SSE" if prefix == "sh" else "SZSE" for _, row in df_new_part.iterrows(): all_db_values.append(( clean, exchange, str(row["day"]), "1m", float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0, float(row.get("open", 0)), float(row.get("high", 0)), float(row.get("low", 0)), float(row.get("close", 0)), )) stats["db_records"] += len(df_new_part) else: stats["failed"] += 1 consecutive_fails += 1 done_set.add(code) if consecutive_fails >= MAX_CONSECUTIVE_FAILS: logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) time.sleep(CONSECUTIVE_FAIL_PAUSE) consecutive_fails = 0 if (i + 1) % 500 == 0: logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) # 保存进度 progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()})) # 写vnpy DB if all_db_values: _write_vnpy_db(all_db_values, "15min") # 保存最终进度 progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()})) logger.info("15min完成: %s", json.dumps(stats, ensure_ascii=False)) return stats # ======================== vnpy DB写入 ======================== def _write_vnpy_db(values: list, label: str): """增量写入vnpy SQLite DB""" logger.info("写入vnpy DB [%s]: %d 条记录", label, len(values)) try: conn = sqlite3.connect(str(VNPY_DB_PATH), timeout=120) c = conn.cursor() c.execute("PRAGMA journal_mode=WAL") for j in range(0, len(values), DB_BATCH_SIZE): c.executemany( """INSERT OR REPLACE INTO dbbardata (symbol,exchange,datetime,interval,volume,turnover,open_interest, open_price,high_price,low_price,close_price) VALUES (?,?,?,?,?,?,?,?,?,?,?)""", values[j : j + DB_BATCH_SIZE], ) conn.commit() logger.info(" DB批次 %d/%d", j // DB_BATCH_SIZE + 1, (len(values) - 1) // DB_BATCH_SIZE + 1) # 更新overview c.execute( """INSERT OR REPLACE INTO dbbaroverview (symbol,exchange,interval,count,start,end) SELECT symbol,exchange,interval,COUNT(*),MIN(datetime),MAX(datetime) FROM dbbardata GROUP BY symbol,exchange,interval""" ) conn.commit() conn.close() logger.info("✅ vnpy DB [%s] 写入完成", label) except Exception as e: logger.error("❌ vnpy DB [%s] 写入失败: %s", label, e) # ======================== 主入口 ======================== def main(): parser = argparse.ArgumentParser(description="全市场每日增量更新") parser.add_argument("--skip-daily", action="store_true", help="跳过日线更新") parser.add_argument("--skip-15min", action="store_true", help="跳过15分钟线更新") args = parser.parse_args() if not nas_mounted(): logger.error("❌ NAS未挂载,退出") sys.exit(1) codes = get_all_codes() logger.info("全市场股票数: %d", len(codes)) logger.info("更新时间: %s", datetime.now().isoformat()) t_start = time.time() report = {} if not args.skip_daily: report["daily"] = run_daily_update(codes) if not args.skip_15min: report["15min"] = run_15min_update(codes) elapsed = time.time() - t_start report["elapsed_sec"] = round(elapsed, 1) logger.info("=" * 60) logger.info("全部完成,耗时 %.1f 秒", elapsed) logger.info(json.dumps(report, ensure_ascii=False, indent=2)) # 写入报告 report_file = LOG_DIR / f"report_{datetime.now().strftime('%Y%m%d')}.json" report_file.write_text(json.dumps(report, ensure_ascii=False, indent=2)) return report if __name__ == "__main__": main()