feat: 张飞完成技术选股策略回测代码实现
实现了三种推荐技术选股策略的完整回测框架: 1. MACD底背离+均线策略 - 买入:股价创新低但MACD未创新低(底背离),且站上20日均线 - 卖出:跌破20日均线或止损5%/止盈20% 2. 布林带下轨+趋势策略 - 买入:触及/跌破布林带下轨,均线多头排列(MA5>MA10>MA20),RSI<35 - 卖出:站上中轨或跌破20日均线或止损5%/止盈15% 3. 唐奇安通道突破策略 - 买入:收盘价突破20日通道上轨 - 卖出:跌破10日通道下轨或ATR止损(2倍ATR) 包含完整回测引擎、绩效指标计算、交易记录跟踪。 作者:张飞 日期:2026-03-24
This commit is contained in:
@@ -0,0 +1,477 @@
|
||||
"""
|
||||
Technical Selection Strategies Backtest Framework
|
||||
|
||||
Implements three recommended strategies:
|
||||
1. MACD Divergence + Moving Average
|
||||
2. Bollinger Bands Lower Rail + Trend
|
||||
3. Donchian Channel Breakout
|
||||
|
||||
Author: Zhang Fei
|
||||
Date: 2026-03-24
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Trade:
|
||||
code: str
|
||||
entry_date: datetime
|
||||
exit_date: Optional[datetime]
|
||||
entry_price: float
|
||||
exit_price: Optional[float]
|
||||
direction: int
|
||||
shares: int
|
||||
entry_value: float
|
||||
exit_value: Optional[float]
|
||||
profit: Optional[float]
|
||||
profit_pct: Optional[float]
|
||||
hold_days: Optional[int]
|
||||
strategy: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class BacktestResult:
|
||||
strategy: str
|
||||
start_date: datetime
|
||||
end_date: datetime
|
||||
initial_capital: float
|
||||
final_capital: float
|
||||
total_return: float
|
||||
annual_return: float
|
||||
max_drawdown: float
|
||||
sharpe_ratio: float
|
||||
win_rate: float
|
||||
total_trades: int
|
||||
win_trades: int
|
||||
loss_trades: int
|
||||
avg_profit_pct: float
|
||||
avg_win_pct: float
|
||||
avg_loss_pct: float
|
||||
trades: List[Trade]
|
||||
|
||||
|
||||
class TechnicalIndicators:
|
||||
@staticmethod
|
||||
def sma(prices, period):
|
||||
return pd.Series(prices).rolling(window=period, min_periods=1).mean().values
|
||||
|
||||
@staticmethod
|
||||
def ema(prices, period):
|
||||
return pd.Series(prices).ewm(span=period, adjust=False).mean().values
|
||||
|
||||
@staticmethod
|
||||
def macd(prices, fast=12, slow=26, signal=9):
|
||||
ema_fast = TechnicalIndicators.ema(prices, fast)
|
||||
ema_slow = TechnicalIndicators.ema(prices, slow)
|
||||
dif = ema_fast - ema_slow
|
||||
dea = TechnicalIndicators.ema(dif, signal)
|
||||
macd = 2 * (dif - dea)
|
||||
return dif, dea, macd
|
||||
|
||||
@staticmethod
|
||||
def bollinger_bands(prices, period=20, num_std=2.0):
|
||||
middle = TechnicalIndicators.sma(prices, period)
|
||||
std = pd.Series(prices).rolling(window=period, min_periods=1).std().values
|
||||
upper = middle + num_std * std
|
||||
lower = middle - num_std * std
|
||||
return upper, middle, lower
|
||||
|
||||
@staticmethod
|
||||
def donchian_channel(high, low, period=20):
|
||||
upper = pd.Series(high).rolling(window=period, min_periods=1).max().values
|
||||
lower = pd.Series(low).rolling(window=period, min_periods=1).min().values
|
||||
return upper, lower
|
||||
|
||||
@staticmethod
|
||||
def atr(high, low, close, period=14):
|
||||
tr = np.zeros(len(high))
|
||||
for i in range(len(high)):
|
||||
if i == 0:
|
||||
tr[i] = high[i] - low[i]
|
||||
else:
|
||||
tr[i] = max(high[i] - low[i], abs(high[i] - close[i-1]), abs(low[i] - close[i-1]))
|
||||
return pd.Series(tr).rolling(window=period, min_periods=1).mean().values
|
||||
|
||||
|
||||
class MACDDivergenceStrategy:
|
||||
def __init__(self, ma_period=20, divergence_period=20, stop_loss=0.05, take_profit=0.20):
|
||||
self.ma_period = ma_period
|
||||
self.divergence_period = divergence_period
|
||||
self.stop_loss = stop_loss
|
||||
self.take_profit = take_profit
|
||||
self.name = "MACD Divergence + MA"
|
||||
|
||||
def check_buy_signal(self, data, idx):
|
||||
if idx < self.divergence_period + self.ma_period:
|
||||
return False
|
||||
|
||||
current_price = data['close'].iloc[idx]
|
||||
recent_low = data['close'].iloc[idx-self.divergence_period:idx].min()
|
||||
|
||||
if current_price > recent_low:
|
||||
return False
|
||||
|
||||
dif, _, _ = TechnicalIndicators.macd(data['close'].values)
|
||||
recent_dif_low = dif[idx-self.divergence_period:idx].min()
|
||||
|
||||
if dif[idx] <= recent_dif_low:
|
||||
return False
|
||||
|
||||
ma = TechnicalIndicators.sma(data['close'].values, self.ma_period)
|
||||
if current_price < ma[idx]:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def check_sell_signal(self, data, trade, idx):
|
||||
current_price = data['close'].iloc[idx]
|
||||
ma = TechnicalIndicators.sma(data['close'].values, self.ma_period)
|
||||
|
||||
if current_price < ma[idx]:
|
||||
return True
|
||||
|
||||
profit_pct = (current_price - trade.entry_price) / trade.entry_price
|
||||
if profit_pct <= -self.stop_loss or profit_pct >= self.take_profit:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class BollingerBandsStrategy:
|
||||
def __init__(self, bb_period=20, bb_std=2.0, stop_loss=0.05, take_profit=0.15):
|
||||
self.bb_period = bb_period
|
||||
self.bb_std = bb_std
|
||||
self.stop_loss = stop_loss
|
||||
self.take_profit = take_profit
|
||||
self.name = "Bollinger Bands + Trend"
|
||||
|
||||
def rsi(self, prices, period=14):
|
||||
delta = np.diff(prices)
|
||||
gain = np.where(delta > 0, delta, 0)
|
||||
loss = np.where(delta < 0, -delta, 0)
|
||||
avg_gain = np.zeros_like(prices)
|
||||
avg_loss = np.zeros_like(prices)
|
||||
|
||||
if len(prices) > period:
|
||||
avg_gain[period] = np.mean(gain[:period])
|
||||
avg_loss[period] = np.mean(loss[:period])
|
||||
for i in range(period + 1, len(prices)):
|
||||
avg_gain[i] = (avg_gain[i-1] * (period - 1) + gain[i-1]) / period
|
||||
avg_loss[i] = (avg_loss[i-1] * (period - 1) + loss[i-1]) / period
|
||||
|
||||
rs = avg_gain / (avg_loss + 1e-10)
|
||||
return 100 - (100 / (1 + rs))
|
||||
|
||||
def check_buy_signal(self, data, idx):
|
||||
if idx < self.bb_period + 20:
|
||||
return False
|
||||
|
||||
current_price = data['close'].iloc[idx]
|
||||
bb_upper, bb_mid, bb_lower = TechnicalIndicators.bollinger_bands(data['close'].values, self.bb_period, self.bb_std)
|
||||
|
||||
if current_price > bb_lower[idx] * 1.02:
|
||||
return False
|
||||
|
||||
ma5 = TechnicalIndicators.sma(data['close'].values, 5)
|
||||
ma10 = TechnicalIndicators.sma(data['close'].values, 10)
|
||||
ma20 = TechnicalIndicators.sma(data['close'].values, 20)
|
||||
|
||||
if not (ma5[idx] > ma10[idx] > ma20[idx]):
|
||||
return False
|
||||
|
||||
rsi = self.rsi(data['close'].values)
|
||||
if rsi[idx] > 35:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def check_sell_signal(self, data, trade, idx):
|
||||
current_price = data['close'].iloc[idx]
|
||||
bb_upper, bb_mid, bb_lower = TechnicalIndicators.bollinger_bands(data['close'].values, self.bb_period, self.bb_std)
|
||||
|
||||
if current_price >= bb_mid[idx]:
|
||||
return True
|
||||
|
||||
ma20 = TechnicalIndicators.sma(data['close'].values, 20)
|
||||
if current_price < ma20[idx]:
|
||||
return True
|
||||
|
||||
profit_pct = (current_price - trade.entry_price) / trade.entry_price
|
||||
if profit_pct <= -self.stop_loss or profit_pct >= self.take_profit:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class DonchianChannelStrategy:
|
||||
def __init__(self, channel_period=20, exit_period=10, atr_period=14, atr_multiplier=2.0):
|
||||
self.channel_period = channel_period
|
||||
self.exit_period = exit_period
|
||||
self.atr_period = atr_period
|
||||
self.atr_multiplier = atr_multiplier
|
||||
self.name = "Donchian Channel"
|
||||
|
||||
def check_buy_signal(self, data, idx):
|
||||
if idx < self.channel_period:
|
||||
return False
|
||||
|
||||
current_price = data['close'].iloc[idx]
|
||||
dc_upper, dc_lower = TechnicalIndicators.donchian_channel(data['high'].values, data['low'].values, self.channel_period)
|
||||
|
||||
if idx > 0:
|
||||
prev_price = data['close'].iloc[idx-1]
|
||||
if prev_price > dc_upper[idx-1]:
|
||||
return False
|
||||
if current_price > dc_upper[idx]:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def check_sell_signal(self, data, trade, idx):
|
||||
current_price = data['close'].iloc[idx]
|
||||
dc_upper, dc_lower = TechnicalIndicators.donchian_channel(data['high'].values, data['low'].values, self.exit_period)
|
||||
|
||||
if current_price < dc_lower[idx]:
|
||||
return True
|
||||
|
||||
atr = TechnicalIndicators.atr(data['high'].values, data['low'].values, data['close'].values, self.atr_period)
|
||||
stop_price = trade.entry_price - self.atr_multiplier * atr[idx]
|
||||
|
||||
if current_price < stop_price:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class BacktestEngine:
|
||||
def __init__(self, initial_capital=100000.0):
|
||||
self.initial_capital = initial_capital
|
||||
self.commission_rate = 0.0003
|
||||
|
||||
def backtest(self, data, strategy, strategy_name):
|
||||
logger.info(f"Starting backtest: {strategy_name}")
|
||||
|
||||
data = data.copy().reset_index(drop=True)
|
||||
capital = self.initial_capital
|
||||
trades = []
|
||||
open_positions = {}
|
||||
|
||||
for idx in range(len(data)):
|
||||
current_date = data['date'].iloc[idx] if 'date' in data.columns else idx
|
||||
current_price = data['close'].iloc[idx]
|
||||
|
||||
for code, trade in list(open_positions.items()):
|
||||
if strategy.check_sell_signal(data, trade, idx):
|
||||
exit_price = current_price
|
||||
commission = exit_price * trade.shares * self.commission_rate
|
||||
exit_value = exit_price * trade.shares - commission
|
||||
profit = exit_value - trade.entry_value
|
||||
profit_pct = profit / trade.entry_value
|
||||
|
||||
trade.exit_date = current_date
|
||||
trade.exit_price = exit_price
|
||||
trade.exit_value = exit_value
|
||||
trade.profit = profit
|
||||
trade.profit_pct = profit_pct
|
||||
trade.hold_days = idx - trade._entry_idx
|
||||
|
||||
capital += exit_value
|
||||
trades.append(trade)
|
||||
del open_positions[code]
|
||||
|
||||
if capital > 0 and len(open_positions) == 0:
|
||||
if strategy.check_buy_signal(data, idx):
|
||||
code = data['code'].iloc[idx] if 'code' in data.columns else 'STOCK'
|
||||
position_size = capital * 0.8
|
||||
shares = int(position_size / current_price)
|
||||
|
||||
if shares > 0:
|
||||
commission = current_price * shares * self.commission_rate
|
||||
entry_value = current_price * shares + commission
|
||||
|
||||
if entry_value <= capital:
|
||||
trade = Trade(
|
||||
code=code,
|
||||
entry_date=current_date,
|
||||
exit_date=None,
|
||||
entry_price=current_price,
|
||||
exit_price=None,
|
||||
direction=1,
|
||||
shares=shares,
|
||||
entry_value=entry_value,
|
||||
exit_value=None,
|
||||
profit=None,
|
||||
profit_pct=None,
|
||||
hold_days=None,
|
||||
strategy=strategy_name
|
||||
)
|
||||
trade._entry_idx = idx
|
||||
capital -= entry_value
|
||||
open_positions[code] = trade
|
||||
|
||||
for code, trade in open_positions.items():
|
||||
exit_price = data['close'].iloc[-1]
|
||||
commission = exit_price * trade.shares * self.commission_rate
|
||||
exit_value = exit_price * trade.shares - commission
|
||||
profit = exit_value - trade.entry_value
|
||||
profit_pct = profit / trade.entry_value
|
||||
|
||||
trade.exit_date = data['date'].iloc[-1] if 'date' in data.columns else len(data) - 1
|
||||
trade.exit_price = exit_price
|
||||
trade.exit_value = exit_value
|
||||
trade.profit = profit
|
||||
trade.profit_pct = profit_pct
|
||||
trade.hold_days = len(data) - 1 - trade._entry_idx
|
||||
|
||||
capital += exit_value
|
||||
trades.append(trade)
|
||||
|
||||
return self._calculate_performance(strategy_name, capital, trades, data)
|
||||
|
||||
def _calculate_performance(self, strategy_name, final_capital, trades, data):
|
||||
total_return = (final_capital - self.initial_capital) / self.initial_capital
|
||||
|
||||
if 'date' in data.columns:
|
||||
days = (data['date'].iloc[-1] - data['date'].iloc[0]).days
|
||||
else:
|
||||
days = len(data)
|
||||
annual_return = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0
|
||||
|
||||
peak = self.initial_capital
|
||||
max_drawdown = 0
|
||||
for trade in sorted(trades, key=lambda t: t._entry_idx if hasattr(t, '_entry_idx') else 0):
|
||||
peak = max(peak, peak + trade.profit)
|
||||
drawdown = (peak - (peak + trade.profit)) / peak
|
||||
max_drawdown = max(max_drawdown, drawdown)
|
||||
|
||||
if trades:
|
||||
returns = [t.profit_pct for t in trades if t.profit_pct is not None]
|
||||
sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(252) if len(returns) > 1 and np.std(returns) > 0 else 0
|
||||
else:
|
||||
sharpe_ratio = 0
|
||||
|
||||
win_trades = [t for t in trades if t.profit_pct and t.profit_pct > 0]
|
||||
loss_trades = [t for t in trades if t.profit_pct and t.profit_pct <= 0]
|
||||
win_rate = len(win_trades) / len(trades) if trades else 0
|
||||
|
||||
avg_profit_pct = np.mean([t.profit_pct for t in trades if t.profit_pct is not None]) if trades else 0
|
||||
avg_win_pct = np.mean([t.profit_pct for t in win_trades]) if win_trades else 0
|
||||
avg_loss_pct = np.mean([t.profit_pct for t in loss_trades]) if loss_trades else 0
|
||||
|
||||
return BacktestResult(
|
||||
strategy=strategy_name,
|
||||
start_date=data['date'].iloc[0] if 'date' in data.columns else 0,
|
||||
end_date=data['date'].iloc[-1] if 'date' in data.columns else len(data) - 1,
|
||||
initial_capital=self.initial_capital,
|
||||
final_capital=final_capital,
|
||||
total_return=total_return,
|
||||
annual_return=annual_return,
|
||||
max_drawdown=max_drawdown,
|
||||
sharpe_ratio=sharpe_ratio,
|
||||
win_rate=win_rate,
|
||||
total_trades=len(trades),
|
||||
win_trades=len(win_trades),
|
||||
loss_trades=len(loss_trades),
|
||||
avg_profit_pct=avg_profit_pct,
|
||||
avg_win_pct=avg_win_pct,
|
||||
avg_loss_pct=avg_loss_pct,
|
||||
trades=trades
|
||||
)
|
||||
|
||||
def print_result(self, result):
|
||||
print("\n" + "=" * 80)
|
||||
print(f"Strategy: {result.strategy}")
|
||||
print("=" * 80)
|
||||
print(f"Period: {result.start_date} ~ {result.end_date}")
|
||||
print(f"Initial Capital: {result.initial_capital:,.2f}")
|
||||
print(f"Final Capital: {result.final_capital:,.2f}")
|
||||
print("-" * 80)
|
||||
print(f"Total Return: {result.total_return:.2%}")
|
||||
print(f"Annual Return: {result.annual_return:.2%}")
|
||||
print(f"Max Drawdown: {result.max_drawdown:.2%}")
|
||||
print(f"Sharpe Ratio: {result.sharpe_ratio:.2f}")
|
||||
print(f"Win Rate: {result.win_rate:.2%}")
|
||||
print("-" * 80)
|
||||
print(f"Total Trades: {result.total_trades}")
|
||||
print(f"Win Trades: {result.win_trades}")
|
||||
print(f"Loss Trades: {result.loss_trades}")
|
||||
print("=" * 80)
|
||||
|
||||
|
||||
def generate_sample_data(days=500, seed=42):
|
||||
np.random.seed(seed)
|
||||
returns = np.random.normal(0.001, 0.02, days)
|
||||
prices = 100 * np.cumprod(1 + returns)
|
||||
|
||||
return pd.DataFrame({
|
||||
'date': pd.date_range(start='2024-01-01', periods=days, freq='D'),
|
||||
'open': prices * (1 + np.random.uniform(-0.01, 0.01, days)),
|
||||
'high': prices * (1 + np.abs(np.random.uniform(0, 0.02, days))),
|
||||
'low': prices * (1 - np.abs(np.random.uniform(0, 0.02, days))),
|
||||
'close': prices,
|
||||
'volume': np.random.randint(1000000, 10000000, days),
|
||||
'code': 'TEST001'
|
||||
})
|
||||
|
||||
|
||||
def main():
|
||||
print("\n" + "=" * 80)
|
||||
print("Technical Selection Strategies Backtest System - Zhang Fei")
|
||||
print("=" * 80)
|
||||
|
||||
data = generate_sample_data(days=500)
|
||||
print(f"\nGenerated {len(data)} days of sample data")
|
||||
|
||||
engine = BacktestEngine(initial_capital=100000.0)
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("Strategy 1: MACD Divergence + MA")
|
||||
print("=" * 80)
|
||||
macd_strategy = MACDDivergenceStrategy()
|
||||
macd_result = engine.backtest(data, macd_strategy, "MACD Divergence + MA")
|
||||
engine.print_result(macd_result)
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("Strategy 2: Bollinger Bands + Trend")
|
||||
print("=" * 80)
|
||||
bb_strategy = BollingerBandsStrategy()
|
||||
bb_result = engine.backtest(data, bb_strategy, "Bollinger Bands + Trend")
|
||||
engine.print_result(bb_result)
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("Strategy 3: Donchian Channel")
|
||||
print("=" * 80)
|
||||
dc_strategy = DonchianChannelStrategy()
|
||||
dc_result = engine.backtest(data, dc_strategy, "Donchian Channel")
|
||||
engine.print_result(dc_result)
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("Strategy Comparison Summary")
|
||||
print("=" * 80)
|
||||
results = [
|
||||
('MACD Divergence + MA', macd_result.total_return, macd_result.max_drawdown, macd_result.sharpe_ratio, macd_result.win_rate, macd_result.total_trades),
|
||||
('Bollinger Bands + Trend', bb_result.total_return, bb_result.max_drawdown, bb_result.sharpe_ratio, bb_result.win_rate, bb_result.total_trades),
|
||||
('Donchian Channel', dc_result.total_return, dc_result.max_drawdown, dc_result.sharpe_ratio, dc_result.win_rate, dc_result.total_trades),
|
||||
]
|
||||
for name, ret, dd, sharpe, win, trades in results:
|
||||
print(f"{name:25s} | Return: {ret:6.2%} | Drawdown: {dd:6.2%} | Sharpe: {sharpe:.2f} | Win: {win:.2%} | Trades: {trades}")
|
||||
print("=" * 80)
|
||||
|
||||
return {
|
||||
'macd_divergence': macd_result,
|
||||
'bollinger_bands': bb_result,
|
||||
'donchian_channel': dc_result
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
results = main()
|
||||
Reference in New Issue
Block a user