diff --git a/data_platform/daily_all_update.py b/data_platform/daily_all_update.py index e802af55..045e9d71 100644 --- a/data_platform/daily_all_update.py +++ b/data_platform/daily_all_update.py @@ -617,6 +617,7 @@ def run_daily_update(codes: List[str], local_conn: sqlite3.Connection) -> dict: for i, code in enumerate(todo): last_date = get_daily_last_date(code) if not last_date: + logger.debug("日线无历史数据,跳过(需先全量导入): %s", code) stats["skipped"] += 1 done_set.add(code) continue @@ -636,7 +637,7 @@ def run_daily_update(codes: List[str], local_conn: sqlite3.Connection) -> dict: if failed: stats["failed"] += 1 - done_set.add(code) + # 注意:失败不加done_set,下次运行可重试 if (i + 1) % 500 == 0: logger.info("日线进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) save_progress("daily", done_set) @@ -647,7 +648,7 @@ def run_daily_update(codes: List[str], local_conn: sqlite3.Connection) -> dict: data[c] = pd.to_numeric(data[c], errors="coerce") if (data[["close", "open"]] <= 0).any().any(): stats["failed"] += 1 - done_set.add(code) + # 校验失败也不加done_set continue try: @@ -789,7 +790,7 @@ def run_15min_update(codes: List[str], local_conn: sqlite3.Connection) -> dict: if failed: stats["failed"] += 1 - done_set.add(code) + # 注意:失败不加done_set,下次运行可重试 if (i + 1) % 500 == 0: logger.info("15min进度: %d/%d updated=%d failed=%d", i + 1, len(todo), stats["updated"], stats["failed"]) save_progress("15min", done_set) @@ -807,7 +808,7 @@ def run_15min_update(codes: List[str], local_conn: sqlite3.Connection) -> dict: df_new = df_new[~bad] if df_new.empty: stats["failed"] += 1 - done_set.add(code) + # 校验失败也不加done_set continue # 增量合并 @@ -982,11 +983,7 @@ def sync_db_to_nas(): os.rename(nas_path, old_path) os.rename(new_path, nas_path) logger.info("✅ DB同步完成(mv原子重命名)") - - # 清理旧文件 - if os.path.exists(old_path): - os.remove(old_path) - logger.info(" 清理旧文件: %s", old_path) + # 注意:.old备份不立即清理,下次sync时清理(保留回退路径) except Exception as e: logger.error("❌ DB同步失败: %s,尝试恢复", e) # 恢复:把old改回来