auto-sync: 2026-05-02 21:25:52

This commit is contained in:
cfdaily
2026-05-02 21:25:52 +08:00
parent b4face2258
commit 6ad71f54ec
+346
View File
@@ -0,0 +1,346 @@
#!/usr/bin/env python3
"""
15分钟线数据下载脚本
数据源降级链:
1. 腾讯 mkline API(直接返回15分钟线)
2. 腾讯 minute/query + 聚合为15分钟(仅当天数据)
功能:
- 支持HS300 / 全市场下载
- 增量下载(追加新数据,不覆盖)
- 断点续传(JSON进度文件)
- 限频保护(0.3s间隔 + 重试)
- 与已有84只Parquet格式完全一致
用法:
python3 download_minute.py --scope hs300
python3 download_minute.py --scope all
python3 download_minute.py --codes 000001 600519
python3 download_minute.py --scope hs300 --resume # 断点续传
"""
import argparse
import json
import re
import sys
import time
import logging
import urllib.request
import urllib.error
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Tuple
import pandas as pd
import numpy as np
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
# --- 配置 ---
OUTPUT_DIR = Path("/Volumes/stock/minute_kline/15min")
PROGRESS_FILE = OUTPUT_DIR / "download_progress.json"
REQUEST_INTERVAL = 0.3 # 秒/请求
MAX_RETRIES = 3 # 单只重试次数
CONSECUTIVE_FAIL_PAUSE = 60 # 连续失败暂停秒数
MAX_CONSECUTIVE_FAILS = 5 # 连续失败阈值
HS300_FILE = Path("/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data/raw/stock_info/hs300_constituents_latest.csv")
ALL_STOCKS_FILE = Path("/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data/raw/stock_info/stock_basic_info_raw_20260326_113530.csv")
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"Referer": "https://finance.qq.com",
}
# --- HTTP 工具 ---
def fetch_url(url: str, timeout: int = 10) -> str:
req = urllib.request.Request(url, headers=HEADERS)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="replace")
# --- 腾讯 mkline API ---
def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]:
"""
腾讯mkline API,直接返回15分钟线
Args:
symbol: 如 "sz000001"
count: 返回条数
"""
url = f"http://web.ifzq.gtimg.cn/appstock/app/kline/mkline?param={symbol},m15,,{count}"
try:
raw = fetch_url(url)
data = json.loads(raw)
# 解析结构: data -> {symbol} -> data -> day/data
stock_data = data.get("data", {}).get(symbol, {}).get("data", {})
# mkline返回的是 { "day": [...], "m15": [...] }
klines = stock_data.get("m15", stock_data.get("day", []))
if not klines:
return None
rows = []
for line in klines:
parts = line.split()
if len(parts) >= 6:
# 格式: "YYYYMMDDHHMM open high low close volume"
dt_str = parts[0]
rows.append({
"day": f"{dt_str[:4]}-{dt_str[4:6]}-{dt_str[6:8]} {dt_str[8:10]}:{dt_str[10:12]}:00",
"open": float(parts[1]),
"high": float(parts[2]),
"low": float(parts[3]),
"close": float(parts[4]),
"volume": str(int(float(parts[5]))),
"amount": str(0.0), # mkline不返回amount
})
if not rows:
return None
return pd.DataFrame(rows)
except Exception as e:
logger.debug("mkline failed for %s: %s", symbol, e)
return None
# --- 腾讯 minute/query API + 聚合 ---
def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame]:
"""
腾讯minute/query API,返回1分钟线,聚合为15分钟线
Args:
symbol: 如 "sz000001"
date: 如 "20260502"
"""
url = f"http://web.ifzq.gtimg.cn/appstock/app/minute/query?code={symbol}"
try:
raw = fetch_url(url)
data = json.loads(raw)
minute_data = data.get("data", {}).get(symbol, {}).get("data", {}).get("data", [])
if not minute_data:
return None
# 解析1分钟线: "HHMM price vol amount"
one_min = []
for line in minute_data:
parts = line.split()
if len(parts) >= 4:
hhmm = parts[0]
one_min.append({
"time": f"{date[:4]}-{date[4:6]}-{date[6:8]} {hhmm[:2]}:{hhmm[2:]}:00",
"price": float(parts[1]),
"vol": float(parts[2]),
"amount": float(parts[3]),
})
if not one_min:
return None
df = pd.DataFrame(one_min)
return _aggregate_1m_to_15m(df)
except Exception as e:
logger.debug("minute_query failed for %s: %s", symbol, e)
None
def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame:
"""将1分钟线聚合为15分钟线"""
df["time"] = pd.to_datetime(df["time"])
# 15分钟分组:按时间段切分(9:30-9:45, 9:45-10:00, ...
df["group"] = df["time"].dt.floor("15min")
agg = df.groupby("group").agg(
open=("price", "first"),
high=("price", "max"),
low=("price", "min"),
close=("price", "last"),
volume=("vol", "sum"),
amount=("amount", "last"), # 累计值取最后
).reset_index()
result = pd.DataFrame({
"day": agg["group"].dt.strftime("%Y-%m-%d %H:%M:%S"),
"open": agg["open"],
"high": agg["high"],
"low": agg["low"],
"close": agg["close"],
"volume": agg["volume"].astype(str),
"amount": agg["amount"].astype(str),
})
return result
# --- 下载主流程 ---
def get_market_prefix(code: str) -> str:
code = re.sub(r"[^0-9]", "", code).zfill(6)
if code.startswith(("60", "68", "51")):
return "sh", code
return "sz", code
def download_single(code: str) -> Tuple[Optional[pd.DataFrame], str]:
"""下载单只股票15分钟线,返回(df, source)"""
prefix, clean = get_market_prefix(code)
symbol = f"{prefix}{clean}"
# 主源:mkline
df = try_mkline(symbol)
if df is not None and len(df) > 0:
return df, "mkline"
# 备源:minute/query + 聚合
today = datetime.now().strftime("%Y%m%d")
df = try_minute_query_aggregate(symbol, today)
if df is not None and len(df) > 0:
return df, "minute_query"
return None, "failed"
def download_with_increment(code: str, output_dir: Path) -> Tuple[str, int]:
"""增量下载单只股票,返回(status, rows)"""
prefix, clean = get_market_prefix(code)
filename = f"{prefix}{clean}_15min.parquet"
parquet_path = output_dir / filename
df_new, source = download_single(code)
if df_new is None:
return "failed", 0
if parquet_path.exists():
# 增量:合并去重
existing = pd.read_parquet(parquet_path)
combined = pd.concat([existing, df_new], ignore_index=True)
combined = combined.drop_duplicates(subset=["day"], keep="last")
combined = combined.sort_values("day").reset_index(drop=True)
else:
combined = df_new
# 原子写入
tmp_path = parquet_path.with_suffix(".tmp")
combined.to_parquet(tmp_path, index=False)
tmp_path.rename(parquet_path)
return f"ok({source})", len(df_new)
# --- 断点续传 ---
def load_progress() -> dict:
if PROGRESS_FILE.exists():
return json.loads(PROGRESS_FILE.read_text())
return {"completed": [], "failed": [], "last_update": ""}
def save_progress(progress: dict):
progress["last_update"] = datetime.now().isoformat()
PROGRESS_FILE.write_text(json.dumps(progress, ensure_ascii=False, indent=2))
# --- 股票列表 ---
def get_stock_list(scope: str) -> List[str]:
if scope == "hs300":
df = pd.read_csv(HS300_FILE)
# 尝试多种列名
for col in ["成分券代码", "代码", "code"]:
if col in df.columns:
return [str(c).zfill(6) for c in df[col].tolist()]
raise ValueError(f"HS300文件中找不到代码列,现有列: {list(df.columns)}")
if scope == "all":
df = pd.read_csv(ALL_STOCKS_FILE)
for col in ["代码", "code", "股票代码"]:
if col in df.columns:
return [str(c).zfill(6) for c in df[col].tolist()]
raise ValueError(f"全市场文件中找不到代码列,现有列: {list(df.columns)}")
raise ValueError(f"Unknown scope: {scope}")
# --- 主入口 ---
def main():
parser = argparse.ArgumentParser(description="15分钟线数据下载")
parser.add_argument("--scope", choices=["hs300", "all"], help="下载范围")
parser.add_argument("--codes", nargs="+", help="指定股票代码")
parser.add_argument("--resume", action="store_true", help="断点续传")
parser.add_argument("--output-dir", default=str(OUTPUT_DIR), help="输出目录")
args = parser.parse_args()
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# 获取股票列表
if args.codes:
codes = args.codes
elif args.scope:
codes = get_stock_list(args.scope)
else:
parser.error("必须指定 --scope 或 --codes")
# 断点续传
progress = load_progress() if args.resume else {"completed": [], "failed": []}
skip_set = set(progress["completed"])
todo = [c for c in codes if c not in skip_set]
logger.info("股票总数: %d, 已完成: %d, 待下载: %d", len(codes), len(skip_set), len(todo))
t_start = time.time()
ok_count = 0
fail_count = 0
consecutive_fails = 0
for i, code in enumerate(todo):
# 限频
if i > 0:
time.sleep(REQUEST_INTERVAL)
# 重试逻辑
status = "failed"
rows = 0
for attempt in range(MAX_RETRIES):
try:
status, rows = download_with_increment(code, output_dir)
if status != "failed":
break
except Exception as e:
logger.warning(" 重试 %d/%d: %s", attempt + 1, MAX_RETRIES, e)
time.sleep(1)
if status != "failed":
ok_count += 1
consecutive_fails = 0
progress["completed"].append(code)
logger.info("[%d/%d] %s: %s (%d rows)", i + 1, len(todo), code, status, rows)
else:
fail_count += 1
consecutive_fails += 1
progress["failed"].append(code)
logger.warning("[%d/%d] %s: FAILED", i + 1, len(todo), code)
# 连续失败保护
if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
logger.error("连续失败 %d 次,暂停 %d", consecutive_fails, CONSECUTIVE_FAIL_PAUSE)
time.sleep(CONSECUTIVE_FAIL_PAUSE)
consecutive_fails =0
# 定期保存进度
if (i + 1) % 50 == 0:
save_progress(progress)
# 最终保存
save_progress(progress)
elapsed = time.time() - t_start
logger.info("=" * 50)
logger.info("下载完成: 成功 %d, 失败 %d, 耗时 %.1f", ok_count, fail_count, elapsed)
logger.info("进度文件: %s", PROGRESS_FILE)
if __name__ == "__main__":
main()