304 lines
9.8 KiB
Python
304 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
BaoStock 15分钟线历史回补脚本 (v1.1)
|
||
|
||
功能:用BaoStock免费数据源全量重建全市场15分钟K线历史数据
|
||
策略:
|
||
- 全量重建:BaoStock获取完整历史,完全替换旧parquet
|
||
- 旧文件备份到 backup/ 目录
|
||
- 增量模式:已有BaoStock重建过的文件自动跳过(检查标记)
|
||
|
||
BaoStock特点:
|
||
- 无反爬限制,0.35s/只
|
||
- 不复权数据(adjustflag=3)
|
||
- 分钟线从1999年起,我们取2024-01-01起
|
||
|
||
用法:
|
||
python3 backfill_15min_baostock.py # 全量回补
|
||
python3 backfill_15min_baostock.py --start 20200101 # 指定起始日期
|
||
python3 backfill_15min_baostock.py --limit 10 # 只处理前10只(测试)
|
||
python3 backfill_15min_baostock.py --force # 强制重建(覆盖已有BaoStock数据)
|
||
|
||
变更记录:
|
||
v1.0 (2026-05-05) 赵云创建
|
||
v1.1 (2026-05-05) 修复:旧新浪数据含后复权值,改为全量重建模式
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
import os
|
||
import shutil
|
||
import sys
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import List, Optional, Tuple
|
||
|
||
import baostock as bs
|
||
import pandas as pd
|
||
|
||
# ======================== 配置 ========================
|
||
|
||
NAS_ROOT = Path("/Volumes/stock")
|
||
MINUTE_15_DIR = NAS_ROOT / "minute_kline" / "15min"
|
||
BACKUP_DIR = MINUTE_15_DIR / "backup_sina" # 旧新浪数据备份
|
||
LOG_DIR = NAS_ROOT / "logs" / "daily_update"
|
||
PROGRESS_DIR = LOG_DIR / "progress"
|
||
ALL_STOCKS_FILE = NAS_ROOT / "A股数据" / "stock_info" / "all_stocks.csv"
|
||
|
||
# BaoStock配置
|
||
BS_START_DATE = "2024-01-01"
|
||
BS_INTERVAL = 0.4 # 请求间隔秒
|
||
BS_MAX_RETRIES = 2
|
||
PROGRESS_SAVE_EVERY = 500
|
||
|
||
# ======================== 日志 ========================
|
||
|
||
def setup_logging():
|
||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
log_file = LOG_DIR / f"backfill_15min_{ts}.log"
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)s %(message)s",
|
||
handlers=[
|
||
logging.FileHandler(log_file, encoding="utf-8"),
|
||
logging.StreamHandler(),
|
||
],
|
||
)
|
||
return logging.getLogger(__name__)
|
||
|
||
logger = setup_logging()
|
||
|
||
|
||
# ======================== 工具函数 ========================
|
||
|
||
def get_all_codes() -> List[str]:
|
||
df = pd.read_csv(ALL_STOCKS_FILE)
|
||
for col in ["代码", "code", "股票代码"]:
|
||
if col in df.columns:
|
||
return [str(c).zfill(6) for c in df[col].tolist()]
|
||
raise ValueError(f"找不到代码列: {list(df.columns)}")
|
||
|
||
|
||
def code_to_baostock(code: str) -> Tuple[str, str]:
|
||
"""6位代码 → (BaoStock格式, parquet前缀)"""
|
||
if code.startswith(("6", "68", "51")):
|
||
return f"sh.{code}", "sh"
|
||
else:
|
||
return f"sz.{code}", "sz"
|
||
|
||
|
||
def is_backfilled(parquet_path: Path) -> bool:
|
||
"""检查文件是否已经被BaoStock回补过"""
|
||
# 用标记文件判断
|
||
marker = parquet_path.parent / f".{parquet_path.stem}.baostock"
|
||
return marker.exists()
|
||
|
||
|
||
def load_progress() -> set:
|
||
progress_file = PROGRESS_DIR / "backfill_15min_progress.json"
|
||
if progress_file.exists():
|
||
try:
|
||
return set(json.loads(progress_file.read_text()).get("done", []))
|
||
except Exception:
|
||
pass
|
||
return set()
|
||
|
||
|
||
def save_progress(done_set: set):
|
||
PROGRESS_DIR.mkdir(parents=True, exist_ok=True)
|
||
progress_file = PROGRESS_DIR / "backfill_15min_progress.json"
|
||
progress_file.write_text(json.dumps({
|
||
"done": sorted(done_set),
|
||
"ts": datetime.now().isoformat(),
|
||
}, ensure_ascii=False))
|
||
|
||
|
||
# ======================== 核心逻辑 ========================
|
||
|
||
def fetch_bs_15min(bs_code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
|
||
"""从BaoStock获取15min不复权数据"""
|
||
rs = bs.query_history_k_data_plus(
|
||
bs_code,
|
||
"date,time,code,open,high,low,close,volume,amount,adjustflag",
|
||
start_date=start_date,
|
||
end_date=end_date,
|
||
frequency="15",
|
||
adjustflag="3", # 不复权
|
||
)
|
||
|
||
if rs.error_code != "0":
|
||
logger.debug("BaoStock %s 错误: %s %s", bs_code, rs.error_code, rs.error_msg)
|
||
return None
|
||
|
||
rows = []
|
||
while rs.next():
|
||
rows.append(rs.get_row_data())
|
||
|
||
if not rows:
|
||
return None
|
||
|
||
df = pd.DataFrame(rows, columns=["date", "time", "code", "open", "high", "low",
|
||
"close", "volume", "amount", "adjustflag"])
|
||
|
||
# 转换时间格式: 20260428094500000 → 2026-04-28 09:45:00
|
||
df["day"] = df["time"].apply(lambda t: f"{t[:4]}-{t[4:6]}-{t[6:8]} {t[8:10]}:{t[10:12]}:00")
|
||
|
||
# 数值转换
|
||
for col in ["open", "high", "low", "close"]:
|
||
df[col] = pd.to_numeric(df[col], errors="coerce")
|
||
df["volume"] = df["volume"].astype(str)
|
||
df["amount"] = df["amount"].astype(str)
|
||
|
||
# 保留与parquet一致的列
|
||
df = df[["day", "open", "high", "low", "close", "volume", "amount"]]
|
||
|
||
# 过滤无效数据
|
||
df = df.dropna(subset=["open", "high", "low", "close"])
|
||
bad_ohlc = (df["high"] < df[["open", "close"]].max(axis=1)) | \
|
||
(df["low"] > df[["open", "close"]].min(axis=1))
|
||
if bad_ohlc.any():
|
||
df = df[~bad_ohlc]
|
||
|
||
return df if not df.empty else None
|
||
|
||
|
||
def backfill_one(code: str, start_date: str, end_date: str, force: bool = False) -> Tuple[str, int]:
|
||
"""
|
||
全量重建单只股票的15min历史
|
||
返回: (status, total_rows)
|
||
"""
|
||
bs_code, prefix = code_to_baostock(code)
|
||
parquet_path = MINUTE_15_DIR / f"{prefix}{code}_15min.parquet"
|
||
|
||
# 已回补过的跳过
|
||
if not force and is_backfilled(parquet_path):
|
||
return "skipped", 0
|
||
|
||
# 获取BaoStock数据
|
||
df_new = None
|
||
for attempt in range(BS_MAX_RETRIES):
|
||
try:
|
||
df_new = fetch_bs_15min(bs_code, start_date, end_date)
|
||
if df_new is not None and len(df_new) > 0:
|
||
break
|
||
except Exception as e:
|
||
logger.debug("backfill %s 重试%d: %s", code, attempt + 1, e)
|
||
time.sleep(1)
|
||
|
||
if df_new is None or df_new.empty:
|
||
return "failed", 0
|
||
|
||
# 备份旧文件
|
||
if parquet_path.exists():
|
||
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
|
||
backup_path = BACKUP_DIR / parquet_path.name
|
||
if not backup_path.exists(): # 不覆盖已有备份
|
||
shutil.copy2(str(parquet_path), str(backup_path))
|
||
|
||
# 写入新文件
|
||
try:
|
||
df_new = df_new.sort_values("day").reset_index(drop=True)
|
||
df_new.to_parquet(parquet_path, index=False)
|
||
# 写标记文件
|
||
marker = parquet_path.parent / f".{parquet_path.stem}.baostock"
|
||
marker.write_text(datetime.now().isoformat())
|
||
return "ok", len(df_new)
|
||
except Exception as e:
|
||
logger.error("写入 %s 失败: %s", code, e)
|
||
return "failed", 0
|
||
|
||
|
||
# ======================== 主流程 ========================
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="BaoStock 15min历史回补(全量重建)")
|
||
parser.add_argument("--start", default=BS_START_DATE, help="回补起始日期 (YYYYMMDD)")
|
||
parser.add_argument("--end", default="", help="结束日期 (默认今天)")
|
||
parser.add_argument("--codes", help="指定股票代码,逗号分隔")
|
||
parser.add_argument("--limit", type=int, default=0, help="限制处理数量(测试用)")
|
||
parser.add_argument("--force", action="store_true", help="强制重建(覆盖已有BaoStock数据)")
|
||
args = parser.parse_args()
|
||
|
||
start_date = f"{args.start[:4]}-{args.start[4:6]}-{args.start[6:8]}"
|
||
end_date = f"{args.end[:4]}-{args.end[4:6]}-{args.end[6:8]}" if args.end else datetime.now().strftime("%Y-%m-%d")
|
||
|
||
if not NAS_ROOT.exists():
|
||
logger.error("❌ NAS未挂载")
|
||
sys.exit(1)
|
||
|
||
# 登录BaoStock
|
||
lg = bs.login()
|
||
if lg.error_code != "0":
|
||
logger.error("❌ BaoStock登录失败: %s", lg.error_msg)
|
||
sys.exit(1)
|
||
logger.info("✅ BaoStock登录成功")
|
||
|
||
# 获取股票列表
|
||
if args.codes:
|
||
codes = [c.strip() for c in args.codes.split(",")]
|
||
else:
|
||
codes = get_all_codes()
|
||
|
||
if args.limit > 0:
|
||
codes = codes[:args.limit]
|
||
|
||
logger.info("=" * 60)
|
||
logger.info("BaoStock 15min全量重建开始")
|
||
logger.info(f" 股票数: {len(codes)}")
|
||
logger.info(f" 日期范围: {start_date} ~ {end_date}")
|
||
logger.info(f" 数据目录: {MINUTE_15_DIR}")
|
||
logger.info(f" 旧数据备份: {BACKUP_DIR}")
|
||
|
||
# 进度恢复
|
||
done_set = load_progress()
|
||
if args.force:
|
||
todo = codes # 强制模式不跳过
|
||
else:
|
||
todo = [c for c in codes if c not in done_set]
|
||
logger.info(f" 待处理: {len(todo)}(已完成: {len(done_set)})")
|
||
|
||
stats = {"ok": 0, "skipped": 0, "failed": 0, "rows": 0}
|
||
t_start = time.time()
|
||
|
||
for i, code in enumerate(todo):
|
||
try:
|
||
status, total_rows = backfill_one(code, start_date, end_date, args.force)
|
||
except Exception as e:
|
||
status, total_rows = "failed", 0
|
||
logger.debug("backfill %s 异常: %s", code, e)
|
||
|
||
stats[status] = stats.get(status, 0) + 1
|
||
if status == "ok":
|
||
stats["rows"] += total_rows
|
||
|
||
done_set.add(code)
|
||
|
||
if (i + 1) % PROGRESS_SAVE_EVERY == 0:
|
||
save_progress(done_set)
|
||
elapsed = time.time() - t_start
|
||
logger.info("进度: %d/%d ok=%d skipped=%d failed=%d rows=%d (%.0f秒)",
|
||
i + 1, len(todo), stats["ok"], stats["skipped"],
|
||
stats["failed"], stats["rows"], elapsed)
|
||
|
||
# 频率控制
|
||
if i < len(todo) - 1:
|
||
time.sleep(BS_INTERVAL)
|
||
|
||
# 保存最终进度
|
||
save_progress(done_set)
|
||
|
||
bs.logout()
|
||
|
||
elapsed = time.time() - t_start
|
||
logger.info("=" * 60)
|
||
logger.info("✅ 回补完成,耗时 %.1f 秒", elapsed)
|
||
logger.info("统计: %s", json.dumps(stats, ensure_ascii=False))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|