#!/usr/bin/env python3 """实时行情三源降级 - 新浪→东财→腾讯""" import urllib.request import json import re import logging from datetime import datetime from typing import Optional logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') logger = logging.getLogger(__name__) HEADERS_SINA = { "User-Agent": "Mozilla/5.0", "Referer": "https://finance.sina.com.cn", "Accept-Language": "zh-CN,zh;q=0.9" } HEADERS_EM = {"Referer": "https://www.eastmoney.com"} FETCHED_AT = datetime.now().strftime("%Y-%m-%d %H:%M:%S") def _fetch_url(url: str, headers: dict = None, timeout: int = 10) -> str: req = urllib.request.Request(url, headers=headers or {}) try: with urllib.request.urlopen(req, timeout=timeout) as r: charset = "gbk" if "sina" in url or "sinajs" in url else "utf-8" return r.read().decode(charset, errors="replace") except Exception: return "" def _parse_sina(raw: str, symbol: str) -> Optional[dict]: m = re.search(r'"([^"]*)"', raw) if not m: return None parts = m.group(1).split(",") if len(parts) < 32: return None try: prev_close = float(parts[2]) if parts[2] else 0 current = float(parts[3]) if parts[3] else 0 return { "symbol": symbol, "name": parts[0], "current": round(current, 2), "prev_close": round(prev_close, 2), "open": round(float(parts[1]), 2) if parts[1] else 0, "high": round(float(parts[4]), 2) if parts[4] else 0, "low": round(float(parts[5]), 2) if parts[5] else 0, "volume": int(float(parts[8])) if parts[8] else 0, "amount": round(float(parts[9]), 2) if parts[9] else 0, "timestamp": f"{parts[30]} {parts[31]}" if len(parts) > 31 else "", "source": "sina", "fetched_at": FETCHED_AT, } except (ValueError, IndexError): return None def _parse_tencent(raw: str, symbol: str) -> Optional[dict]: m = re.search(r'"([^"]*)"', raw) if not m: return None fields = m.group(1).split("~") if len(fields) < 35: return None try: current = float(fields[3]) prev_close = float(fields[4]) if current <= 0: return None return { "symbol": symbol, "name": fields[1], "current": round(current, 2), "prev_close": round(prev_close, 2), "open": round(float(fields[5]), 2), "high": round(float(fields[33]), 2) if fields[33] else 0, "low": round(float(fields[34]), 2) if fields[34] else 0, "volume": int(float(fields[6])) if fields[6] else 0, "amount": round(float(fields[37]) * 10000, 2) if fields[37] else 0, "timestamp": fields[30][:8] + " " + fields[30][8:] if fields[30] else "", "source": "tencent", "fetched_at": FETCHED_AT, } except (ValueError, IndexError): return None def _parse_eastmoney(raw: str, symbol: str) -> Optional[dict]: try: obj = json.loads(raw) d = obj.get("data", {}) or {} if not d.get("f43"): return None return { "symbol": symbol, "name": d.get("f58", ""), "current": round(d["f43"] / 100, 2), "prev_close": round(d["f60"] / 100, 2), "open": round(d["f46"] / 100, 2), "high": round(d["f44"] / 100, 2), "low": round(d["f45"] / 100, 2), "volume": d.get("f47", 0), "amount": round(d.get("f48", 0) / 1e8, 2), "timestamp": "", "source": "eastmoney", "fetched_at": FETCHED_AT, } except Exception: return None def _get_prefix(code: str) -> str: code = re.sub(r"[^0-9]", "", code) if code.startswith(("60", "68", "51", "58", "11")): return "sh", code return "sz", code def _em_secid(code: str) -> str: code = re.sub(r"[^0-9]", "", code) m = 1 if code.startswith(("60", "68")) else 0 return f"{m}.{code}" def get_realtime_quote(code: str) -> dict: """获取实时行情,三源降级:新浪→东财→腾讯""" prefix, clean = _get_prefix(code) symbol = f"{prefix}{clean}" # 1. 新浪 raw = _fetch_url(f"http://hq.sinajs.cn/list={symbol}", HEADERS_SINA) if raw: data = _parse_sina(raw, symbol) if data and data["current"] > 0: logger.info(f"新浪成功: {symbol} = {data['current']}") return data # 2. 东财 secid = _em_secid(code) raw = _fetch_url( f"http://push2.eastmoney.com/api/qt/stock/get?secid={secid}" f"&fields=f43,f44,f45,f46,f47,f48,f57,f58,f60,f169,f170", HEADERS_EM) if raw: data = _parse_eastmoney(raw, symbol) if data and data["current"] > 0: logger.info(f"东财成功: {symbol} = {data['current']}") return data # 3. 腾讯 raw = _fetch_url(f"http://qt.gtimg.cn/q={symbol}") if raw: data = _parse_tencent(raw, symbol) if data and data["current"] > 0: logger.info(f"腾讯成功: {symbol} = {data['current']}") return data return {"error": f"所有数据源均无法获取 {code}", "symbol": symbol, "fetched_at": FETCHED_AT} if __name__ == "__main__": import sys code = sys.argv[1] if len(sys.argv) > 1 else "600519" result = get_realtime_quote(code) for k, v in result.items(): print(f" {k}: {v}")