auto-sync: 2026-05-03 12:52:27

This commit is contained in:
cfdaily
2026-05-03 12:52:27 +08:00
parent b1c3fbb262
commit 970b396ef4
+269 -57
View File
@@ -9,20 +9,32 @@
设计原则: 设计原则:
- 增量更新,不重复下载 - 增量更新,不重复下载
- 失败不影响其他股票 - 失败不影响其他股票
- 进度持久化,支持断点续传 - 进度持久化,支持断点续传(日线+15min均有进度文件)
- 限频保护 - 限频保护 + 全局源不可用检测
- 日志完整 - DB轮转备份(保留7天)
- 日志完整 + 失败率告警
用法: 用法:
python3 daily_all_update.py # 全量更新(日线+15min) python3 daily_all_update.py # 全量更新(日线+15min)
python3 daily_all_update.py --skip-daily # 只更新15min python3 daily_all_update.py --skip-daily # 只更新15min
python3 daily_all_update.py --skip-15min # 只更新日线 python3 daily_all_update.py --skip-15min # 只更新日线
变更记录:
v1.1 (2026-05-03) - 司马懿评审后修改:
- interval "1m""15m"
- 15min增量改为严格按日期追加(不再用drop_duplicates keep=last
- 日线增加进度文件
- 全局源不可用检测(连续30只首次失败→终止)
- DB轮转备份(保留7天)
- 失败率>5%告警
""" """
import argparse import argparse
import glob
import json import json
import os import os
import re import re
import shutil
import sqlite3 import sqlite3
import sys import sys
import time import time
@@ -48,7 +60,10 @@ REQUEST_INTERVAL = 0.3
MAX_RETRIES = 3 MAX_RETRIES = 3
CONSECUTIVE_FAIL_PAUSE = 60 CONSECUTIVE_FAIL_PAUSE = 60
MAX_CONSECUTIVE_FAILS = 10 MAX_CONSECUTIVE_FAILS = 10
DB_BATCH_SIZE = 50000 # 全局源不可用检测:连续N只股票首次请求就失败,判定源不可用
GLOBAL_FAIL_THRESHOLD = 30
# DB轮转备份数
DB_BACKUP_KEEP_DAYS = 7
HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"} HEADERS = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)"}
@@ -97,6 +112,91 @@ def nas_mounted() -> bool:
return DAILY_DIR.exists() and MINUTE_15_DIR.exists() return DAILY_DIR.exists() and MINUTE_15_DIR.exists()
# ======================== DB备份 ========================
def rotate_db_backup():
"""轮转备份vnpy DB,保留最近N天"""
backup_dir = VNPY_DB_PATH.parent
today = datetime.now().strftime("%Y%m%d")
backup_file = backup_dir / f"quant_trading_{today}.db.bak"
# 如果今天已备份过,跳过
if backup_file.exists():
logger.info("DB今日已备份: %s", backup_file)
return
logger.info("开始DB备份: %s%s", VNPY_DB_PATH.name, backup_file.name)
try:
shutil.copy2(str(VNPY_DB_PATH), str(backup_file))
logger.info("✅ DB备份完成 (%.1f MB)", backup_file.stat().st_size / 1024 / 1024)
except Exception as e:
logger.error("❌ DB备份失败: %s", e)
return
# 清理过期备份
cutoff = datetime.now() - timedelta(days=DB_BACKUP_KEEP_DAYS)
for f in backup_dir.glob("quant_trading_*.db.bak"):
try:
# 从文件名提取日期
date_str = f.stem.split("_")[-1] # quant_trading_20260503.db.bak → 20260503
file_date = datetime.strptime(date_str, "%Y%m%d")
if file_date < cutoff:
f.unlink()
logger.info("清理过期备份: %s", f.name)
except (ValueError, OSError):
pass
# ======================== 进度文件 ========================
def load_progress(name: str) -> set:
"""加载进度文件,返回已完成的代码集合"""
progress_file = PROGRESS_DIR / f"{name}_progress.json"
PROGRESS_DIR.mkdir(parents=True, exist_ok=True)
if progress_file.exists():
try:
return set(json.loads(progress_file.read_text()).get("done", []))
except Exception:
pass
return set()
def save_progress(name: str, done_set: set):
"""保存进度文件"""
progress_file = PROGRESS_DIR / f"{name}_progress.json"
progress_file.write_text(json.dumps({
"done": sorted(list(done_set)),
"ts": datetime.now().isoformat(),
}))
# ======================== 全局源不可用检测 ========================
class SourceHealthMonitor:
"""监控数据源健康状态,连续N只首次请求失败则判定源不可用"""
def __init__(self, threshold: int = GLOBAL_FAIL_THRESHOLD):
self.threshold = threshold
self.consecutive_first_fail = 0
def report(self, code: str, first_attempt_failed: bool) -> bool:
"""
报告单只股票的首次请求结果
返回True表示源仍健康,False表示源不可用应终止
"""
if first_attempt_failed:
self.consecutive_first_fail += 1
if self.consecutive_first_fail >= self.threshold:
logger.error(
"⚠️ 全局源不可用检测触发:连续 %d 只股票首次请求失败,终止本次更新",
self.consecutive_first_fail,
)
return False
else:
self.consecutive_first_fail = 0
return True
# ======================== 日线更新 ======================== # ======================== 日线更新 ========================
def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
@@ -129,7 +229,6 @@ def fetch_tencent_daily(code: str, start_date: str, end_date: str) -> Optional[p
for c in ["open", "close", "high", "low", "volume", "amount"]: for c in ["open", "close", "high", "low", "volume", "amount"]:
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)
df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
# 确保date为字符串比较
df["date"] = df["date"].astype(str) df["date"] = df["date"].astype(str)
mask = (df["date"] >= start_date) & (df["date"] <= end_date) mask = (df["date"] >= start_date) & (df["date"] <= end_date)
result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] result = df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]]
@@ -142,7 +241,6 @@ def update_daily_parquet(code: str, new_data: pd.DataFrame) -> int:
year = datetime.now().year year = datetime.now().year
parquet_path = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet" parquet_path = DAILY_DIR / str(year) / f"{prefix}{clean}_daily.parquet"
# 确保date为字符串类型,避免和已有parquet的datetime.date冲突
new_data = new_data.copy() new_data = new_data.copy()
new_data["date"] = new_data["date"].astype(str) new_data["date"] = new_data["date"].astype(str)
@@ -186,38 +284,67 @@ def run_daily_update(codes: List[str]) -> dict:
stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0}
all_db_values = [] all_db_values = []
consecutive_fails = 0 consecutive_fails = 0
source_aborted = False
# 进度文件(硬伤1修复)
done_set = load_progress("daily")
todo = [c for c in codes if c not in done_set]
logger.info("待更新: %d(已完成: %d", len(todo), len(done_set))
# 全局源健康监控(硬伤2修复)
health = SourceHealthMonitor()
for i, code in enumerate(todo):
if source_aborted:
break
for i, code in enumerate(codes):
if i > 0: if i > 0:
time.sleep(REQUEST_INTERVAL) time.sleep(REQUEST_INTERVAL)
last_date = get_daily_last_date(code) last_date = get_daily_last_date(code)
if not last_date: if not last_date:
stats["skipped"] += 1 stats["skipped"] += 1
done_set.add(code)
continue continue
next_day = (pd.Timestamp(last_date) + timedelta(days=1)).strftime("%Y-%m-%d") next_day = (pd.Timestamp(last_date) + timedelta(days=1)).strftime("%Y-%m-%d")
if next_day > today: if next_day > today:
stats["skipped"] += 1 stats["skipped"] += 1
done_set.add(code)
continue continue
data = None data = None
first_attempt_failed = False
for attempt in range(MAX_RETRIES): for attempt in range(MAX_RETRIES):
try: try:
data = fetch_tencent_daily(code, next_day, today) data = fetch_tencent_daily(code, next_day, today)
if data is not None:
break break
if attempt == 0:
first_attempt_failed = True
except Exception as e: except Exception as e:
if attempt == 0:
first_attempt_failed = True
if attempt < MAX_RETRIES - 1: if attempt < MAX_RETRIES - 1:
time.sleep(1) time.sleep(1)
else: else:
logger.debug("日线 %s 失败: %s", code, e) logger.debug("日线 %s 失败: %s", code, e)
# 全局源不可用检测
if not health.report(code, first_attempt_failed and data is None):
source_aborted = True
logger.error("❌ 腾讯日线源不可用,终止日线更新")
break
if data is None or data.empty: if data is None or data.empty:
stats["skipped"] += 1 stats["skipped"] += 1
done_set.add(code)
continue continue
# 校验 # 校验
if (data[["open", "high", "low", "close"]] <= 0).any().any(): if (data[["open", "high", "low", "close"]] <= 0).any().any():
stats["failed"] += 1 stats["failed"] += 1
consecutive_fails += 1
done_set.add(code)
continue continue
try: try:
@@ -242,18 +369,27 @@ def run_daily_update(codes: List[str]) -> dict:
consecutive_fails += 1 consecutive_fails += 1
logger.warning("日线写入失败 %s: %s", code, e) logger.warning("日线写入失败 %s: %s", code, e)
done_set.add(code)
if consecutive_fails >= MAX_CONSECUTIVE_FAILS: if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
logger.error("连续失败 %d 次,暂停 %d", consecutive_fails, CONSECUTIVE_FAIL_PAUSE) logger.error("连续失败 %d 次,暂停 %d", consecutive_fails, CONSECUTIVE_FAIL_PAUSE)
time.sleep(CONSECUTIVE_FAIL_PAUSE) time.sleep(CONSECUTIVE_FAIL_PAUSE)
consecutive_fails = 0 consecutive_fails = 0
if (i + 1) % 500 == 0: if (i + 1) % 500 == 0:
logger.info("日线进度: %d/%d updated=%d", i + 1, len(codes), stats["updated"]) logger.info("日线进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"])
save_progress("daily", done_set)
# 写vnpy DB # 写vnpy DB
if all_db_values: if all_db_values:
_write_vnpy_db(all_db_values, "日线") _write_vnpy_db(all_db_values, "日线")
# 保存最终进度
save_progress("daily", done_set)
if source_aborted:
stats["source_aborted"] = True
logger.info("日线完成: %s", json.dumps(stats, ensure_ascii=False)) logger.info("日线完成: %s", json.dumps(stats, ensure_ascii=False))
return stats return stats
@@ -284,8 +420,24 @@ def try_sina_15min(symbol: str, datalen: int = 800) -> Optional[pd.DataFrame]:
return df[cols] return df[cols]
def update_15min_parquet(code: str) -> Tuple[str, int]: def get_15min_last_date(parquet_path: Path) -> str:
"""增量下载并合并15分钟线""" """获取15min Parquet中最后一条的时间戳"""
if not parquet_path.exists():
return ""
try:
df = pd.read_parquet(parquet_path, columns=["day"])
if not df.empty:
return str(df["day"].max())
except Exception:
pass
return ""
def update_15min_parquet(code: str) -> Tuple[str, int, list]:
"""
增量下载并合并15分钟线(阻塞项2修复:严格按日期追加)
返回: (status, new_rows, db_values)
"""
prefix, clean = get_market_prefix(code) prefix, clean = get_market_prefix(code)
symbol = f"{prefix}{clean}" symbol = f"{prefix}{clean}"
parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet" parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet"
@@ -301,7 +453,7 @@ def update_15min_parquet(code: str) -> Tuple[str, int]:
time.sleep(1) time.sleep(1)
if df_new is None or df_new.empty: if df_new is None or df_new.empty:
return "failed", 0 return "failed", 0, []
# 数据校验 # 数据校验
for col in ["open", "high", "low", "close"]: for col in ["open", "high", "low", "close"]:
@@ -317,27 +469,62 @@ def update_15min_parquet(code: str) -> Tuple[str, int]:
if bad_ohlc.any(): if bad_ohlc.any():
df_new = df_new[~bad_ohlc] df_new = df_new[~bad_ohlc]
if df_new.empty: if df_new.empty:
return "failed", 0 return "failed", 0, []
# 转回object类型 # 转回object类型
df_new["volume"] = df_new["volume"].astype(str) df_new["volume"] = df_new["volume"].astype(str)
df_new["amount"] = df_new["amount"].astype(str) df_new["amount"] = df_new["amount"].astype(str)
df_new["day"] = df_new["day"].astype(str)
if parquet_path.exists(): # 获取已有数据最后日期
last_date = get_15min_last_date(parquet_path)
if last_date:
# 严格追加:只保留 day > last_date 的新数据(阻塞项2修复)
df_increment = df_new[df_new["day"] > last_date].copy()
if df_increment.empty:
# 没有新数据,不需要写入
return "ok", 0, []
# 缺口检测:API返回的最早数据比last_date晚超过1根K线(15min
api_first = df_new["day"].min()
expected_next = str(pd.Timestamp(last_date) + pd.Timedelta(minutes=15))
if api_first > expected_next:
logger.warning(
"⚠️ 15min数据缺口 %s: last=%s, api_first=%s (expected=%s)",
code, last_date, api_first, expected_next,
)
# 合并
existing = pd.read_parquet(parquet_path) existing = pd.read_parquet(parquet_path)
old_len = len(existing) existing["day"] = existing["day"].astype(str)
combined = pd.concat([existing, df_new], ignore_index=True) combined = pd.concat([existing, df_increment], ignore_index=True)
combined = combined.drop_duplicates(subset=["day"], keep="last")
combined = combined.sort_values("day").reset_index(drop=True) combined = combined.sort_values("day").reset_index(drop=True)
new_rows = len(combined) - old_len new_rows = len(df_increment)
else: else:
combined = df_new # 无已有数据,直接写入全部
combined = df_new.sort_values("day").reset_index(drop=True)
df_increment = df_new
new_rows = len(df_new) new_rows = len(df_new)
# 原子写入
tmp = parquet_path.with_suffix(".tmp") tmp = parquet_path.with_suffix(".tmp")
combined.to_parquet(tmp, index=False) combined.to_parquet(tmp, index=False)
tmp.rename(parquet_path) tmp.rename(parquet_path)
return "ok", new_rows
# 构建DB写入数据(interval="15m",阻塞项1修复)
exchange = "SSE" if prefix == "sh" else "SZSE"
db_values = []
for _, row in df_increment.iterrows():
db_values.append((
clean, exchange, str(row["day"]), "15m",
float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0,
float(row.get("open", 0)), float(row.get("high", 0)),
float(row.get("low", 0)), float(row.get("close", 0)),
))
return "ok", new_rows, db_values
def run_15min_update(codes: List[str]) -> dict: def run_15min_update(codes: List[str]) -> dict:
@@ -348,55 +535,43 @@ def run_15min_update(codes: List[str]) -> dict:
stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0} stats = {"updated": 0, "skipped": 0, "failed": 0, "records": 0, "db_records": 0}
all_db_values = [] all_db_values = []
consecutive_fails = 0 consecutive_fails = 0
source_aborted = False
# 加载进度(支持断点续传) # 进度文件
progress_file = PROGRESS_DIR / "15min_progress.json" done_set = load_progress("15min")
PROGRESS_DIR.mkdir(parents=True, exist_ok=True)
done_set = set()
if progress_file.exists():
try:
done_set = set(json.loads(progress_file.read_text()).get("done", []))
except Exception:
pass
todo = [c for c in codes if c not in done_set] todo = [c for c in codes if c not in done_set]
logger.info("待更新: %d(已完成: %d", len(todo), len(done_set)) logger.info("待更新: %d(已完成: %d", len(todo), len(done_set))
# 全局源健康监控
health = SourceHealthMonitor()
for i, code in enumerate(todo): for i, code in enumerate(todo):
if source_aborted:
break
if i > 0: if i > 0:
time.sleep(REQUEST_INTERVAL) time.sleep(REQUEST_INTERVAL)
try: try:
status, new_rows = update_15min_parquet(code) status, new_rows, db_values = update_15min_parquet(code)
first_attempt_failed = (status == "failed")
except Exception as e: except Exception as e:
status, new_rows = "failed", 0 status, new_rows, db_values = "failed", 0, []
first_attempt_failed = True
logger.debug("15min %s 异常: %s", code, e) logger.debug("15min %s 异常: %s", code, e)
# 全局源不可用检测
if not health.report(code, first_attempt_failed):
source_aborted = True
logger.error("❌ 新浪15min源不可用,终止15min更新")
break
if status == "ok": if status == "ok":
stats["updated"] += 1 stats["updated"] += 1
stats["records"] += new_rows stats["records"] += new_rows
stats["db_records"] += len(db_values)
all_db_values.extend(db_values)
consecutive_fails = 0 consecutive_fails = 0
# 收集vnpy DB数据
prefix, clean = get_market_prefix(code)
parquet_path = MINUTE_15_DIR / f"{prefix}{clean}_15min.parquet"
if parquet_path.exists() and new_rows > 0:
df = pd.read_parquet(parquet_path)
# 只取最新的增量部分
if len(df) > new_rows:
df_new_part = df.iloc[-new_rows:]
else:
df_new_part = df
exchange = "SSE" if prefix == "sh" else "SZSE"
for _, row in df_new_part.iterrows():
all_db_values.append((
clean, exchange, str(row["day"]), "1m",
float(row.get("volume", 0)), float(row.get("amount", 0)), 0.0,
float(row.get("open", 0)), float(row.get("high", 0)),
float(row.get("low", 0)), float(row.get("close", 0)),
))
stats["db_records"] += len(df_new_part)
else: else:
stats["failed"] += 1 stats["failed"] += 1
consecutive_fails += 1 consecutive_fails += 1
@@ -410,15 +585,18 @@ def run_15min_update(codes: List[str]) -> dict:
if (i + 1) % 500 == 0: if (i + 1) % 500 == 0:
logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"])
# 保存进度 save_progress("15min", done_set)
progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()}))
# 写vnpy DB # 写vnpy DB
if all_db_values: if all_db_values:
_write_vnpy_db(all_db_values, "15min") _write_vnpy_db(all_db_values, "15min")
# 保存最终进度 # 保存最终进度
progress_file.write_text(json.dumps({"done": list(done_set), "ts": datetime.now().isoformat()})) save_progress("15min", done_set)
if source_aborted:
stats["source_aborted"] = True
logger.info("15min完成: %s", json.dumps(stats, ensure_ascii=False)) logger.info("15min完成: %s", json.dumps(stats, ensure_ascii=False))
return stats return stats
@@ -449,7 +627,7 @@ def _write_vnpy_db(values: list, label: str):
conn = sqlite3.connect(str(VNPY_DB_PATH), timeout=120) conn = sqlite3.connect(str(VNPY_DB_PATH), timeout=120)
c = conn.cursor() c = conn.cursor()
c.execute("PRAGMA journal_mode=WAL") c.execute("PRAGMA journal_mode=WAL")
c.execute(f"ATTACH DATABASE ? AS tmpdb", (local_tmp,)) c.execute("ATTACH DATABASE ? AS tmpdb", (local_tmp,))
c.execute("INSERT OR REPLACE INTO main.dbbardata SELECT * FROM tmpdb.dbbardata") c.execute("INSERT OR REPLACE INTO main.dbbardata SELECT * FROM tmpdb.dbbardata")
conn.commit() conn.commit()
c.execute("DETACH DATABASE tmpdb") c.execute("DETACH DATABASE tmpdb")
@@ -470,6 +648,27 @@ def _write_vnpy_db(values: list, label: str):
os.remove(local_tmp) os.remove(local_tmp)
# ======================== 告警与报告 ========================
def check_failure_rate(stats: dict, label: str) -> bool:
"""检查失败率,返回True表示异常"""
total = stats.get("updated", 0) + stats.get("failed", 0) + stats.get("skipped", 0)
failed = stats.get("failed", 0)
if total == 0:
return False
rate = failed / total
if rate > 0.05:
logger.error(
"❌ 数据更新异常 [%s]:失败率 %.1f%% (%d/%d)",
label, rate * 100, failed, total,
)
return True
if stats.get("source_aborted"):
logger.error("❌ [%s] 源不可用导致终止", label)
return True
return False
# ======================== 主入口 ======================== # ======================== 主入口 ========================
def main(): def main():
@@ -488,17 +687,30 @@ def main():
t_start = time.time() t_start = time.time()
report = {} report = {}
has_alert = False
# DB备份(硬伤3修复)
rotate_db_backup()
if not args.skip_daily: if not args.skip_daily:
report["daily"] = run_daily_update(codes) report["daily"] = run_daily_update(codes)
if check_failure_rate(report["daily"], "日线"):
has_alert = True
if not args.skip_15min: if not args.skip_15min:
report["15min"] = run_15min_update(codes) report["15min"] = run_15min_update(codes)
if check_failure_rate(report["15min"], "15min"):
has_alert = True
elapsed = time.time() - t_start elapsed = time.time() - t_start
report["elapsed_sec"] = round(elapsed, 1) report["elapsed_sec"] = round(elapsed, 1)
report["has_alert"] = has_alert
logger.info("=" * 60) logger.info("=" * 60)
logger.info("全部完成,耗时 %.1f", elapsed) if has_alert:
logger.error("⚠️ 本次更新存在异常,请检查日志")
else:
logger.info("✅ 全部完成,耗时 %.1f", elapsed)
logger.info(json.dumps(report, ensure_ascii=False, indent=2)) logger.info(json.dumps(report, ensure_ascii=False, indent=2))
# 写入报告 # 写入报告