Files
sanguo_vnpy/data_platform/daily_all_update.py
T
2026-05-05 09:20:43 +08:00

865 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
全市场每日增量更新 - 日线 + 15分钟线
功能:
1. 日线:东方财富API(主) + 腾讯API(备) → 更新Parquet + vnpy DB
2. 15分钟线:新浪API800条) → 增量合并Parquet + 导入vnpy DB
数据源降级链:
日线: 东方财富(amount真实,可拿多年) → 腾讯(amount有时0)
15min: 新浪(amount真实,800条) → 无备源
设计原则:
- 增量更新,不重复下载
- 失败不影响其他股票
- 进度持久化,支持断点续传(日线+15min均有进度文件)
- 限频保护 + 全局源不可用检测
- DB轮转备份(保留7天)
- 日志完整 + 失败率告警
用法:
python3 daily_all_update.py # 全量更新(日线+15min)
python3 daily_all_update.py --skip-daily # 只更新15min
python3 daily_all_update.py --skip-15min # 只更新日线
变更记录:
v1.1 (2026-05-03) - 司马懿评审后修改:
- interval "1m""15m"
- 15min增量改为严格按日期追加(不再用drop_duplicates keep=last
- 日线增加进度文件
- 全局源不可用检测(连续30只首次失败→终止)
- DB轮转备份(保留7天)
- 失败率>5%告警
v1.2 (2026-05-05) - 东方财富集成:
- 日线主源从腾讯切换为东方财富(amount真实,可拿多年历史)
- 腾讯降为日线备源
- 东方财富限频:4s/请求 + 随机抖动±1s
- 反爬:Referer + 完整UA + ut参数 + JSONP
"""
import argparse
import glob
import json
import os
import re
import shutil
import sqlite3
import sys
import time
import logging
import urllib.request
import urllib.error
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, List, Tuple
import pandas as pd
# ======================== 配置 ========================
LOG_DIR = Path("/Volumes/stock/logs/daily_update")
DAILY_DIR = Path("/Volumes/stock/A股数据/日线数据/daily")
MINUTE_15_DIR = Path("/Volumes/stock/minute_kline/15min")
VNPY_DB_PATH = Path("/Volumes/stock/sanguo_vnpy/data/quant_trading.db")
ALL_STOCKS_FILE = Path("/Volumes/stock/A股数据/stock_info/stock_basic_info_raw_20260326_113530.csv")
PROGRESS_DIR = Path("/Volumes/stock/logs/daily_update/progress")
REQUEST_INTERVAL_SINA = 0.3 # 新浪:稳定,0.3s够
REQUEST_INTERVAL_EM = 4.0 # 东方财富:严格3-5s,加随机抖动
EM_JITTER = 1.0 # 东方财富随机抖动范围(±秒)
MAX_RETRIES = 3
CONSECUTIVE_FAIL_PAUSE = 60
MAX_CONSECUTIVE_FAILS = 10
# 全局源不可用检测:连续N只股票首次请求就失败,判定源不可用
GLOBAL_FAIL_THRESHOLD = 30
# DB轮转备份数
DB_BACKUP_KEEP_DAYS = 7
HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"}
HEADERS_EM = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Referer": "https://quote.eastmoney.com/",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
}
def setup_logging():
LOG_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = LOG_DIR / f"update_{ts}.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler(log_file, encoding="utf-8"),
logging.StreamHandler(),
],
)
return logging.getLogger(__name__)
logger = setup_logging()
def _make_opener():
return urllib.request.build_opener(urllib.request.ProxyHandler({}))
# ======================== 工具函数 ========================
def get_market_prefix(code: str) -> Tuple[str, str]:
code = re.sub(r"[^0-9]", "", code).zfill(6)
if code.startswith(("60", "68", "51")):
return "sh", code
return "sz", code
def get_all_codes() -> List[str]:
df = pd.read_csv(ALL_STOCKS_FILE)
for col in ["代码", "code", "股票代码"]:
if col in df.columns:
return [str(c).zfill(6) for c in df[col].tolist()]
raise ValueError(f"找不到代码列: {list(df.columns)}")
def nas_mounted() -> bool:
return DAILY_DIR.exists() and MINUTE_15_DIR.exists()
# ======================== DB备份 ========================
def rotate_db_backup():
"""轮转备份vnpy DB,保留最近N天"""
backup_dir = VNPY_DB_PATH.parent
today = datetime.now().strftime("%Y%m%d")
backup_file = backup_dir / f"quant_trading_{today}.db.bak"
# 如果今天已备份过,跳过
if backup_file.exists():
logger.info("DB今日已备份: %s", backup_file)
return
logger.info("开始DB备份: %s%s", VNPY_DB_PATH.name, backup_file.name)
try:
shutil.copy2(str(VNPY_DB_PATH), str(backup_file))
logger.info("✅ DB备份完成 (%.1f MB)", backup_file.stat().st_size / 1024 / 1024)
except Exception as e:
logger.error("❌ DB备份失败: %s", e)
return
# 清理过期备份
cutoff = datetime.now() - timedelta(days=DB_BACKUP_KEEP_DAYS)
for f in backup_dir.glob("quant_trading_*.db.bak"):
try:
# 从文件名提取日期
date_str = f.stem.split("_")[-1] # quant_trading_20260503.db.bak → 20260503
file_date = datetime.strptime(date_str, "%Y%m%d")
if file_date < cutoff:
f.unlink()
logger.info("清理过期备份: %s", f.name)
except (ValueError, OSError):
pass
# ======================== 进度文件 ========================
def load_progress(name: str) -> set:
"""加载进度文件,返回已完成的代码集合"""
progress_file = PROGRESS_DIR / f"{name}_progress.json"
PROGRESS_DIR.mkdir(parents=True, exist_ok=True)
if progress_file.exists():
try:
return set(json.loads(progress_file.read_text()).get("done", []))
except Exception:
pass
return set()
def save_progress(name: str, done_set: set):
"""保存进度文件"""
progress_file = PROGRESS_DIR / f"{name}_progress.json"
progress_file.write_text(json.dumps({
"done": sorted(list(done_set)),
"ts": datetime.now().isoformat(),
}))
# ======================== 全局源不可用检测 ========================
class SourceHealthMonitor:
"""监控数据源健康状态,连续N只首次请求失败则判定源不可用"""
def __init__(self, threshold: int = GLOBAL_FAIL_THRESHOLD):
self.threshold = threshold
self.consecutive_first_fail = 0
def report(self, code: str, first_attempt_failed: bool) -> bool:
"""
报告单只股票的首次请求结果
返回True表示源仍健康,False表示源不可用应终止
"""
if first_attempt_failed:
self.consecutive_first_fail += 1
if self.consecutive_first_fail >= self.threshold:
logger.error(
"⚠️ 全局源不可用检测触发:连续 %d 只股票首次请求失败,终止本次更新",
self.consecutive_first_fail,
)
return False
else:
self.consecutive_first_fail = 0
return True
# ======================== 日线更新 ========================
def get_em_secid(code: str) -> str:
"""获取东方财富secid: 1.沪市 0.深市"""
if code.startswith(("60", "68", "51")):
return f"1.{code}"
return f"0.{code}"
def fetch_eastmoney_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
"""东方财富K线API获取日线(主源)
特点:
- 一次请求可拿多年数据(实测4年1046条)
- amount有真实值
- 反爬严格:必须Referer+UA+ut,限频3-5s/请求
"""
import requests as _requests
import random
secid = get_em_secid(code)
ts = str(int(time.time() * 1000))
url = (
f"https://push2his.eastmoney.com/api/qt/stock/kline/get?"
f"secid={secid}&klt=101&fqt=1&"
f"beg={start_date.replace('-', '')}&end={end_date.replace('-', '')}&"
f"fields1=f1,f2,f3,f4,f5,f6,f7,f8&"
f"fields2=f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61&"
f"ut=b2884a393a59ad64002292a3e90d46a5&"
f"lmt=10000&"
f"cb=jQuery_em_{ts}&_={ts}"
)
session = _requests.Session()
session.trust_env = False # 绕过系统代理
try:
r = session.get(url, headers=HEADERS_EM, timeout=15, verify=False)
if r.status_code != 200:
return None
# 解析JSONP
text = r.text
start_idx = text.index("(") + 1
end_idx = text.rindex(")")
data = json.loads(text[start_idx:end_idx])
except Exception as e:
logger.debug("东方财富日线失败 %s: %s", code, e)
return None
if data.get("rc") != 0:
return None
klines = data.get("data", {}).get("klines", [])
if not klines:
return None
# 解析: date,open,close,high,low,volume,amount,amplitude,pct_change,change,turnover_rate
rows = []
for line in klines:
parts = line.split(",")
if len(parts) < 7:
continue
rows.append({
"date": parts[0],
"open": float(parts[1]),
"close": float(parts[2]),
"high": float(parts[3]),
"low": float(parts[4]),
"volume": float(parts[5]),
"amount": float(parts[6]),
})
if not rows:
return None
df = pd.DataFrame(rows)
# 东方财富日期格式可能含时间,标准化
df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
df["date"] = df["date"].astype(str)
mask = (df["date"] >= start_date) & (df["date"] <= end_date)
result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]]
return result if not result.empty else None
def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
"""腾讯K线API获取日线增量"""
prefix, clean = get_market_prefix(code)
tq = f"{prefix}{clean}"
days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days},"
opener = _make_opener()
req = urllib.request.Request(url, headers=HEADERS)
with opener.open(req, timeout=10) as r:
raw = r.read().decode("utf-8", errors="replace")
data = json.loads(raw)
d = data.get("data")
if not isinstance(d, dict):
return None
klines = d.get(tq, {}).get("day", [])
if not klines:
return None
df = pd.DataFrame(klines)
ncols = len(df.columns)
if ncols >= 7:
df.columns = ["date", "open", "close", "high", "low", "volume", "amount"][:ncols]
else:
df.columns = ["date", "open", "close", "high", "low", "volume"][:ncols]
if "amount" not in df.columns:
df["amount"] = 0.0
for c in ["open", "close", "high", "low", "volume", "amount"]:
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)
df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
df["date"] = df["date"].astype(str)
mask = (df["date"] >= start_date) & (df["date"] <= end_date)
result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]]
return result if not result.empty else None
def update_daily_parquet(code: str, new_data: pd.DataFrame) -> int:
"""增量写入日线Parquet"""
prefix, clean = get_market_prefix(code)
year = datetime.now().year
parquet_path = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet"
new_data = new_data.copy()
new_data["date"] = new_data["date"].astype(str)
if parquet_path.exists():
existing = pd.read_parquet(parquet_path)
existing["date"] = existing["date"].astype(str)
combined = pd.concat([existing, new_data], ignore_index=True)
combined = combined.drop_duplicates(subset=["date"], keep="last")
combined = combined.sort_values("date").reset_index(drop=True)
else:
parquet_path.parent.mkdir(parents=True, exist_ok=True)
combined = new_data
tmp = parquet_path.with_suffix(".tmp")
combined.to_parquet(tmp, index=False)
tmp.rename(parquet_path)
return len(new_data)
def get_daily_last_date(code: str) -> str:
prefix, clean = get_market_prefix(code)
for year in range(datetime.now().year, 2009, -1):
fpath = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet"
if fpath.exists():
try:
df = pd.read_parquet(fpath, columns=["date"])
if not df.empty:
val = df["date"].max()
return str(val)[:10]
except Exception:
pass
return ""
def run_daily_update(codes: List[str]) -> dict:
"""日线增量更新"""
logger.info("=" * 60)
logger.info("日线增量更新开始,共 %d", len(codes))
today = datetime.now().strftime("%Y-%m-%d")
stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0}
all_db_values = []
consecutive_fails = 0
source_aborted = False
# 进度文件(硬伤1修复)
done_set = load_progress("daily")
todo = [c for c in codes if c not in done_set]
logger.info("待更新: %d(已完成: %d", len(todo), len(done_set))
# 全局源健康监控(硬伤2修复)
health = SourceHealthMonitor()
for i, code in enumerate(todo):
if source_aborted:
break
if i > 0:
# 东方财富严格限频:基础间隔4s + 伪随机抖动±1s
jitter = (hash(code) % 200 - 100) / 100.0 * EM_JITTER
time.sleep(REQUEST_INTERVAL_EM + jitter)
last_date = get_daily_last_date(code)
if not last_date:
stats["skipped"] += 1
done_set.add(code)
continue
next_day = (pd.Timestamp(last_date) + timedelta(days=1)).strftime("%Y-%m-%d")
if next_day > today:
stats["skipped"] += 1
done_set.add(code)
continue
data = None
source_used = ""
first_attempt_failed = False
# 主源:东方财富(amount真实,可拿多年历史)
for attempt in range(MAX_RETRIES):
try:
data = fetch_eastmoney_daily(code, next_day, today)
if data is not None:
source_used = "eastmoney"
break
if attempt == 0:
first_attempt_failed = True
except Exception:
if attempt == 0:
first_attempt_failed = True
if attempt < MAX_RETRIES - 1:
time.sleep(2)
# 备源:腾讯(amount有时为0
if data is None:
first_attempt_failed_backup = False
for attempt in range(MAX_RETRIES):
try:
data = fetch_tencent_daily(code, next_day, today)
if data is not None:
source_used = "tencent"
break
if attempt == 0:
first_attempt_failed_backup = True
except Exception:
if attempt == 0:
first_attempt_failed_backup = True
if attempt < MAX_RETRIES - 1:
time.sleep(1)
# 只有两个源都首次失败才算全局失败
if first_attempt_failed and first_attempt_failed_backup:
first_attempt_failed = True
else:
first_attempt_failed = False
# 全局源不可用检测
if not health.report(code, first_attempt_failed and data is None):
source_aborted = True
logger.error("❌ 日线源均不可用(东方财富+腾讯),终止日线更新")
break
if data is None or data.empty:
stats["skipped"] += 1
done_set.add(code)
continue
# 校验
if (data[["open", "high", "low", "close"]] <= 0).any().any():
stats["failed"] += 1
consecutive_fails += 1
done_set.add(code)
continue
try:
n = update_daily_parquet(code, data)
stats["updated"] += 1
stats["records"] += n
consecutive_fails = 0
# 收集vnpy DB数据
prefix, clean = get_market_prefix(code)
exchange = "SSE" if prefix == "sh" else "SZSE"
for _, row in data.iterrows():
all_db_values.append((
clean, exchange, str(row["date"]), "d",
float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0,
float(row.get("open", 0)), float(row.get("high", 0)),
float(row.get("low", 0)), float(row.get("close", 0)),
))
stats["db_records"] += 1
except Exception as e:
stats["failed"] += 1
consecutive_fails += 1
logger.warning("日线写入失败 %s: %s", code, e)
done_set.add(code)
if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
logger.error("连续失败 %d 次,暂停 %d", consecutive_fails, CONSECUTIVE_FAIL_PAUSE)
time.sleep(CONSECUTIVE_FAIL_PAUSE)
consecutive_fails = 0
if (i + 1) % 500 == 0:
logger.info("日线进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"])
save_progress("daily", done_set)
# 写vnpy DB
if all_db_values:
_write_vnpy_db(all_db_values, "日线")
# 保存最终进度
save_progress("daily", done_set)
if source_aborted:
stats["source_aborted"] = True
logger.info("日线完成: %s", json.dumps(stats, ensure_ascii=False))
return stats
# ======================== 15分钟线更新 ========================
def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]:
"""新浪15分钟K线API"""
url = (
f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20=min15_{symbol}=/"
f"CN_MarketDataService.getKLineData?symbol={symbol}&scale=15&ma=no&datalen={datalen}"
)
opener = _make_opener()
req = urllib.request.Request(url, headers=HEADERS)
with opener.open(req, timeout=15) as r:
raw = r.read().decode("utf-8", errors="replace")
m = re.search(r"\((\[.*\])\)", raw, re.DOTALL)
if not m:
return None
data = json.loads(m.group(1))
if not data:
return None
df = pd.DataFrame(data)
cols = ["day", "open", "high", "low", "close", "volume", "amount"]
for c in cols:
if c not in df.columns:
return None
return df[cols]
def get_15min_last_date(parquet_path: Path) -> str:
"""获取15min Parquet中最后一条的时间戳"""
if not parquet_path.exists():
return ""
try:
df = pd.read_parquet(parquet_path, columns=["day"])
if not df.empty:
return str(df["day"].max())
except Exception:
pass
return ""
def update_15min_parquet(code: str) -> Tuple[str, int, list]:
"""
增量下载并合并15分钟线(阻塞项2修复:严格按日期追加)
返回: (status, new_rows, db_values)
"""
prefix, clean = get_market_prefix(code)
symbol = f"{prefix}{clean}"
parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet"
df_new = None
for attempt in range(MAX_RETRIES):
try:
df_new = try_sina_15min(symbol)
if df_new is not None and len(df_new) > 0:
break
except Exception:
if attempt < MAX_RETRIES - 1:
time.sleep(1)
if df_new is None or df_new.empty:
return "failed", 0, []
# 数据校验
for col in ["open", "high", "low", "close"]:
df_new[col] = pd.to_numeric(df_new[col], errors="coerce")
df_new["volume"] = pd.to_numeric(df_new["volume"], errors="coerce").fillna(0)
df_new["amount"] = pd.to_numeric(df_new["amount"], errors="coerce").fillna(0)
bad_zero = (df_new[["close", "open"]] <= 0).any(axis=1)
if bad_zero.any():
df_new = df_new[~bad_zero]
bad_ohlc = (df_new["high"] < df_new[["open", "close"]].max(axis=1)) | \
(df_new["low"] > df_new[["open", "close"]].min(axis=1))
if bad_ohlc.any():
df_new = df_new[~bad_ohlc]
if df_new.empty:
return "failed", 0, []
# 转回object类型
df_new["volume"] = df_new["volume"].astype(str)
df_new["amount"] = df_new["amount"].astype(str)
df_new["day"] = df_new["day"].astype(str)
# 获取已有数据最后日期
last_date = get_15min_last_date(parquet_path)
if last_date:
# 严格追加:只保留 day > last_date 的新数据(阻塞项2修复)
df_increment = df_new[df_new["day"] > last_date].copy()
if df_increment.empty:
# 没有新数据,不需要写入
return "ok", 0, []
# 缺口检测:API返回的最早数据比last_date晚超过1根K线(15min
api_first = df_new["day"].min()
expected_next = str(pd.Timestamp(last_date) + pd.Timedelta(minutes=15))
if api_first > expected_next:
logger.warning(
"⚠️ 15min数据缺口 %s: last=%s, api_first=%s (expected=%s)",
code, last_date, api_first, expected_next,
)
# 合并
existing = pd.read_parquet(parquet_path)
existing["day"] = existing["day"].astype(str)
combined = pd.concat([existing, df_increment], ignore_index=True)
combined = combined.sort_values("day").reset_index(drop=True)
new_rows = len(df_increment)
else:
# 无已有数据,直接写入全部
combined = df_new.sort_values("day").reset_index(drop=True)
df_increment = df_new
new_rows = len(df_new)
# 原子写入
tmp = parquet_path.with_suffix(".tmp")
combined.to_parquet(tmp, index=False)
tmp.rename(parquet_path)
# 构建DB写入数据(interval="15m",阻塞项1修复)
exchange = "SSE" if prefix == "sh" else "SZSE"
db_values = []
for _, row in df_increment.iterrows():
db_values.append((
clean, exchange, str(row["day"]), "15m",
float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0,
float(row.get("open", 0)), float(row.get("high", 0)),
float(row.get("low", 0)), float(row.get("close", 0)),
))
return "ok", new_rows, db_values
def run_15min_update(codes: List[str]) -> dict:
"""15分钟线增量更新"""
logger.info("=" * 60)
logger.info("15分钟线增量更新开始,共 %d", len(codes))
stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0}
all_db_values = []
consecutive_fails = 0
source_aborted = False
# 进度文件
done_set = load_progress("15min")
todo = [c for c in codes if c not in done_set]
logger.info("待更新: %d(已完成: %d", len(todo), len(done_set))
# 全局源健康监控
health = SourceHealthMonitor()
for i, code in enumerate(todo):
if i < 3:
print(f"[DEBUG] 15min loop i={i} code={code}", flush=True)
if source_aborted:
break
if i > 0:
time.sleep(REQUEST_INTERVAL_SINA)
try:
status, new_rows, db_values = update_15min_parquet(code)
first_attempt_failed = (status == "failed")
except Exception as e:
status, new_rows, db_values = "failed", 0, []
first_attempt_failed = True
logger.debug("15min %s 异常: %s", code, e)
# 全局源不可用检测
if not health.report(code, first_attempt_failed):
source_aborted = True
logger.error("❌ 新浪15min源不可用,终止15min更新")
break
if status == "ok":
stats["updated"] += 1
stats["records"] += new_rows
stats["db_records"] += len(db_values)
all_db_values.extend(db_values)
consecutive_fails = 0
else:
stats["failed"] += 1
consecutive_fails += 1
done_set.add(code)
if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
logger.error("连续失败 %d 次,暂停 %d", consecutive_fails, CONSECUTIVE_FAIL_PAUSE)
time.sleep(CONSECUTIVE_FAIL_PAUSE)
consecutive_fails = 0
if (i + 1) % 500 == 0:
logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"])
save_progress("15min", done_set)
# 写vnpy DB
if all_db_values:
_write_vnpy_db(all_db_values, "15min")
# 保存最终进度
save_progress("15min", done_set)
if source_aborted:
stats["source_aborted"] = True
logger.info("15min完成: %s", json.dumps(stats, ensure_ascii=False))
return stats
# ======================== vnpy DB写入 ========================
def _write_vnpy_db(values: list, label: str):
"""增量写入vnpy SQLite DB - 本地临时DB写入后ATTACH导入NAS DB"""
logger.info("写入vnpy DB [%s]: %d 条记录", label, len(values))
local_tmp = f"/tmp/vnpy_update_{label}_{int(time.time())}.db"
try:
# 1. 本地临时DB写入增量
conn = sqlite3.connect(local_tmp, timeout=30)
c = conn.cursor()
c.execute("PRAGMA journal_mode=WAL")
c.execute("""CREATE TABLE IF NOT EXISTS dbbardata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
exchange TEXT NOT NULL,
datetime TEXT NOT NULL,
interval TEXT NOT NULL,
volume REAL DEFAULT 0,
turnover REAL DEFAULT 0,
open_interest REAL DEFAULT 0,
open_price REAL,
high_price REAL,
low_price REAL,
close_price REAL,
UNIQUE(symbol, exchange, datetime, interval)
)""")
c.executemany(
"""INSERT OR REPLACE INTO dbbardata
(symbol,exchange,datetime,interval,volume,turnover,open_interest,
open_price,high_price,low_price,close_price)
VALUES (?,?,?,?,?,?,?,?,?,?,?)""",
values,
)
conn.commit()
conn.close()
logger.info(" 本地写入完成: %s (%d rows)", local_tmp, len(values))
# 2. ATTACH到NAS DB,批量导入
conn = sqlite3.connect(str(VNPY_DB_PATH), timeout=120)
c = conn.cursor()
c.execute("PRAGMA journal_mode=WAL")
c.execute("ATTACH DATABASE ? AS tmpdb", (local_tmp,))
c.execute("INSERT OR REPLACE INTO main.dbbardata SELECT * FROM tmpdb.dbbardata")
conn.commit()
c.execute("DETACH DATABASE tmpdb")
# 更新overview
c.execute(
"""INSERT OR REPLACE INTO dbbaroverview (symbol,exchange,interval,count,start,end)
SELECT symbol,exchange,interval,COUNT(*),MIN(datetime),MAX(datetime)
FROM dbbardata GROUP BY symbol,exchange,interval"""
)
conn.commit()
conn.close()
logger.info("✅ vnpy DB [%s] 写入完成", label)
except Exception as e:
logger.error("❌ vnpy DB [%s] 写入失败: %s", label, e)
finally:
if os.path.exists(local_tmp):
os.remove(local_tmp)
# ======================== 告警与报告 ========================
def check_failure_rate(stats: dict, label: str) -> bool:
"""检查失败率,返回True表示异常"""
total = stats.get("updated", 0) + stats.get("failed", 0) + stats.get("skipped", 0)
failed = stats.get("failed", 0)
if total == 0:
return False
rate = failed / total
if rate > 0.05:
logger.error(
"❌ 数据更新异常 [%s]:失败率 %.1f%% (%d/%d)",
label, rate * 100, failed, total,
)
return True
if stats.get("source_aborted"):
logger.error("❌ [%s] 源不可用导致终止", label)
return True
return False
# ======================== 主入口 ========================
def main():
parser = argparse.ArgumentParser(description="全市场每日增量更新")
parser.add_argument("--skip-daily", action="store_true", help="跳过日线更新")
parser.add_argument("--skip-15min", action="store_true", help="跳过15分钟线更新")
args = parser.parse_args()
if not nas_mounted():
logger.error("❌ NAS未挂载,退出")
sys.exit(1)
codes = get_all_codes()
logger.info("全市场股票数: %d", len(codes))
logger.info("更新时间: %s", datetime.now().isoformat())
t_start = time.time()
report = {}
has_alert = False
# DB备份(硬伤3修复)
rotate_db_backup()
if not args.skip_daily:
report["daily"] = run_daily_update(codes)
if check_failure_rate(report["daily"], "日线"):
has_alert = True
if not args.skip_15min:
report["15min"] = run_15min_update(codes)
if check_failure_rate(report["15min"], "15min"):
has_alert = True
elapsed = time.time() - t_start
report["elapsed_sec"] = round(elapsed, 1)
report["has_alert"] = has_alert
logger.info("=" * 60)
if has_alert:
logger.error("⚠️ 本次更新存在异常,请检查日志")
else:
logger.info("✅ 全部完成,耗时 %.1f", elapsed)
logger.info(json.dumps(report, ensure_ascii=False, indent=2))
# 写入报告
report_file = LOG_DIR / f"report_{datetime.now().strftime('%Y%m%d')}.json"
report_file.write_text(json.dumps(report, ensure_ascii=False, indent=2))
return report
if __name__ == "__main__":
main()