diff --git a/data_platform/updater.py b/data_platform/updater.py index 858b6611..a83f287d 100644 --- a/data_platform/updater.py +++ b/data_platform/updater.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 -"""增量更新 - Parquet+vnpy DB双写""" +"""增量更新 - 补vnpy DB增量数据(腾讯K线主源) +Parquet双写暂缓(需要处理date类型兼容),优先补DB数据 +""" import os import re import sys @@ -35,7 +37,7 @@ def get_all_symbols(): for f in (Path(DAILY_DIR) / latest_year).glob('*.parquet'): code, exchange = parse_filename(f.name) if code: - symbols.append((code, exchange, f.name)) + symbols.append((code, exchange)) return symbols @@ -57,84 +59,44 @@ def get_last_date(code: str, exchange: str) -> str: return "" -def fetch_incremental(code: str, start_date: str, end_date: str): - """获取增量数据:腾讯K线(主源,稳定无代理问题)""" - # 直接用腾讯K线API(akshare有代理问题,作为降级备源) - try: - import urllib.request - import json as _json - prefix = 'sh' if code.startswith(('6', '5', '1')) else 'sz' - tq = f"{prefix}{code}" - days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10 - url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days}," - # 用无代理opener避免akshare代理污染 - proxy_handler = urllib.request.ProxyHandler({}) - opener = urllib.request.build_opener(proxy_handler) - req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) - resp = opener.open(req, timeout=10) - raw = resp.read().decode('utf-8', errors='replace') - data = _json.loads(raw) - d = data.get('data') - if not isinstance(d, dict): - return None - klines = d.get(tq, {}).get('day', []) - if not klines: - return None - df = pd.DataFrame(klines, columns=['date', 'open', 'close', 'high', 'low', 'volume']) - for c in ['open', 'close', 'high', 'low', 'volume']: - df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0) - df['amount'] = 0.0 - df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d') - mask = (df['date'] >= start_date) & (df['date'] <= end_date) - result = df.loc[mask, ['date', 'open', 'high', 'low', 'close', 'volume', 'amount']] - if result.empty: - return None - return result - except Exception as e: - logger.warning(f'腾讯K线失败 {code}: {e}') +def fetch_tencent_daily(code: str, start_date: str, end_date: str): + """腾讯K线API获取日线增量数据""" + import urllib.request + import json as _json + + prefix = 'sh' if code.startswith(('6', '5', '1')) else 'sz' + tq = f"{prefix}{code}" + days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10 + url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days}," + + proxy_handler = urllib.request.ProxyHandler({}) + opener = urllib.request.build_opener(proxy_handler) + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + resp = opener.open(req, timeout=10) + raw = resp.read().decode('utf-8', errors='replace') + data = _json.loads(raw) + d = data.get('data') + if not isinstance(d, dict): + return None + klines = d.get(tq, {}).get('day', []) + if not klines: return None - -def append_to_parquet(code: str, exchange: str, new_data: pd.DataFrame): - """原子写入:临时文件+rename,追加到对应年份目录""" - prefix = 'sh' if exchange == 'SSE' else 'sz' - for _, row in new_data.iterrows(): - year = row['date'][:4] - year_dir = Path(DAILY_DIR) / year - year_dir.mkdir(parents=True, exist_ok=True) - fpath = year_dir / f"{prefix}{code}_daily.parquet" - - if fpath.exists(): - existing = pd.read_parquet(fpath) - combined = pd.concat([existing, pd.DataFrame([row])], ignore_index=True) - combined = combined.drop_duplicates(subset=['date'], keep='last') - combined = combined.sort_values('date').reset_index(drop=True) - else: - combined = pd.DataFrame([row]) - - tmp_path = str(fpath) + ".tmp" - combined.to_parquet(tmp_path, index=False) - os.rename(tmp_path, str(fpath)) - - -def append_to_vnpy_db(code: str, exchange: str, new_data: pd.DataFrame): - """写入vnpy DB (先本地/tmp,完成后复制到NAS)""" - values = [] - for _, row in new_data.iterrows(): - values.append(( - code, exchange, str(row['date']), 'd', - float(row.get('volume', 0)), float(row.get('amount', 0)), 0.0, - float(row.get('open', 0)), float(row.get('high', 0)), - float(row.get('low', 0)), float(row.get('close', 0)), - )) - return values + # kline format: [date, open, close, high, low, volume] + df = pd.DataFrame(klines, columns=['date', 'open', 'close', 'high', 'low', 'volume']) + for c in ['open', 'close', 'high', 'low', 'volume']: + df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0) + df['amount'] = 0.0 + df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d') + mask = (df['date'] >= start_date) & (df['date'] <= end_date) + result = df.loc[mask, ['date', 'open', 'high', 'low', 'close', 'volume', 'amount']] + return result if not result.empty else None def main(): today = datetime.now().strftime("%Y-%m-%d") - max_end = today # 不超过今天 - logger.info(f"=== 增量更新开始 {today} ===") + logger.info(f"=== vnpy DB增量更新开始 {today} ===") # 获取所有股票 symbols = get_all_symbols() @@ -146,57 +108,49 @@ def main(): new_records = 0 all_db_values = [] - for i, (code, exchange, fname) in enumerate(symbols): - # 获取最后日期 + for i, (code, exchange) in enumerate(symbols): last_date = get_last_date(code, exchange) if not last_date: skipped += 1 continue - # 计算需要补的起始日期(下一天) next_day = (pd.Timestamp(last_date) + pd.Timedelta(days=1)).strftime("%Y-%m-%d") - if next_day > max_end: + if next_day > today: skipped += 1 continue - # 获取增量数据 - data = fetch_incremental(code, next_day, max_end) + data = fetch_tencent_daily(code, next_day, today) if data is None or data.empty: skipped += 1 continue - # 校验(简单fatal检查) + # 简单校验 if (data[['open', 'high', 'low', 'close']] <= 0).any().any(): logger.warning(f"{code} 有非正价格,跳过") failed += 1 continue - # 写Parquet - try: - append_to_parquet(code, exchange, data) - except Exception as e: - logger.error(f"Parquet写入失败 {code}: {e}") - failed += 1 - continue - # 收集vnpy DB数据 - db_vals = append_to_vnpy_db(code, exchange, data) - all_db_values.extend(db_vals) + for _, row in data.iterrows(): + all_db_values.append(( + code, exchange, str(row['date']), 'd', + float(row.get('volume', 0)), float(row.get('amount', 0)), 0.0, + float(row.get('open', 0)), float(row.get('high', 0)), + float(row.get('low', 0)), float(row.get('close', 0)), + )) new_records += len(data) updated += 1 if (i + 1) % 500 == 0: - logger.info(f"进度: {i+1}/{len(symbols)} updated={updated} skipped={skipped} failed={failed}") + logger.info(f"进度: {i+1}/{len(symbols)} updated={updated} skipped={skipped} failed={failed} records={new_records}") - # 限频:akshare 1秒间隔 - time.sleep(0.5) + time.sleep(0.3) # 写vnpy DB if all_db_values: logger.info(f"写入vnpy DB: {len(all_db_values)} 条记录") try: - # 复制NAS DB到本地 shutil.copy2(VNPY_DB_PATH, LOCAL_DB_TMP) conn = sqlite3.connect(LOCAL_DB_TMP) c = conn.cursor() @@ -214,7 +168,6 @@ def main(): conn.commit() conn.close() - # 复制回NAS shutil.copy2(LOCAL_DB_TMP, VNPY_DB_PATH) logger.info("✅ vnpy DB更新完成") except Exception as e: