197 lines
6.7 KiB
Python
197 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
"""增量更新 - 补vnpy DB增量数据(腾讯K线主源)
|
|
Parquet双写暂缓(需要处理date类型兼容),优先补DB数据
|
|
"""
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import sqlite3
|
|
import shutil
|
|
import logging
|
|
import time
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DAILY_DIR = "/Volumes/stock/A股数据/日线数据/daily/"
|
|
VNPY_DB_PATH = "/Volumes/stock/sanguo_vnpy/data/quant_trading.db"
|
|
LOCAL_DB_TMP = "/tmp/quant_trading_updater.db"
|
|
BATCH_SIZE = 50000
|
|
|
|
|
|
def parse_filename(filename):
|
|
m = re.match(r'(sh|sz)(\d{6})_daily\.parquet', filename)
|
|
if not m:
|
|
return None, None
|
|
return m.group(2), 'SSE' if m.group(1) == 'sh' else 'SZSE'
|
|
|
|
|
|
def get_all_symbols():
|
|
"""扫描最新年份目录获取所有股票代码"""
|
|
latest_year = max(d.name for d in Path(DAILY_DIR).iterdir() if d.is_dir() and d.name.isdigit())
|
|
symbols = []
|
|
for f in (Path(DAILY_DIR) / latest_year).glob('*.parquet'):
|
|
code, exchange = parse_filename(f.name)
|
|
if code:
|
|
symbols.append((code, exchange))
|
|
return symbols
|
|
|
|
|
|
def get_last_date(code: str, exchange: str) -> str:
|
|
"""获取某只股票在NAS Parquet中的最后日期"""
|
|
prefix = 'sh' if exchange == 'SSE' else 'sz'
|
|
for year_dir in sorted(Path(DAILY_DIR).iterdir(), reverse=True):
|
|
if not year_dir.is_dir() or not year_dir.name.isdigit():
|
|
continue
|
|
fpath = year_dir / f"{prefix}{code}_daily.parquet"
|
|
if fpath.exists():
|
|
try:
|
|
df = pd.read_parquet(fpath, columns=['date'])
|
|
if not df.empty:
|
|
last = df['date'].max()
|
|
return str(last)[:10]
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
|
|
def fetch_tencent_daily(code: str, start_date: str, end_date: str):
|
|
"""腾讯K线API获取日线增量数据"""
|
|
import urllib.request
|
|
import json as _json
|
|
|
|
prefix = 'sh' if code.startswith(('6', '5', '1')) else 'sz'
|
|
tq = f"{prefix}{code}"
|
|
days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10
|
|
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days},"
|
|
|
|
proxy_handler = urllib.request.ProxyHandler({})
|
|
opener = urllib.request.build_opener(proxy_handler)
|
|
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
|
resp = opener.open(req, timeout=10)
|
|
raw = resp.read().decode('utf-8', errors='replace')
|
|
data = _json.loads(raw)
|
|
d = data.get('data')
|
|
if not isinstance(d, dict):
|
|
return None
|
|
klines = d.get(tq, {}).get('day', [])
|
|
if not klines:
|
|
return None
|
|
|
|
# kline format: [date, open, close, high, low, volume] or 7 cols with amount
|
|
df = pd.DataFrame(klines)
|
|
ncols = len(df.columns)
|
|
if ncols >= 7:
|
|
df.columns = ['date', 'open', 'close', 'high', 'low', 'volume', 'amount'][:ncols]
|
|
else:
|
|
df.columns = ['date', 'open', 'close', 'high', 'low', 'volume'][:ncols]
|
|
if 'amount' not in df.columns:
|
|
df['amount'] = 0.0
|
|
for c in ['open', 'close', 'high', 'low', 'volume', 'amount']:
|
|
df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0)
|
|
df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')
|
|
mask = (df['date'] >= start_date) & (df['date'] <= end_date)
|
|
result = df.loc[mask, ['date', 'open', 'high', 'low', 'close', 'volume', 'amount']]
|
|
return result if not result.empty else None
|
|
|
|
|
|
def main():
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
logger.info(f"=== vnpy DB增量更新开始 {today} ===")
|
|
|
|
# 获取所有股票
|
|
symbols = get_all_symbols()
|
|
logger.info(f"扫描到 {len(symbols)} 只股票")
|
|
|
|
updated = 0
|
|
skipped = 0
|
|
failed = 0
|
|
new_records = 0
|
|
all_db_values = []
|
|
|
|
for i, (code, exchange) in enumerate(symbols):
|
|
last_date = get_last_date(code, exchange)
|
|
if not last_date:
|
|
skipped += 1
|
|
continue
|
|
|
|
next_day = (pd.Timestamp(last_date) + pd.Timedelta(days=1)).strftime("%Y-%m-%d")
|
|
if next_day > today:
|
|
skipped += 1
|
|
continue
|
|
|
|
data = fetch_tencent_daily(code, next_day, today)
|
|
if data is None or data.empty:
|
|
skipped += 1
|
|
continue
|
|
|
|
# 简单校验
|
|
if (data[['open', 'high', 'low', 'close']] <= 0).any().any():
|
|
logger.warning(f"{code} 有非正价格,跳过")
|
|
failed += 1
|
|
continue
|
|
|
|
# 收集vnpy DB数据
|
|
for _, row in data.iterrows():
|
|
all_db_values.append((
|
|
code, exchange, str(row['date']), 'd',
|
|
float(row.get('volume', 0)), float(row.get('amount', 0)), 0.0,
|
|
float(row.get('open', 0)), float(row.get('high', 0)),
|
|
float(row.get('low', 0)), float(row.get('close', 0)),
|
|
))
|
|
|
|
new_records += len(data)
|
|
updated += 1
|
|
|
|
if (i + 1) % 500 == 0:
|
|
logger.info(f"进度: {i+1}/{len(symbols)} updated={updated} skipped={skipped} failed={failed} records={new_records}")
|
|
|
|
time.sleep(0.3)
|
|
|
|
# 写vnpy DB
|
|
if all_db_values:
|
|
logger.info(f"写入vnpy DB: {len(all_db_values)} 条记录")
|
|
try:
|
|
shutil.copy2(VNPY_DB_PATH, LOCAL_DB_TMP)
|
|
conn = sqlite3.connect(LOCAL_DB_TMP)
|
|
c = conn.cursor()
|
|
for j in range(0, len(all_db_values), BATCH_SIZE):
|
|
c.executemany('''INSERT OR REPLACE INTO dbbardata
|
|
(symbol,exchange,datetime,interval,volume,turnover,open_interest,
|
|
open_price,high_price,low_price,close_price)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?)''', all_db_values[j:j+BATCH_SIZE])
|
|
conn.commit()
|
|
|
|
# 重建overview
|
|
c.execute('''INSERT OR REPLACE INTO dbbaroverview (symbol,exchange,interval,count,start,end)
|
|
SELECT symbol,exchange,interval,COUNT(*),MIN(datetime),MAX(datetime)
|
|
FROM dbbardata GROUP BY symbol,exchange,interval''')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
shutil.copy2(LOCAL_DB_TMP, VNPY_DB_PATH)
|
|
logger.info("✅ vnpy DB更新完成")
|
|
except Exception as e:
|
|
logger.error(f"❌ vnpy DB更新失败: {e}")
|
|
|
|
report = {
|
|
"date": today,
|
|
"total_symbols": len(symbols),
|
|
"updated": updated,
|
|
"skipped": skipped,
|
|
"failed": failed,
|
|
"new_records": new_records,
|
|
}
|
|
logger.info(f"=== 更新完成 ===")
|
|
logger.info(json.dumps(report, ensure_ascii=False, indent=2))
|
|
return report
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|