auto-sync: 2026-05-02 19:03:00

This commit is contained in:
cfdaily
2026-05-02 19:03:00 +08:00
parent 78cf0b706e
commit 149a31f427
+48 -95
View File
@@ -1,5 +1,7 @@
#!/usr/bin/env python3
"""增量更新 - Parquet+vnpy DB双写"""
"""增量更新 - 补vnpy DB增量数据(腾讯K线主源)
Parquet双写暂缓(需要处理date类型兼容),优先补DB数据
"""
import os
import re
import sys
@@ -35,7 +37,7 @@ def get_all_symbols():
for f in (Path(DAILY_DIR) / latest_year).glob('*.parquet'):
code, exchange = parse_filename(f.name)
if code:
symbols.append((code, exchange, f.name))
symbols.append((code, exchange))
return symbols
@@ -57,84 +59,44 @@ def get_last_date(code: str, exchange: str) -> str:
return ""
def fetch_incremental(code: str, start_date: str, end_date: str):
"""获取增量数据:腾讯K线(主源,稳定无代理问题)"""
# 直接用腾讯K线API(akshare有代理问题,作为降级备源)
try:
import urllib.request
import json as _json
prefix = 'sh' if code.startswith(('6', '5', '1')) else 'sz'
tq = f"{prefix}{code}"
days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days},"
# 用无代理opener避免akshare代理污染
proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(proxy_handler)
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = opener.open(req, timeout=10)
raw = resp.read().decode('utf-8', errors='replace')
data = _json.loads(raw)
d = data.get('data')
if not isinstance(d, dict):
return None
klines = d.get(tq, {}).get('day', [])
if not klines:
return None
df = pd.DataFrame(klines, columns=['date', 'open', 'close', 'high', 'low', 'volume'])
for c in ['open', 'close', 'high', 'low', 'volume']:
df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0)
df['amount'] = 0.0
df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')
mask = (df['date'] >= start_date) & (df['date'] <= end_date)
result = df.loc[mask, ['date', 'open', 'high', 'low', 'close', 'volume', 'amount']]
if result.empty:
return None
return result
except Exception as e:
logger.warning(f'腾讯K线失败 {code}: {e}')
def fetch_tencent_daily(code: str, start_date: str, end_date: str):
"""腾讯K线API获取日线增量数据"""
import urllib.request
import json as _json
prefix = 'sh' if code.startswith(('6', '5', '1')) else 'sz'
tq = f"{prefix}{code}"
days = (pd.Timestamp(end_date) - pd.Timestamp(start_date)).days + 10
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq},day,{start_date},,{days},"
proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(proxy_handler)
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = opener.open(req, timeout=10)
raw = resp.read().decode('utf-8', errors='replace')
data = _json.loads(raw)
d = data.get('data')
if not isinstance(d, dict):
return None
klines = d.get(tq, {}).get('day', [])
if not klines:
return None
def append_to_parquet(code: str, exchange: str, new_data: pd.DataFrame):
"""原子写入:临时文件+rename,追加到对应年份目录"""
prefix = 'sh' if exchange == 'SSE' else 'sz'
for _, row in new_data.iterrows():
year = row['date'][:4]
year_dir = Path(DAILY_DIR) / year
year_dir.mkdir(parents=True, exist_ok=True)
fpath = year_dir / f"{prefix}{code}_daily.parquet"
if fpath.exists():
existing = pd.read_parquet(fpath)
combined = pd.concat([existing, pd.DataFrame([row])], ignore_index=True)
combined = combined.drop_duplicates(subset=['date'], keep='last')
combined = combined.sort_values('date').reset_index(drop=True)
else:
combined = pd.DataFrame([row])
tmp_path = str(fpath) + ".tmp"
combined.to_parquet(tmp_path, index=False)
os.rename(tmp_path, str(fpath))
def append_to_vnpy_db(code: str, exchange: str, new_data: pd.DataFrame):
"""写入vnpy DB (先本地/tmp,完成后复制到NAS)"""
values = []
for _, row in new_data.iterrows():
values.append((
code, exchange, str(row['date']), 'd',
float(row.get('volume', 0)), float(row.get('amount', 0)), 0.0,
float(row.get('open', 0)), float(row.get('high', 0)),
float(row.get('low', 0)), float(row.get('close', 0)),
))
return values
# kline format: [date, open, close, high, low, volume]
df = pd.DataFrame(klines, columns=['date', 'open', 'close', 'high', 'low', 'volume'])
for c in ['open', 'close', 'high', 'low', 'volume']:
df[c] = pd.to_numeric(df[c], errors='coerce').fillna(0)
df['amount'] = 0.0
df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')
mask = (df['date'] >= start_date) & (df['date'] <= end_date)
result = df.loc[mask, ['date', 'open', 'high', 'low', 'close', 'volume', 'amount']]
return result if not result.empty else None
def main():
today = datetime.now().strftime("%Y-%m-%d")
max_end = today # 不超过今天
logger.info(f"=== 增量更新开始 {today} ===")
logger.info(f"=== vnpy DB增量更新开始 {today} ===")
# 获取所有股票
symbols = get_all_symbols()
@@ -146,57 +108,49 @@ def main():
new_records = 0
all_db_values = []
for i, (code, exchange, fname) in enumerate(symbols):
# 获取最后日期
for i, (code, exchange) in enumerate(symbols):
last_date = get_last_date(code, exchange)
if not last_date:
skipped += 1
continue
# 计算需要补的起始日期(下一天)
next_day = (pd.Timestamp(last_date) + pd.Timedelta(days=1)).strftime("%Y-%m-%d")
if next_day > max_end:
if next_day > today:
skipped += 1
continue
# 获取增量数据
data = fetch_incremental(code, next_day, max_end)
data = fetch_tencent_daily(code, next_day, today)
if data is None or data.empty:
skipped += 1
continue
# 校验(简单fatal检查)
# 简单校验
if (data[['open', 'high', 'low', 'close']] <= 0).any().any():
logger.warning(f"{code} 有非正价格,跳过")
failed += 1
continue
# 写Parquet
try:
append_to_parquet(code, exchange, data)
except Exception as e:
logger.error(f"Parquet写入失败 {code}: {e}")
failed += 1
continue
# 收集vnpy DB数据
db_vals = append_to_vnpy_db(code, exchange, data)
all_db_values.extend(db_vals)
for _, row in data.iterrows():
all_db_values.append((
code, exchange, str(row['date']), 'd',
float(row.get('volume', 0)), float(row.get('amount', 0)), 0.0,
float(row.get('open', 0)), float(row.get('high', 0)),
float(row.get('low', 0)), float(row.get('close', 0)),
))
new_records += len(data)
updated += 1
if (i + 1) % 500 == 0:
logger.info(f"进度: {i+1}/{len(symbols)} updated={updated} skipped={skipped} failed={failed}")
logger.info(f"进度: {i+1}/{len(symbols)} updated={updated} skipped={skipped} failed={failed} records={new_records}")
# 限频:akshare 1秒间隔
time.sleep(0.5)
time.sleep(0.3)
# 写vnpy DB
if all_db_values:
logger.info(f"写入vnpy DB: {len(all_db_values)} 条记录")
try:
# 复制NAS DB到本地
shutil.copy2(VNPY_DB_PATH, LOCAL_DB_TMP)
conn = sqlite3.connect(LOCAL_DB_TMP)
c = conn.cursor()
@@ -214,7 +168,6 @@ def main():
conn.commit()
conn.close()
# 复制回NAS
shutil.copy2(LOCAL_DB_TMP, VNPY_DB_PATH)
logger.info("✅ vnpy DB更新完成")
except Exception as e: