Files
sanguo_vnpy/data_platform/updater.py
T
2026-05-02 19:56:18 +08:00

197 lines
6.9 KiB
Python

#!/usr/bin/env python3
"""增量更新 - 补vnpy DB增量数据(腾讯K线主源)
Parquet双写暂缓(需要处理date类型兼容),优先补DB数据
"""
import os
import re
import sys
import json
import sqlite3
import shutil
import logging
import time
import pandas as pd
from pathlib import Path
from datetime import datetime
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
DAILY_DIR = "/Volumes/stock/A股数据/日线数据/daily/"
VNPY_DB_PATH = "/Volumes/stock/sanguo_vnpy/data/quant_trading.db"
LOCAL_DB_TMP = "/tmp/quant_trading_updater.db"
BATCH_SIZE = 50000
def parse_filename(filename):
m = re.match(r'(sh|sz)(\d{6})_daily\.parquet', filename)
if not m:
return None, None
return m.group(2), 'SSE' if m.group(1) == 'sh' else 'SZSE'
def get_all_symbols():
"""扫描最新年份目录获取所有股票代码"""
latest_year = max(d.name for d in Path(DAILY_DIR).iterdir() if d.is_dir() and d.name.isdigit())
symbols = []
for f in (Path(DAILY_DIR) / latest_year).glob('*.parquet'):
code, exchange = parse_filename(f.name)
if code:
symbols.append((code, exchange))
return symbols
def get_last_date(code: str, exchange: str) -> str:
"""获取某只股票在NAS Parquet中的最后日期"""
prefix = 'sh' if exchange == 'SSE' else 'sz'
for year_dir in sorted(Path(DAILY_DIR).iterdir(), reverse=True):
if not year_dir.is_dir() or not year_dir.name.isdigit():
continue
fpath = year_dir / f"{prefix}{code}_daily.parquet"
if fpath.exists():
try:
df = pd.read_parquet(fpath, columns=['date'])
if not df.empty:
last = df['date'].max()
return str(last)[:10]
except Exception:
pass
return ""
def fetch_tencent_daily(code: str, start_date: str, end_date: str):
"""腾讯K线API获取日线增量数据"""
import urllib.request
import json as _json
prefix = 'sh' if code.startswith(('6', '5', '1')) else 'sz'
tq = f"{prefix}{code}"
days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days},"
proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(proxy_handler)
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = opener.open(req, timeout=10)
raw = resp.read().decode('utf-8', errors='replace')
data = _json.loads(raw)
d = data.get('data')
if not isinstance(d, dict):
return None
klines = d.get(tq, {}).get('day', [])
if not klines:
return None
# kline format: [date, open, close, high, low, volume] or 7 cols with amount
df = pd.DataFrame(klines)
ncols = len(df.columns)
if ncols >= 7:
df.columns = ['date', 'open', 'close', 'high', 'low', 'volume', 'amount'][:ncols]
else:
df.columns = ['date', 'open', 'close', 'high', 'low', 'volume'][:ncols]
if 'amount' not in df.columns:
df['amount'] = 0.0
for c in ['open', 'close', 'high', 'low', 'volume', 'amount']:
df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0)
df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')
mask = (df['date'] >= start_date) & (df['date'] <= end_date)
result = df.loc[mask, ['date', 'open', 'high', 'low', 'close', 'volume', 'amount']]
return result if not result.empty else None
def main():
today = datetime.now().strftime("%Y-%m-%d")
logger.info(f"=== vnpy DB增量更新开始 {today} ===")
# 获取所有股票
symbols = get_all_symbols()
logger.info(f"扫描到 {len(symbols)} 只股票")
updated = 0
skipped = 0
failed = 0
new_records = 0
all_db_values = []
for i, (code, exchange) in enumerate(symbols):
last_date = get_last_date(code, exchange)
if not last_date:
skipped += 1
continue
next_day = (pd.Timestamp(last_date) + pd.Timedelta(days=1)).strftime("%Y-%m-%d")
if next_day > today:
skipped += 1
continue
data = fetch_tencent_daily(code, next_day, today)
if data is None or data.empty:
skipped += 1
continue
# 简单校验
if (data[['open', 'high', 'low', 'close']] <= 0).any().any():
logger.warning(f"{code} 有非正价格,跳过")
failed += 1
continue
# 收集vnpy DB数据
for _, row in data.iterrows():
all_db_values.append((
code, exchange, str(row['date']), 'd',
float(row.get('volume', 0)), float(row.get('amount', 0)), 0.0,
float(row.get('open', 0)), float(row.get('high', 0)),
float(row.get('low', 0)), float(row.get('close', 0)),
))
new_records += len(data)
updated += 1
if (i + 1) % 500 == 0:
logger.info(f"进度: {i+1}/{len(symbols)} updated={updated} skipped={skipped} failed={failed} records={new_records}")
time.sleep(0.3)
# 写vnpy DB - 直接在NAS上追加增量(避免1.4GB全量copy)
if all_db_values:
logger.info(f"写入vnpy DB: {len(all_db_values)} 条记录")
try:
# 方式1: 直接在NAS DB上INSERT OR REPLACE(增量数据量小,SMB可接受)
conn = sqlite3.connect(VNPY_DB_PATH, timeout=120)
c = conn.cursor()
c.execute("PRAGMA journal_mode=WAL")
for j in range(0, len(all_db_values), BATCH_SIZE):
c.executemany('''INSERT OR REPLACE INTO dbbardata
(symbol,exchange,datetime,interval,volume,turnover,open_interest,
open_price,high_price,low_price,close_price)
VALUES (?,?,?,?,?,?,?,?,?,?,?)''', all_db_values[j:j+BATCH_SIZE])
conn.commit()
logger.info(f" 写入批次 {j//BATCH_SIZE + 1}/{(len(all_db_values)-1)//BATCH_SIZE + 1}")
# 重建overview
c.execute('''INSERT OR REPLACE INTO dbbaroverview (symbol,exchange,interval,count,start,end)
SELECT symbol,exchange,interval,COUNT(*),MIN(datetime),MAX(datetime)
FROM dbbardata GROUP BY symbol,exchange,interval''')
conn.commit()
conn.close()
logger.info("✅ vnpy DB更新完成")
except Exception as e:
logger.error(f"❌ vnpy DB更新失败: {e}")
report = {
"date": today,
"total_symbols": len(symbols),
"updated": updated,
"skipped": skipped,
"failed": failed,
"new_records": new_records,
}
logger.info(f"=== 更新完成 ===")
logger.info(json.dumps(report, ensure_ascii=False, indent=2))
return report
if __name__ == "__main__":
main()