diff --git a/data_platform/download_minute.py b/data_platform/download_minute.py index 26bc61fd..846f8919 100644 --- a/data_platform/download_minute.py +++ b/data_platform/download_minute.py @@ -2,18 +2,18 @@ """ 15分钟线数据下载脚本 -数据源降级链: - 1. 腾讯 mkline API(直接返回15分钟线) - 2. 腾讯 minute/query + 聚合为15分钟(仅当天数据) +数据源降级链: + 1. 腾讯 mkline API(直接返回15分钟线) + 2. 腾讯 minute/query + 聚合为15分钟(仅当天数据) -功能: +功能: - 支持HS300 / 全市场下载 - - 增量下载(追加新数据,不覆盖) - - 断点续传(JSON进度文件) - - 限频保护(0.3s间隔 + 重试) + - 增量下载(追加新数据,不覆盖) + - 断点续传(JSON进度文件) + - 限频保护(0.3s间隔 + 重试) - 与已有84只Parquet格式完全一致 -用法: +用法: python3 download_minute.py --scope hs300 python3 download_minute.py --scope all python3 download_minute.py --codes 000001 600519 @@ -67,8 +67,8 @@ def fetch_url(url: str, timeout: int = 10) -> str: # --- 腾讯 mkline API --- def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]: """ - 腾讯mkline API,直接返回15分钟线 - + 腾讯mkline API,直接返回15分钟线 + Args: symbol: 如 "sz000001" count: 返回条数 @@ -79,12 +79,12 @@ def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]: data = json.loads(raw) # 解析结构: data -> {symbol} -> data -> day/data stock_data = data.get("data", {}).get(symbol, {}).get("data", {}) - + # mkline返回的是 { "day": [...], "m15": [...] } klines = stock_data.get("m15", stock_data.get("day", [])) if not klines: return None - + rows = [] for line in klines: parts = line.split() @@ -100,7 +100,7 @@ def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]: "volume": str(int(float(parts[5]))), "amount": str(round(float(parts[4]) * int(float(parts[5])), 2)), # close*volume估算 }) - + if not rows: return None return pd.DataFrame(rows) @@ -112,8 +112,8 @@ def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]: # --- 腾讯 minute/query API + 聚合 --- def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame]: """ - 腾讯minute/query API,返回1分钟线,聚合为15分钟线 - + 腾讯minute/query API,返回1分钟线,聚合为15分钟线 + Args: symbol: 如 "sz000001" date: 如 "20260502" @@ -123,10 +123,10 @@ def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame] raw = fetch_url(url) data = json.loads(raw) minute_data = data.get("data", {}).get(symbol, {}).get("data", {}).get("data", []) - + if not minute_data: return None - + # 解析1分钟线: "HHMM price vol amount" one_min = [] for line in minute_data: @@ -139,10 +139,10 @@ def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame] "vol": float(parts[2]), "amount": float(parts[3]), }) - + if not one_min: return None - + df = pd.DataFrame(one_min) return _aggregate_1m_to_15m(df) except Exception as e: @@ -153,10 +153,10 @@ def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame] def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame: """将1分钟线聚合为15分钟线""" df["time"] = pd.to_datetime(df["time"]) - # 15分钟分组:按时间段切分(9:30-9:45, 9:45-10:00, ...) - # end-of-bar对齐:已有84只数据用K线结束时间(09:45, 10:00...) + # 15分钟分组:按时间段切分(9:30-9:45, 9:45-10:00, ...) + # end-of-bar对齐:已有84只数据用K线结束时间(09:45, 10:00...) df["group"] = df["time"].dt.floor("15min") + pd.Timedelta(minutes=15) - + agg = df.groupby("group").agg( open=("price", "first"), high=("price", "max"), @@ -165,7 +165,7 @@ def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame: volume=("vol", "sum"), amount=("amount", "last"), # 累计值取最后 ).reset_index() - + result = pd.DataFrame({ "day": agg["group"].dt.strftime("%Y-%m-%d %H:%M:%S"), "open": agg["open"], @@ -187,48 +187,64 @@ def get_market_prefix(code: str) -> str: def download_single(code: str) -> Tuple[Optional[pd.DataFrame], str]: - """下载单只股票15分钟线,返回(df, source)""" + """下载单只股票15分钟线,返回(df, source)""" prefix, clean = get_market_prefix(code) symbol = f"{prefix}{clean}" - - # 主源:mkline + + # 主源:mkline df = try_mkline(symbol) if df is not None and len(df) > 0: return df, "mkline" - - # 备源:minute/query + 聚合 + + # 备源:minute/query + 聚合 today = datetime.now().strftime("%Y%m%d") df = try_minute_query_aggregate(symbol, today) if df is not None and len(df) > 0: return df, "minute_query" - + return None, "failed" def download_with_increment(code: str, output_dir: Path) -> Tuple[str, int]: - """增量下载单只股票,返回(status, rows)""" + """增量下载单只股票,返回(status, rows)""" prefix, clean = get_market_prefix(code) filename = f"{prefix}{clean}_15min.parquet" parquet_path = output_dir / filename - + df_new, source = download_single(code) if df_new is None: return "failed", 0 - + + # 数据校验 + for col in ['open', 'high', 'low', 'close']: + df_new[col] = pd.to_numeric(df_new[col], errors='coerce') + # 价格>0 + bad_zero = (df_new[['close', 'open']] <= 0).any(axis=1) + if bad_zero.any(): + logger.warning("价格<=0 %s: %d条", code, bad_zero.sum()) + df_new = df_new[~bad_zero] + # OHLC一致性 + bad_ohlc = (df_new['high'] < df_new[['open', 'close']].max(axis=1)) | (df_new['low'] > df_new[['open', 'close']].min(axis=1)) + if bad_ohlc.any(): + logger.warning("OHLC异常 %s: %d条", code, bad_ohlc.sum()) + df_new = df_new[~bad_ohlc] + if df_new.empty: + return "failed", 0 + if parquet_path.exists(): - # 增量:合并去重 + # 增量:合并去重 existing = pd.read_parquet(parquet_path) combined = pd.concat([existing, df_new], ignore_index=True) combined = combined.drop_duplicates(subset=["day"], keep="last") combined = combined.sort_values("day").reset_index(drop=True) else: combined = df_new - + # 原子写入 tmp_path = parquet_path.with_suffix(".tmp") combined.to_parquet(tmp_path, index=False) tmp_path.rename(parquet_path) - + return f"ok({source})", len(df_new) @@ -252,15 +268,15 @@ def get_stock_list(scope: str) -> List[str]: for col in ["成分券代码", "代码", "code"]: if col in df.columns: return [str(c).zfill(6) for c in df[col].tolist()] - raise ValueError(f"HS300文件中找不到代码列,现有列: {list(df.columns)}") - + raise ValueError(f"HS300文件中找不到代码列,现有列: {list(df.columns)}") + if scope == "all": df = pd.read_csv(ALL_STOCKS_FILE) for col in ["代码", "code", "股票代码"]: if col in df.columns: return [str(c).zfill(6) for c in df[col].tolist()] - raise ValueError(f"全市场文件中找不到代码列,现有列: {list(df.columns)}") - + raise ValueError(f"全市场文件中找不到代码列,现有列: {list(df.columns)}") + raise ValueError(f"Unknown scope: {scope}") @@ -272,10 +288,10 @@ def main(): parser.add_argument("--resume", action="store_true", help="断点续传") parser.add_argument("--output-dir", default=str(OUTPUT_DIR), help="输出目录") args = parser.parse_args() - + output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) - + # 获取股票列表 if args.codes: codes = args.codes @@ -283,24 +299,24 @@ def main(): codes = get_stock_list(args.scope) else: parser.error("必须指定 --scope 或 --codes") - + # 断点续传 progress = load_progress() if args.resume else {"completed": [], "failed": []} skip_set = set(progress["completed"]) - + todo = [c for c in codes if c not in skip_set] logger.info("股票总数: %d, 已完成: %d, 待下载: %d", len(codes), len(skip_set), len(todo)) - + t_start = time.time() ok_count = 0 fail_count = 0 consecutive_fails = 0 - + for i, code in enumerate(todo): # 限频 if i > 0: time.sleep(REQUEST_INTERVAL) - + # 重试逻辑 status = "failed" rows = 0 @@ -312,7 +328,7 @@ def main(): except Exception as e: logger.warning(" 重试 %d/%d: %s", attempt + 1, MAX_RETRIES, e) time.sleep(1) - + if status != "failed": ok_count += 1 consecutive_fails = 0 @@ -323,20 +339,20 @@ def main(): consecutive_fails += 1 progress["failed"].append(code) logger.warning("[%d/%d] %s: FAILED", i + 1, len(todo), code) - + # 连续失败保护 if consecutive_fails >= MAX_CONSECUTIVE_FAILS: - logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) + logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) time.sleep(CONSECUTIVE_FAIL_PAUSE) consecutive_fails =0 - + # 定期保存进度 if (i + 1) % 50 == 0: save_progress(progress) - + # 最终保存 save_progress(progress) - + elapsed = time.time() - t_start logger.info("=" * 50) logger.info("下载完成: 成功 %d, 失败 %d, 耗时 %.1f 秒", ok_count, fail_count, elapsed)