From 21933cc2bba0ee03714dba2dabc53f5090d18f6f Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 5 May 2026 11:25:41 +0800 Subject: [PATCH] auto-sync: 2026-05-05 11:25:41 --- data_platform/backfill_15min_baostock.py | 132 ++++++++++++----------- 1 file changed, 69 insertions(+), 63 deletions(-) diff --git a/data_platform/backfill_15min_baostock.py b/data_platform/backfill_15min_baostock.py index f63b6b0e..4317883b 100644 --- a/data_platform/backfill_15min_baostock.py +++ b/data_platform/backfill_15min_baostock.py @@ -1,21 +1,27 @@ #!/usr/bin/env python3 """ -BaoStock 15分钟线历史回补脚本 +BaoStock 15分钟线历史回补脚本 (v1.1) -功能:用BaoStock免费数据源回补全市场15分钟K线历史数据 -特点: - - BaoStock无反爬限制,0.35s/只 - - 支持1999年以来的分钟线数据 - - 增量回补:只回补现有数据之前的空白段 - - 进度断点续传 +功能:用BaoStock免费数据源全量重建全市场15分钟K线历史数据 +策略: + - 全量重建:BaoStock获取完整历史,完全替换旧parquet + - 旧文件备份到 backup/ 目录 + - 增量模式:已有BaoStock重建过的文件自动跳过(检查标记) + +BaoStock特点: + - 无反爬限制,0.35s/只 + - 不复权数据(adjustflag=3) + - 分钟线从1999年起,我们取2024-01-01起 用法: - python3 backfill_15min_baostock.py # 全量回补到2024-01-01 + python3 backfill_15min_baostock.py # 全量回补 python3 backfill_15min_baostock.py --start 20200101 # 指定起始日期 - python3 backfill_15min_baostock.py --codes 000001,600000 # 指定股票 + python3 backfill_15min_baostock.py --limit 10 # 只处理前10只(测试) + python3 backfill_15min_baostock.py --force # 强制重建(覆盖已有BaoStock数据) 变更记录: - v1.0 (2026-05-05) 赵云创建 - BaoStock 15min历史回补 + v1.0 (2026-05-05) 赵云创建 + v1.1 (2026-05-05) 修复:旧新浪数据含后复权值,改为全量重建模式 """ import argparse @@ -36,15 +42,16 @@ import pandas as pd NAS_ROOT = Path("/Volumes/stock") MINUTE_15_DIR = NAS_ROOT / "minute_kline" / "15min" +BACKUP_DIR = MINUTE_15_DIR / "backup_sina" # 旧新浪数据备份 LOG_DIR = NAS_ROOT / "logs" / "daily_update" PROGRESS_DIR = LOG_DIR / "progress" ALL_STOCKS_FILE = NAS_ROOT / "A股数据" / "stock_info" / "all_stocks.csv" # BaoStock配置 -BS_START_DATE = "2024-01-01" # 默认回补起始日 -BS_INTERVAL = 0.4 # BaoStock请求间隔(秒) -BS_MAX_RETRIES = 2 # 单只最大重试 -PROGRESS_SAVE_EVERY = 500 # 每N只保存进度 +BS_START_DATE = "2024-01-01" +BS_INTERVAL = 0.4 # 请求间隔秒 +BS_MAX_RETRIES = 2 +PROGRESS_SAVE_EVERY = 500 # ======================== 日志 ======================== @@ -77,24 +84,22 @@ def get_all_codes() -> List[str]: def code_to_baostock(code: str) -> Tuple[str, str]: - """6位代码 → BaoStock格式 (sz.000001) 和 parquet前缀""" + """6位代码 → (BaoStock格式, parquet前缀)""" if code.startswith(("6", "68", "51")): return f"sh.{code}", "sh" else: return f"sz.{code}", "sz" -def get_parquet_last_date(parquet_path: Path) -> str: - """获取parquet中最早一条的日期(YYYY-MM-DD)""" +def is_backfilled(parquet_path: Path) -> bool: + """检查文件是否已经被BaoStock回补过(通过元数据标记)""" if not parquet_path.exists(): - return "" + return False try: - df = pd.read_parquet(parquet_path, columns=["day"]) - if not df.empty: - return str(df["day"].min())[:10] + df = pd.read_parquet(parquet_path) + return df.attrs.get("source") == "baostock" except Exception: - pass - return "" + return False def load_progress() -> set: @@ -113,13 +118,13 @@ def save_progress(done_set: set): progress_file.write_text(json.dumps({ "done": sorted(done_set), "ts": datetime.now().isoformat(), - })) + }, ensure_ascii=False)) # ======================== 核心逻辑 ======================== def fetch_bs_15min(bs_code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: - """从BaoStock获取15min数据,返回与parquet兼容的DataFrame""" + """从BaoStock获取15min不复权数据""" rs = bs.query_history_k_data_plus( bs_code, "date,time,code,open,high,low,close,volume,amount,adjustflag", @@ -143,7 +148,7 @@ def fetch_bs_15min(bs_code: str, start_date: str, end_date: str) -> Optional[pd. df = pd.DataFrame(rows, columns=["date", "time", "code", "open", "high", "low", "close", "volume", "amount", "adjustflag"]) - # 转换 time 格式: 20260428094500000 → 2026-04-28 09:45:00 + # 转换时间格式: 20260428094500000 → 2026-04-28 09:45:00 df["day"] = df["time"].apply(lambda t: f"{t[:4]}-{t[4:6]}-{t[6:8]} {t[8:10]}:{t[10:12]}:00") # 数值转换 @@ -162,30 +167,25 @@ def fetch_bs_15min(bs_code: str, start_date: str, end_date: str) -> Optional[pd. if bad_ohlc.any(): df = df[~bad_ohlc] + # 标记数据来源 + df.attrs["source"] = "baostock" + return df if not df.empty else None -def backfill_one(code: str, start_date: str) -> Tuple[str, int]: +def backfill_one(code: str, start_date: str, end_date: str, force: bool = False) -> Tuple[str, int]: """ - 回补单只股票的15min历史数据 - 返回: (status, new_rows) + 全量重建单只股票的15min历史 + 返回: (status, total_rows) """ bs_code, prefix = code_to_baostock(code) parquet_path = MINUTE_15_DIR / f"{prefix}{code}_15min.parquet" - # 确定回补结束日期 = 现有数据最早日期的前一天(避免重叠) - earliest_existing = get_parquet_last_date(parquet_path) + # 已回补过的跳过 + if not force and is_backfilled(parquet_path): + return "skipped", 0 - if earliest_existing: - # 已有数据,回补到最早日期之前 - end_date = (pd.Timestamp(earliest_existing) - pd.Timedelta(days=1)).strftime("%Y-%m-%d") - if end_date <= start_date: - return "skipped", 0 - else: - # 全新股票,回补到最近交易日 - end_date = datetime.now().strftime("%Y-%m-%d") - - # 获取数据 + # 获取BaoStock数据 df_new = None for attempt in range(BS_MAX_RETRIES): try: @@ -199,19 +199,18 @@ def backfill_one(code: str, start_date: str) -> Tuple[str, int]: if df_new is None or df_new.empty: return "failed", 0 - # 合并到parquet - try: - if parquet_path.exists(): - df_exist = pd.read_parquet(parquet_path) - df_merged = pd.concat([df_new, df_exist], ignore_index=True) - # 去重(按day字段) - df_merged = df_merged.drop_duplicates(subset=["day"], keep="last") - df_merged = df_merged.sort_values("day").reset_index(drop=True) - else: - MINUTE_15_DIR.mkdir(parents=True, exist_ok=True) - df_merged = df_new + # 备份旧文件 + if parquet_path.exists(): + BACKUP_DIR.mkdir(parents=True, exist_ok=True) + backup_path = BACKUP_DIR / parquet_path.name + if not backup_path.exists(): # 不覆盖已有备份 + shutil.copy2(str(parquet_path), str(backup_path)) - df_merged.to_parquet(parquet_path, index=False) + # 写入新文件 + try: + df_new = df_new.sort_values("day").reset_index(drop=True) + # 保存时把attrs写入parquet metadata + df_new.to_parquet(parquet_path, index=False) return "ok", len(df_new) except Exception as e: logger.error("写入 %s 失败: %s", code, e) @@ -221,13 +220,16 @@ def backfill_one(code: str, start_date: str) -> Tuple[str, int]: # ======================== 主流程 ======================== def main(): - parser = argparse.ArgumentParser(description="BaoStock 15min历史回补") + parser = argparse.ArgumentParser(description="BaoStock 15min历史回补(全量重建)") parser.add_argument("--start", default=BS_START_DATE, help="回补起始日期 (YYYYMMDD)") + parser.add_argument("--end", default="", help="结束日期 (默认今天)") parser.add_argument("--codes", help="指定股票代码,逗号分隔") parser.add_argument("--limit", type=int, default=0, help="限制处理数量(测试用)") + parser.add_argument("--force", action="store_true", help="强制重建(覆盖已有BaoStock数据)") args = parser.parse_args() start_date = f"{args.start[:4]}-{args.start[4:6]}-{args.start[6:8]}" + end_date = f"{args.end[:4]}-{args.end[4:6]}-{args.end[6:8]}" if args.end else datetime.now().strftime("%Y-%m-%d") if not NAS_ROOT.exists(): logger.error("❌ NAS未挂载") @@ -250,37 +252,41 @@ def main(): codes = codes[:args.limit] logger.info("=" * 60) - logger.info("BaoStock 15min历史回补开始") + logger.info("BaoStock 15min全量重建开始") logger.info(f" 股票数: {len(codes)}") - logger.info(f" 回补起始: {start_date}") + logger.info(f" 日期范围: {start_date} ~ {end_date}") logger.info(f" 数据目录: {MINUTE_15_DIR}") + logger.info(f" 旧数据备份: {BACKUP_DIR}") # 进度恢复 done_set = load_progress() - todo = [c for c in codes if c not in done_set] + if args.force: + todo = codes # 强制模式不跳过 + else: + todo = [c for c in codes if c not in done_set] logger.info(f" 待处理: {len(todo)}(已完成: {len(done_set)})") - stats = {"updated": 0, "skipped": 0, "failed": 0, "rows": 0} + stats = {"ok": 0, "skipped": 0, "failed": 0, "rows": 0} t_start = time.time() for i, code in enumerate(todo): try: - status, new_rows = backfill_one(code, start_date) + status, total_rows = backfill_one(code, start_date, end_date, args.force) except Exception as e: - status, new_rows = "failed", 0 + status, total_rows = "failed", 0 logger.debug("backfill %s 异常: %s", code, e) stats[status] = stats.get(status, 0) + 1 if status == "ok": - stats["rows"] += new_rows + stats["rows"] += total_rows done_set.add(code) if (i + 1) % PROGRESS_SAVE_EVERY == 0: save_progress(done_set) elapsed = time.time() - t_start - logger.info("进度: %d/%d updated=%d skipped=%d failed=%d rows=%d (%.0f秒)", - i + 1, len(todo), stats["updated"], stats["skipped"], + logger.info("进度: %d/%d ok=%d skipped=%d failed=%d rows=%d (%.0f秒)", + i + 1, len(todo), stats["ok"], stats["skipped"], stats["failed"], stats["rows"], elapsed) # 频率控制