diff --git a/data_platform/backfill_15min_baostock.py b/data_platform/backfill_15min_baostock.py new file mode 100644 index 00000000..f63b6b0e --- /dev/null +++ b/data_platform/backfill_15min_baostock.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python3 +""" +BaoStock 15分钟线历史回补脚本 + +功能:用BaoStock免费数据源回补全市场15分钟K线历史数据 +特点: + - BaoStock无反爬限制,0.35s/只 + - 支持1999年以来的分钟线数据 + - 增量回补:只回补现有数据之前的空白段 + - 进度断点续传 + +用法: + python3 backfill_15min_baostock.py # 全量回补到2024-01-01 + python3 backfill_15min_baostock.py --start 20200101 # 指定起始日期 + python3 backfill_15min_baostock.py --codes 000001,600000 # 指定股票 + +变更记录: + v1.0 (2026-05-05) 赵云创建 - BaoStock 15min历史回补 +""" + +import argparse +import json +import logging +import os +import shutil +import sys +import time +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Tuple + +import baostock as bs +import pandas as pd + +# ======================== 配置 ======================== + +NAS_ROOT = Path("/Volumes/stock") +MINUTE_15_DIR = NAS_ROOT / "minute_kline" / "15min" +LOG_DIR = NAS_ROOT / "logs" / "daily_update" +PROGRESS_DIR = LOG_DIR / "progress" +ALL_STOCKS_FILE = NAS_ROOT / "A股数据" / "stock_info" / "all_stocks.csv" + +# BaoStock配置 +BS_START_DATE = "2024-01-01" # 默认回补起始日 +BS_INTERVAL = 0.4 # BaoStock请求间隔(秒) +BS_MAX_RETRIES = 2 # 单只最大重试 +PROGRESS_SAVE_EVERY = 500 # 每N只保存进度 + +# ======================== 日志 ======================== + +def setup_logging(): + LOG_DIR.mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file = LOG_DIR / f"backfill_15min_{ts}.log" + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + handlers=[ + logging.FileHandler(log_file, encoding="utf-8"), + logging.StreamHandler(), + ], + ) + return logging.getLogger(__name__) + +logger = setup_logging() + + +# ======================== 工具函数 ======================== + +def get_all_codes() -> List[str]: + df = pd.read_csv(ALL_STOCKS_FILE) + for col in ["代码", "code", "股票代码"]: + if col in df.columns: + return [str(c).zfill(6) for c in df[col].tolist()] + raise ValueError(f"找不到代码列: {list(df.columns)}") + + +def code_to_baostock(code: str) -> Tuple[str, str]: + """6位代码 → BaoStock格式 (sz.000001) 和 parquet前缀""" + if code.startswith(("6", "68", "51")): + return f"sh.{code}", "sh" + else: + return f"sz.{code}", "sz" + + +def get_parquet_last_date(parquet_path: Path) -> str: + """获取parquet中最早一条的日期(YYYY-MM-DD)""" + if not parquet_path.exists(): + return "" + try: + df = pd.read_parquet(parquet_path, columns=["day"]) + if not df.empty: + return str(df["day"].min())[:10] + except Exception: + pass + return "" + + +def load_progress() -> set: + progress_file = PROGRESS_DIR / "backfill_15min_progress.json" + if progress_file.exists(): + try: + return set(json.loads(progress_file.read_text()).get("done", [])) + except Exception: + pass + return set() + + +def save_progress(done_set: set): + PROGRESS_DIR.mkdir(parents=True, exist_ok=True) + progress_file = PROGRESS_DIR / "backfill_15min_progress.json" + progress_file.write_text(json.dumps({ + "done": sorted(done_set), + "ts": datetime.now().isoformat(), + })) + + +# ======================== 核心逻辑 ======================== + +def fetch_bs_15min(bs_code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: + """从BaoStock获取15min数据,返回与parquet兼容的DataFrame""" + rs = bs.query_history_k_data_plus( + bs_code, + "date,time,code,open,high,low,close,volume,amount,adjustflag", + start_date=start_date, + end_date=end_date, + frequency="15", + adjustflag="3", # 不复权 + ) + + if rs.error_code != "0": + logger.debug("BaoStock %s 错误: %s %s", bs_code, rs.error_code, rs.error_msg) + return None + + rows = [] + while rs.next(): + rows.append(rs.get_row_data()) + + if not rows: + return None + + df = pd.DataFrame(rows, columns=["date", "time", "code", "open", "high", "low", + "close", "volume", "amount", "adjustflag"]) + + # 转换 time 格式: 20260428094500000 → 2026-04-28 09:45:00 + df["day"] = df["time"].apply(lambda t: f"{t[:4]}-{t[4:6]}-{t[6:8]} {t[8:10]}:{t[10:12]}:00") + + # 数值转换 + for col in ["open", "high", "low", "close"]: + df[col] = pd.to_numeric(df[col], errors="coerce") + df["volume"] = df["volume"].astype(str) + df["amount"] = df["amount"].astype(str) + + # 保留与parquet一致的列 + df = df[["day", "open", "high", "low", "close", "volume", "amount"]] + + # 过滤无效数据 + df = df.dropna(subset=["open", "high", "low", "close"]) + bad_ohlc = (df["high"] < df[["open", "close"]].max(axis=1)) | \ + (df["low"] > df[["open", "close"]].min(axis=1)) + if bad_ohlc.any(): + df = df[~bad_ohlc] + + return df if not df.empty else None + + +def backfill_one(code: str, start_date: str) -> Tuple[str, int]: + """ + 回补单只股票的15min历史数据 + 返回: (status, new_rows) + """ + bs_code, prefix = code_to_baostock(code) + parquet_path = MINUTE_15_DIR / f"{prefix}{code}_15min.parquet" + + # 确定回补结束日期 = 现有数据最早日期的前一天(避免重叠) + earliest_existing = get_parquet_last_date(parquet_path) + + if earliest_existing: + # 已有数据,回补到最早日期之前 + end_date = (pd.Timestamp(earliest_existing) - pd.Timedelta(days=1)).strftime("%Y-%m-%d") + if end_date <= start_date: + return "skipped", 0 + else: + # 全新股票,回补到最近交易日 + end_date = datetime.now().strftime("%Y-%m-%d") + + # 获取数据 + df_new = None + for attempt in range(BS_MAX_RETRIES): + try: + df_new = fetch_bs_15min(bs_code, start_date, end_date) + if df_new is not None and len(df_new) > 0: + break + except Exception as e: + logger.debug("backfill %s 重试%d: %s", code, attempt + 1, e) + time.sleep(1) + + if df_new is None or df_new.empty: + return "failed", 0 + + # 合并到parquet + try: + if parquet_path.exists(): + df_exist = pd.read_parquet(parquet_path) + df_merged = pd.concat([df_new, df_exist], ignore_index=True) + # 去重(按day字段) + df_merged = df_merged.drop_duplicates(subset=["day"], keep="last") + df_merged = df_merged.sort_values("day").reset_index(drop=True) + else: + MINUTE_15_DIR.mkdir(parents=True, exist_ok=True) + df_merged = df_new + + df_merged.to_parquet(parquet_path, index=False) + return "ok", len(df_new) + except Exception as e: + logger.error("写入 %s 失败: %s", code, e) + return "failed", 0 + + +# ======================== 主流程 ======================== + +def main(): + parser = argparse.ArgumentParser(description="BaoStock 15min历史回补") + parser.add_argument("--start", default=BS_START_DATE, help="回补起始日期 (YYYYMMDD)") + parser.add_argument("--codes", help="指定股票代码,逗号分隔") + parser.add_argument("--limit", type=int, default=0, help="限制处理数量(测试用)") + args = parser.parse_args() + + start_date = f"{args.start[:4]}-{args.start[4:6]}-{args.start[6:8]}" + + if not NAS_ROOT.exists(): + logger.error("❌ NAS未挂载") + sys.exit(1) + + # 登录BaoStock + lg = bs.login() + if lg.error_code != "0": + logger.error("❌ BaoStock登录失败: %s", lg.error_msg) + sys.exit(1) + logger.info("✅ BaoStock登录成功") + + # 获取股票列表 + if args.codes: + codes = [c.strip() for c in args.codes.split(",")] + else: + codes = get_all_codes() + + if args.limit > 0: + codes = codes[:args.limit] + + logger.info("=" * 60) + logger.info("BaoStock 15min历史回补开始") + logger.info(f" 股票数: {len(codes)}") + logger.info(f" 回补起始: {start_date}") + logger.info(f" 数据目录: {MINUTE_15_DIR}") + + # 进度恢复 + done_set = load_progress() + todo = [c for c in codes if c not in done_set] + logger.info(f" 待处理: {len(todo)}(已完成: {len(done_set)})") + + stats = {"updated": 0, "skipped": 0, "failed": 0, "rows": 0} + t_start = time.time() + + for i, code in enumerate(todo): + try: + status, new_rows = backfill_one(code, start_date) + except Exception as e: + status, new_rows = "failed", 0 + logger.debug("backfill %s 异常: %s", code, e) + + stats[status] = stats.get(status, 0) + 1 + if status == "ok": + stats["rows"] += new_rows + + done_set.add(code) + + if (i + 1) % PROGRESS_SAVE_EVERY == 0: + save_progress(done_set) + elapsed = time.time() - t_start + logger.info("进度: %d/%d updated=%d skipped=%d failed=%d rows=%d (%.0f秒)", + i + 1, len(todo), stats["updated"], stats["skipped"], + stats["failed"], stats["rows"], elapsed) + + # 频率控制 + if i < len(todo) - 1: + time.sleep(BS_INTERVAL) + + # 保存最终进度 + save_progress(done_set) + + bs.logout() + + elapsed = time.time() - t_start + logger.info("=" * 60) + logger.info("✅ 回补完成,耗时 %.1f 秒", elapsed) + logger.info("统计: %s", json.dumps(stats, ensure_ascii=False)) + + +if __name__ == "__main__": + main()