auto-sync: 2026-05-02 21:32:53

This commit is contained in:
cfdaily
2026-05-02 21:32:53 +08:00
parent 57615ef5cd
commit 15d791de6d
+68 -52
View File
@@ -2,18 +2,18 @@
""" """
15分钟线数据下载脚本 15分钟线数据下载脚本
数据源降级链 数据源降级链:
1. 腾讯 mkline API直接返回15分钟线 1. 腾讯 mkline API(直接返回15分钟线)
2. 腾讯 minute/query + 聚合为15分钟仅当天数据 2. 腾讯 minute/query + 聚合为15分钟(仅当天数据)
功能 功能:
- 支持HS300 / 全市场下载 - 支持HS300 / 全市场下载
- 增量下载追加新数据不覆盖 - 增量下载(追加新数据,不覆盖)
- 断点续传JSON进度文件 - 断点续传(JSON进度文件)
- 限频保护0.3s间隔 + 重试 - 限频保护(0.3s间隔 + 重试)
- 与已有84只Parquet格式完全一致 - 与已有84只Parquet格式完全一致
用法 用法:
python3 download_minute.py --scope hs300 python3 download_minute.py --scope hs300
python3 download_minute.py --scope all python3 download_minute.py --scope all
python3 download_minute.py --codes 000001 600519 python3 download_minute.py --codes 000001 600519
@@ -67,8 +67,8 @@ def fetch_url(url: str, timeout: int = 10) -> str:
# --- 腾讯 mkline API --- # --- 腾讯 mkline API ---
def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]: def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]:
""" """
腾讯mkline API直接返回15分钟线 腾讯mkline API,直接返回15分钟线
Args: Args:
symbol: 如 "sz000001" symbol: 如 "sz000001"
count: 返回条数 count: 返回条数
@@ -79,12 +79,12 @@ def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]:
data = json.loads(raw) data = json.loads(raw)
# 解析结构: data -> {symbol} -> data -> day/data # 解析结构: data -> {symbol} -> data -> day/data
stock_data = data.get("data", {}).get(symbol, {}).get("data", {}) stock_data = data.get("data", {}).get(symbol, {}).get("data", {})
# mkline返回的是 { "day": [...], "m15": [...] } # mkline返回的是 { "day": [...], "m15": [...] }
klines = stock_data.get("m15", stock_data.get("day", [])) klines = stock_data.get("m15", stock_data.get("day", []))
if not klines: if not klines:
return None return None
rows = [] rows = []
for line in klines: for line in klines:
parts = line.split() parts = line.split()
@@ -100,7 +100,7 @@ def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]:
"volume": str(int(float(parts[5]))), "volume": str(int(float(parts[5]))),
"amount": str(round(float(parts[4]) * int(float(parts[5])), 2)), # close*volume估算 "amount": str(round(float(parts[4]) * int(float(parts[5])), 2)), # close*volume估算
}) })
if not rows: if not rows:
return None return None
return pd.DataFrame(rows) return pd.DataFrame(rows)
@@ -112,8 +112,8 @@ def try_mkline(symbol: str, count: int = 800) -> Optional[pd.DataFrame]:
# --- 腾讯 minute/query API + 聚合 --- # --- 腾讯 minute/query API + 聚合 ---
def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame]: def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame]:
""" """
腾讯minute/query API返回1分钟线聚合为15分钟线 腾讯minute/query API,返回1分钟线,聚合为15分钟线
Args: Args:
symbol: 如 "sz000001" symbol: 如 "sz000001"
date: 如 "20260502" date: 如 "20260502"
@@ -123,10 +123,10 @@ def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame]
raw = fetch_url(url) raw = fetch_url(url)
data = json.loads(raw) data = json.loads(raw)
minute_data = data.get("data", {}).get(symbol, {}).get("data", {}).get("data", []) minute_data = data.get("data", {}).get(symbol, {}).get("data", {}).get("data", [])
if not minute_data: if not minute_data:
return None return None
# 解析1分钟线: "HHMM price vol amount" # 解析1分钟线: "HHMM price vol amount"
one_min = [] one_min = []
for line in minute_data: for line in minute_data:
@@ -139,10 +139,10 @@ def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame]
"vol": float(parts[2]), "vol": float(parts[2]),
"amount": float(parts[3]), "amount": float(parts[3]),
}) })
if not one_min: if not one_min:
return None return None
df = pd.DataFrame(one_min) df = pd.DataFrame(one_min)
return _aggregate_1m_to_15m(df) return _aggregate_1m_to_15m(df)
except Exception as e: except Exception as e:
@@ -153,10 +153,10 @@ def try_minute_query_aggregate(symbol: str, date: str) -> Optional[pd.DataFrame]
def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame: def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame:
"""将1分钟线聚合为15分钟线""" """将1分钟线聚合为15分钟线"""
df["time"] = pd.to_datetime(df["time"]) df["time"] = pd.to_datetime(df["time"])
# 15分钟分组按时间段切分9:30-9:45, 9:45-10:00, ... # 15分钟分组:按时间段切分(9:30-9:45, 9:45-10:00, ...)
# end-of-bar对齐已有84只数据用K线结束时间09:45, 10:00... # end-of-bar对齐:已有84只数据用K线结束时间(09:45, 10:00...)
df["group"] = df["time"].dt.floor("15min") + pd.Timedelta(minutes=15) df["group"] = df["time"].dt.floor("15min") + pd.Timedelta(minutes=15)
agg = df.groupby("group").agg( agg = df.groupby("group").agg(
open=("price", "first"), open=("price", "first"),
high=("price", "max"), high=("price", "max"),
@@ -165,7 +165,7 @@ def _aggregate_1m_to_15m(df: pd.DataFrame) -> pd.DataFrame:
volume=("vol", "sum"), volume=("vol", "sum"),
amount=("amount", "last"), # 累计值取最后 amount=("amount", "last"), # 累计值取最后
).reset_index() ).reset_index()
result = pd.DataFrame({ result = pd.DataFrame({
"day": agg["group"].dt.strftime("%Y-%m-%d %H:%M:%S"), "day": agg["group"].dt.strftime("%Y-%m-%d %H:%M:%S"),
"open": agg["open"], "open": agg["open"],
@@ -187,48 +187,64 @@ def get_market_prefix(code: str) -> str:
def download_single(code: str) -> Tuple[Optional[pd.DataFrame], str]: def download_single(code: str) -> Tuple[Optional[pd.DataFrame], str]:
"""下载单只股票15分钟线返回(df, source)""" """下载单只股票15分钟线,返回(df, source)"""
prefix, clean = get_market_prefix(code) prefix, clean = get_market_prefix(code)
symbol = f"{prefix}{clean}" symbol = f"{prefix}{clean}"
# 主源mkline # 主源:mkline
df = try_mkline(symbol) df = try_mkline(symbol)
if df is not None and len(df) > 0: if df is not None and len(df) > 0:
return df, "mkline" return df, "mkline"
# 备源minute/query + 聚合 # 备源:minute/query + 聚合
today = datetime.now().strftime("%Y%m%d") today = datetime.now().strftime("%Y%m%d")
df = try_minute_query_aggregate(symbol, today) df = try_minute_query_aggregate(symbol, today)
if df is not None and len(df) > 0: if df is not None and len(df) > 0:
return df, "minute_query" return df, "minute_query"
return None, "failed" return None, "failed"
def download_with_increment(code: str, output_dir: Path) -> Tuple[str, int]: def download_with_increment(code: str, output_dir: Path) -> Tuple[str, int]:
"""增量下载单只股票返回(status, rows)""" """增量下载单只股票,返回(status, rows)"""
prefix, clean = get_market_prefix(code) prefix, clean = get_market_prefix(code)
filename = f"{prefix}{clean}_15min.parquet" filename = f"{prefix}{clean}_15min.parquet"
parquet_path = output_dir / filename parquet_path = output_dir / filename
df_new, source = download_single(code) df_new, source = download_single(code)
if df_new is None: if df_new is None:
return "failed", 0 return "failed", 0
# 数据校验
for col in ['open', 'high', 'low', 'close']:
df_new[col] = pd.to_numeric(df_new[col], errors='coerce')
# 价格>0
bad_zero = (df_new[['close', 'open']] <= 0).any(axis=1)
if bad_zero.any():
logger.warning("价格<=0 %s: %d", code, bad_zero.sum())
df_new = df_new[~bad_zero]
# OHLC一致性
bad_ohlc = (df_new['high'] < df_new[['open', 'close']].max(axis=1)) | (df_new['low'] > df_new[['open', 'close']].min(axis=1))
if bad_ohlc.any():
logger.warning("OHLC异常 %s: %d", code, bad_ohlc.sum())
df_new = df_new[~bad_ohlc]
if df_new.empty:
return "failed", 0
if parquet_path.exists(): if parquet_path.exists():
# 增量合并去重 # 增量:合并去重
existing = pd.read_parquet(parquet_path) existing = pd.read_parquet(parquet_path)
combined = pd.concat([existing, df_new], ignore_index=True) combined = pd.concat([existing, df_new], ignore_index=True)
combined = combined.drop_duplicates(subset=["day"], keep="last") combined = combined.drop_duplicates(subset=["day"], keep="last")
combined = combined.sort_values("day").reset_index(drop=True) combined = combined.sort_values("day").reset_index(drop=True)
else: else:
combined = df_new combined = df_new
# 原子写入 # 原子写入
tmp_path = parquet_path.with_suffix(".tmp") tmp_path = parquet_path.with_suffix(".tmp")
combined.to_parquet(tmp_path, index=False) combined.to_parquet(tmp_path, index=False)
tmp_path.rename(parquet_path) tmp_path.rename(parquet_path)
return f"ok({source})", len(df_new) return f"ok({source})", len(df_new)
@@ -252,15 +268,15 @@ def get_stock_list(scope: str) -> List[str]:
for col in ["成分券代码", "代码", "code"]: for col in ["成分券代码", "代码", "code"]:
if col in df.columns: if col in df.columns:
return [str(c).zfill(6) for c in df[col].tolist()] return [str(c).zfill(6) for c in df[col].tolist()]
raise ValueError(f"HS300文件中找不到代码列现有列: {list(df.columns)}") raise ValueError(f"HS300文件中找不到代码列,现有列: {list(df.columns)}")
if scope == "all": if scope == "all":
df = pd.read_csv(ALL_STOCKS_FILE) df = pd.read_csv(ALL_STOCKS_FILE)
for col in ["代码", "code", "股票代码"]: for col in ["代码", "code", "股票代码"]:
if col in df.columns: if col in df.columns:
return [str(c).zfill(6) for c in df[col].tolist()] return [str(c).zfill(6) for c in df[col].tolist()]
raise ValueError(f"全市场文件中找不到代码列现有列: {list(df.columns)}") raise ValueError(f"全市场文件中找不到代码列,现有列: {list(df.columns)}")
raise ValueError(f"Unknown scope: {scope}") raise ValueError(f"Unknown scope: {scope}")
@@ -272,10 +288,10 @@ def main():
parser.add_argument("--resume", action="store_true", help="断点续传") parser.add_argument("--resume", action="store_true", help="断点续传")
parser.add_argument("--output-dir", default=str(OUTPUT_DIR), help="输出目录") parser.add_argument("--output-dir", default=str(OUTPUT_DIR), help="输出目录")
args = parser.parse_args() args = parser.parse_args()
output_dir = Path(args.output_dir) output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True)
# 获取股票列表 # 获取股票列表
if args.codes: if args.codes:
codes = args.codes codes = args.codes
@@ -283,24 +299,24 @@ def main():
codes = get_stock_list(args.scope) codes = get_stock_list(args.scope)
else: else:
parser.error("必须指定 --scope 或 --codes") parser.error("必须指定 --scope 或 --codes")
# 断点续传 # 断点续传
progress = load_progress() if args.resume else {"completed": [], "failed": []} progress = load_progress() if args.resume else {"completed": [], "failed": []}
skip_set = set(progress["completed"]) skip_set = set(progress["completed"])
todo = [c for c in codes if c not in skip_set] todo = [c for c in codes if c not in skip_set]
logger.info("股票总数: %d, 已完成: %d, 待下载: %d", len(codes), len(skip_set), len(todo)) logger.info("股票总数: %d, 已完成: %d, 待下载: %d", len(codes), len(skip_set), len(todo))
t_start = time.time() t_start = time.time()
ok_count = 0 ok_count = 0
fail_count = 0 fail_count = 0
consecutive_fails = 0 consecutive_fails = 0
for i, code in enumerate(todo): for i, code in enumerate(todo):
# 限频 # 限频
if i > 0: if i > 0:
time.sleep(REQUEST_INTERVAL) time.sleep(REQUEST_INTERVAL)
# 重试逻辑 # 重试逻辑
status = "failed" status = "failed"
rows = 0 rows = 0
@@ -312,7 +328,7 @@ def main():
except Exception as e: except Exception as e:
logger.warning(" 重试 %d/%d: %s", attempt + 1, MAX_RETRIES, e) logger.warning(" 重试 %d/%d: %s", attempt + 1, MAX_RETRIES, e)
time.sleep(1) time.sleep(1)
if status != "failed": if status != "failed":
ok_count += 1 ok_count += 1
consecutive_fails = 0 consecutive_fails = 0
@@ -323,20 +339,20 @@ def main():
consecutive_fails += 1 consecutive_fails += 1
progress["failed"].append(code) progress["failed"].append(code)
logger.warning("[%d/%d] %s: FAILED", i + 1, len(todo), code) logger.warning("[%d/%d] %s: FAILED", i + 1, len(todo), code)
# 连续失败保护 # 连续失败保护
if consecutive_fails >= MAX_CONSECUTIVE_FAILS: if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
logger.error("连续失败 %d暂停 %d", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) logger.error("连续失败 %d,暂停 %d", consecutive_fails, CONSECUTIVE_FAIL_PAUSE)
time.sleep(CONSECUTIVE_FAIL_PAUSE) time.sleep(CONSECUTIVE_FAIL_PAUSE)
consecutive_fails =0 consecutive_fails =0
# 定期保存进度 # 定期保存进度
if (i + 1) % 50 == 0: if (i + 1) % 50 == 0:
save_progress(progress) save_progress(progress)
# 最终保存 # 最终保存
save_progress(progress) save_progress(progress)
elapsed = time.time() - t_start elapsed = time.time() - t_start
logger.info("=" * 50) logger.info("=" * 50)
logger.info("下载完成: 成功 %d, 失败 %d, 耗时 %.1f", ok_count, fail_count, elapsed) logger.info("下载完成: 成功 %d, 失败 %d, 耗时 %.1f", ok_count, fail_count, elapsed)