Files
sanguo_vnpy/data_platform/download_minute.py
T
2026-05-02 22:35:26 +08:00

334 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
15分钟线数据下载脚本
数据源降级链:
1. 新浪财经15分钟K线API(有真实amount800条/次)
2. 腾讯 minute/query + 聚合为15分钟(仅当天数据,amount为估算)
功能:
- 支持HS300 / 全市场下载
- 增量下载(追加新数据,不覆盖已有)
- 断点续传(JSON进度文件)
- 限频保护(0.3s间隔 + 重试)
- 与已有84只Parquet格式完全一致(7列,end-of-bar时间戳)
- 数据校验(价格>0, OHLC一致性)
用法:
python3 download_minute.py --scope hs300
python3 download_minute.py --scope all
python3 download_minute.py --codes 000001 600519
python3 download_minute.py --scope hs300 --resume
"""
import argparse
import json
import re
import sys
import time
import logging
import urllib.request
import urllib.error
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Tuple
import pandas as pd
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
# --- 配置 ---
OUTPUT_DIR = Path("/Volumes/stock/minute_kline/15min")
PROGRESS_FILE = OUTPUT_DIR / "download_progress.json"
REQUEST_INTERVAL = 0.3
MAX_RETRIES = 3
CONSECUTIVE_FAIL_PAUSE = 60
MAX_CONSECUTIVE_FAILS = 5
HS300_FILE = Path("/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data/raw/stock_info/hs300_constituents_latest.csv")
ALL_STOCKS_FILE = Path("/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data/raw/stock_info/stock_basic_info_raw_20260326_113530.csv")
HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"}
def _make_opener():
"""创建无代理opener,避免akshare代理污染"""
return urllib.request.build_opener(urllib.request.ProxyHandler({}))
# --- 新浪15分钟K线API(主源) ---
def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]:
"""
新浪财经15分钟K线API
symbol: sz000001 或 sh600519
datalen: 返回条数(最大约800
返回: DataFrame(day, open, high, low, close, volume, amount) 或 None
"""
url = (
f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20=min15_{symbol}=/"
f"CN_MarketDataService.getKLineData?symbol={symbol}&scale=15&ma=no&datalen={datalen}"
)
try:
opener = _make_opener()
req = urllib.request.Request(url, headers=HEADERS)
with opener.open(req, timeout=15) as r:
raw = r.read().decode("utf-8", errors="replace")
m = re.search(r'\((\[.*\])\)', raw, re.DOTALL)
if not m:
return None
data = json.loads(m.group(1))
if not data:
return None
df = pd.DataFrame(data)
# 确保列顺序
cols = ["day", "open", "high", "low", "close", "volume", "amount"]
for c in cols:
if c not in df.columns:
return None
return df[cols]
except Exception as e:
logger.debug("新浪15min失败 %s: %s", symbol, e)
return None
# --- 腾讯 minute/query + 聚合(备源,仅当天) ---
def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame]:
"""
腾讯minute/query API,返回1分钟线,聚合为15分钟线
symbol: sz000001
date: 20260502
"""
url = f"http://web.ifzq.gtimg.cn/appstock/app/minute/query?code={symbol}"
try:
opener = _make_opener()
req = urllib.request.Request(url, headers=HEADERS)
with opener.open(req, timeout=10) as r:
data = json.loads(r.read())
minute_data = data.get("data", {}).get(symbol, {}).get("data", {}).get("data", [])
if not minute_data:
return None
one_min = []
for line in minute_data:
parts = line.split()
if len(parts) >= 4:
hhmm = parts[0]
one_min.append({
"time": f"{date[:4]}-{date[4:6]}-{date[6:8]} {hhmm[:2]}:{hhmm[2:]}:00",
"price": float(parts[1]),
"vol": float(parts[2]),
"amount": float(parts[3]),
})
if not one_min:
return None
return _aggregate_1m_to_15m(pd.DataFrame(one_min))
except Exception as e:
logger.debug("minute_query失败 %s: %s", symbol, e)
return None
def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame:
"""1分钟线聚合为15分钟线(end-of-bar时间戳)"""
df["time"] = pd.to_datetime(df["time"])
# end-of-bar对齐:已有84只数据用K线结束时间(09:45, 10:00...
df["group"] = df["time"].dt.floor("15min") + pd.Timedelta(minutes=15)
agg = df.groupby("group").agg(
open=("price", "first"),
high=("price", "max"),
low=("price", "min"),
close=("price", "last"),
volume=("vol", "sum"),
amount=("amount", "last"),
).reset_index()
return pd.DataFrame({
"day": agg["group"].dt.strftime("%Y-%m-%d %H:%M:%S"),
"open": agg["open"],
"high": agg["high"],
"low": agg["low"],
"close": agg["close"],
"volume": agg["volume"].astype(str),
"amount": agg["amount"].astype(str),
})
# --- 下载主流程 ---
def get_market_prefix(code: str) -> Tuple[str, str]:
code = re.sub(r"[^0-9]", "", code).zfill(6)
if code.startswith(("60", "68", "51")):
return "sh", code
return "sz", code
def download_single(code: str) -> Tuple[Optional[pd.DataFrame], str]:
"""下载单只股票15分钟线,返回(df, source)"""
prefix, clean = get_market_prefix(code)
symbol = f"{prefix}{clean}"
# 主源:新浪15分钟线
df = try_sina_15min(symbol)
if df is not None and len(df) > 0:
return df, "sina_15min"
# 备源:minute/query + 聚合
today = datetime.now().strftime("%Y%m%d")
df = try_minute_query_aggregate(symbol, today)
if df is not None and len(df) > 0:
return df, "minute_query"
return None, "failed"
def download_with_increment(code: str, output_dir: Path) -> Tuple[str, int]:
"""增量下载单只股票"""
prefix, clean = get_market_prefix(code)
filename = f"{prefix}{clean}_15min.parquet"
parquet_path = output_dir / filename
df_new, source = download_single(code)
if df_new is None:
return "failed", 0
# 数据校验
for col in ["open", "high", "low", "close"]:
df_new[col] = pd.to_numeric(df_new[col], errors="coerce")
df_new["volume"] = pd.to_numeric(df_new["volume"], errors="coerce").fillna(0)
df_new["amount"] = pd.to_numeric(df_new["amount"], errors="coerce").fillna(0)
# 价格>0
bad_zero = (df_new[["close", "open"]] <= 0).any(axis=1)
if bad_zero.any():
logger.warning("价格<=0 %s: %d", code, bad_zero.sum())
df_new = df_new[~bad_zero]
# OHLC一致性
bad_ohlc = (df_new["high"] < df_new[["open", "close"]].max(axis=1)) | \
(df_new["low"] > df_new[["open", "close"]].min(axis=1))
if bad_ohlc.any():
logger.warning("OHLC异常 %s: %d", code, bad_ohlc.sum())
df_new = df_new[~bad_ohlc]
if df_new.empty:
return "failed", 0
# 转回object类型与已有数据兼容
df_new["volume"] = df_new["volume"].astype(str)
df_new["amount"] = df_new["amount"].astype(str)
if parquet_path.exists():
existing = pd.read_parquet(parquet_path)
combined = pd.concat([existing, df_new], ignore_index=True)
combined = combined.drop_duplicates(subset=["day"], keep="last")
combined = combined.sort_values("day").reset_index(drop=True)
else:
combined = df_new
# 原子写入
tmp_path = parquet_path.with_suffix(".tmp")
combined.to_parquet(tmp_path, index=False)
tmp_path.rename(parquet_path)
return f"ok({source})", len(df_new)
# --- 断点续传 ---
def load_progress() -> dict:
if PROGRESS_FILE.exists():
return json.loads(PROGRESS_FILE.read_text())
return {"completed": [], "failed": [], "last_update": ""}
def save_progress(progress: dict):
progress["last_update"] = datetime.now().isoformat()
PROGRESS_FILE.write_text(json.dumps(progress, ensure_ascii=False, indent=2))
# --- 股票列表 ---
def get_stock_list(scope: str) -> List[str]:
if scope == "hs300":
df = pd.read_csv(HS300_FILE)
for col in ["成分券代码", "代码", "code"]:
if col in df.columns:
return [str(c).zfill(6) for c in df[col].tolist()]
raise ValueError(f"HS300文件中找不到代码列,现有列: {list(df.columns)}")
if scope == "all":
df = pd.read_csv(ALL_STOCKS_FILE)
for col in ["代码", "code", "股票代码"]:
if col in df.columns:
return [str(c).zfill(6) for c in df[col].tolist()]
raise ValueError(f"全市场文件中找不到代码列,现有列: {list(df.columns)}")
raise ValueError(f"Unknown scope: {scope}")
# --- 主入口 ---
def main():
parser = argparse.ArgumentParser(description="15分钟线数据下载")
parser.add_argument("--scope", choices=["hs300", "all"], help="下载范围")
parser.add_argument("--codes", nargs="+", help="指定股票代码")
parser.add_argument("--resume", action="store_true", help="断点续传")
parser.add_argument("--output-dir", default=str(OUTPUT_DIR), help="输出目录")
args = parser.parse_args()
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
if args.codes:
codes = args.codes
elif args.scope:
codes = get_stock_list(args.scope)
else:
parser.error("必须指定 --scope 或 --codes")
progress = load_progress() if args.resume else {"completed": [], "failed": []}
skip_set = set(progress["completed"])
todo = [c for c in codes if c not in skip_set]
logger.info("股票总数: %d, 已完成: %d, 待下载: %d", len(codes), len(skip_set), len(todo))
t_start = time.time()
ok_count = 0
fail_count = 0
consecutive_fails = 0
for i, code in enumerate(todo):
if i > 0:
time.sleep(REQUEST_INTERVAL)
status = "failed"
rows = 0
for attempt in range(MAX_RETRIES):
try:
status, rows = download_with_increment(code, output_dir)
if status != "failed":
break
except Exception as e:
logger.warning(" 重试 %d/%d: %s", attempt + 1, MAX_RETRIES, e)
time.sleep(1)
if status != "failed":
ok_count += 1
consecutive_fails = 0
progress["completed"].append(code)
logger.info("[%d/%d] %s: %s (%d rows)", i + 1, len(todo), code, status, rows)
else:
fail_count += 1
consecutive_fails += 1
progress["failed"].append(code)
logger.warning("[%d/%d] %s: FAILED", i + 1, len(todo), code)
if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
logger.error("连续失败 %d 次,暂停 %d", consecutive_fails, CONSECUTIVE_FAIL_PAUSE)
time.sleep(CONSECUTIVE_FAIL_PAUSE)
consecutive_fails = 0
if (i + 1) % 50 == 0:
save_progress(progress)
save_progress(progress)
elapsed = time.time() - t_start
logger.info("=" * 50)
logger.info("下载完成: 成功 %d, 失败 %d, 耗时 %.1f", ok_count, fail_count, elapsed)
if __name__ == "__main__":
main()