From fd98ae057b866801e79ad3b7dea464a27a712fd4 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sat, 2 May 2026 18:47:13 +0800 Subject: [PATCH] auto-sync: 2026-05-02 18:47:13 --- data_platform/fallback.py | 96 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 data_platform/fallback.py diff --git a/data_platform/fallback.py b/data_platform/fallback.py new file mode 100644 index 00000000..694cdf94 --- /dev/null +++ b/data_platform/fallback.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +"""多源降级管理器 - 日线(akshare→腾讯) + 实时(新浪→东财→腾讯)""" +import pandas as pd +import urllib.request +import json +import logging +from datetime import datetime, timedelta +from typing import Optional + +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') +logger = logging.getLogger(__name__) + + +class FallbackManager: + def __init__(self): + self._source_used = "" + + def get_source_used(self) -> str: + return self._source_used + + def get_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame: + """获取日线数据,降级链:akshare → 腾讯K线""" + # 1. akshare + try: + df = self._fetch_akshare_daily(symbol, start_date, end_date) + if df is not None and not df.empty: + self._source_used = "akshare" + return df + except Exception as e: + logger.warning(f"akshare日线失败 {symbol}: {e}") + + # 2. 腾讯K线 + try: + df = self._fetch_tencent_daily(symbol, start_date, end_date) + if df is not None and not df.empty: + self._source_used = "tencent_kline" + return df + except Exception as e: + logger.warning(f"腾讯K线失败 {symbol}: {e}") + + raise RuntimeError(f"所有日线数据源失败: {symbol} {start_date}~{end_date}") + + def get_realtime(self, symbol: str) -> dict: + """获取实时行情,降级链:新浪→东财→腾讯""" + from realtime import get_realtime_quote + result = get_realtime_quote(symbol) + self._source_used = result.get("source", "unknown") + return result + + def _fetch_akshare_daily(self, symbol: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: + import akshare as ak + code = symbol.replace("SH", "").replace("SZ", "").replace("sh", "").replace("sz", "") + s = start_date.replace("-", "") + e = end_date.replace("-", "") + df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=s, end_date=e, adjust="") + if df is None or df.empty: + return None + df = df.rename(columns={"日期": "date", "开盘": "open", "收盘": "close", + "最高": "high", "最低": "low", "成交量": "volume", + "成交额": "amount"}) + df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") + for c in ["open", "high", "low", "close", "volume", "amount"]: + df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) + return df[["date", "open", "high", "low", "close", "volume", "amount"]] + + def _fetch_tencent_daily(self, symbol: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: + """腾讯K线API获取日线""" + code = symbol.replace("SH", "").replace("SZ", "").replace("sh", "").replace("sz", "") + if code.startswith(("6", "5", "1")): + prefix = "sh" + else: + prefix = "sz" + tq_symbol = f"{prefix}{code}" + + # 计算天数 + days = (datetime.strptime(end_date, "%Y-%m-%d") - datetime.strptime(start_date, "%Y-%m-%d")).days + 10 + url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?_var=kline_dayqfq¶m={tq_symbol},day,,{days},qfqa" + try: + with urllib.request.urlopen(url, timeout=10) as r: + raw = r.read().decode("gbk") + json_str = raw.split("=", 1)[1] + data = json.loads(json_str) + klines = data.get("data", {}).get(tq_symbol, {}).get("day", []) + if not klines: + return None + df = pd.DataFrame(klines, columns=["date", "open", "close", "high", "low", "volume"]) + for c in ["open", "close", "high", "low", "volume"]: + df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) + df["amount"] = 0.0 + df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") + # Filter date range + mask = (df["date"] >= start_date) & (df["date"] <= end_date) + return df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] + except Exception as e: + logger.warning(f"腾讯K线请求失败: {e}") + return None