#!/usr/bin/env python3 """多源降级管理器 - 日线(akshare→腾讯) + 实时(新浪→东财→腾讯)""" import pandas as pd import urllib.request import json import logging from datetime import datetime, timedelta from typing import Optional logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') logger = logging.getLogger(__name__) class FallbackManager: def __init__(self): self._source_used = "" def get_source_used(self) -> str: return self._source_used def get_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame: """获取日线数据,降级链:akshare → 腾讯K线""" # 1. akshare try: df = self._fetch_akshare_daily(symbol, start_date, end_date) if df is not None and not df.empty: self._source_used = "akshare" return df except Exception as e: logger.warning(f"akshare日线失败 {symbol}: {e}") # 2. 腾讯K线 try: df = self._fetch_tencent_daily(symbol, start_date, end_date) if df is not None and not df.empty: self._source_used = "tencent_kline" return df except Exception as e: logger.warning(f"腾讯K线失败 {symbol}: {e}") raise RuntimeError(f"所有日线数据源失败: {symbol} {start_date}~{end_date}") def get_realtime(self, symbol: str) -> dict: """获取实时行情,降级链:新浪→东财→腾讯""" from realtime import get_realtime_quote result = get_realtime_quote(symbol) self._source_used = result.get("source", "unknown") return result def _fetch_akshare_daily(self, symbol: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: import akshare as ak code = symbol.replace("SH", "").replace("SZ", "").replace("sh", "").replace("sz", "") s = start_date.replace("-", "") e = end_date.replace("-", "") df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=s, end_date=e, adjust="") if df is None or df.empty: return None df = df.rename(columns={"日期": "date", "开盘": "open", "收盘": "close", "最高": "high", "最低": "low", "成交量": "volume", "成交额": "amount"}) df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") for c in ["open", "high", "low", "close", "volume", "amount"]: df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) return df[["date", "open", "high", "low", "close", "volume", "amount"]] def _fetch_tencent_daily(self, symbol: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: """腾讯K线API获取日线""" code = symbol.replace("SH", "").replace("SZ", "").replace("sh", "").replace("sz", "") if code.startswith(("6", "5", "1")): prefix = "sh" else: prefix = "sz" tq_symbol = f"{prefix}{code}" days = (datetime.strptime(end_date, "%Y-%m-%d") - datetime.strptime(start_date, "%Y-%m-%d")).days + 10 url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq_symbol},day,{start_date},,{days}," try: import urllib.request, json as _json opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with opener.open(req, timeout=10) as r: resp = _json.loads(r.read()) d = resp.get("data") if not isinstance(d, dict): return None klines = d.get(tq_symbol, {}).get("day", []) if not klines: return None df = pd.DataFrame(klines) ncols = len(df.columns) if ncols >= 7: df.columns = ["date", "open", "close", "high", "low", "volume", "amount"][:ncols] else: df.columns = ["date", "open", "close", "high", "low", "volume"][:ncols] if "amount" not in df.columns: df["amount"] = 0.0 for c in ["open", "close", "high", "low", "volume", "amount"]: df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") mask = (df["date"] >= start_date) & (df["date"] <= end_date) return df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]] except Exception as e: logger.warning(f"腾讯K线请求失败: {e}") return None