""" 高频交易信号生成器 支持tick级和分钟级数据处理 低延迟信号生成 支持向量化计算 """ from typing import List, Dict, Tuple, Optional, Union import numpy as np import pandas as pd from dataclasses import dataclass from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) @dataclass class TickData: """tick数据结构""" code: str datetime: datetime price: float volume: int amount: float direction: int = 0 # 1买-1卖0平 @dataclass class BarData: """K线数据结构""" code: str datetime: datetime open: float high: float low: float close: float volume: float amount: float @dataclass class Signal: """交易信号结构""" code: str datetime: datetime direction: int # 1多-1空0无 strength: float # 信号强度 0-1 price: float # 信号价格 expected_price: float # 预期目标价 class HighFrequencyDataPipeline: """高频数据流处理管道""" def __init__(self, window_size: int = 100): self.window_size = window_size self._buffer: Dict[str, List[TickData]] = {} self._last_bar: Dict[str, Optional[BarData]] = {} self._tick_count = 0 def on_tick(self, tick: TickData) -> Optional[Signal]: """处理新tick,可能生成信号""" if tick.code not in self._buffer: self._buffer[tick.code] = [] buffer = self._buffer[tick.code] buffer.append(tick) # 保持窗口大小 if len(buffer) > self.window_size: buffer.pop(0) self._tick_count += 1 # 这里可以触发滚动计算 return None def get_tick_buffer(self, code: str) -> List[TickData]: """获取tick缓冲""" return self._buffer.get(code, []) def statistics(self) -> Dict: """获取统计信息""" return { 'total_ticks': self._tick_count, 'total_codes': len(self._buffer), 'buffer_size': sum(len(b) for b in self._buffer.values()) } class TechnicalFactorCalculator: """技术因子计算器 - 向量化实现""" def __init__(self): self.cache: Dict[str, pd.DataFrame] = {} def calculate_ma(self, prices: np.ndarray, period: int) -> np.ndarray: """计算移动平均线""" return pd.Series(prices).rolling(period).mean().values def calculate_ema(self, prices: np.ndarray, period: int) -> np.ndarray: """计算指数移动平均线""" return pd.Series(prices).ewm(span=period, adjust=False).mean().values def calculate_macd(self, prices: np.ndarray, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """计算MACD""" ema_fast = self.calculate_ema(prices, fast_period) ema_slow = self.calculate_ema(prices, slow_period) dif = ema_fast - ema_slow dea = self.calculate_ema(dif, signal_period) macd = 2 * (dif - dea) return dif, dea, macd def calculate_bollinger_bands(self, prices: np.ndarray, period: int = 20, num_std: float = 2.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """计算布林带""" middle = pd.Series(prices).rolling(period).mean().values std = pd.Series(prices).rolling(period).std().values upper = middle + num_std * std lower = middle - num_std * std return upper, middle, lower def calculate_rsi(self, prices: np.ndarray, period: int = 14) -> np.ndarray: """计算RSI相对强弱指数""" delta = np.diff(prices) gain = np.where(delta > 0, delta, 0) loss = np.where(delta < 0, -delta, 0) avg_gain = np.zeros_like(prices) avg_loss = np.zeros_like(prices) # 第一个period没有数据 avg_gain[period] = np.mean(gain[:period]) avg_loss[period] = np.mean(loss[:period]) # Wilder's smoothing for i in range(period + 1, len(prices)): avg_gain[i] = (avg_gain[i-1] * (period - 1) + gain[i-1]) / period avg_loss[i] = (avg_loss[i-1] * (period - 1) + loss[i-1]) / period rs = avg_gain / (avg_loss + 1e-10) rsi = 100 - (100 / (1 + rs)) return rsi def calculate_atr(self, high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 14) -> np.ndarray: """计算ATR平均真实波幅""" tr = np.zeros_like(high) tr[0] = high[0] - low[0] for i in range(1, len(high)): tr1 = high[i] - low[i] tr2 = abs(high[i] - close[i-1]) tr3 = abs(low[i] - close[i-1]) tr[i] = max(tr1, tr2, tr3) atr = pd.Series(tr).rolling(period).mean().values return atr def calculate_volume_ma(self, volumes: np.ndarray, period: int) -> np.ndarray: """计算成交量移动平均""" return pd.Series(volumes).rolling(period).mean().values def calculate_obv(self, close: np.ndarray, volume: np.ndarray) -> np.ndarray: """计算OBV能量潮""" obv = np.zeros_like(close) obv[0] = volume[0] for i in range(1, len(close)): if close[i] > close[i-1]: obv[i] = obv[i-1] + volume[i] elif close[i] < close[i-1]: obv[i] = obv[i-1] - volume[i] else: obv[i] = obv[i-1] return obv class HighFrequencySignalGenerator: """高频交易信号生成器 支持多种技术因子组合生成交易信号 """ def __init__(self): self.factor_calculator = TechnicalFactorCalculator() self.data_pipeline = HighFrequencyDataPipeline() self.signal_history: List[Signal] = [] self.performance_stats = { 'total_signals': 0, 'long_signals': 0, 'short_signals': 0, } def generate_signal_from_bar(self, bar: BarData, data: pd.DataFrame) -> Optional[Signal]: """从K线数据生成信号""" # 计算各种技术因子 close = data['close'].values high = data['high'].values low = data['low'].values volume = data['volume'].values # MACD dif, dea, macd = self.factor_calculator.calculate_macd(close) # RSI rsi = self.factor_calculator.calculate_rsi(close) # 布林带 bb_upper, bb_mid, bb_lower = self.factor_calculator.calculate_bollinger_bands(close) # ATR atr = self.factor_calculator.calculate_atr(high, low, close, 14) # 生成信号逻辑 current_rsi = rsi[-1] current_macd = macd[-1] current_dif = dif[-1] current_dea = dea[-1] current_price = close[-1] # MACD金叉买入 if current_dif > current_dea and (dif[-2] <= dea[-2]) and current_rsi < 70: signal = Signal( code=bar.code, datetime=bar.datetime, direction=1, strength=min((70 - current_rsi) / 70 + (current_dif - current_dea) / 1, 1.0), price=current_price, expected_price=current_price + 2 * atr[-1] ) self._record_signal(signal) return signal # MACD死叉卖出 elif current_dif < current_dea and (dif[-2] >= dea[-2]) and current_rsi > 30: signal = Signal( code=bar.code, datetime=bar.datetime, direction=-1, strength=min((current_rsi - 30) / 30 + (current_dea - current_dif) / 1, 1.0), price=current_price, expected_price=current_price - 2 * atr[-1] ) self._record_signal(signal) return signal # 布林带突破 elif current_price < bb_lower[-1] and current_rsi < 30: signal = Signal( code=bar.code, datetime=bar.datetime, direction=1, strength=(bb_lower[-1] - current_price) / (bb_lower[-1] * 0.05), price=current_price, expected_price=bb_mid[-1] ) self._record_signal(signal) return signal elif current_price > bb_upper[-1] and current_rsi > 70: signal = Signal( code=bar.code, datetime=bar.datetime, direction=-1, strength=(current_price - bb_upper[-1]) / (bb_upper[-1] * 0.05), price=current_price, expected_price=bb_mid[-1] ) self._record_signal(signal) return signal return None def _record_signal(self, signal: Signal): """记录信号""" self.signal_history.append(signal) self.performance_stats['total_signals'] += 1 if signal.direction == 1: self.performance_stats['long_signals'] += 1 elif signal.direction == -1: self.performance_stats['short_signals'] += 1 def get_performance_stats(self) -> Dict: """获取性能统计""" return self.performance_stats.copy() class PerformanceMonitor: """性能监控器 - 监控高频算法性能""" def __init__(self, window_size: int = 1000): self.latencies: List[float] = [] self.window_size = window_size self.processed_ticks = 0 self.start_time = datetime.now() def on_process_start(self): """开始处理计时""" self._start = datetime.now() def on_process_end(self): """结束处理计时""" end = datetime.now() latency = (end - self._start).total_seconds() * 1000 # ms self.latencies.append(latency) self.processed_ticks += 1 # 保持窗口大小 if len(self.latencies) > self.window_size: self.latencies.pop(0) def get_stats(self) -> Dict: """获取性能统计""" if not self.latencies: return { 'latency_ms_avg': 0, 'latency_ms_p50': 0, 'latency_ms_p99': 0, 'total_processed': 0, 'tps': 0 } latencies_np = np.array(self.latencies) elapsed = (datetime.now() - self.start_time).total_seconds() return { 'latency_ms_avg': float(latencies_np.mean()), 'latency_ms_p50': float(np.percentile(latencies_np, 50)), 'latency_ms_p99': float(np.percentile(latencies_np, 99)), 'latency_ms_max': float(latencies_np.max()), 'total_processed': self.processed_ticks, 'tps': self.processed_ticks / elapsed if elapsed > 0 else 0 } def print_stats(self): """打印性能统计""" stats = self.get_stats() print("=" * 60) print("高频算法性能监控") print("=" * 60) print(f"平均延迟: {stats['latency_ms_avg']:.2f} ms") print(f"P50延迟: {stats['latency_ms_p50']:.2f} ms") print(f"P99延迟: {stats['latency_ms_p99']:.2f} ms") print(f"最大延迟: {stats['latency_ms_max']:.2f} ms") print(f"总处理tick数: {stats['total_processed']}") print(f"每秒处理: {stats['tps']:.2f} ticks") print("=" * 60) # 导出 __all__ = [ 'TickData', 'BarData', 'Signal', 'HighFrequencyDataPipeline', 'TechnicalFactorCalculator', 'HighFrequencySignalGenerator', 'PerformanceMonitor', ]