160 lines
5.4 KiB
Python
160 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
|
"""实时行情三源降级 - 新浪→东财→腾讯"""
|
|
import urllib.request
|
|
import json
|
|
import re
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
HEADERS_SINA = {
|
|
"User-Agent": "Mozilla/5.0", "Referer": "https://finance.sina.com.cn",
|
|
"Accept-Language": "zh-CN,zh;q=0.9"
|
|
}
|
|
HEADERS_EM = {"Referer": "https://www.eastmoney.com"}
|
|
FETCHED_AT = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def _fetch_url(url: str, headers: dict = None, timeout: int = 10) -> str:
|
|
req = urllib.request.Request(url, headers=headers or {})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as r:
|
|
charset = "gbk" if "sina" in url or "sinajs" in url else "utf-8"
|
|
return r.read().decode(charset, errors="replace")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _parse_sina(raw: str, symbol: str) -> Optional[dict]:
|
|
m = re.search(r'"([^"]*)"', raw)
|
|
if not m:
|
|
return None
|
|
parts = m.group(1).split(",")
|
|
if len(parts) < 32:
|
|
return None
|
|
try:
|
|
prev_close = float(parts[2]) if parts[2] else 0
|
|
current = float(parts[3]) if parts[3] else 0
|
|
return {
|
|
"symbol": symbol, "name": parts[0],
|
|
"current": round(current, 2), "prev_close": round(prev_close, 2),
|
|
"open": round(float(parts[1]), 2) if parts[1] else 0,
|
|
"high": round(float(parts[4]), 2) if parts[4] else 0,
|
|
"low": round(float(parts[5]), 2) if parts[5] else 0,
|
|
"volume": int(float(parts[8])) if parts[8] else 0,
|
|
"amount": round(float(parts[9]), 2) if parts[9] else 0,
|
|
"timestamp": f"{parts[30]} {parts[31]}" if len(parts) > 31 else "",
|
|
"source": "sina", "fetched_at": FETCHED_AT,
|
|
}
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
def _parse_tencent(raw: str, symbol: str) -> Optional[dict]:
|
|
m = re.search(r'"([^"]*)"', raw)
|
|
if not m:
|
|
return None
|
|
fields = m.group(1).split("~")
|
|
if len(fields) < 35:
|
|
return None
|
|
try:
|
|
current = float(fields[3])
|
|
prev_close = float(fields[4])
|
|
if current <= 0:
|
|
return None
|
|
return {
|
|
"symbol": symbol, "name": fields[1],
|
|
"current": round(current, 2), "prev_close": round(prev_close, 2),
|
|
"open": round(float(fields[5]), 2),
|
|
"high": round(float(fields[33]), 2) if fields[33] else 0,
|
|
"low": round(float(fields[34]), 2) if fields[34] else 0,
|
|
"volume": int(float(fields[6])) if fields[6] else 0,
|
|
"amount": round(float(fields[37]) * 10000, 2) if fields[37] else 0,
|
|
"timestamp": fields[30][:8] + " " + fields[30][8:] if fields[30] else "",
|
|
"source": "tencent", "fetched_at": FETCHED_AT,
|
|
}
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
def _parse_eastmoney(raw: str, symbol: str) -> Optional[dict]:
|
|
try:
|
|
obj = json.loads(raw)
|
|
d = obj.get("data", {}) or {}
|
|
if not d.get("f43"):
|
|
return None
|
|
return {
|
|
"symbol": symbol, "name": d.get("f58", ""),
|
|
"current": round(d["f43"] / 100, 2),
|
|
"prev_close": round(d["f60"] / 100, 2),
|
|
"open": round(d["f46"] / 100, 2),
|
|
"high": round(d["f44"] / 100, 2),
|
|
"low": round(d["f45"] / 100, 2),
|
|
"volume": d.get("f47", 0),
|
|
"amount": round(d.get("f48", 0) / 1e8, 2),
|
|
"timestamp": "",
|
|
"source": "eastmoney", "fetched_at": FETCHED_AT,
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _get_prefix(code: str) -> str:
|
|
code = re.sub(r"[^0-9]", "", code)
|
|
if code.startswith(("60", "68", "51", "58", "11")):
|
|
return "sh", code
|
|
return "sz", code
|
|
|
|
|
|
def _em_secid(code: str) -> str:
|
|
code = re.sub(r"[^0-9]", "", code)
|
|
m = 1 if code.startswith(("60", "68")) else 0
|
|
return f"{m}.{code}"
|
|
|
|
|
|
def get_realtime_quote(code: str) -> dict:
|
|
"""获取实时行情,三源降级:新浪→东财→腾讯"""
|
|
prefix, clean = _get_prefix(code)
|
|
symbol = f"{prefix}{clean}"
|
|
|
|
# 1. 新浪
|
|
raw = _fetch_url(f"http://hq.sinajs.cn/list={symbol}", HEADERS_SINA)
|
|
if raw:
|
|
data = _parse_sina(raw, symbol)
|
|
if data and data["current"] > 0:
|
|
logger.info(f"新浪成功: {symbol} = {data['current']}")
|
|
return data
|
|
|
|
# 2. 东财
|
|
secid = _em_secid(code)
|
|
raw = _fetch_url(
|
|
f"http://push2.eastmoney.com/api/qt/stock/get?secid={secid}"
|
|
f"&fields=f43,f44,f45,f46,f47,f48,f57,f58,f60,f169,f170",
|
|
HEADERS_EM)
|
|
if raw:
|
|
data = _parse_eastmoney(raw, symbol)
|
|
if data and data["current"] > 0:
|
|
logger.info(f"东财成功: {symbol} = {data['current']}")
|
|
return data
|
|
|
|
# 3. 腾讯
|
|
raw = _fetch_url(f"http://qt.gtimg.cn/q={symbol}")
|
|
if raw:
|
|
data = _parse_tencent(raw, symbol)
|
|
if data and data["current"] > 0:
|
|
logger.info(f"腾讯成功: {symbol} = {data['current']}")
|
|
return data
|
|
|
|
return {"error": f"所有数据源均无法获取 {code}", "symbol": symbol, "fetched_at": FETCHED_AT}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
code = sys.argv[1] if len(sys.argv) > 1 else "600519"
|
|
result = get_realtime_quote(code)
|
|
for k, v in result.items():
|
|
print(f" {k}: {v}")
|