367 lines
12 KiB
Python
367 lines
12 KiB
Python
"""
|
|
高频交易信号生成器
|
|
支持tick级和分钟级数据处理
|
|
低延迟信号生成
|
|
支持向量化计算
|
|
"""
|
|
|
|
from typing import List, Dict, Tuple, Optional, Union
|
|
import numpy as np
|
|
import pandas as pd
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TickData:
|
|
"""tick数据结构"""
|
|
code: str
|
|
datetime: datetime
|
|
price: float
|
|
volume: int
|
|
amount: float
|
|
direction: int = 0 # 1买-1卖0平
|
|
|
|
|
|
@dataclass
|
|
class BarData:
|
|
"""K线数据结构"""
|
|
code: str
|
|
datetime: datetime
|
|
open: float
|
|
high: float
|
|
low: float
|
|
close: float
|
|
volume: float
|
|
amount: float
|
|
|
|
|
|
@dataclass
|
|
class Signal:
|
|
"""交易信号结构"""
|
|
code: str
|
|
datetime: datetime
|
|
direction: int # 1多-1空0无
|
|
strength: float # 信号强度 0-1
|
|
price: float # 信号价格
|
|
expected_price: float # 预期目标价
|
|
|
|
|
|
class HighFrequencyDataPipeline:
|
|
"""高频数据流处理管道"""
|
|
|
|
def __init__(self, window_size: int = 100):
|
|
self.window_size = window_size
|
|
self._buffer: Dict[str, List[TickData]] = {}
|
|
self._last_bar: Dict[str, Optional[BarData]] = {}
|
|
self._tick_count = 0
|
|
|
|
def on_tick(self, tick: TickData) -> Optional[Signal]:
|
|
"""处理新tick,可能生成信号"""
|
|
if tick.code not in self._buffer:
|
|
self._buffer[tick.code] = []
|
|
|
|
buffer = self._buffer[tick.code]
|
|
buffer.append(tick)
|
|
|
|
# 保持窗口大小
|
|
if len(buffer) > self.window_size:
|
|
buffer.pop(0)
|
|
|
|
self._tick_count += 1
|
|
|
|
# 这里可以触发滚动计算
|
|
return None
|
|
|
|
def get_tick_buffer(self, code: str) -> List[TickData]:
|
|
"""获取tick缓冲"""
|
|
return self._buffer.get(code, [])
|
|
|
|
def statistics(self) -> Dict:
|
|
"""获取统计信息"""
|
|
return {
|
|
'total_ticks': self._tick_count,
|
|
'total_codes': len(self._buffer),
|
|
'buffer_size': sum(len(b) for b in self._buffer.values())
|
|
}
|
|
|
|
|
|
class TechnicalFactorCalculator:
|
|
"""技术因子计算器 - 向量化实现"""
|
|
|
|
def __init__(self):
|
|
self.cache: Dict[str, pd.DataFrame] = {}
|
|
|
|
def calculate_ma(self, prices: np.ndarray, period: int) -> np.ndarray:
|
|
"""计算移动平均线"""
|
|
return pd.Series(prices).rolling(period).mean().values
|
|
|
|
def calculate_ema(self, prices: np.ndarray, period: int) -> np.ndarray:
|
|
"""计算指数移动平均线"""
|
|
return pd.Series(prices).ewm(span=period, adjust=False).mean().values
|
|
|
|
def calculate_macd(self, prices: np.ndarray,
|
|
fast_period: int = 12,
|
|
slow_period: int = 26,
|
|
signal_period: int = 9) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
|
"""计算MACD"""
|
|
ema_fast = self.calculate_ema(prices, fast_period)
|
|
ema_slow = self.calculate_ema(prices, slow_period)
|
|
dif = ema_fast - ema_slow
|
|
dea = self.calculate_ema(dif, signal_period)
|
|
macd = 2 * (dif - dea)
|
|
return dif, dea, macd
|
|
|
|
def calculate_bollinger_bands(self, prices: np.ndarray,
|
|
period: int = 20,
|
|
num_std: float = 2.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
|
"""计算布林带"""
|
|
middle = pd.Series(prices).rolling(period).mean().values
|
|
std = pd.Series(prices).rolling(period).std().values
|
|
upper = middle + num_std * std
|
|
lower = middle - num_std * std
|
|
return upper, middle, lower
|
|
|
|
def calculate_rsi(self, prices: np.ndarray, period: int = 14) -> np.ndarray:
|
|
"""计算RSI相对强弱指数"""
|
|
delta = np.diff(prices)
|
|
gain = np.where(delta > 0, delta, 0)
|
|
loss = np.where(delta < 0, -delta, 0)
|
|
|
|
avg_gain = np.zeros_like(prices)
|
|
avg_loss = np.zeros_like(prices)
|
|
|
|
# 第一个period没有数据
|
|
avg_gain[period] = np.mean(gain[:period])
|
|
avg_loss[period] = np.mean(loss[:period])
|
|
|
|
# Wilder's smoothing
|
|
for i in range(period + 1, len(prices)):
|
|
avg_gain[i] = (avg_gain[i-1] * (period - 1) + gain[i-1]) / period
|
|
avg_loss[i] = (avg_loss[i-1] * (period - 1) + loss[i-1]) / period
|
|
|
|
rs = avg_gain / (avg_loss + 1e-10)
|
|
rsi = 100 - (100 / (1 + rs))
|
|
return rsi
|
|
|
|
def calculate_atr(self, high: np.ndarray, low: np.ndarray,
|
|
close: np.ndarray, period: int = 14) -> np.ndarray:
|
|
"""计算ATR平均真实波幅"""
|
|
tr = np.zeros_like(high)
|
|
tr[0] = high[0] - low[0]
|
|
|
|
for i in range(1, len(high)):
|
|
tr1 = high[i] - low[i]
|
|
tr2 = abs(high[i] - close[i-1])
|
|
tr3 = abs(low[i] - close[i-1])
|
|
tr[i] = max(tr1, tr2, tr3)
|
|
|
|
atr = pd.Series(tr).rolling(period).mean().values
|
|
return atr
|
|
|
|
def calculate_volume_ma(self, volumes: np.ndarray, period: int) -> np.ndarray:
|
|
"""计算成交量移动平均"""
|
|
return pd.Series(volumes).rolling(period).mean().values
|
|
|
|
def calculate_obv(self, close: np.ndarray, volume: np.ndarray) -> np.ndarray:
|
|
"""计算OBV能量潮"""
|
|
obv = np.zeros_like(close)
|
|
obv[0] = volume[0]
|
|
|
|
for i in range(1, len(close)):
|
|
if close[i] > close[i-1]:
|
|
obv[i] = obv[i-1] + volume[i]
|
|
elif close[i] < close[i-1]:
|
|
obv[i] = obv[i-1] - volume[i]
|
|
else:
|
|
obv[i] = obv[i-1]
|
|
|
|
return obv
|
|
|
|
|
|
class HighFrequencySignalGenerator:
|
|
"""高频交易信号生成器
|
|
支持多种技术因子组合生成交易信号
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.factor_calculator = TechnicalFactorCalculator()
|
|
self.data_pipeline = HighFrequencyDataPipeline()
|
|
self.signal_history: List[Signal] = []
|
|
self.performance_stats = {
|
|
'total_signals': 0,
|
|
'long_signals': 0,
|
|
'short_signals': 0,
|
|
}
|
|
|
|
def generate_signal_from_bar(self, bar: BarData,
|
|
data: pd.DataFrame) -> Optional[Signal]:
|
|
"""从K线数据生成信号"""
|
|
# 计算各种技术因子
|
|
close = data['close'].values
|
|
high = data['high'].values
|
|
low = data['low'].values
|
|
volume = data['volume'].values
|
|
|
|
# MACD
|
|
dif, dea, macd = self.factor_calculator.calculate_macd(close)
|
|
|
|
# RSI
|
|
rsi = self.factor_calculator.calculate_rsi(close)
|
|
|
|
# 布林带
|
|
bb_upper, bb_mid, bb_lower = self.factor_calculator.calculate_bollinger_bands(close)
|
|
|
|
# ATR
|
|
atr = self.factor_calculator.calculate_atr(high, low, close, 14)
|
|
|
|
# 生成信号逻辑
|
|
current_rsi = rsi[-1]
|
|
current_macd = macd[-1]
|
|
current_dif = dif[-1]
|
|
current_dea = dea[-1]
|
|
current_price = close[-1]
|
|
|
|
# MACD金叉买入
|
|
if current_dif > current_dea and (dif[-2] <= dea[-2]) and current_rsi < 70:
|
|
signal = Signal(
|
|
code=bar.code,
|
|
datetime=bar.datetime,
|
|
direction=1,
|
|
strength=min((70 - current_rsi) / 70 + (current_dif - current_dea) / 1, 1.0),
|
|
price=current_price,
|
|
expected_price=current_price + 2 * atr[-1]
|
|
)
|
|
self._record_signal(signal)
|
|
return signal
|
|
|
|
# MACD死叉卖出
|
|
elif current_dif < current_dea and (dif[-2] >= dea[-2]) and current_rsi > 30:
|
|
signal = Signal(
|
|
code=bar.code,
|
|
datetime=bar.datetime,
|
|
direction=-1,
|
|
strength=min((current_rsi - 30) / 30 + (current_dea - current_dif) / 1, 1.0),
|
|
price=current_price,
|
|
expected_price=current_price - 2 * atr[-1]
|
|
)
|
|
self._record_signal(signal)
|
|
return signal
|
|
|
|
# 布林带突破
|
|
elif current_price < bb_lower[-1] and current_rsi < 30:
|
|
signal = Signal(
|
|
code=bar.code,
|
|
datetime=bar.datetime,
|
|
direction=1,
|
|
strength=(bb_lower[-1] - current_price) / (bb_lower[-1] * 0.05),
|
|
price=current_price,
|
|
expected_price=bb_mid[-1]
|
|
)
|
|
self._record_signal(signal)
|
|
return signal
|
|
|
|
elif current_price > bb_upper[-1] and current_rsi > 70:
|
|
signal = Signal(
|
|
code=bar.code,
|
|
datetime=bar.datetime,
|
|
direction=-1,
|
|
strength=(current_price - bb_upper[-1]) / (bb_upper[-1] * 0.05),
|
|
price=current_price,
|
|
expected_price=bb_mid[-1]
|
|
)
|
|
self._record_signal(signal)
|
|
return signal
|
|
|
|
return None
|
|
|
|
def _record_signal(self, signal: Signal):
|
|
"""记录信号"""
|
|
self.signal_history.append(signal)
|
|
self.performance_stats['total_signals'] += 1
|
|
if signal.direction == 1:
|
|
self.performance_stats['long_signals'] += 1
|
|
elif signal.direction == -1:
|
|
self.performance_stats['short_signals'] += 1
|
|
|
|
def get_performance_stats(self) -> Dict:
|
|
"""获取性能统计"""
|
|
return self.performance_stats.copy()
|
|
|
|
|
|
class PerformanceMonitor:
|
|
"""性能监控器 - 监控高频算法性能"""
|
|
|
|
def __init__(self, window_size: int = 1000):
|
|
self.latencies: List[float] = []
|
|
self.window_size = window_size
|
|
self.processed_ticks = 0
|
|
self.start_time = datetime.now()
|
|
|
|
def on_process_start(self):
|
|
"""开始处理计时"""
|
|
self._start = datetime.now()
|
|
|
|
def on_process_end(self):
|
|
"""结束处理计时"""
|
|
end = datetime.now()
|
|
latency = (end - self._start).total_seconds() * 1000 # ms
|
|
self.latencies.append(latency)
|
|
self.processed_ticks += 1
|
|
|
|
# 保持窗口大小
|
|
if len(self.latencies) > self.window_size:
|
|
self.latencies.pop(0)
|
|
|
|
def get_stats(self) -> Dict:
|
|
"""获取性能统计"""
|
|
if not self.latencies:
|
|
return {
|
|
'latency_ms_avg': 0,
|
|
'latency_ms_p50': 0,
|
|
'latency_ms_p99': 0,
|
|
'total_processed': 0,
|
|
'tps': 0
|
|
}
|
|
|
|
latencies_np = np.array(self.latencies)
|
|
elapsed = (datetime.now() - self.start_time).total_seconds()
|
|
|
|
return {
|
|
'latency_ms_avg': float(latencies_np.mean()),
|
|
'latency_ms_p50': float(np.percentile(latencies_np, 50)),
|
|
'latency_ms_p99': float(np.percentile(latencies_np, 99)),
|
|
'latency_ms_max': float(latencies_np.max()),
|
|
'total_processed': self.processed_ticks,
|
|
'tps': self.processed_ticks / elapsed if elapsed > 0 else 0
|
|
}
|
|
|
|
def print_stats(self):
|
|
"""打印性能统计"""
|
|
stats = self.get_stats()
|
|
print("=" * 60)
|
|
print("高频算法性能监控")
|
|
print("=" * 60)
|
|
print(f"平均延迟: {stats['latency_ms_avg']:.2f} ms")
|
|
print(f"P50延迟: {stats['latency_ms_p50']:.2f} ms")
|
|
print(f"P99延迟: {stats['latency_ms_p99']:.2f} ms")
|
|
print(f"最大延迟: {stats['latency_ms_max']:.2f} ms")
|
|
print(f"总处理tick数: {stats['total_processed']}")
|
|
print(f"每秒处理: {stats['tps']:.2f} ticks")
|
|
print("=" * 60)
|
|
|
|
|
|
# 导出
|
|
__all__ = [
|
|
'TickData',
|
|
'BarData',
|
|
'Signal',
|
|
'HighFrequencyDataPipeline',
|
|
'TechnicalFactorCalculator',
|
|
'HighFrequencySignalGenerator',
|
|
'PerformanceMonitor',
|
|
]
|