diff --git a/data_platform/realtime.py b/data_platform/realtime.py new file mode 100644 index 00000000..29152ee8 --- /dev/null +++ b/data_platform/realtime.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +"""实时行情三源降级 - 新浪→东财→腾讯""" +import urllib.request +import json +import re +import logging +from datetime import datetime +from typing import Optional + +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') +logger = logging.getLogger(__name__) + +HEADERS_SINA = { + "User-Agent": "Mozilla/5.0", "Referer": "https://finance.sina.com.cn", + "Accept-Language": "zh-CN,zh;q=0.9" +} +HEADERS_EM = {"Referer": "https://www.eastmoney.com"} +FETCHED_AT = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + +def _fetch_url(url: str, headers: dict = None, timeout: int = 10) -> str: + req = urllib.request.Request(url, headers=headers or {}) + try: + with urllib.request.urlopen(req, timeout=timeout) as r: + charset = "gbk" if "sina" in url or "sinajs" in url else "utf-8" + return r.read().decode(charset, errors="replace") + except Exception: + return "" + + +def _parse_sina(raw: str, symbol: str) -> Optional[dict]: + m = re.search(r'"([^"]*)"', raw) + if not m: + return None + parts = m.group(1).split(",") + if len(parts) < 32: + return None + try: + prev_close = float(parts[2]) if parts[2] else 0 + current = float(parts[3]) if parts[3] else 0 + return { + "symbol": symbol, "name": parts[0], + "current": round(current, 2), "prev_close": round(prev_close, 2), + "open": round(float(parts[1]), 2) if parts[1] else 0, + "high": round(float(parts[4]), 2) if parts[4] else 0, + "low": round(float(parts[5]), 2) if parts[5] else 0, + "volume": int(float(parts[8])) if parts[8] else 0, + "amount": round(float(parts[9]), 2) if parts[9] else 0, + "timestamp": f"{parts[30]} {parts[31]}" if len(parts) > 31 else "", + "source": "sina", "fetched_at": FETCHED_AT, + } + except (ValueError, IndexError): + return None + + +def _parse_tencent(raw: str, symbol: str) -> Optional[dict]: + m = re.search(r'"([^"]*)"', raw) + if not m: + return None + fields = m.group(1).split("~") + if len(fields) < 35: + return None + try: + current = float(fields[3]) + prev_close = float(fields[4]) + if current <= 0: + return None + return { + "symbol": symbol, "name": fields[1], + "current": round(current, 2), "prev_close": round(prev_close, 2), + "open": round(float(fields[5]), 2), + "high": round(float(fields[33]), 2) if fields[33] else 0, + "low": round(float(fields[34]), 2) if fields[34] else 0, + "volume": int(float(fields[6])) if fields[6] else 0, + "amount": round(float(fields[37]) * 10000, 2) if fields[37] else 0, + "timestamp": fields[30][:8] + " " + fields[30][8:] if fields[30] else "", + "source": "tencent", "fetched_at": FETCHED_AT, + } + except (ValueError, IndexError): + return None + + +def _parse_eastmoney(raw: str, symbol: str) -> Optional[dict]: + try: + obj = json.loads(raw) + d = obj.get("data", {}) or {} + if not d.get("f43"): + return None + return { + "symbol": symbol, "name": d.get("f58", ""), + "current": round(d["f43"] / 100, 2), + "prev_close": round(d["f60"] / 100, 2), + "open": round(d["f46"] / 100, 2), + "high": round(d["f44"] / 100, 2), + "low": round(d["f45"] / 100, 2), + "volume": d.get("f47", 0), + "amount": round(d.get("f48", 0) / 1e8, 2), + "timestamp": "", + "source": "eastmoney", "fetched_at": FETCHED_AT, + } + except Exception: + return None + + +def _get_prefix(code: str) -> str: + code = re.sub(r"[^0-9]", "", code) + if code.startswith(("60", "68", "51", "58", "11")): + return "sh", code + return "sz", code + + +def _em_secid(code: str) -> str: + code = re.sub(r"[^0-9]", "", code) + m = 1 if code.startswith(("60", "68")) else 0 + return f"{m}.{code}" + + +def get_realtime_quote(code: str) -> dict: + """获取实时行情,三源降级:新浪→东财→腾讯""" + prefix, clean = _get_prefix(code) + symbol = f"{prefix}{clean}" + + # 1. 新浪 + raw = _fetch_url(f"http://hq.sinajs.cn/list={symbol}", HEADERS_SINA) + if raw: + data = _parse_sina(raw, symbol) + if data and data["current"] > 0: + logger.info(f"新浪成功: {symbol} = {data['current']}") + return data + + # 2. 东财 + secid = _em_secid(code) + raw = _fetch_url( + f"http://push2.eastmoney.com/api/qt/stock/get?secid={secid}" + f"&fields=f43,f44,f45,f46,f47,f48,f57,f58,f60,f169,f170", + HEADERS_EM) + if raw: + data = _parse_eastmoney(raw, symbol) + if data and data["current"] > 0: + logger.info(f"东财成功: {symbol} = {data['current']}") + return data + + # 3. 腾讯 + raw = _fetch_url(f"http://qt.gtimg.cn/q={symbol}") + if raw: + data = _parse_tencent(raw, symbol) + if data and data["current"] > 0: + logger.info(f"腾讯成功: {symbol} = {data['current']}") + return data + + return {"error": f"所有数据源均无法获取 {code}", "symbol": symbol, "fetched_at": FETCHED_AT} + + +if __name__ == "__main__": + import sys + code = sys.argv[1] if len(sys.argv) > 1 else "600519" + result = get_realtime_quote(code) + for k, v in result.items(): + print(f" {k}: {v}")