493 lines
17 KiB
Python
493 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
全市场每日增量更新 - 日线 + 15分钟线
|
||
|
||
功能:
|
||
1. 日线:腾讯K线API → 更新Parquet + vnpy DB
|
||
2. 15分钟线:新浪API(800条) → 增量合并Parquet + 导入vnpy DB
|
||
|
||
设计原则:
|
||
- 增量更新,不重复下载
|
||
- 失败不影响其他股票
|
||
- 进度持久化,支持断点续传
|
||
- 限频保护
|
||
- 日志完整
|
||
|
||
用法:
|
||
python3 daily_all_update.py # 全量更新(日线+15min)
|
||
python3 daily_all_update.py --skip-daily # 只更新15min
|
||
python3 daily_all_update.py --skip-15min # 只更新日线
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import sys
|
||
import time
|
||
import logging
|
||
import urllib.request
|
||
import urllib.error
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Optional, List, Tuple
|
||
|
||
import pandas as pd
|
||
|
||
# ======================== 配置 ========================
|
||
|
||
LOG_DIR = Path("/Volumes/stock/logs/daily_update")
|
||
DAILY_DIR = Path("/Volumes/stock/A股数据/日线数据/daily")
|
||
MINUTE_15_DIR = Path("/Volumes/stock/minute_kline/15min")
|
||
VNPY_DB_PATH = Path("/Volumes/stock/sanguo_vnpy/data/quant_trading.db")
|
||
ALL_STOCKS_FILE = Path("/Volumes/stock/A股数据/stock_info/stock_basic_info_raw_20260326_113530.csv")
|
||
PROGRESS_DIR = Path("/Volumes/stock/logs/daily_update/progress")
|
||
|
||
REQUEST_INTERVAL = 0.3
|
||
MAX_RETRIES = 3
|
||
CONSECUTIVE_FAIL_PAUSE = 60
|
||
MAX_CONSECUTIVE_FAILS = 10
|
||
DB_BATCH_SIZE = 50000
|
||
|
||
HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"}
|
||
|
||
|
||
def setup_logging():
|
||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
log_file = LOG_DIR / f"update_{ts}.log"
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)s %(message)s",
|
||
handlers=[
|
||
logging.FileHandler(log_file, encoding="utf-8"),
|
||
logging.StreamHandler(),
|
||
],
|
||
)
|
||
return logging.getLogger(__name__)
|
||
|
||
|
||
logger = setup_logging()
|
||
|
||
|
||
def _make_opener():
|
||
return urllib.request.build_opener(urllib.request.ProxyHandler({}))
|
||
|
||
|
||
# ======================== 工具函数 ========================
|
||
|
||
def get_market_prefix(code: str) -> Tuple[str, str]:
|
||
code = re.sub(r"[^0-9]", "", code).zfill(6)
|
||
if code.startswith(("60", "68", "51")):
|
||
return "sh", code
|
||
return "sz", code
|
||
|
||
|
||
def get_all_codes() -> List[str]:
|
||
df = pd.read_csv(ALL_STOCKS_FILE)
|
||
for col in ["代码", "code", "股票代码"]:
|
||
if col in df.columns:
|
||
return [str(c).zfill(6) for c in df[col].tolist()]
|
||
raise ValueError(f"找不到代码列: {list(df.columns)}")
|
||
|
||
|
||
def nas_mounted() -> bool:
|
||
return DAILY_DIR.exists() and MINUTE_15_DIR.exists()
|
||
|
||
|
||
# ======================== 日线更新 ========================
|
||
|
||
def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
|
||
"""腾讯K线API获取日线增量"""
|
||
prefix, clean = get_market_prefix(code)
|
||
tq = f"{prefix}{clean}"
|
||
days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10
|
||
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days},"
|
||
|
||
opener = _make_opener()
|
||
req = urllib.request.Request(url, headers=HEADERS)
|
||
with opener.open(req, timeout=10) as r:
|
||
raw = r.read().decode("utf-8", errors="replace")
|
||
data = json.loads(raw)
|
||
d = data.get("data")
|
||
if not isinstance(d, dict):
|
||
return None
|
||
klines = d.get(tq, {}).get("day", [])
|
||
if not klines:
|
||
return None
|
||
|
||
df = pd.DataFrame(klines)
|
||
ncols = len(df.columns)
|
||
if ncols >= 7:
|
||
df.columns = ["date", "open", "close", "high", "low", "volume", "amount"][:ncols]
|
||
else:
|
||
df.columns = ["date", "open", "close", "high", "low", "volume"][:ncols]
|
||
if "amount" not in df.columns:
|
||
df["amount"] = 0.0
|
||
for c in ["open", "close", "high", "low", "volume", "amount"]:
|
||
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)
|
||
df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
|
||
# 确保date为字符串比较
|
||
df["date"] = df["date"].astype(str)
|
||
mask = (df["date"] >= start_date) & (df["date"] <= end_date)
|
||
result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]]
|
||
return result if not result.empty else None
|
||
|
||
|
||
def update_daily_parquet(code: str, new_data: pd.DataFrame) -> int:
|
||
"""增量写入日线Parquet"""
|
||
prefix, clean = get_market_prefix(code)
|
||
year = datetime.now().year
|
||
parquet_path = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet"
|
||
|
||
if parquet_path.exists():
|
||
existing = pd.read_parquet(parquet_path)
|
||
combined = pd.concat([existing, new_data], ignore_index=True)
|
||
combined = combined.drop_duplicates(subset=["date"], keep="last")
|
||
combined = combined.sort_values("date").reset_index(drop=True)
|
||
else:
|
||
parquet_path.parent.mkdir(parents=True, exist_ok=True)
|
||
combined = new_data
|
||
|
||
tmp = parquet_path.with_suffix(".tmp")
|
||
combined.to_parquet(tmp, index=False)
|
||
tmp.rename(parquet_path)
|
||
return len(new_data)
|
||
|
||
|
||
def get_daily_last_date(code: str) -> str:
|
||
prefix, clean = get_market_prefix(code)
|
||
for year in range(datetime.now().year, 2009, -1):
|
||
fpath = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet"
|
||
if fpath.exists():
|
||
try:
|
||
df = pd.read_parquet(fpath, columns=["date"])
|
||
if not df.empty:
|
||
return str(df["date"].max())[:10]
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def run_daily_update(codes: List[str]) -> dict:
|
||
"""日线增量更新"""
|
||
logger.info("=" * 60)
|
||
logger.info("日线增量更新开始,共 %d 只", len(codes))
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0}
|
||
all_db_values = []
|
||
consecutive_fails = 0
|
||
|
||
for i, code in enumerate(codes):
|
||
if i > 0:
|
||
time.sleep(REQUEST_INTERVAL)
|
||
|
||
last_date = get_daily_last_date(code)
|
||
if not last_date:
|
||
stats["skipped"] += 1
|
||
continue
|
||
next_day = (pd.Timestamp(last_date) + timedelta(days=1)).strftime("%Y-%m-%d")
|
||
if next_day > today:
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
data = None
|
||
for attempt in range(MAX_RETRIES):
|
||
try:
|
||
data = fetch_tencent_daily(code, next_day, today)
|
||
break
|
||
except Exception as e:
|
||
if attempt < MAX_RETRIES - 1:
|
||
time.sleep(1)
|
||
else:
|
||
logger.debug("日线 %s 失败: %s", code, e)
|
||
|
||
if data is None or data.empty:
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
# 校验
|
||
if (data[["open", "high", "low", "close"]] <= 0).any().any():
|
||
stats["failed"] += 1
|
||
continue
|
||
|
||
try:
|
||
n = update_daily_parquet(code, data)
|
||
stats["updated"] += 1
|
||
stats["records"] += n
|
||
consecutive_fails = 0
|
||
|
||
# 收集vnpy DB数据
|
||
prefix, clean = get_market_prefix(code)
|
||
exchange = "SSE" if prefix == "sh" else "SZSE"
|
||
for _, row in data.iterrows():
|
||
all_db_values.append((
|
||
clean, exchange, str(row["date"]), "d",
|
||
float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0,
|
||
float(row.get("open", 0)), float(row.get("high", 0)),
|
||
float(row.get("low", 0)), float(row.get("close", 0)),
|
||
))
|
||
stats["db_records"] += 1
|
||
except Exception as e:
|
||
stats["failed"] += 1
|
||
consecutive_fails += 1
|
||
logger.warning("日线写入失败 %s: %s", code, e)
|
||
|
||
if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
|
||
logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE)
|
||
time.sleep(CONSECUTIVE_FAIL_PAUSE)
|
||
consecutive_fails = 0
|
||
|
||
if (i + 1) % 500 == 0:
|
||
logger.info("日线进度: %d/%d updated=%d", i + 1, len(codes), stats["updated"])
|
||
|
||
# 写vnpy DB
|
||
if all_db_values:
|
||
_write_vnpy_db(all_db_values, "日线")
|
||
|
||
logger.info("日线完成: %s", json.dumps(stats, ensure_ascii=False))
|
||
return stats
|
||
|
||
|
||
# ======================== 15分钟线更新 ========================
|
||
|
||
def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]:
|
||
"""新浪15分钟K线API"""
|
||
url = (
|
||
f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20=min15_{symbol}=/"
|
||
f"CN_MarketDataService.getKLineData?symbol={symbol}&scale=15&ma=no&datalen={datalen}"
|
||
)
|
||
opener = _make_opener()
|
||
req = urllib.request.Request(url, headers=HEADERS)
|
||
with opener.open(req, timeout=15) as r:
|
||
raw = r.read().decode("utf-8", errors="replace")
|
||
m = re.search(r"\((\[.*\])\)", raw, re.DOTALL)
|
||
if not m:
|
||
return None
|
||
data = json.loads(m.group(1))
|
||
if not data:
|
||
return None
|
||
df = pd.DataFrame(data)
|
||
cols = ["day", "open", "high", "low", "close", "volume", "amount"]
|
||
for c in cols:
|
||
if c not in df.columns:
|
||
return None
|
||
return df[cols]
|
||
|
||
|
||
def update_15min_parquet(code: str) -> Tuple[str, int]:
|
||
"""增量下载并合并15分钟线"""
|
||
prefix, clean = get_market_prefix(code)
|
||
symbol = f"{prefix}{clean}"
|
||
parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet"
|
||
|
||
df_new = None
|
||
for attempt in range(MAX_RETRIES):
|
||
try:
|
||
df_new = try_sina_15min(symbol)
|
||
if df_new is not None and len(df_new) > 0:
|
||
break
|
||
except Exception:
|
||
if attempt < MAX_RETRIES - 1:
|
||
time.sleep(1)
|
||
|
||
if df_new is None or df_new.empty:
|
||
return "failed", 0
|
||
|
||
# 数据校验
|
||
for col in ["open", "high", "low", "close"]:
|
||
df_new[col] = pd.to_numeric(df_new[col], errors="coerce")
|
||
df_new["volume"] = pd.to_numeric(df_new["volume"], errors="coerce").fillna(0)
|
||
df_new["amount"] = pd.to_numeric(df_new["amount"], errors="coerce").fillna(0)
|
||
|
||
bad_zero = (df_new[["close", "open"]] <= 0).any(axis=1)
|
||
if bad_zero.any():
|
||
df_new = df_new[~bad_zero]
|
||
bad_ohlc = (df_new["high"] < df_new[["open", "close"]].max(axis=1)) | \
|
||
(df_new["low"] > df_new[["open", "close"]].min(axis=1))
|
||
if bad_ohlc.any():
|
||
df_new = df_new[~bad_ohlc]
|
||
if df_new.empty:
|
||
return "failed", 0
|
||
|
||
# 转回object类型
|
||
df_new["volume"] = df_new["volume"].astype(str)
|
||
df_new["amount"] = df_new["amount"].astype(str)
|
||
|
||
if parquet_path.exists():
|
||
existing = pd.read_parquet(parquet_path)
|
||
old_len = len(existing)
|
||
combined = pd.concat([existing, df_new], ignore_index=True)
|
||
combined = combined.drop_duplicates(subset=["day"], keep="last")
|
||
combined = combined.sort_values("day").reset_index(drop=True)
|
||
new_rows = len(combined) - old_len
|
||
else:
|
||
combined = df_new
|
||
new_rows = len(df_new)
|
||
|
||
tmp = parquet_path.with_suffix(".tmp")
|
||
combined.to_parquet(tmp, index=False)
|
||
tmp.rename(parquet_path)
|
||
return "ok", new_rows
|
||
|
||
|
||
def run_15min_update(codes: List[str]) -> dict:
|
||
"""15分钟线增量更新"""
|
||
logger.info("=" * 60)
|
||
logger.info("15分钟线增量更新开始,共 %d 只", len(codes))
|
||
|
||
stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0}
|
||
all_db_values = []
|
||
consecutive_fails = 0
|
||
|
||
# 加载进度(支持断点续传)
|
||
progress_file = PROGRESS_DIR / "15min_progress.json"
|
||
PROGRESS_DIR.mkdir(parents=True, exist_ok=True)
|
||
done_set = set()
|
||
if progress_file.exists():
|
||
try:
|
||
done_set = set(json.loads(progress_file.read_text()).get("done", []))
|
||
except Exception:
|
||
pass
|
||
|
||
todo = [c for c in codes if c not in done_set]
|
||
logger.info("待更新: %d(已完成: %d)", len(todo), len(done_set))
|
||
|
||
for i, code in enumerate(todo):
|
||
if i > 0:
|
||
time.sleep(REQUEST_INTERVAL)
|
||
|
||
try:
|
||
status, new_rows = update_15min_parquet(code)
|
||
except Exception as e:
|
||
status, new_rows = "failed", 0
|
||
logger.debug("15min %s 异常: %s", code, e)
|
||
|
||
if status == "ok":
|
||
stats["updated"] += 1
|
||
stats["records"] += new_rows
|
||
consecutive_fails = 0
|
||
|
||
# 收集vnpy DB数据
|
||
prefix, clean = get_market_prefix(code)
|
||
parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet"
|
||
if parquet_path.exists() and new_rows > 0:
|
||
df = pd.read_parquet(parquet_path)
|
||
# 只取最新的增量部分
|
||
if len(df) > new_rows:
|
||
df_new_part = df.iloc[-new_rows:]
|
||
else:
|
||
df_new_part = df
|
||
|
||
exchange = "SSE" if prefix == "sh" else "SZSE"
|
||
for _, row in df_new_part.iterrows():
|
||
all_db_values.append((
|
||
clean, exchange, str(row["day"]), "1m",
|
||
float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0,
|
||
float(row.get("open", 0)), float(row.get("high", 0)),
|
||
float(row.get("low", 0)), float(row.get("close", 0)),
|
||
))
|
||
stats["db_records"] += len(df_new_part)
|
||
else:
|
||
stats["failed"] += 1
|
||
consecutive_fails += 1
|
||
|
||
done_set.add(code)
|
||
|
||
if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
|
||
logger.error("连续失败 %d 次,暂停 %d 秒", consecutive_fails, CONSECUTIVE_FAIL_PAUSE)
|
||
time.sleep(CONSECUTIVE_FAIL_PAUSE)
|
||
consecutive_fails = 0
|
||
|
||
if (i + 1) % 500 == 0:
|
||
logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"])
|
||
# 保存进度
|
||
progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()}))
|
||
|
||
# 写vnpy DB
|
||
if all_db_values:
|
||
_write_vnpy_db(all_db_values, "15min")
|
||
|
||
# 保存最终进度
|
||
progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()}))
|
||
logger.info("15min完成: %s", json.dumps(stats, ensure_ascii=False))
|
||
return stats
|
||
|
||
|
||
# ======================== vnpy DB写入 ========================
|
||
|
||
def _write_vnpy_db(values: list, label: str):
|
||
"""增量写入vnpy SQLite DB"""
|
||
logger.info("写入vnpy DB [%s]: %d 条记录", label, len(values))
|
||
try:
|
||
conn = sqlite3.connect(str(VNPY_DB_PATH), timeout=120)
|
||
c = conn.cursor()
|
||
c.execute("PRAGMA journal_mode=WAL")
|
||
for j in range(0, len(values), DB_BATCH_SIZE):
|
||
c.executemany(
|
||
"""INSERT OR REPLACE INTO dbbardata
|
||
(symbol,exchange,datetime,interval,volume,turnover,open_interest,
|
||
open_price,high_price,low_price,close_price)
|
||
VALUES (?,?,?,?,?,?,?,?,?,?,?)""",
|
||
values[j : j + DB_BATCH_SIZE],
|
||
)
|
||
conn.commit()
|
||
logger.info(" DB批次 %d/%d", j // DB_BATCH_SIZE + 1, (len(values) - 1) // DB_BATCH_SIZE + 1)
|
||
|
||
# 更新overview
|
||
c.execute(
|
||
"""INSERT OR REPLACE INTO dbbaroverview (symbol,exchange,interval,count,start,end)
|
||
SELECT symbol,exchange,interval,COUNT(*),MIN(datetime),MAX(datetime)
|
||
FROM dbbardata GROUP BY symbol,exchange,interval"""
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
logger.info("✅ vnpy DB [%s] 写入完成", label)
|
||
except Exception as e:
|
||
logger.error("❌ vnpy DB [%s] 写入失败: %s", label, e)
|
||
|
||
|
||
# ======================== 主入口 ========================
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="全市场每日增量更新")
|
||
parser.add_argument("--skip-daily", action="store_true", help="跳过日线更新")
|
||
parser.add_argument("--skip-15min", action="store_true", help="跳过15分钟线更新")
|
||
args = parser.parse_args()
|
||
|
||
if not nas_mounted():
|
||
logger.error("❌ NAS未挂载,退出")
|
||
sys.exit(1)
|
||
|
||
codes = get_all_codes()
|
||
logger.info("全市场股票数: %d", len(codes))
|
||
logger.info("更新时间: %s", datetime.now().isoformat())
|
||
|
||
t_start = time.time()
|
||
report = {}
|
||
|
||
if not args.skip_daily:
|
||
report["daily"] = run_daily_update(codes)
|
||
|
||
if not args.skip_15min:
|
||
report["15min"] = run_15min_update(codes)
|
||
|
||
elapsed = time.time() - t_start
|
||
report["elapsed_sec"] = round(elapsed, 1)
|
||
logger.info("=" * 60)
|
||
logger.info("全部完成,耗时 %.1f 秒", elapsed)
|
||
logger.info(json.dumps(report, ensure_ascii=False, indent=2))
|
||
|
||
# 写入报告
|
||
report_file = LOG_DIR / f"report_{datetime.now().strftime('%Y%m%d')}.json"
|
||
report_file.write_text(json.dumps(report, ensure_ascii=False, indent=2))
|
||
|
||
return report
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|