#!/usr/bin/env python3 """ BaoStock 15分钟线历史回补脚本 功能:用BaoStock免费数据源回补全市场15分钟K线历史数据 特点: - BaoStock无反爬限制,0.35s/只 - 支持1999年以来的分钟线数据 - 增量回补:只回补现有数据之前的空白段 - 进度断点续传 用法: python3 backfill_15min_baostock.py # 全量回补到2024-01-01 python3 backfill_15min_baostock.py --start 20200101 # 指定起始日期 python3 backfill_15min_baostock.py --codes 000001,600000 # 指定股票 变更记录: v1.0 (2026-05-05) 赵云创建 - BaoStock 15min历史回补 """ import argparse import json import logging import os import shutil import sys import time from datetime import datetime from pathlib import Path from typing import List, Optional, Tuple import baostock as bs import pandas as pd # ======================== 配置 ======================== NAS_ROOT = Path("/Volumes/stock") MINUTE_15_DIR = NAS_ROOT / "minute_kline" / "15min" LOG_DIR = NAS_ROOT / "logs" / "daily_update" PROGRESS_DIR = LOG_DIR / "progress" ALL_STOCKS_FILE = NAS_ROOT / "A股数据" / "stock_info" / "all_stocks.csv" # BaoStock配置 BS_START_DATE = "2024-01-01" # 默认回补起始日 BS_INTERVAL = 0.4 # BaoStock请求间隔(秒) BS_MAX_RETRIES = 2 # 单只最大重试 PROGRESS_SAVE_EVERY = 500 # 每N只保存进度 # ======================== 日志 ======================== def setup_logging(): LOG_DIR.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = LOG_DIR / f"backfill_15min_{ts}.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", handlers=[ logging.FileHandler(log_file, encoding="utf-8"), logging.StreamHandler(), ], ) return logging.getLogger(__name__) logger = setup_logging() # ======================== 工具函数 ======================== def get_all_codes() -> List[str]: df = pd.read_csv(ALL_STOCKS_FILE) for col in ["代码", "code", "股票代码"]: if col in df.columns: return [str(c).zfill(6) for c in df[col].tolist()] raise ValueError(f"找不到代码列: {list(df.columns)}") def code_to_baostock(code: str) -> Tuple[str, str]: """6位代码 → BaoStock格式 (sz.000001) 和 parquet前缀""" if code.startswith(("6", "68", "51")): return f"sh.{code}", "sh" else: return f"sz.{code}", "sz" def get_parquet_last_date(parquet_path: Path) -> str: """获取parquet中最早一条的日期(YYYY-MM-DD)""" if not parquet_path.exists(): return "" try: df = pd.read_parquet(parquet_path, columns=["day"]) if not df.empty: return str(df["day"].min())[:10] except Exception: pass return "" def load_progress() -> set: progress_file = PROGRESS_DIR / "backfill_15min_progress.json" if progress_file.exists(): try: return set(json.loads(progress_file.read_text()).get("done", [])) except Exception: pass return set() def save_progress(done_set: set): PROGRESS_DIR.mkdir(parents=True, exist_ok=True) progress_file = PROGRESS_DIR / "backfill_15min_progress.json" progress_file.write_text(json.dumps({ "done": sorted(done_set), "ts": datetime.now().isoformat(), })) # ======================== 核心逻辑 ======================== def fetch_bs_15min(bs_code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: """从BaoStock获取15min数据,返回与parquet兼容的DataFrame""" rs = bs.query_history_k_data_plus( bs_code, "date,time,code,open,high,low,close,volume,amount,adjustflag", start_date=start_date, end_date=end_date, frequency="15", adjustflag="3", # 不复权 ) if rs.error_code != "0": logger.debug("BaoStock %s 错误: %s %s", bs_code, rs.error_code, rs.error_msg) return None rows = [] while rs.next(): rows.append(rs.get_row_data()) if not rows: return None df = pd.DataFrame(rows, columns=["date", "time", "code", "open", "high", "low", "close", "volume", "amount", "adjustflag"]) # 转换 time 格式: 20260428094500000 → 2026-04-28 09:45:00 df["day"] = df["time"].apply(lambda t: f"{t[:4]}-{t[4:6]}-{t[6:8]} {t[8:10]}:{t[10:12]}:00") # 数值转换 for col in ["open", "high", "low", "close"]: df[col] = pd.to_numeric(df[col], errors="coerce") df["volume"] = df["volume"].astype(str) df["amount"] = df["amount"].astype(str) # 保留与parquet一致的列 df = df[["day", "open", "high", "low", "close", "volume", "amount"]] # 过滤无效数据 df = df.dropna(subset=["open", "high", "low", "close"]) bad_ohlc = (df["high"] < df[["open", "close"]].max(axis=1)) | \ (df["low"] > df[["open", "close"]].min(axis=1)) if bad_ohlc.any(): df = df[~bad_ohlc] return df if not df.empty else None def backfill_one(code: str, start_date: str) -> Tuple[str, int]: """ 回补单只股票的15min历史数据 返回: (status, new_rows) """ bs_code, prefix = code_to_baostock(code) parquet_path = MINUTE_15_DIR / f"{prefix}{code}_15min.parquet" # 确定回补结束日期 = 现有数据最早日期的前一天(避免重叠) earliest_existing = get_parquet_last_date(parquet_path) if earliest_existing: # 已有数据,回补到最早日期之前 end_date = (pd.Timestamp(earliest_existing) - pd.Timedelta(days=1)).strftime("%Y-%m-%d") if end_date <= start_date: return "skipped", 0 else: # 全新股票,回补到最近交易日 end_date = datetime.now().strftime("%Y-%m-%d") # 获取数据 df_new = None for attempt in range(BS_MAX_RETRIES): try: df_new = fetch_bs_15min(bs_code, start_date, end_date) if df_new is not None and len(df_new) > 0: break except Exception as e: logger.debug("backfill %s 重试%d: %s", code, attempt + 1, e) time.sleep(1) if df_new is None or df_new.empty: return "failed", 0 # 合并到parquet try: if parquet_path.exists(): df_exist = pd.read_parquet(parquet_path) df_merged = pd.concat([df_new, df_exist], ignore_index=True) # 去重(按day字段) df_merged = df_merged.drop_duplicates(subset=["day"], keep="last") df_merged = df_merged.sort_values("day").reset_index(drop=True) else: MINUTE_15_DIR.mkdir(parents=True, exist_ok=True) df_merged = df_new df_merged.to_parquet(parquet_path, index=False) return "ok", len(df_new) except Exception as e: logger.error("写入 %s 失败: %s", code, e) return "failed", 0 # ======================== 主流程 ======================== def main(): parser = argparse.ArgumentParser(description="BaoStock 15min历史回补") parser.add_argument("--start", default=BS_START_DATE, help="回补起始日期 (YYYYMMDD)") parser.add_argument("--codes", help="指定股票代码,逗号分隔") parser.add_argument("--limit", type=int, default=0, help="限制处理数量(测试用)") args = parser.parse_args() start_date = f"{args.start[:4]}-{args.start[4:6]}-{args.start[6:8]}" if not NAS_ROOT.exists(): logger.error("❌ NAS未挂载") sys.exit(1) # 登录BaoStock lg = bs.login() if lg.error_code != "0": logger.error("❌ BaoStock登录失败: %s", lg.error_msg) sys.exit(1) logger.info("✅ BaoStock登录成功") # 获取股票列表 if args.codes: codes = [c.strip() for c in args.codes.split(",")] else: codes = get_all_codes() if args.limit > 0: codes = codes[:args.limit] logger.info("=" * 60) logger.info("BaoStock 15min历史回补开始") logger.info(f" 股票数: {len(codes)}") logger.info(f" 回补起始: {start_date}") logger.info(f" 数据目录: {MINUTE_15_DIR}") # 进度恢复 done_set = load_progress() todo = [c for c in codes if c not in done_set] logger.info(f" 待处理: {len(todo)}(已完成: {len(done_set)})") stats = {"updated": 0, "skipped": 0, "failed": 0, "rows": 0} t_start = time.time() for i, code in enumerate(todo): try: status, new_rows = backfill_one(code, start_date) except Exception as e: status, new_rows = "failed", 0 logger.debug("backfill %s 异常: %s", code, e) stats[status] = stats.get(status, 0) + 1 if status == "ok": stats["rows"] += new_rows done_set.add(code) if (i + 1) % PROGRESS_SAVE_EVERY == 0: save_progress(done_set) elapsed = time.time() - t_start logger.info("进度: %d/%d updated=%d skipped=%d failed=%d rows=%d (%.0f秒)", i + 1, len(todo), stats["updated"], stats["skipped"], stats["failed"], stats["rows"], elapsed) # 频率控制 if i < len(todo) - 1: time.sleep(BS_INTERVAL) # 保存最终进度 save_progress(done_set) bs.logout() elapsed = time.time() - t_start logger.info("=" * 60) logger.info("✅ 回补完成,耗时 %.1f 秒", elapsed) logger.info("统计: %s", json.dumps(stats, ensure_ascii=False)) if __name__ == "__main__": main()