#!/usr/bin/env python3 """ BaoStock 15分钟线历史回补脚本 (v1.1) 功能:用BaoStock免费数据源全量重建全市场15分钟K线历史数据 策略: - 全量重建:BaoStock获取完整历史,完全替换旧parquet - 旧文件备份到 backup/ 目录 - 增量模式:已有BaoStock重建过的文件自动跳过(检查标记) BaoStock特点: - 无反爬限制,0.35s/只 - 不复权数据(adjustflag=3) - 分钟线从1999年起,我们取2024-01-01起 用法: python3 backfill_15min_baostock.py # 全量回补 python3 backfill_15min_baostock.py --start 20200101 # 指定起始日期 python3 backfill_15min_baostock.py --limit 10 # 只处理前10只(测试) python3 backfill_15min_baostock.py --force # 强制重建(覆盖已有BaoStock数据) 变更记录: v1.0 (2026-05-05) 赵云创建 v1.1 (2026-05-05) 修复:旧新浪数据含后复权值,改为全量重建模式 """ import argparse import json import logging import os import shutil import sys import time from datetime import datetime from pathlib import Path from typing import List, Optional, Tuple import baostock as bs import pandas as pd # ======================== 配置 ======================== NAS_ROOT = Path("/Volumes/stock") MINUTE_15_DIR = NAS_ROOT / "minute_kline" / "15min" BACKUP_DIR = MINUTE_15_DIR / "backup_sina" # 旧新浪数据备份 LOG_DIR = NAS_ROOT / "logs" / "daily_update" PROGRESS_DIR = LOG_DIR / "progress" ALL_STOCKS_FILE = NAS_ROOT / "A股数据" / "stock_info" / "stock_basic_info_raw_20260326_113530.csv" # BaoStock配置 BS_START_DATE = "2024-01-01" BS_INTERVAL = 0.4 # 请求间隔秒 BS_MAX_RETRIES = 2 PROGRESS_SAVE_EVERY = 500 # ======================== 日志 ======================== def setup_logging(): LOG_DIR.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") # 日志写本地避免SMB阻塞 log_file = Path(f"/tmp/backfill_15min_{ts}.log") logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", handlers=[ logging.FileHandler(log_file, encoding="utf-8"), logging.StreamHandler(), ], ) return logging.getLogger(__name__), log_file logger, LOG_FILE = setup_logging() # ======================== 工具函数 ======================== def get_all_codes() -> List[str]: df = pd.read_csv(ALL_STOCKS_FILE) for col in ["代码", "code", "股票代码"]: if col in df.columns: return [str(c).zfill(6) for c in df[col].tolist()] raise ValueError(f"找不到代码列: {list(df.columns)}") def code_to_baostock(code: str) -> Tuple[str, str]: """6位代码 → (BaoStock格式, parquet前缀)""" if code.startswith(("6", "68", "51")): return f"sh.{code}", "sh" else: return f"sz.{code}", "sz" def is_backfilled(parquet_path: Path) -> bool: """检查文件是否已经被BaoStock回补过""" # 用标记文件判断 marker = parquet_path.parent / f".{parquet_path.stem}.baostock" return marker.exists() def load_progress() -> set: progress_file = PROGRESS_DIR / "backfill_15min_progress.json" if progress_file.exists(): try: return set(json.loads(progress_file.read_text()).get("done", [])) except Exception: pass return set() def save_progress(done_set: set): PROGRESS_DIR.mkdir(parents=True, exist_ok=True) progress_file = PROGRESS_DIR / "backfill_15min_progress.json" progress_file.write_text(json.dumps({ "done": sorted(done_set), "ts": datetime.now().isoformat(), }, ensure_ascii=False)) # ======================== 核心逻辑 ======================== def fetch_bs_15min(bs_code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: """从BaoStock获取15min不复权数据""" rs = bs.query_history_k_data_plus( bs_code, "date,time,code,open,high,low,close,volume,amount,adjustflag", start_date=start_date, end_date=end_date, frequency="15", adjustflag="3", # 不复权 ) if rs.error_code != "0": logger.debug("BaoStock %s 错误: %s %s", bs_code, rs.error_code, rs.error_msg) return None rows = [] while rs.next(): rows.append(rs.get_row_data()) if not rows: return None df = pd.DataFrame(rows, columns=["date", "time", "code", "open", "high", "low", "close", "volume", "amount", "adjustflag"]) # 转换时间格式: 20260428094500000 → 2026-04-28 09:45:00 df["day"] = df["time"].apply(lambda t: f"{t[:4]}-{t[4:6]}-{t[6:8]} {t[8:10]}:{t[10:12]}:00") # 数值转换 for col in ["open", "high", "low", "close"]: df[col] = pd.to_numeric(df[col], errors="coerce") df["volume"] = df["volume"].astype(str) df["amount"] = df["amount"].astype(str) # 保留与parquet一致的列 df = df[["day", "open", "high", "low", "close", "volume", "amount"]] # 过滤无效数据 df = df.dropna(subset=["open", "high", "low", "close"]) bad_ohlc = (df["high"] < df[["open", "close"]].max(axis=1)) | \ (df["low"] > df[["open", "close"]].min(axis=1)) if bad_ohlc.any(): df = df[~bad_ohlc] return df if not df.empty else None def backfill_one(code: str, start_date: str, end_date: str, force: bool = False) -> Tuple[str, int]: """ 全量重建单只股票的15min历史 返回: (status, total_rows) """ bs_code, prefix = code_to_baostock(code) parquet_path = MINUTE_15_DIR / f"{prefix}{code}_15min.parquet" # 已回补过的跳过 if not force and is_backfilled(parquet_path): return "skipped", 0 # 获取BaoStock数据 df_new = None for attempt in range(BS_MAX_RETRIES): try: df_new = fetch_bs_15min(bs_code, start_date, end_date) if df_new is not None and len(df_new) > 0: break except Exception as e: logger.debug("backfill %s 重试%d: %s", code, attempt + 1, e) time.sleep(1) if df_new is None or df_new.empty: return "failed", 0 # 备份旧文件 if parquet_path.exists(): BACKUP_DIR.mkdir(parents=True, exist_ok=True) backup_path = BACKUP_DIR / parquet_path.name if not backup_path.exists(): # 不覆盖已有备份 shutil.copy2(str(parquet_path), str(backup_path)) # 写入新文件 try: df_new = df_new.sort_values("day").reset_index(drop=True) df_new.to_parquet(parquet_path, index=False) # 写标记文件 marker = parquet_path.parent / f".{parquet_path.stem}.baostock" marker.write_text(datetime.now().isoformat()) return "ok", len(df_new) except Exception as e: logger.error("写入 %s 失败: %s", code, e) return "failed", 0 # ======================== 主流程 ======================== def main(): parser = argparse.ArgumentParser(description="BaoStock 15min历史回补(全量重建)") parser.add_argument("--start", default=BS_START_DATE, help="回补起始日期 (YYYYMMDD)") parser.add_argument("--end", default="", help="结束日期 (默认今天)") parser.add_argument("--codes", help="指定股票代码,逗号分隔") parser.add_argument("--limit", type=int, default=0, help="限制处理数量(测试用)") parser.add_argument("--force", action="store_true", help="强制重建(覆盖已有BaoStock数据)") args = parser.parse_args() start_date = f"{args.start[:4]}-{args.start[4:6]}-{args.start[6:8]}" end_date = f"{args.end[:4]}-{args.end[4:6]}-{args.end[6:8]}" if args.end else datetime.now().strftime("%Y-%m-%d") if not NAS_ROOT.exists(): logger.error("❌ NAS未挂载") sys.exit(1) # 登录BaoStock lg = bs.login() if lg.error_code != "0": logger.error("❌ BaoStock登录失败: %s", lg.error_msg) sys.exit(1) logger.info("✅ BaoStock登录成功") # 获取股票列表 if args.codes: codes = [c.strip() for c in args.codes.split(",")] else: codes = get_all_codes() if args.limit > 0: codes = codes[:args.limit] logger.info("=" * 60) logger.info("BaoStock 15min全量重建开始") logger.info(f" 股票数: {len(codes)}") logger.info(f" 日期范围: {start_date} ~ {end_date}") logger.info(f" 数据目录: {MINUTE_15_DIR}") logger.info(f" 旧数据备份: {BACKUP_DIR}") logger.info(f" 日志文件: {LOG_FILE}") # 进度恢复 done_set = load_progress() if args.force: todo = codes # 强制模式不跳过 else: todo = [c for c in codes if c not in done_set] logger.info(f" 待处理: {len(todo)}(已完成: {len(done_set)})") stats = {"ok": 0, "skipped": 0, "failed": 0, "rows": 0} t_start = time.time() for i, code in enumerate(todo): try: status, total_rows = backfill_one(code, start_date, end_date, args.force) except Exception as e: status, total_rows = "failed", 0 logger.debug("backfill %s 异常: %s", code, e) stats[status] = stats.get(status, 0) + 1 if status == "ok": stats["rows"] += total_rows done_set.add(code) if (i + 1) % PROGRESS_SAVE_EVERY == 0: save_progress(done_set) elapsed = time.time() - t_start logger.info("进度: %d/%d ok=%d skipped=%d failed=%d rows=%d (%.0f秒)", i + 1, len(todo), stats["ok"], stats["skipped"], stats["failed"], stats["rows"], elapsed) # 频率控制 if i < len(todo) - 1: time.sleep(BS_INTERVAL) # 保存最终进度 save_progress(done_set) bs.logout() elapsed = time.time() - t_start logger.info("=" * 60) logger.info("✅ 回补完成,耗时 %.1f 秒", elapsed) logger.info("统计: %s", json.dumps(stats, ensure_ascii=False)) if __name__ == "__main__": main()