From 6ad71f54ec88a1b295f9d544c17d2bdfc6bc86e4 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sat, 2 May 2026 21:25:52 +0800 Subject: [PATCH] auto-sync: 2026-05-02 21:25:52 --- data_platform/download_minute.py | 346 +++++++++++++++++++++++++++++++ 1 file changed, 346 insertions(+) create mode 100644 data_platform/download_minute.py diff --git a/data_platform/download_minute.py b/data_platform/download_minute.py new file mode 100644 index 00000000..78dad2dc --- /dev/null +++ b/data_platform/download_minute.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python3 +""" +15分钟线数据下载脚本 + +数据源降级链: + 1. 腾讯 mkline API(直接返回15分钟线) + 2. 腾讯 minute/query + 聚合为15分钟(仅当天数据) + +功能: + - 支持HS300 / 全市场下载 + - 增量下载(追加新数据,不覆盖) + - 断点续传(JSON进度文件) + - 限频保护(0.3s间隔 + 重试) + - 与已有84只Parquet格式完全一致 + +用法: + python3 download_minute.py --scope hs300 + python3 download_minute.py --scope all + python3 download_minute.py --codes 000001 600519 + python3 download_minute.py --scope hs300 --resume # 断点续传 +""" + +import argparse +import json +import re +import sys +import time +import logging +import urllib.request +import urllib.error +from datetime import datetime +from pathlib import Path +from typing import Optional, List, Tuple + +import pandas as pd +import numpy as np + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", +) +logger = logging.getLogger(__name__) + +# --- 配置 --- +OUTPUT_DIR = Path("/Volumes/stock/minute_kline/15min") +PROGRESS_FILE = OUTPUT_DIR / "download_progress.json" +REQUEST_INTERVAL = 0.3 # 秒/请求 +MAX_RETRIES = 3 # 单只重试次数 +CONSECUTIVE_FAIL_PAUSE = 60 # 连续失败暂停秒数 +MAX_CONSECUTIVE_FAILS = 5 # 连续失败阈值 +HS300_FILE = Path("/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data/raw/stock_info/hs300_constituents_latest.csv") +ALL_STOCKS_FILE = Path("/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data/raw/stock_info/stock_basic_info_raw_20260326_113530.csv") + +HEADERS = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)", + "Referer": "https://finance.qq.com", +} + + +# --- HTTP 工具 --- +def fetch_url(url: str, timeout: int = 10) -> str: + req = urllib.request.Request(url, headers=HEADERS) + with urllib.request.urlopen(req, timeout=timeout) as resp: + return resp.read().decode("utf-8", errors="replace") + + +# --- 腾讯 mkline API --- +def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]: + """ + 腾讯mkline API,直接返回15分钟线 + + Args: + symbol: 如 "sz000001" + count: 返回条数 + """ + url = f"http://web.ifzq.gtimg.cn/appstock/app/kline/mkline?param={symbol},m15,,{count}" + try: + raw = fetch_url(url) + data = json.loads(raw) + # 解析结构: data -> {symbol} -> data -> day/data + stock_data = data.get("data", {}).get(symbol, {}).get("data", {}) + + # mkline返回的是 { "day": [...], "m15": [...] } + klines = stock_data.get("m15", stock_data.get("day", [])) + if not klines: + return None + + rows = [] + for line in klines: + parts = line.split() + if len(parts) >= 6: + # 格式: "YYYYMMDDHHMM open high low close volume" + dt_str = parts[0] + rows.append({ + "day": f"{dt_str[:4]}-{dt_str[4:6]}-{dt_str[6:8]} {dt_str[8:10]}:{dt_str[10:12]}:00", + "open": float(parts[1]), + "high": float(parts[2]), + "low": float(parts[3]), + "close": float(parts[4]), + "volume": str(int(float(parts[5]))), + "amount": str(0.0), # mkline不返回amount + }) + + if not rows: + return None + return pd.DataFrame(rows) + except Exception as e: + logger.debug("mkline failed for %s: %s", symbol, e) + return None + + +# --- 腾讯 minute/query API + 聚合 --- +def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame]: + """ + 腾讯minute/query API,返回1分钟线,聚合为15分钟线 + + Args: + symbol: 如 "sz000001" + date: 如 "20260502" + """ + url = f"http://web.ifzq.gtimg.cn/appstock/app/minute/query?code={symbol}" + try: + raw = fetch_url(url) + data = json.loads(raw) + minute_data = data.get("data", {}).get(symbol, {}).get("data", {}).get("data", []) + + if not minute_data: + return None + + # 解析1分钟线: "HHMM price vol amount" + one_min = [] + for line in minute_data: + parts = line.split() + if len(parts) >= 4: + hhmm = parts[0] + one_min.append({ + "time": f"{date[:4]}-{date[4:6]}-{date[6:8]} {hhmm[:2]}:{hhmm[2:]}:00", + "price": float(parts[1]), + "vol": float(parts[2]), + "amount": float(parts[3]), + }) + + if not one_min: + return None + + df = pd.DataFrame(one_min) + return _aggregate_1m_to_15m(df) + except Exception as e: + logger.debug("minute_query failed for %s: %s", symbol, e) + None + + +def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame: + """将1分钟线聚合为15分钟线""" + df["time"] = pd.to_datetime(df["time"]) + # 15分钟分组:按时间段切分(9:30-9:45, 9:45-10:00, ...) + df["group"] = df["time"].dt.floor("15min") + + agg = df.groupby("group").agg( + open=("price", "first"), + high=("price", "max"), + low=("price", "min"), + close=("price", "last"), + volume=("vol", "sum"), + amount=("amount", "last"), # 累计值取最后 + ).reset_index() + + result = pd.DataFrame({ + "day": agg["group"].dt.strftime("%Y-%m-%d %H:%M:%S"), + "open": agg["open"], + "high": agg["high"], + "low": agg["low"], + "close": agg["close"], + "volume": agg["volume"].astype(str), + "amount": agg["amount"].astype(str), + }) + return result + + +# --- 下载主流程 --- +def get_market_prefix(code: str) -> str: + code = re.sub(r"[^0-9]", "", code).zfill(6) + if code.startswith(("60", "68", "51")): + return "sh", code + return "sz", code + + +def download_single(code: str) -> Tuple[Optional[pd.DataFrame], str]: + """下载单只股票15分钟线,返回(df, source)""" + prefix, clean = get_market_prefix(code) + symbol = f"{prefix}{clean}" + + # 主源:mkline + df = try_mkline(symbol) + if df is not None and len(df) > 0: + return df, "mkline" + + # 备源:minute/query + 聚合 + today = datetime.now().strftime("%Y%m%d") + df = try_minute_query_aggregate(symbol, today) + if df is not None and len(df) > 0: + return df, "minute_query" + + return None, "failed" + + +def download_with_increment(code: str, output_dir: Path) -> Tuple[str, int]: + """增量下载单只股票,返回(status, rows)""" + prefix, clean = get_market_prefix(code) + filename = f"{prefix}{clean}_15min.parquet" + parquet_path = output_dir / filename + + df_new, source = download_single(code) + if df_new is None: + return "failed", 0 + + if parquet_path.exists(): + # 增量:合并去重 + existing = pd.read_parquet(parquet_path) + combined = pd.concat([existing, df_new], ignore_index=True) + combined = combined.drop_duplicates(subset=["day"], keep="last") + combined = combined.sort_values("day").reset_index(drop=True) + else: + combined = df_new + + # 原子写入 + tmp_path = parquet_path.with_suffix(".tmp") + combined.to_parquet(tmp_path, index=False) + tmp_path.rename(parquet_path) + + return f"ok({source})", len(df_new) + + +# --- 断点续传 --- +def load_progress() -> dict: + if PROGRESS_FILE.exists(): + return json.loads(PROGRESS_FILE.read_text()) + return {"completed": [], "failed": [], "last_update": ""} + + +def save_progress(progress: dict): + progress["last_update"] = datetime.now().isoformat() + PROGRESS_FILE.write_text(json.dumps(progress, ensure_ascii=False, indent=2)) + + +# --- 股票列表 --- +def get_stock_list(scope: str) -> List[str]: + if scope == "hs300": + df = pd.read_csv(HS300_FILE) + # 尝试多种列名 + for col in ["成分券代码", "代码", "code"]: + if col in df.columns: + return [str(c).zfill(6) for c in df[col].tolist()] + raise ValueError(f"HS300文件中找不到代码列,现有列: {list(df.columns)}") + + if scope == "all": + df = pd.read_csv(ALL_STOCKS_FILE) + for col in ["代码", "code", "股票代码"]: + if col in df.columns: + return [str(c).zfill(6) for c in df[col].tolist()] + raise ValueError(f"全市场文件中找不到代码列,现有列: {list(df.columns)}") + + raise ValueError(f"Unknown scope: {scope}") + + +# --- 主入口 --- +def main(): + parser = argparse.ArgumentParser(description="15分钟线数据下载") + parser.add_argument("--scope", choices=["hs300", "all"], help="下载范围") + parser.add_argument("--codes", nargs="+", help="指定股票代码") + parser.add_argument("--resume", action="store_true", help="断点续传") + parser.add_argument("--output-dir", default=str(OUTPUT_DIR), help="输出目录") + args = parser.parse_args() + + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # 获取股票列表 + if args.codes: + codes = args.codes + elif args.scope: + codes = get_stock_list(args.scope) + else: + parser.error("必须指定 --scope 或 --codes") + + # 断点续传 + progress = load_progress() if args.resume else {"completed": [], "failed": []} + skip_set = set(progress["completed"]) + + todo = [c for c in codes if c not in skip_set] + logger.info("股票总数: %d, 已完成: %d, 待下载: %d", len(codes), len(skip_set), len(todo)) + + t_start = time.time() + ok_count = 0 + fail_count = 0 + consecutive_fails = 0 + + for i, code in enumerate(todo): + # 限频 + if i > 0: + time.sleep(REQUEST_INTERVAL) + + # 重试逻辑 + status = "failed" + rows = 0 + for attempt in range(MAX_RETRIES): + try: + status, rows = download_with_increment(code, output_dir) + if status != "failed": + break + except Exception as e: + logger.warning(" 重试 %d/%d: %s", attempt + 1, MAX_RETRIES, e) + time.sleep(1) + + if status != "failed": + ok_count += 1 + consecutive_fails = 0 + progress["completed"].append(code) + logger.info("[%d/%d] %s: %s (%d rows)", i + 1, len(todo), code, status, rows) + else: + fail_count += 1 + consecutive_fails += 1 + progress["failed"].append(code) + logger.warning("[%d/%d] %s: FAILED", i + 1, len(todo), code) + + # 连续失败保护 + if consecutive_fails >= MAX_CONSECUTIVE_FAILS: + logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) + time.sleep(CONSECUTIVE_FAIL_PAUSE) + consecutive_fails =0 + + # 定期保存进度 + if (i + 1) % 50 == 0: + save_progress(progress) + + # 最终保存 + save_progress(progress) + + elapsed = time.time() - t_start + logger.info("=" * 50) + logger.info("下载完成: 成功 %d, 失败 %d, 耗时 %.1f 秒", ok_count, fail_count, elapsed) + logger.info("进度文件: %s", PROGRESS_FILE) + + +if __name__ == "__main__": + main()