Files
2026-05-02 19:49:48 +08:00

105 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""多源降级管理器 - 日线(akshare→腾讯) + 实时(新浪→东财→腾讯)"""
import pandas as pd
import urllib.request
import json
import logging
from datetime import datetime, timedelta
from typing import Optional
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
class FallbackManager:
def __init__(self):
self._source_used = ""
def get_source_used(self) -> str:
return self._source_used
def get_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
"""获取日线数据,降级链:akshare → 腾讯K线"""
# 1. akshare
try:
df = self._fetch_akshare_daily(symbol, start_date, end_date)
if df is not None and not df.empty:
self._source_used = "akshare"
return df
except Exception as e:
logger.warning(f"akshare日线失败 {symbol}: {e}")
# 2. 腾讯K线
try:
df = self._fetch_tencent_daily(symbol, start_date, end_date)
if df is not None and not df.empty:
self._source_used = "tencent_kline"
return df
except Exception as e:
logger.warning(f"腾讯K线失败 {symbol}: {e}")
raise RuntimeError(f"所有日线数据源失败: {symbol} {start_date}~{end_date}")
def get_realtime(self, symbol: str) -> dict:
"""获取实时行情,降级链:新浪→东财→腾讯"""
from realtime import get_realtime_quote
result = get_realtime_quote(symbol)
self._source_used = result.get("source", "unknown")
return result
def _fetch_akshare_daily(self, symbol: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
import akshare as ak
code = symbol.replace("SH", "").replace("SZ", "").replace("sh", "").replace("sz", "")
s = start_date.replace("-", "")
e = end_date.replace("-", "")
df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=s, end_date=e, adjust="")
if df is None or df.empty:
return None
df = df.rename(columns={"日期": "date", "开盘": "open", "收盘": "close",
"最高": "high", "最低": "low", "成交量": "volume",
"成交额": "amount"})
df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
for c in ["open", "high", "low", "close", "volume", "amount"]:
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)
return df[["date", "open", "high", "low", "close", "volume", "amount"]]
def _fetch_tencent_daily(self, symbol: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
"""腾讯K线API获取日线"""
code = symbol.replace("SH", "").replace("SZ", "").replace("sh", "").replace("sz", "")
if code.startswith(("6", "5", "1")):
prefix = "sh"
else:
prefix = "sz"
tq_symbol = f"{prefix}{code}"
days = (datetime.strptime(end_date, "%Y-%m-%d") - datetime.strptime(start_date, "%Y-%m-%d")).days + 10
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={tq_symbol},day,{start_date},,{days},"
try:
import urllib.request, json as _json
opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with opener.open(req, timeout=10) as r:
resp = _json.loads(r.read())
d = resp.get("data")
if not isinstance(d, dict):
return None
klines = d.get(tq_symbol, {}).get("day", [])
if not klines:
return None
df = pd.DataFrame(klines)
ncols = len(df.columns)
if ncols >= 7:
df.columns = ["date", "open", "close", "high", "low", "volume", "amount"][:ncols]
else:
df.columns = ["date", "open", "close", "high", "low", "volume"][:ncols]
if "amount" not in df.columns:
df["amount"] = 0.0
for c in ["open", "close", "high", "low", "volume", "amount"]:
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)
df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
mask = (df["date"] >= start_date) & (df["date"] <= end_date)
return df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]]
except Exception as e:
logger.warning(f"腾讯K线请求失败: {e}")
return None