#!/usr/bin/env python3 """增量更新 - 补vnpy DB增量数据(腾讯K线主源) Parquet双写暂缓(需要处理date类型兼容),优先补DB数据 """ import os import re import sys import json import sqlite3 import shutil import logging import time import pandas as pd from pathlib import Path from datetime import datetime logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') logger = logging.getLogger(__name__) DAILY_DIR = "/Volumes/stock/A股数据/日线数据/daily/" VNPY_DB_PATH = "/Volumes/stock/sanguo_vnpy/data/quant_trading.db" LOCAL_DB_TMP = "/tmp/quant_trading_updater.db" BATCH_SIZE = 50000 def parse_filename(filename): m = re.match(r'(sh|sz)(\d{6})_daily\.parquet', filename) if not m: return None, None return m.group(2), 'SSE' if m.group(1) == 'sh' else 'SZSE' def get_all_symbols(): """扫描最新年份目录获取所有股票代码""" latest_year = max(d.name for d in Path(DAILY_DIR).iterdir() if d.is_dir() and d.name.isdigit()) symbols = [] for f in (Path(DAILY_DIR) / latest_year).glob('*.parquet'): code, exchange = parse_filename(f.name) if code: symbols.append((code, exchange)) return symbols def get_last_date(code: str, exchange: str) -> str: """获取某只股票在NAS Parquet中的最后日期""" prefix = 'sh' if exchange == 'SSE' else 'sz' for year_dir in sorted(Path(DAILY_DIR).iterdir(), reverse=True): if not year_dir.is_dir() or not year_dir.name.isdigit(): continue fpath = year_dir / f"{prefix}{code}_daily.parquet" if fpath.exists(): try: df = pd.read_parquet(fpath, columns=['date']) if not df.empty: last = df['date'].max() return str(last)[:10] except Exception: pass return "" def fetch_tencent_daily(code: str, start_date: str, end_date: str): """腾讯K线API获取日线增量数据""" import urllib.request import json as _json prefix = 'sh' if code.startswith(('6', '5', '1')) else 'sz' tq = f"{prefix}{code}" days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10 url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days}," proxy_handler = urllib.request.ProxyHandler({}) opener = urllib.request.build_opener(proxy_handler) req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) resp = opener.open(req, timeout=10) raw = resp.read().decode('utf-8', errors='replace') data = _json.loads(raw) d = data.get('data') if not isinstance(d, dict): return None klines = d.get(tq, {}).get('day', []) if not klines: return None # kline format: [date, open, close, high, low, volume] or 7 cols with amount df = pd.DataFrame(klines) ncols = len(df.columns) if ncols >= 7: df.columns = ['date', 'open', 'close', 'high', 'low', 'volume', 'amount'][:ncols] else: df.columns = ['date', 'open', 'close', 'high', 'low', 'volume'][:ncols] if 'amount' not in df.columns: df['amount'] = 0.0 for c in ['open', 'close', 'high', 'low', 'volume', 'amount']: df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0) df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d') mask = (df['date'] >= start_date) & (df['date'] <= end_date) result = df.loc[mask, ['date', 'open', 'high', 'low', 'close', 'volume', 'amount']] return result if not result.empty else None def main(): today = datetime.now().strftime("%Y-%m-%d") logger.info(f"=== vnpy DB增量更新开始 {today} ===") # 获取所有股票 symbols = get_all_symbols() logger.info(f"扫描到 {len(symbols)} 只股票") updated = 0 skipped = 0 failed = 0 new_records = 0 all_db_values = [] for i, (code, exchange) in enumerate(symbols): last_date = get_last_date(code, exchange) if not last_date: skipped += 1 continue next_day = (pd.Timestamp(last_date) + pd.Timedelta(days=1)).strftime("%Y-%m-%d") if next_day > today: skipped += 1 continue data = fetch_tencent_daily(code, next_day, today) if data is None or data.empty: skipped += 1 continue # 简单校验 if (data[['open', 'high', 'low', 'close']] <= 0).any().any(): logger.warning(f"{code} 有非正价格,跳过") failed += 1 continue # 收集vnpy DB数据 for _, row in data.iterrows(): all_db_values.append(( code, exchange, str(row['date']), 'd', float(row.get('volume', 0)), float(row.get('amount', 0)), 0.0, float(row.get('open', 0)), float(row.get('high', 0)), float(row.get('low', 0)), float(row.get('close', 0)), )) new_records += len(data) updated += 1 if (i + 1) % 500 == 0: logger.info(f"进度: {i+1}/{len(symbols)} updated={updated} skipped={skipped} failed={failed} records={new_records}") time.sleep(0.3) # 写vnpy DB - 直接在NAS上追加增量(避免1.4GB全量copy) if all_db_values: logger.info(f"写入vnpy DB: {len(all_db_values)} 条记录") try: # 方式1: 直接在NAS DB上INSERT OR REPLACE(增量数据量小,SMB可接受) conn = sqlite3.connect(VNPY_DB_PATH, timeout=120) c = conn.cursor() c.execute("PRAGMA journal_mode=WAL") for j in range(0, len(all_db_values), BATCH_SIZE): c.executemany('''INSERT OR REPLACE INTO dbbardata (symbol,exchange,datetime,interval,volume,turnover,open_interest, open_price,high_price,low_price,close_price) VALUES (?,?,?,?,?,?,?,?,?,?,?)''', all_db_values[j:j+BATCH_SIZE]) conn.commit() logger.info(f" 写入批次 {j//BATCH_SIZE + 1}/{(len(all_db_values)-1)//BATCH_SIZE + 1}") # 重建overview c.execute('''INSERT OR REPLACE INTO dbbaroverview (symbol,exchange,interval,count,start,end) SELECT symbol,exchange,interval,COUNT(*),MIN(datetime),MAX(datetime) FROM dbbardata GROUP BY symbol,exchange,interval''') conn.commit() conn.close() logger.info("✅ vnpy DB更新完成") except Exception as e: logger.error(f"❌ vnpy DB更新失败: {e}") report = { "date": today, "total_symbols": len(symbols), "updated": updated, "skipped": skipped, "failed": failed, "new_records": new_records, } logger.info(f"=== 更新完成 ===") logger.info(json.dumps(report, ensure_ascii=False, indent=2)) return report if __name__ == "__main__": main()