From d6709c205cf9c6880802c56e55399c903cee1fa6 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sat, 2 May 2026 22:35:26 +0800 Subject: [PATCH] auto-sync: 2026-05-02 22:35:26 --- data_platform/download_minute.py | 198 +++++++++++++------------------ 1 file changed, 84 insertions(+), 114 deletions(-) diff --git a/data_platform/download_minute.py b/data_platform/download_minute.py index eb96eade..f07b324d 100644 --- a/data_platform/download_minute.py +++ b/data_platform/download_minute.py @@ -2,22 +2,23 @@ """ 15分钟线数据下载脚本 -数据源降级链: - 1. 腾讯 mkline API(直接返回15分钟线) - 2. 腾讯 minute/query + 聚合为15分钟(仅当天数据) +数据源降级链: + 1. 新浪财经15分钟K线API(有真实amount,800条/次) + 2. 腾讯 minute/query + 聚合为15分钟(仅当天数据,amount为估算) -功能: +功能: - 支持HS300 / 全市场下载 - - 增量下载(追加新数据,不覆盖) - - 断点续传(JSON进度文件) - - 限频保护(0.3s间隔 + 重试) - - 与已有84只Parquet格式完全一致 + - 增量下载(追加新数据,不覆盖已有) + - 断点续传(JSON进度文件) + - 限频保护(0.3s间隔 + 重试) + - 与已有84只Parquet格式完全一致(7列,end-of-bar时间戳) + - 数据校验(价格>0, OHLC一致性) -用法: +用法: python3 download_minute.py --scope hs300 python3 download_minute.py --scope all python3 download_minute.py --codes 000001 600519 - python3 download_minute.py --scope hs300 --resume # 断点续传 + python3 download_minute.py --scope hs300 --resume """ import argparse @@ -33,7 +34,6 @@ from pathlib import Path from typing import Optional, List, Tuple import pandas as pd -import numpy as np logging.basicConfig( level=logging.INFO, @@ -44,90 +44,72 @@ logger = logging.getLogger(__name__) # --- 配置 --- OUTPUT_DIR = Path("/Volumes/stock/minute_kline/15min") PROGRESS_FILE = OUTPUT_DIR / "download_progress.json" -REQUEST_INTERVAL = 0.3 # 秒/请求 -MAX_RETRIES = 3 # 单只重试次数 -CONSECUTIVE_FAIL_PAUSE = 60 # 连续失败暂停秒数 -MAX_CONSECUTIVE_FAILS = 5 # 连续失败阈值 -HS300_FILE = Path("/Volumes/stock/A股数据/stock_info/hs300_constituents_latest.csv") -ALL_STOCKS_FILE = Path("/Volumes/stock/sanguo_vnpy/data/all_stocks.csv") +REQUEST_INTERVAL = 0.3 +MAX_RETRIES = 3 +CONSECUTIVE_FAIL_PAUSE = 60 +MAX_CONSECUTIVE_FAILS = 5 +HS300_FILE = Path("/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data/raw/stock_info/hs300_constituents_latest.csv") +ALL_STOCKS_FILE = Path("/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data/raw/stock_info/stock_basic_info_raw_20260326_113530.csv") -HEADERS = { - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)", - "Referer": "https://finance.qq.com", -} +HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"} -# --- HTTP 工具 --- -def fetch_url(url: str, timeout: int = 10) -> str: - req = urllib.request.Request(url, headers=HEADERS) - with urllib.request.urlopen(req, timeout=timeout) as resp: - return resp.read().decode("utf-8", errors="replace") +def _make_opener(): + """创建无代理opener,避免akshare代理污染""" + return urllib.request.build_opener(urllib.request.ProxyHandler({})) -# --- 腾讯 mkline API --- -def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]: +# --- 新浪15分钟K线API(主源) --- +def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]: """ - 腾讯mkline API,直接返回15分钟线 - - Args: - symbol: 如 "sz000001" - count: 返回条数 + 新浪财经15分钟K线API + symbol: sz000001 或 sh600519 + datalen: 返回条数(最大约800) + 返回: DataFrame(day, open, high, low, close, volume, amount) 或 None """ - url = f"http://web.ifzq.gtimg.cn/appstock/app/kline/mkline?param={symbol},m15,,{count}" + url = ( + f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20=min15_{symbol}=/" + f"CN_MarketDataService.getKLineData?symbol={symbol}&scale=15&ma=no&datalen={datalen}" + ) try: - raw = fetch_url(url) - data = json.loads(raw) - # 解析结构: data -> {symbol} -> data -> day/data - stock_data = data.get("data", {}).get(symbol, {}).get("data", {}) - - # mkline返回的是 { "day": [...], "m15": [...] } - klines = stock_data.get("m15", stock_data.get("day", [])) - if not klines: + opener = _make_opener() + req = urllib.request.Request(url, headers=HEADERS) + with opener.open(req, timeout=15) as r: + raw = r.read().decode("utf-8", errors="replace") + m = re.search(r'\((\[.*\])\)', raw, re.DOTALL) + if not m: return None - - rows = [] - for line in klines: - parts = line.split() - if len(parts) >= 6: - # 格式: "YYYYMMDDHHMM open high low close volume" - dt_str = parts[0] - rows.append({ - "day": f"{dt_str[:4]}-{dt_str[4:6]}-{dt_str[6:8]} {dt_str[8:10]}:{dt_str[10:12]}:00", - "open": float(parts[1]), - "high": float(parts[2]), - "low": float(parts[3]), - "close": float(parts[4]), - "volume": str(int(float(parts[5]))), - "amount": str(round(float(parts[4]) * int(float(parts[5])), 2)), # close*volume估算 - }) - - if not rows: + data = json.loads(m.group(1)) + if not data: return None - return pd.DataFrame(rows) + df = pd.DataFrame(data) + # 确保列顺序 + cols = ["day", "open", "high", "low", "close", "volume", "amount"] + for c in cols: + if c not in df.columns: + return None + return df[cols] except Exception as e: - logger.debug("mkline failed for %s: %s", symbol, e) + logger.debug("新浪15min失败 %s: %s", symbol, e) return None -# --- 腾讯 minute/query API + 聚合 --- +# --- 腾讯 minute/query + 聚合(备源,仅当天) --- def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame]: """ - 腾讯minute/query API,返回1分钟线,聚合为15分钟线 - - Args: - symbol: 如 "sz000001" - date: 如 "20260502" + 腾讯minute/query API,返回1分钟线,聚合为15分钟线 + symbol: sz000001 + date: 20260502 """ url = f"http://web.ifzq.gtimg.cn/appstock/app/minute/query?code={symbol}" try: - raw = fetch_url(url) - data = json.loads(raw) + opener = _make_opener() + req = urllib.request.Request(url, headers=HEADERS) + with opener.open(req, timeout=10) as r: + data = json.loads(r.read()) minute_data = data.get("data", {}).get(symbol, {}).get("data", {}).get("data", []) - if not minute_data: return None - - # 解析1分钟线: "HHMM price vol amount" one_min = [] for line in minute_data: parts = line.split() @@ -139,22 +121,18 @@ def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame] "vol": float(parts[2]), "amount": float(parts[3]), }) - if not one_min: return None - - df = pd.DataFrame(one_min) - return _aggregate_1m_to_15m(df) + return _aggregate_1m_to_15m(pd.DataFrame(one_min)) except Exception as e: - logger.debug("minute_query failed for %s: %s", symbol, e) + logger.debug("minute_query失败 %s: %s", symbol, e) return None def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame: - """将1分钟线聚合为15分钟线""" + """1分钟线聚合为15分钟线(end-of-bar时间戳)""" df["time"] = pd.to_datetime(df["time"]) - # 15分钟分组:按时间段切分(9:30-9:45, 9:45-10:00, ...) - # end-of-bar对齐:已有84只数据用K线结束时间(09:45, 10:00...) + # end-of-bar对齐:已有84只数据用K线结束时间(09:45, 10:00...) df["group"] = df["time"].dt.floor("15min") + pd.Timedelta(minutes=15) agg = df.groupby("group").agg( @@ -163,10 +141,10 @@ def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame: low=("price", "min"), close=("price", "last"), volume=("vol", "sum"), - amount=("amount", "last"), # 累计值取最后 + amount=("amount", "last"), ).reset_index() - result = pd.DataFrame({ + return pd.DataFrame({ "day": agg["group"].dt.strftime("%Y-%m-%d %H:%M:%S"), "open": agg["open"], "high": agg["high"], @@ -175,11 +153,10 @@ def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame: "volume": agg["volume"].astype(str), "amount": agg["amount"].astype(str), }) - return result # --- 下载主流程 --- -def get_market_prefix(code: str) -> str: +def get_market_prefix(code: str) -> Tuple[str, str]: code = re.sub(r"[^0-9]", "", code).zfill(6) if code.startswith(("60", "68", "51")): return "sh", code @@ -187,16 +164,16 @@ def get_market_prefix(code: str) -> str: def download_single(code: str) -> Tuple[Optional[pd.DataFrame], str]: - """下载单只股票15分钟线,返回(df, source)""" + """下载单只股票15分钟线,返回(df, source)""" prefix, clean = get_market_prefix(code) symbol = f"{prefix}{clean}" - # 主源:mkline - df = try_mkline(symbol) + # 主源:新浪15分钟线 + df = try_sina_15min(symbol) if df is not None and len(df) > 0: - return df, "mkline" + return df, "sina_15min" - # 备源:minute/query + 聚合 + # 备源:minute/query + 聚合 today = datetime.now().strftime("%Y%m%d") df = try_minute_query_aggregate(symbol, today) if df is not None and len(df) > 0: @@ -206,7 +183,7 @@ def download_single(code: str) -> Tuple[Optional[pd.DataFrame], str]: def download_with_increment(code: str, output_dir: Path) -> Tuple[str, int]: - """增量下载单只股票,返回(status, rows)""" + """增量下载单只股票""" prefix, clean = get_market_prefix(code) filename = f"{prefix}{clean}_15min.parquet" parquet_path = output_dir / filename @@ -216,23 +193,30 @@ def download_with_increment(code: str, output_dir: Path) -> Tuple[str, int]: return "failed", 0 # 数据校验 - for col in ['open', 'high', 'low', 'close']: - df_new[col] = pd.to_numeric(df_new[col], errors='coerce') + for col in ["open", "high", "low", "close"]: + df_new[col] = pd.to_numeric(df_new[col], errors="coerce") + df_new["volume"] = pd.to_numeric(df_new["volume"], errors="coerce").fillna(0) + df_new["amount"] = pd.to_numeric(df_new["amount"], errors="coerce").fillna(0) + # 价格>0 - bad_zero = (df_new[['close', 'open']] <= 0).any(axis=1) + bad_zero = (df_new[["close", "open"]] <= 0).any(axis=1) if bad_zero.any(): logger.warning("价格<=0 %s: %d条", code, bad_zero.sum()) df_new = df_new[~bad_zero] # OHLC一致性 - bad_ohlc = (df_new['high'] < df_new[['open', 'close']].max(axis=1)) | (df_new['low'] > df_new[['open', 'close']].min(axis=1)) + bad_ohlc = (df_new["high"] < df_new[["open", "close"]].max(axis=1)) | \ + (df_new["low"] > df_new[["open", "close"]].min(axis=1)) if bad_ohlc.any(): logger.warning("OHLC异常 %s: %d条", code, bad_ohlc.sum()) df_new = df_new[~bad_ohlc] if df_new.empty: return "failed", 0 + # 转回object类型与已有数据兼容 + df_new["volume"] = df_new["volume"].astype(str) + df_new["amount"] = df_new["amount"].astype(str) + if parquet_path.exists(): - # 增量:合并去重 existing = pd.read_parquet(parquet_path) combined = pd.concat([existing, df_new], ignore_index=True) combined = combined.drop_duplicates(subset=["day"], keep="last") @@ -264,19 +248,16 @@ def save_progress(progress: dict): def get_stock_list(scope: str) -> List[str]: if scope == "hs300": df = pd.read_csv(HS300_FILE) - # 尝试多种列名 for col in ["成分券代码", "代码", "code"]: if col in df.columns: return [str(c).zfill(6) for c in df[col].tolist()] - raise ValueError(f"HS300文件中找不到代码列,现有列: {list(df.columns)}") - + raise ValueError(f"HS300文件中找不到代码列,现有列: {list(df.columns)}") if scope == "all": df = pd.read_csv(ALL_STOCKS_FILE) for col in ["代码", "code", "股票代码"]: if col in df.columns: return [str(c).zfill(6) for c in df[col].tolist()] - raise ValueError(f"全市场文件中找不到代码列,现有列: {list(df.columns)}") - + raise ValueError(f"全市场文件中找不到代码列,现有列: {list(df.columns)}") raise ValueError(f"Unknown scope: {scope}") @@ -292,7 +273,6 @@ def main(): output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) - # 获取股票列表 if args.codes: codes = args.codes elif args.scope: @@ -300,10 +280,8 @@ def main(): else: parser.error("必须指定 --scope 或 --codes") - # 断点续传 progress = load_progress() if args.resume else {"completed": [], "failed": []} skip_set = set(progress["completed"]) - todo = [c for c in codes if c not in skip_set] logger.info("股票总数: %d, 已完成: %d, 待下载: %d", len(codes), len(skip_set), len(todo)) @@ -313,11 +291,9 @@ def main(): consecutive_fails = 0 for i, code in enumerate(todo): - # 限频 if i > 0: time.sleep(REQUEST_INTERVAL) - # 重试逻辑 status = "failed" rows = 0 for attempt in range(MAX_RETRIES): @@ -339,24 +315,18 @@ def main(): consecutive_fails += 1 progress["failed"].append(code) logger.warning("[%d/%d] %s: FAILED", i + 1, len(todo), code) - - # 连续失败保护 if consecutive_fails >= MAX_CONSECUTIVE_FAILS: - logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) + logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) time.sleep(CONSECUTIVE_FAIL_PAUSE) - consecutive_fails =0 + consecutive_fails = 0 - # 定期保存进度 if (i + 1) % 50 == 0: save_progress(progress) - # 最终保存 save_progress(progress) - elapsed = time.time() - t_start logger.info("=" * 50) logger.info("下载完成: 成功 %d, 失败 %d, 耗时 %.1f 秒", ok_count, fail_count, elapsed) - logger.info("进度文件: %s", PROGRESS_FILE) if __name__ == "__main__":