diff --git a/technical-strategy/02-algorithms/technical_selection_backtest_final.py b/technical-strategy/02-algorithms/technical_selection_backtest_final.py new file mode 100644 index 000000000..92f384a32 --- /dev/null +++ b/technical-strategy/02-algorithms/technical_selection_backtest_final.py @@ -0,0 +1,477 @@ +""" +Technical Selection Strategies Backtest Framework + +Implements three recommended strategies: +1. MACD Divergence + Moving Average +2. Bollinger Bands Lower Rail + Trend +3. Donchian Channel Breakout + +Author: Zhang Fei +Date: 2026-03-24 +""" + +import numpy as np +import pandas as pd +from typing import Dict, List, Tuple, Optional +from dataclasses import dataclass +from datetime import datetime +import logging + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +@dataclass +class Trade: + code: str + entry_date: datetime + exit_date: Optional[datetime] + entry_price: float + exit_price: Optional[float] + direction: int + shares: int + entry_value: float + exit_value: Optional[float] + profit: Optional[float] + profit_pct: Optional[float] + hold_days: Optional[int] + strategy: str + + +@dataclass +class BacktestResult: + strategy: str + start_date: datetime + end_date: datetime + initial_capital: float + final_capital: float + total_return: float + annual_return: float + max_drawdown: float + sharpe_ratio: float + win_rate: float + total_trades: int + win_trades: int + loss_trades: int + avg_profit_pct: float + avg_win_pct: float + avg_loss_pct: float + trades: List[Trade] + + +class TechnicalIndicators: + @staticmethod + def sma(prices, period): + return pd.Series(prices).rolling(window=period, min_periods=1).mean().values + + @staticmethod + def ema(prices, period): + return pd.Series(prices).ewm(span=period, adjust=False).mean().values + + @staticmethod + def macd(prices, fast=12, slow=26, signal=9): + ema_fast = TechnicalIndicators.ema(prices, fast) + ema_slow = TechnicalIndicators.ema(prices, slow) + dif = ema_fast - ema_slow + dea = TechnicalIndicators.ema(dif, signal) + macd = 2 * (dif - dea) + return dif, dea, macd + + @staticmethod + def bollinger_bands(prices, period=20, num_std=2.0): + middle = TechnicalIndicators.sma(prices, period) + std = pd.Series(prices).rolling(window=period, min_periods=1).std().values + upper = middle + num_std * std + lower = middle - num_std * std + return upper, middle, lower + + @staticmethod + def donchian_channel(high, low, period=20): + upper = pd.Series(high).rolling(window=period, min_periods=1).max().values + lower = pd.Series(low).rolling(window=period, min_periods=1).min().values + return upper, lower + + @staticmethod + def atr(high, low, close, period=14): + tr = np.zeros(len(high)) + for i in range(len(high)): + if i == 0: + tr[i] = high[i] - low[i] + else: + tr[i] = max(high[i] - low[i], abs(high[i] - close[i-1]), abs(low[i] - close[i-1])) + return pd.Series(tr).rolling(window=period, min_periods=1).mean().values + + +class MACDDivergenceStrategy: + def __init__(self, ma_period=20, divergence_period=20, stop_loss=0.05, take_profit=0.20): + self.ma_period = ma_period + self.divergence_period = divergence_period + self.stop_loss = stop_loss + self.take_profit = take_profit + self.name = "MACD Divergence + MA" + + def check_buy_signal(self, data, idx): + if idx < self.divergence_period + self.ma_period: + return False + + current_price = data['close'].iloc[idx] + recent_low = data['close'].iloc[idx-self.divergence_period:idx].min() + + if current_price > recent_low: + return False + + dif, _, _ = TechnicalIndicators.macd(data['close'].values) + recent_dif_low = dif[idx-self.divergence_period:idx].min() + + if dif[idx] <= recent_dif_low: + return False + + ma = TechnicalIndicators.sma(data['close'].values, self.ma_period) + if current_price < ma[idx]: + return False + + return True + + def check_sell_signal(self, data, trade, idx): + current_price = data['close'].iloc[idx] + ma = TechnicalIndicators.sma(data['close'].values, self.ma_period) + + if current_price < ma[idx]: + return True + + profit_pct = (current_price - trade.entry_price) / trade.entry_price + if profit_pct <= -self.stop_loss or profit_pct >= self.take_profit: + return True + + return False + + +class BollingerBandsStrategy: + def __init__(self, bb_period=20, bb_std=2.0, stop_loss=0.05, take_profit=0.15): + self.bb_period = bb_period + self.bb_std = bb_std + self.stop_loss = stop_loss + self.take_profit = take_profit + self.name = "Bollinger Bands + Trend" + + def rsi(self, prices, period=14): + delta = np.diff(prices) + gain = np.where(delta > 0, delta, 0) + loss = np.where(delta < 0, -delta, 0) + avg_gain = np.zeros_like(prices) + avg_loss = np.zeros_like(prices) + + if len(prices) > period: + avg_gain[period] = np.mean(gain[:period]) + avg_loss[period] = np.mean(loss[:period]) + for i in range(period + 1, len(prices)): + avg_gain[i] = (avg_gain[i-1] * (period - 1) + gain[i-1]) / period + avg_loss[i] = (avg_loss[i-1] * (period - 1) + loss[i-1]) / period + + rs = avg_gain / (avg_loss + 1e-10) + return 100 - (100 / (1 + rs)) + + def check_buy_signal(self, data, idx): + if idx < self.bb_period + 20: + return False + + current_price = data['close'].iloc[idx] + bb_upper, bb_mid, bb_lower = TechnicalIndicators.bollinger_bands(data['close'].values, self.bb_period, self.bb_std) + + if current_price > bb_lower[idx] * 1.02: + return False + + ma5 = TechnicalIndicators.sma(data['close'].values, 5) + ma10 = TechnicalIndicators.sma(data['close'].values, 10) + ma20 = TechnicalIndicators.sma(data['close'].values, 20) + + if not (ma5[idx] > ma10[idx] > ma20[idx]): + return False + + rsi = self.rsi(data['close'].values) + if rsi[idx] > 35: + return False + + return True + + def check_sell_signal(self, data, trade, idx): + current_price = data['close'].iloc[idx] + bb_upper, bb_mid, bb_lower = TechnicalIndicators.bollinger_bands(data['close'].values, self.bb_period, self.bb_std) + + if current_price >= bb_mid[idx]: + return True + + ma20 = TechnicalIndicators.sma(data['close'].values, 20) + if current_price < ma20[idx]: + return True + + profit_pct = (current_price - trade.entry_price) / trade.entry_price + if profit_pct <= -self.stop_loss or profit_pct >= self.take_profit: + return True + + return False + + +class DonchianChannelStrategy: + def __init__(self, channel_period=20, exit_period=10, atr_period=14, atr_multiplier=2.0): + self.channel_period = channel_period + self.exit_period = exit_period + self.atr_period = atr_period + self.atr_multiplier = atr_multiplier + self.name = "Donchian Channel" + + def check_buy_signal(self, data, idx): + if idx < self.channel_period: + return False + + current_price = data['close'].iloc[idx] + dc_upper, dc_lower = TechnicalIndicators.donchian_channel(data['high'].values, data['low'].values, self.channel_period) + + if idx > 0: + prev_price = data['close'].iloc[idx-1] + if prev_price > dc_upper[idx-1]: + return False + if current_price > dc_upper[idx]: + return True + + return False + + def check_sell_signal(self, data, trade, idx): + current_price = data['close'].iloc[idx] + dc_upper, dc_lower = TechnicalIndicators.donchian_channel(data['high'].values, data['low'].values, self.exit_period) + + if current_price < dc_lower[idx]: + return True + + atr = TechnicalIndicators.atr(data['high'].values, data['low'].values, data['close'].values, self.atr_period) + stop_price = trade.entry_price - self.atr_multiplier * atr[idx] + + if current_price < stop_price: + return True + + return False + + +class BacktestEngine: + def __init__(self, initial_capital=100000.0): + self.initial_capital = initial_capital + self.commission_rate = 0.0003 + + def backtest(self, data, strategy, strategy_name): + logger.info(f"Starting backtest: {strategy_name}") + + data = data.copy().reset_index(drop=True) + capital = self.initial_capital + trades = [] + open_positions = {} + + for idx in range(len(data)): + current_date = data['date'].iloc[idx] if 'date' in data.columns else idx + current_price = data['close'].iloc[idx] + + for code, trade in list(open_positions.items()): + if strategy.check_sell_signal(data, trade, idx): + exit_price = current_price + commission = exit_price * trade.shares * self.commission_rate + exit_value = exit_price * trade.shares - commission + profit = exit_value - trade.entry_value + profit_pct = profit / trade.entry_value + + trade.exit_date = current_date + trade.exit_price = exit_price + trade.exit_value = exit_value + trade.profit = profit + trade.profit_pct = profit_pct + trade.hold_days = idx - trade._entry_idx + + capital += exit_value + trades.append(trade) + del open_positions[code] + + if capital > 0 and len(open_positions) == 0: + if strategy.check_buy_signal(data, idx): + code = data['code'].iloc[idx] if 'code' in data.columns else 'STOCK' + position_size = capital * 0.8 + shares = int(position_size / current_price) + + if shares > 0: + commission = current_price * shares * self.commission_rate + entry_value = current_price * shares + commission + + if entry_value <= capital: + trade = Trade( + code=code, + entry_date=current_date, + exit_date=None, + entry_price=current_price, + exit_price=None, + direction=1, + shares=shares, + entry_value=entry_value, + exit_value=None, + profit=None, + profit_pct=None, + hold_days=None, + strategy=strategy_name + ) + trade._entry_idx = idx + capital -= entry_value + open_positions[code] = trade + + for code, trade in open_positions.items(): + exit_price = data['close'].iloc[-1] + commission = exit_price * trade.shares * self.commission_rate + exit_value = exit_price * trade.shares - commission + profit = exit_value - trade.entry_value + profit_pct = profit / trade.entry_value + + trade.exit_date = data['date'].iloc[-1] if 'date' in data.columns else len(data) - 1 + trade.exit_price = exit_price + trade.exit_value = exit_value + trade.profit = profit + trade.profit_pct = profit_pct + trade.hold_days = len(data) - 1 - trade._entry_idx + + capital += exit_value + trades.append(trade) + + return self._calculate_performance(strategy_name, capital, trades, data) + + def _calculate_performance(self, strategy_name, final_capital, trades, data): + total_return = (final_capital - self.initial_capital) / self.initial_capital + + if 'date' in data.columns: + days = (data['date'].iloc[-1] - data['date'].iloc[0]).days + else: + days = len(data) + annual_return = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0 + + peak = self.initial_capital + max_drawdown = 0 + for trade in sorted(trades, key=lambda t: t._entry_idx if hasattr(t, '_entry_idx') else 0): + peak = max(peak, peak + trade.profit) + drawdown = (peak - (peak + trade.profit)) / peak + max_drawdown = max(max_drawdown, drawdown) + + if trades: + returns = [t.profit_pct for t in trades if t.profit_pct is not None] + sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(252) if len(returns) > 1 and np.std(returns) > 0 else 0 + else: + sharpe_ratio = 0 + + win_trades = [t for t in trades if t.profit_pct and t.profit_pct > 0] + loss_trades = [t for t in trades if t.profit_pct and t.profit_pct <= 0] + win_rate = len(win_trades) / len(trades) if trades else 0 + + avg_profit_pct = np.mean([t.profit_pct for t in trades if t.profit_pct is not None]) if trades else 0 + avg_win_pct = np.mean([t.profit_pct for t in win_trades]) if win_trades else 0 + avg_loss_pct = np.mean([t.profit_pct for t in loss_trades]) if loss_trades else 0 + + return BacktestResult( + strategy=strategy_name, + start_date=data['date'].iloc[0] if 'date' in data.columns else 0, + end_date=data['date'].iloc[-1] if 'date' in data.columns else len(data) - 1, + initial_capital=self.initial_capital, + final_capital=final_capital, + total_return=total_return, + annual_return=annual_return, + max_drawdown=max_drawdown, + sharpe_ratio=sharpe_ratio, + win_rate=win_rate, + total_trades=len(trades), + win_trades=len(win_trades), + loss_trades=len(loss_trades), + avg_profit_pct=avg_profit_pct, + avg_win_pct=avg_win_pct, + avg_loss_pct=avg_loss_pct, + trades=trades + ) + + def print_result(self, result): + print("\n" + "=" * 80) + print(f"Strategy: {result.strategy}") + print("=" * 80) + print(f"Period: {result.start_date} ~ {result.end_date}") + print(f"Initial Capital: {result.initial_capital:,.2f}") + print(f"Final Capital: {result.final_capital:,.2f}") + print("-" * 80) + print(f"Total Return: {result.total_return:.2%}") + print(f"Annual Return: {result.annual_return:.2%}") + print(f"Max Drawdown: {result.max_drawdown:.2%}") + print(f"Sharpe Ratio: {result.sharpe_ratio:.2f}") + print(f"Win Rate: {result.win_rate:.2%}") + print("-" * 80) + print(f"Total Trades: {result.total_trades}") + print(f"Win Trades: {result.win_trades}") + print(f"Loss Trades: {result.loss_trades}") + print("=" * 80) + + +def generate_sample_data(days=500, seed=42): + np.random.seed(seed) + returns = np.random.normal(0.001, 0.02, days) + prices = 100 * np.cumprod(1 + returns) + + return pd.DataFrame({ + 'date': pd.date_range(start='2024-01-01', periods=days, freq='D'), + 'open': prices * (1 + np.random.uniform(-0.01, 0.01, days)), + 'high': prices * (1 + np.abs(np.random.uniform(0, 0.02, days))), + 'low': prices * (1 - np.abs(np.random.uniform(0, 0.02, days))), + 'close': prices, + 'volume': np.random.randint(1000000, 10000000, days), + 'code': 'TEST001' + }) + + +def main(): + print("\n" + "=" * 80) + print("Technical Selection Strategies Backtest System - Zhang Fei") + print("=" * 80) + + data = generate_sample_data(days=500) + print(f"\nGenerated {len(data)} days of sample data") + + engine = BacktestEngine(initial_capital=100000.0) + + print("\n" + "=" * 80) + print("Strategy 1: MACD Divergence + MA") + print("=" * 80) + macd_strategy = MACDDivergenceStrategy() + macd_result = engine.backtest(data, macd_strategy, "MACD Divergence + MA") + engine.print_result(macd_result) + + print("\n" + "=" * 80) + print("Strategy 2: Bollinger Bands + Trend") + print("=" * 80) + bb_strategy = BollingerBandsStrategy() + bb_result = engine.backtest(data, bb_strategy, "Bollinger Bands + Trend") + engine.print_result(bb_result) + + print("\n" + "=" * 80) + print("Strategy 3: Donchian Channel") + print("=" * 80) + dc_strategy = DonchianChannelStrategy() + dc_result = engine.backtest(data, dc_strategy, "Donchian Channel") + engine.print_result(dc_result) + + print("\n" + "=" * 80) + print("Strategy Comparison Summary") + print("=" * 80) + results = [ + ('MACD Divergence + MA', macd_result.total_return, macd_result.max_drawdown, macd_result.sharpe_ratio, macd_result.win_rate, macd_result.total_trades), + ('Bollinger Bands + Trend', bb_result.total_return, bb_result.max_drawdown, bb_result.sharpe_ratio, bb_result.win_rate, bb_result.total_trades), + ('Donchian Channel', dc_result.total_return, dc_result.max_drawdown, dc_result.sharpe_ratio, dc_result.win_rate, dc_result.total_trades), + ] + for name, ret, dd, sharpe, win, trades in results: + print(f"{name:25s} | Return: {ret:6.2%} | Drawdown: {dd:6.2%} | Sharpe: {sharpe:.2f} | Win: {win:.2%} | Trades: {trades}") + print("=" * 80) + + return { + 'macd_divergence': macd_result, + 'bollinger_bands': bb_result, + 'donchian_channel': dc_result + } + + +if __name__ == "__main__": + results = main()