auto-sync: 2026-05-03 10:45:42

This commit is contained in:
cfdaily
2026-05-03 10:45:42 +08:00
parent c58c8fc2ad
commit ad9a5a9505
+490
View File
@@ -0,0 +1,490 @@
#!/usr/bin/env python3
"""
全市场每日增量更新 - 日线 + 15分钟线
功能:
1. 日线:腾讯K线API → 更新Parquet + vnpy DB
2. 15分钟线:新浪API800条) → 增量合并Parquet + 导入vnpy DB
设计原则:
- 增量更新,不重复下载
- 失败不影响其他股票
- 进度持久化,支持断点续传
- 限频保护
- 日志完整
用法:
python3 daily_all_update.py # 全量更新(日线+15min)
python3 daily_all_update.py --skip-daily # 只更新15min
python3 daily_all_update.py --skip-15min # 只更新日线
"""
import argparse
import json
import os
import re
import sqlite3
import sys
import time
import logging
import urllib.request
import urllib.error
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, List, Tuple
import pandas as pd
# ======================== 配置 ========================
LOG_DIR = Path("/Volumes/stock/logs/daily_update")
DAILY_DIR = Path("/Volumes/stock/A股数据/日线数据/daily")
MINUTE_15_DIR = Path("/Volumes/stock/minute_kline/15min")
VNPY_DB_PATH = Path("/Volumes/stock/sanguo_vnpy/data/quant_trading.db")
ALL_STOCKS_FILE = Path("/Volumes/stock/A股数据/stock_info/stock_basic_info_raw_20260326_113530.csv")
PROGRESS_DIR = Path("/Volumes/stock/logs/daily_update/progress")
REQUEST_INTERVAL = 0.3
MAX_RETRIES = 3
CONSECUTIVE_FAIL_PAUSE = 60
MAX_CONSECUTIVE_FAILS = 10
DB_BATCH_SIZE = 50000
HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"}
def setup_logging():
LOG_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = LOG_DIR / f"update_{ts}.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler(log_file, encoding="utf-8"),
logging.StreamHandler(),
],
)
return logging.getLogger(__name__)
logger = setup_logging()
def _make_opener():
return urllib.request.build_opener(urllib.request.ProxyHandler({}))
# ======================== 工具函数 ========================
def get_market_prefix(code: str) -> Tuple[str, str]:
code = re.sub(r"[^0-9]", "", code).zfill(6)
if code.startswith(("60", "68", "51")):
return "sh", code
return "sz", code
def get_all_codes() -> List[str]:
df = pd.read_csv(ALL_STOCKS_FILE)
for col in ["代码", "code", "股票代码"]:
if col in df.columns:
return [str(c).zfill(6) for c in df[col].tolist()]
raise ValueError(f"找不到代码列: {list(df.columns)}")
def nas_mounted() -> bool:
return DAILY_DIR.exists() and MINUTE_15_DIR.exists()
# ======================== 日线更新 ========================
def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
"""腾讯K线API获取日线增量"""
prefix, clean = get_market_prefix(code)
tq = f"{prefix}{clean}"
days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days},"
opener = _make_opener()
req = urllib.request.Request(url, headers=HEADERS)
with opener.open(req, timeout=10) as r:
raw = r.read().decode("utf-8", errors="replace")
data = json.loads(raw)
d = data.get("data")
if not isinstance(d, dict):
return None
klines = d.get(tq, {}).get("day", [])
if not klines:
return None
df = pd.DataFrame(klines)
ncols = len(df.columns)
if ncols >= 7:
df.columns = ["date", "open", "close", "high", "low", "volume", "amount"][:ncols]
else:
df.columns = ["date", "open", "close", "high", "low", "volume"][:ncols]
if "amount" not in df.columns:
df["amount"] = 0.0
for c in ["open", "close", "high", "low", "volume", "amount"]:
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)
df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
mask = (df["date"] >= start_date) & (df["date"] <= end_date)
result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]]
return result if not result.empty else None
def update_daily_parquet(code: str, new_data: pd.DataFrame) -> int:
"""增量写入日线Parquet"""
prefix, clean = get_market_prefix(code)
year = datetime.now().year
parquet_path = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet"
if parquet_path.exists():
existing = pd.read_parquet(parquet_path)
combined = pd.concat([existing, new_data], ignore_index=True)
combined = combined.drop_duplicates(subset=["date"], keep="last")
combined = combined.sort_values("date").reset_index(drop=True)
else:
parquet_path.parent.mkdir(parents=True, exist_ok=True)
combined = new_data
tmp = parquet_path.with_suffix(".tmp")
combined.to_parquet(tmp, index=False)
tmp.rename(parquet_path)
return len(new_data)
def get_daily_last_date(code: str) -> str:
prefix, clean = get_market_prefix(code)
for year in range(datetime.now().year, 2009, -1):
fpath = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet"
if fpath.exists():
try:
df = pd.read_parquet(fpath, columns=["date"])
if not df.empty:
return str(df["date"].max())[:10]
except Exception:
pass
return ""
def run_daily_update(codes: List[str]) -> dict:
"""日线增量更新"""
logger.info("=" * 60)
logger.info("日线增量更新开始,共 %d", len(codes))
today = datetime.now().strftime("%Y-%m-%d")
stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0}
all_db_values = []
consecutive_fails = 0
for i, code in enumerate(codes):
if i > 0:
time.sleep(REQUEST_INTERVAL)
last_date = get_daily_last_date(code)
if not last_date:
stats["skipped"] += 1
continue
next_day = (pd.Timestamp(last_date) + timedelta(days=1)).strftime("%Y-%m-%d")
if next_day > today:
stats["skipped"] += 1
continue
data = None
for attempt in range(MAX_RETRIES):
try:
data = fetch_tencent_daily(code, next_day, today)
break
except Exception as e:
if attempt < MAX_RETRIES - 1:
time.sleep(1)
else:
logger.debug("日线 %s 失败: %s", code, e)
if data is None or data.empty:
stats["skipped"] += 1
continue
# 校验
if (data[["open", "high", "low", "close"]] <= 0).any().any():
stats["failed"] += 1
continue
try:
n = update_daily_parquet(code, data)
stats["updated"] += 1
stats["records"] += n
consecutive_fails = 0
# 收集vnpy DB数据
prefix, clean = get_market_prefix(code)
exchange = "SSE" if prefix == "sh" else "SZSE"
for _, row in data.iterrows():
all_db_values.append((
clean, exchange, str(row["date"]), "d",
float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0,
float(row.get("open", 0)), float(row.get("high", 0)),
float(row.get("low", 0)), float(row.get("close", 0)),
))
stats["db_records"] += 1
except Exception as e:
stats["failed"] += 1
consecutive_fails += 1
logger.warning("日线写入失败 %s: %s", code, e)
if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
logger.error("连续失败 %d 次,暂停 %d", consecutive_fails, CONSECUTIVE_FAIL_PAUSE)
time.sleep(CONSECUTIVE_FAIL_PAUSE)
consecutive_fails = 0
if (i + 1) % 500 == 0:
logger.info("日线进度: %d/%d updated=%d", i + 1, len(codes), stats["updated"])
# 写vnpy DB
if all_db_values:
_write_vnpy_db(all_db_values, "日线")
logger.info("日线完成: %s", json.dumps(stats, ensure_ascii=False))
return stats
# ======================== 15分钟线更新 ========================
def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]:
"""新浪15分钟K线API"""
url = (
f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20=min15_{symbol}=/"
f"CN_MarketDataService.getKLineData?symbol={symbol}&scale=15&ma=no&datalen={datalen}"
)
opener = _make_opener()
req = urllib.request.Request(url, headers=HEADERS)
with opener.open(req, timeout=15) as r:
raw = r.read().decode("utf-8", errors="replace")
m = re.search(r"\((\[.*\])\)", raw, re.DOTALL)
if not m:
return None
data = json.loads(m.group(1))
if not data:
return None
df = pd.DataFrame(data)
cols = ["day", "open", "high", "low", "close", "volume", "amount"]
for c in cols:
if c not in df.columns:
return None
return df[cols]
def update_15min_parquet(code: str) -> Tuple[str, int]:
"""增量下载并合并15分钟线"""
prefix, clean = get_market_prefix(code)
symbol = f"{prefix}{clean}"
parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet"
df_new = None
for attempt in range(MAX_RETRIES):
try:
df_new = try_sina_15min(symbol)
if df_new is not None and len(df_new) > 0:
break
except Exception:
if attempt < MAX_RETRIES - 1:
time.sleep(1)
if df_new is None or df_new.empty:
return "failed", 0
# 数据校验
for col in ["open", "high", "low", "close"]:
df_new[col] = pd.to_numeric(df_new[col], errors="coerce")
df_new["volume"] = pd.to_numeric(df_new["volume"], errors="coerce").fillna(0)
df_new["amount"] = pd.to_numeric(df_new["amount"], errors="coerce").fillna(0)
bad_zero = (df_new[["close", "open"]] <= 0).any(axis=1)
if bad_zero.any():
df_new = df_new[~bad_zero]
bad_ohlc = (df_new["high"] < df_new[["open", "close"]].max(axis=1)) | \
(df_new["low"] > df_new[["open", "close"]].min(axis=1))
if bad_ohlc.any():
df_new = df_new[~bad_ohlc]
if df_new.empty:
return "failed", 0
# 转回object类型
df_new["volume"] = df_new["volume"].astype(str)
df_new["amount"] = df_new["amount"].astype(str)
if parquet_path.exists():
existing = pd.read_parquet(parquet_path)
old_len = len(existing)
combined = pd.concat([existing, df_new], ignore_index=True)
combined = combined.drop_duplicates(subset=["day"], keep="last")
combined = combined.sort_values("day").reset_index(drop=True)
new_rows = len(combined) - old_len
else:
combined = df_new
new_rows = len(df_new)
tmp = parquet_path.with_suffix(".tmp")
combined.to_parquet(tmp, index=False)
tmp.rename(parquet_path)
return "ok", new_rows
def run_15min_update(codes: List[str]) -> dict:
"""15分钟线增量更新"""
logger.info("=" * 60)
logger.info("15分钟线增量更新开始,共 %d", len(codes))
stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0}
all_db_values = []
consecutive_fails = 0
# 加载进度(支持断点续传)
progress_file = PROGRESS_DIR / "15min_progress.json"
PROGRESS_DIR.mkdir(parents=True, exist_ok=True)
done_set = set()
if progress_file.exists():
try:
done_set = set(json.loads(progress_file.read_text()).get("done", []))
except Exception:
pass
todo = [c for c in codes if c not in done_set]
logger.info("待更新: %d(已完成: %d", len(todo), len(done_set))
for i, code in enumerate(todo):
if i > 0:
time.sleep(REQUEST_INTERVAL)
try:
status, new_rows = update_15min_parquet(code)
except Exception as e:
status, new_rows = "failed", 0
logger.debug("15min %s 异常: %s", code, e)
if status == "ok":
stats["updated"] += 1
stats["records"] += new_rows
consecutive_fails = 0
# 收集vnpy DB数据
prefix, clean = get_market_prefix(code)
parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet"
if parquet_path.exists() and new_rows > 0:
df = pd.read_parquet(parquet_path)
# 只取最新的增量部分
if len(df) > new_rows:
df_new_part = df.iloc[-new_rows:]
else:
df_new_part = df
exchange = "SSE" if prefix == "sh" else "SZSE"
for _, row in df_new_part.iterrows():
all_db_values.append((
clean, exchange, str(row["day"]), "1m",
float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0,
float(row.get("open", 0)), float(row.get("high", 0)),
float(row.get("low", 0)), float(row.get("close", 0)),
))
stats["db_records"] += len(df_new_part)
else:
stats["failed"] += 1
consecutive_fails += 1
done_set.add(code)
if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
logger.error("连续失败 %d 次,暂停 %d", consecutive_fails, CONSECUTIVE_FAIL_PAUSE)
time.sleep(CONSECUTIVE_FAIL_PAUSE)
consecutive_fails = 0
if (i + 1) % 500 == 0:
logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"])
# 保存进度
progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()}))
# 写vnpy DB
if all_db_values:
_write_vnpy_db(all_db_values, "15min")
# 保存最终进度
progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()}))
logger.info("15min完成: %s", json.dumps(stats, ensure_ascii=False))
return stats
# ======================== vnpy DB写入 ========================
def _write_vnpy_db(values: list, label: str):
"""增量写入vnpy SQLite DB"""
logger.info("写入vnpy DB [%s]: %d 条记录", label, len(values))
try:
conn = sqlite3.connect(str(VNPY_DB_PATH), timeout=120)
c = conn.cursor()
c.execute("PRAGMA journal_mode=WAL")
for j in range(0, len(values), DB_BATCH_SIZE):
c.executemany(
"""INSERT OR REPLACE INTO dbbardata
(symbol,exchange,datetime,interval,volume,turnover,open_interest,
open_price,high_price,low_price,close_price)
VALUES (?,?,?,?,?,?,?,?,?,?,?)""",
values[j : j + DB_BATCH_SIZE],
)
conn.commit()
logger.info(" DB批次 %d/%d", j // DB_BATCH_SIZE + 1, (len(values) - 1) // DB_BATCH_SIZE + 1)
# 更新overview
c.execute(
"""INSERT OR REPLACE INTO dbbaroverview (symbol,exchange,interval,count,start,end)
SELECT symbol,exchange,interval,COUNT(*),MIN(datetime),MAX(datetime)
FROM dbbardata GROUP BY symbol,exchange,interval"""
)
conn.commit()
conn.close()
logger.info("✅ vnpy DB [%s] 写入完成", label)
except Exception as e:
logger.error("❌ vnpy DB [%s] 写入失败: %s", label, e)
# ======================== 主入口 ========================
def main():
parser = argparse.ArgumentParser(description="全市场每日增量更新")
parser.add_argument("--skip-daily", action="store_true", help="跳过日线更新")
parser.add_argument("--skip-15min", action="store_true", help="跳过15分钟线更新")
args = parser.parse_args()
if not nas_mounted():
logger.error("❌ NAS未挂载,退出")
sys.exit(1)
codes = get_all_codes()
logger.info("全市场股票数: %d", len(codes))
logger.info("更新时间: %s", datetime.now().isoformat())
t_start = time.time()
report = {}
if not args.skip_daily:
report["daily"] = run_daily_update(codes)
if not args.skip_15min:
report["15min"] = run_15min_update(codes)
elapsed = time.time() - t_start
report["elapsed_sec"] = round(elapsed, 1)
logger.info("=" * 60)
logger.info("全部完成,耗时 %.1f", elapsed)
logger.info(json.dumps(report, ensure_ascii=False, indent=2))
# 写入报告
report_file = LOG_DIR / f"report_{datetime.now().strftime('%Y%m%d')}.json"
report_file.write_text(json.dumps(report, ensure_ascii=False, indent=2))
return report
if __name__ == "__main__":
main()