auto-sync: 2026-05-02 18:47:13

This commit is contained in:
cfdaily
2026-05-02 18:47:13 +08:00
parent a67629203f
commit fd98ae057b
+96
View File
@@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""多源降级管理器 - 日线(akshare→腾讯) + 实时(新浪→东财→腾讯)"""
import pandas as pd
import urllib.request
import json
import logging
from datetime import datetime, timedelta
from typing import Optional
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
class FallbackManager:
def __init__(self):
self._source_used = ""
def get_source_used(self) -> str:
return self._source_used
def get_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
"""获取日线数据,降级链:akshare → 腾讯K线"""
# 1. akshare
try:
df = self._fetch_akshare_daily(symbol, start_date, end_date)
if df is not None and not df.empty:
self._source_used = "akshare"
return df
except Exception as e:
logger.warning(f"akshare日线失败 {symbol}: {e}")
# 2. 腾讯K线
try:
df = self._fetch_tencent_daily(symbol, start_date, end_date)
if df is not None and not df.empty:
self._source_used = "tencent_kline"
return df
except Exception as e:
logger.warning(f"腾讯K线失败 {symbol}: {e}")
raise RuntimeError(f"所有日线数据源失败: {symbol} {start_date}~{end_date}")
def get_realtime(self, symbol: str) -> dict:
"""获取实时行情,降级链:新浪→东财→腾讯"""
from realtime import get_realtime_quote
result = get_realtime_quote(symbol)
self._source_used = result.get("source", "unknown")
return result
def _fetch_akshare_daily(self, symbol: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
import akshare as ak
code = symbol.replace("SH", "").replace("SZ", "").replace("sh", "").replace("sz", "")
s = start_date.replace("-", "")
e = end_date.replace("-", "")
df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date=s, end_date=e, adjust="")
if df is None or df.empty:
return None
df = df.rename(columns={"日期": "date", "开盘": "open", "收盘": "close",
"最高": "high", "最低": "low", "成交量": "volume",
"成交额": "amount"})
df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
for c in ["open", "high", "low", "close", "volume", "amount"]:
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)
return df[["date", "open", "high", "low", "close", "volume", "amount"]]
def _fetch_tencent_daily(self, symbol: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
"""腾讯K线API获取日线"""
code = symbol.replace("SH", "").replace("SZ", "").replace("sh", "").replace("sz", "")
if code.startswith(("6", "5", "1")):
prefix = "sh"
else:
prefix = "sz"
tq_symbol = f"{prefix}{code}"
# 计算天数
days = (datetime.strptime(end_date, "%Y-%m-%d") - datetime.strptime(start_date, "%Y-%m-%d")).days + 10
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?_var=kline_dayqfq&param={tq_symbol},day,,{days},qfqa"
try:
with urllib.request.urlopen(url, timeout=10) as r:
raw = r.read().decode("gbk")
json_str = raw.split("=", 1)[1]
data = json.loads(json_str)
klines = data.get("data", {}).get(tq_symbol, {}).get("day", [])
if not klines:
return None
df = pd.DataFrame(klines, columns=["date", "open", "close", "high", "low", "volume"])
for c in ["open", "close", "high", "low", "volume"]:
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)
df["amount"] = 0.0
df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
# Filter date range
mask = (df["date"] >= start_date) & (df["date"] <= end_date)
return df.loc[mask, ["date", "open", "high", "low", "close", "volume", "amount"]]
except Exception as e:
logger.warning(f"腾讯K线请求失败: {e}")
return None