auto-sync: 2026-04-10 15:05:02

This commit is contained in:
cfdaily
2026-04-10 15:05:02 +08:00
parent f4a349378e
commit a039e1ac0a
5 changed files with 805 additions and 0 deletions
@@ -0,0 +1,650 @@
"""
Technical Selection Strategies Backtest Framework with Risk Control
Implements three recommended strategies + Guanyu Risk Control:
1. MACD Divergence + Moving Average
2. Bollinger Bands Lower Rail + Trend
3. Donchian Channel Breakout
4. Four-layer Risk Control System by Guan Yu
Original Author: Zhang Fei
Risk Control: Guan Yu (Yunchang)
Date: 2026-04-10
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
import logging
# Import risk control module from Guan Yu
from risk_control import RiskController, StockInfo, PortfolioInfo
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class Trade:
code: str
entry_date: datetime
exit_date: Optional[datetime]
entry_price: float
exit_price: Optional[float]
direction: int
shares: int
entry_value: float
exit_value: Optional[float]
profit: Optional[float]
profit_pct: Optional[float]
hold_days: Optional[int]
strategy: str
@dataclass
class BacktestResult:
strategy: str
start_date: datetime
end_date: datetime
initial_capital: float
final_capital: float
total_return: float
annual_return: float
max_drawdown: float
sharpe_ratio: float
win_rate: float
total_trades: int
win_trades: int
loss_trades: int
avg_profit_pct: float
avg_win_pct: float
avg_loss_pct: float
trades: List[Trade]
class TechnicalIndicators:
@staticmethod
def sma(prices, period):
return pd.Series(prices).rolling(window=period, min_periods=1).mean().values
@staticmethod
def ema(prices, period):
return pd.Series(prices).ewm(span=period, adjust=False).mean().values
@staticmethod
def macd(prices, fast=12, slow=26, signal=9):
ema_fast = TechnicalIndicators.ema(prices, fast)
ema_slow = TechnicalIndicators.ema(prices, slow)
dif = ema_fast - ema_slow
dea = TechnicalIndicators.ema(dif, signal)
macd = 2 * (dif - dea)
return dif, dea, macd
@staticmethod
def bollinger_bands(prices, period=20, num_std=2.0):
middle = TechnicalIndicators.sma(prices, period)
std = pd.Series(prices).rolling(window=period, min_periods=1).std().values
upper = middle + num_std * std
lower = middle - num_std * std
return upper, middle, lower
@staticmethod
def donchian_channel(high, low, period=20):
upper = pd.Series(high).rolling(window=period, min_periods=1).max().values
lower = pd.Series(low).rolling(window=period, min_periods=1).min().values
return upper, lower
@staticmethod
def atr(high, low, close, period=14):
tr = np.zeros(len(high))
for i in range(len(high)):
if i == 0:
tr[i] = high[i] - low[i]
else:
tr[i] = max(high[i] - low[i], abs(high[i] - close[i-1]), abs(low[i] - close[i-1]))
return pd.Series(tr).rolling(window=period, min_periods=1).mean().values
class MACDDivergenceStrategy:
def __init__(self, ma_period=20, divergence_period=20, stop_loss=0.05, take_profit=0.20):
self.ma_period = ma_period
self.divergence_period = divergence_period
self.stop_loss = stop_loss
self.take_profit = take_profit
self.name = "MACD Divergence + MA"
def check_buy_signal(self, data, idx):
if idx < self.divergence_period + self.ma_period:
return False
current_price = data['close'].iloc[idx]
recent_low = data['close'].iloc[idx-self.divergence_period:idx].min()
if current_price > recent_low:
return False
dif, _, _ = TechnicalIndicators.macd(data['close'].values)
recent_dif_low = dif[idx-self.divergence_period:idx].min()
if dif[idx] <= recent_dif_low:
return False
ma = TechnicalIndicators.sma(data['close'].values, self.ma_period)
if current_price < ma[idx]:
return False
return True
def check_sell_signal(self, data, trade, idx):
current_price = data['close'].iloc[idx]
ma = TechnicalIndicators.sma(data['close'].values, self.ma_period)
if current_price < ma[idx]:
return True
profit_pct = (current_price - trade.entry_price) / trade.entry_price
if profit_pct <= -self.stop_loss or profit_pct >= self.take_profit:
return True
return False
class BollingerBandsStrategy:
def __init__(self, bb_period=20, bb_std=2.0, stop_loss=0.05, take_profit=0.15):
self.bb_period = bb_period
self.bb_std = bb_std
self.stop_loss = stop_loss
self.take_profit = take_profit
self.name = "Bollinger Bands + Trend"
def rsi(self, prices, period=14):
delta = np.diff(prices)
gain = np.where(delta > 0, delta, 0)
loss = np.where(delta < 0, -delta, 0)
avg_gain = np.zeros_like(prices)
avg_loss = np.zeros_like(prices)
if len(prices) > period:
avg_gain[period] = np.mean(gain[:period])
avg_loss[period] = np.mean(loss[:period])
for i in range(period + 1, len(prices)):
avg_gain[i] = (avg_gain[i-1] * (period - 1) + gain[i-1]) / period
avg_loss[i] = (avg_loss[i-1] * (period - 1) + loss[i-1]) / period
rs = avg_gain / (avg_loss + 1e-10)
return 100 - (100 / (1 + rs))
def check_buy_signal(self, data, idx):
if idx < self.bb_period + 20:
return False
current_price = data['close'].iloc[idx]
bb_upper, bb_mid, bb_lower = TechnicalIndicators.bollinger_bands(data['close'].values, self.bb_period, self.bb_std)
if current_price > bb_lower[idx] * 1.02:
return False
ma5 = TechnicalIndicators.sma(data['close'].values, 5)
ma10 = TechnicalIndicators.sma(data['close'].values, 10)
ma20 = TechnicalIndicators.sma(data['close'].values, 20)
if not (ma5[idx] > ma10[idx] > ma20[idx]):
return False
rsi = self.rsi(data['close'].values)
if rsi[idx] > 35:
return False
return True
def check_sell_signal(self, data, trade, idx):
current_price = data['close'].iloc[idx]
bb_upper, bb_mid, bb_lower = TechnicalIndicators.bollinger_bands(data['close'].values, self.bb_period, self.bb_std)
if current_price >= bb_mid[idx]:
return True
ma20 = TechnicalIndicators.sma(data['close'].values, 20)
if current_price < ma20[idx]:
return True
profit_pct = (current_price - trade.entry_price) / trade.entry_price
if profit_pct <= -self.stop_loss or profit_pct >= self.take_profit:
return True
return False
class DonchianChannelStrategy:
def __init__(self, channel_period=20, exit_period=10, atr_period=14, atr_multiplier=2.0):
self.channel_period = channel_period
self.exit_period = exit_period
self.atr_period = atr_period
self.atr_multiplier = atr_multiplier
self.name = "Donchian Channel"
def check_buy_signal(self, data, idx):
if idx < self.channel_period:
return False
current_price = data['close'].iloc[idx]
dc_upper, dc_lower = TechnicalIndicators.donchian_channel(data['high'].values, data['low'].values, self.channel_period)
if idx > 0:
prev_price = data['close'].iloc[idx-1]
if prev_price > dc_upper[idx-1]:
return False
if current_price > dc_upper[idx]:
return True
return False
def check_sell_signal(self, data, trade, idx):
current_price = data['close'].iloc[idx]
dc_upper, dc_lower = TechnicalIndicators.donchian_channel(data['high'].values, data['low'].values, self.exit_period)
if current_price < dc_lower[idx]:
return True
atr = TechnicalIndicators.atr(data['high'].values, data['low'].values, data['close'].values, self.atr_period)
stop_price = trade.entry_price - self.atr_multiplier * atr[idx]
if current_price < stop_price:
return True
return False
class BacktestEngine:
def __init__(self, initial_capital=100000.0, enable_risk_control=True):
self.initial_capital = initial_capital
self.commission_rate = 0.0003
self.enable_risk_control = enable_risk_control
if enable_risk_control:
self.risk_controller = RiskController()
def backtest(self, data, strategy, strategy_name):
logger.info(f"Starting backtest: {strategy_name} (risk_control={self.enable_risk_control})")
data = data.copy().reset_index(drop=True)
capital = self.initial_capital
trades = []
open_positions = {}
for idx in range(len(data)):
current_date = data['date'].iloc[idx] if 'date' in data.columns else idx
current_price = data['close'].iloc[idx]
# 计算当前组合信息供风控使用
portfolio_info = PortfolioInfo(
total_capital=self.initial_capital,
current_capital=capital + sum(t.entry_value for t in open_positions.values()),
positions={code: trade.shares * current_price for code, trade in open_positions.items()}
)
# 准备股票信息供风控检查
stock_list = []
for code, trade in open_positions.items():
stock_info = StockInfo(
code=code,
name="",
cost_price=trade.entry_price,
current_price=current_price,
is_st=False,
is_limit_down=False,
is_fraud=False,
volume=data['volume'].iloc[idx] / 1e8 if 'volume' in data.columns else 1.0
)
stock_list.append(stock_info)
# 风控收盘后检查
if self.enable_risk_control and stock_list:
risk_result = self.risk_controller.post_trade_check(stock_list, portfolio_info)
# 执行风控止损
if risk_result['stop_loss_required']:
for stop_item in risk_result['stop_loss_stocks']:
code = stop_item['code']
if code in open_positions:
trade = open_positions[code]
exit_price = current_price
commission = exit_price * trade.shares * self.commission_rate
exit_value = exit_price * trade.shares - commission
profit = exit_value - trade.entry_value
profit_pct = profit / trade.entry_value
trade.exit_date = current_date
trade.exit_price = exit_price
trade.exit_value = exit_value
trade.profit = profit
trade.profit_pct = profit_pct
trade.hold_days = idx - trade._entry_idx
capital += exit_value
trades.append(trade)
del open_positions[code]
logger.info(f"[RiskControl] Trigger stop loss: {code} at {current_price:.2f}, drawdown={stop_item['current_drawdown']:.2%}")
# 原策略止损检查
for code, trade in list(open_positions.items()):
if strategy.check_sell_signal(data, trade, idx):
if code in open_positions: # 可能已经被风控止损了
exit_price = current_price
commission = exit_price * trade.shares * self.commission_rate
exit_value = exit_price * trade.shares - commission
profit = exit_value - trade.entry_value
profit_pct = profit / trade.entry_value
trade.exit_date = current_date
trade.exit_price = exit_price
trade.exit_value = exit_value
trade.profit = profit
trade.profit_pct = profit_pct
trade.hold_days = idx - trade._entry_idx
capital += exit_value
trades.append(trade)
del open_positions[code]
# 更新组合信息
portfolio_info = PortfolioInfo(
total_capital=self.initial_capital,
current_capital=capital + sum(t.entry_value for t in open_positions.values()),
positions={code: trade.shares * current_price for code, trade in open_positions.items()}
)
if capital > 0 and len(open_positions) == 0:
if strategy.check_buy_signal(data, idx):
code = data['code'].iloc[idx] if 'code' in data.columns else 'TEST001'
# 风控事前检查
if self.enable_risk_control:
# 准备当前股票信息
current_stock = StockInfo(
code=code,
name="",
cost_price=current_price,
current_price=current_price,
is_st=False,
is_limit_down=False,
is_fraud=False,
volume=data['volume'].iloc[idx] / 1e8 if 'volume' in data.columns else 1.0
)
ok, reason = self.risk_controller.pre_trade_check(current_stock, portfolio_info)
if not ok:
logger.info(f"[RiskControl] Rejected open position: {code}, reason: {reason}")
continue
position_size = capital * 0.8
shares = int(position_size / current_price)
if shares > 0:
commission = current_price * shares * self.commission_rate
entry_value = current_price * shares + commission
if entry_value <= capital:
trade = Trade(
code=code,
entry_date=current_date,
exit_date=None,
entry_price=current_price,
exit_price=None,
direction=1,
shares=shares,
entry_value=entry_value,
exit_value=None,
profit=None,
profit_pct=None,
hold_days=None,
strategy=strategy_name
)
trade._entry_idx = idx
capital -= entry_value
open_positions[code] = trade
for code, trade in open_positions.items():
exit_price = data['close'].iloc[-1]
commission = exit_price * trade.shares * self.commission_rate
exit_value = exit_price * trade.shares - commission
profit = exit_value - trade.entry_value
profit_pct = profit / trade.entry_value
trade.exit_date = data['date'].iloc[-1] if 'date' in data.columns else len(data) - 1
trade.exit_price = exit_price
trade.exit_value = exit_value
trade.profit = profit
trade.profit_pct = profit_pct
trade.hold_days = len(data) - 1 - trade._entry_idx
capital += exit_value
trades.append(trade)
return self._calculate_performance(strategy_name, capital, trades, data)
def _calculate_performance(self, strategy_name, final_capital, trades, data):
total_return = (final_capital - self.initial_capital) / self.initial_capital
if 'date' in data.columns:
days = (data['date'].iloc[-1] - data['date'].iloc[0]).days
else:
days = len(data)
annual_return = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0
peak = self.initial_capital
max_drawdown = 0
for trade in sorted(trades, key=lambda t: t._entry_idx if hasattr(t, '_entry_idx') else 0):
peak = max(peak, peak + trade.profit)
drawdown = (peak - (peak + trade.profit)) / peak
max_drawdown = max(max_drawdown, drawdown)
if trades:
returns = [t.profit_pct for t in trades if t.profit_pct is not None]
sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(252) if len(returns) > 1 and np.std(returns) > 0 else 0
else:
sharpe_ratio = 0
win_trades = [t for t in trades if t.profit_pct and t.profit_pct > 0]
loss_trades = [t for t in trades if t.profit_pct and t.profit_pct <= 0]
win_rate = len(win_trades) / len(trades) if trades else 0
avg_profit_pct = np.mean([t.profit_pct for t in trades if t.profit_pct is not None]) if trades else 0
avg_win_pct = np.mean([t.profit_pct for t in win_trades]) if win_trades else 0
avg_loss_pct = np.mean([t.profit_pct for t in loss_trades]) if loss_trades else 0
return BacktestResult(
strategy=strategy_name,
start_date=data['date'].iloc[0] if 'date' in data.columns else 0,
end_date=data['date'].iloc[-1] if 'date' in data.columns else len(data) - 1,
initial_capital=self.initial_capital,
final_capital=final_capital,
total_return=total_return,
annual_return=annual_return,
max_drawdown=max_drawdown,
sharpe_ratio=sharpe_ratio,
win_rate=win_rate,
total_trades=len(trades),
win_trades=len(win_trades),
loss_trades=len(loss_trades),
avg_profit_pct=avg_profit_pct,
avg_win_pct=avg_win_pct,
avg_loss_pct=avg_loss_pct,
trades=trades
)
def print_result(self, result):
print("\n" + "=" * 80)
print(f"Strategy: {result.strategy}")
print("=" * 80)
print(f"Period: {result.start_date} ~ {result.end_date}")
print(f"Initial Capital: {result.initial_capital:,.2f}")
print(f"Final Capital: {result.final_capital:,.2f}")
print("-" * 80)
print(f"Total Return: {result.total_return:.2%}")
print(f"Annual Return: {result.annual_return:.2%}")
print(f"Max Drawdown: {result.max_drawdown:.2%}")
print(f"Sharpe Ratio: {result.sharpe_ratio:.2f}")
print(f"Win Rate: {result.win_rate:.2%}")
print("-" * 80)
print(f"Total Trades: {result.total_trades}")
print(f"Win Trades: {result.win_trades}")
print(f"Loss Trades: {result.loss_trades}")
print("=" * 80)
def generate_sample_data(code, seed=42, days=500, drift=0.0005):
np.random.seed(seed)
returns = np.random.normal(drift, 0.02, days)
prices = 100 * np.cumprod(1 + returns)
return pd.DataFrame({
'date': pd.date_range(start='2024-01-01', periods=days, freq='D'),
'open': prices * (1 + np.random.uniform(-0.01, 0.01, days)),
'high': prices * (1 + np.abs(np.random.uniform(0, 0.02, days))),
'low': prices * (1 - np.abs(np.random.uniform(0, 0.02, days))),
'close': prices,
'volume': np.random.randint(1000000, 10000000, days),
'code': code
})
def run_backtest_on_multiple_stocks(engine, strategy, strategy_name, n_stocks=10):
"""Run backtest on multiple stocks to get enough trades"""
all_trades = []
total_results = []
for i in range(n_stocks):
# Different drift for different stocks
drift = 0.0005 + (i - n_stocks/2) * 0.0001
code = f"TEST{i+1:03d}"
data = generate_sample_data(code, seed=42+i, days=500, drift=drift)
result = engine.backtest(data, strategy, f"{strategy_name} - {code}")
all_trades.extend(result.trades)
total_results.append(result)
# Aggregate results
if not total_results:
return None
initial_capital = engine.initial_capital * n_stocks
final_capital = sum(r.final_capital for r in total_results)
total_return = (final_capital - initial_capital) / initial_capital
# Find max drawdown across all trades
all_trades_sorted = sorted(all_trades, key=lambda t: t._entry_idx)
peak = 0
max_drawdown = 0
cumulative = 0
for t in all_trades_sorted:
cumulative += t.profit if t.profit else 0
peak = max(peak, cumulative)
drawdown = (peak - cumulative) / (initial_capital + peak) if (initial_capital + peak) > 0 else 0
max_drawdown = max(max_drawdown, drawdown)
# Calculate aggregate statistics
n_total = len(all_trades)
n_win = sum(1 for t in all_trades if t.profit_pct and t.profit_pct > 0)
n_loss = n_total - n_win
if n_total > 0:
returns = [t.profit_pct for t in all_trades if t.profit_pct is not None]
avg_profit_pct = np.mean(returns) if returns else 0
avg_win_pct = np.mean([t.profit_pct for t in all_trades if t.profit_pct and t.profit_pct > 0]) if n_win > 0 else 0
avg_loss_pct = np.mean([-t.profit_pct for t in all_trades if t.profit_pct and t.profit_pct <= 0]) if n_loss > 0 else 0
win_rate = n_win / n_total
sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(252) if len(returns) > 1 and np.std(returns) > 0 else 0
else:
avg_profit_pct = 0
avg_win_pct = 0
avg_loss_pct = 0
win_rate = 0
sharpe_ratio = 0
return BacktestResult(
strategy=strategy_name,
start_date=total_results[0].start_date,
end_date=total_results[-1].end_date,
initial_capital=initial_capital,
final_capital=final_capital,
total_return=total_return,
annual_return=(1 + total_return) ** (365 / 500) - 1,
max_drawdown=max_drawdown,
sharpe_ratio=sharpe_ratio,
win_rate=win_rate,
total_trades=n_total,
win_trades=n_win,
loss_trades=n_loss,
avg_profit_pct=avg_profit_pct,
avg_win_pct=avg_win_pct,
avg_loss_pct=avg_loss_pct,
trades=all_trades
)
def main():
print("\n" + "=" * 80)
print("Technical Selection Strategies Backtest with Risk Control")
print("Original: Zhang Fei | Risk Control: Guan Yu (Yunchang)")
print("=" * 80)
n_stocks = 20
print(f"\nRunning backtest on {n_stocks} simulated stocks...")
print("\n" + "=" * 80)
print("Running backtest WITHOUT risk control...")
print("=" * 80)
engine_no_rc = BacktestEngine(initial_capital=100000.0, enable_risk_control=False)
macd_strategy = MACDDivergenceStrategy()
macd_result_no_rc = run_backtest_on_multiple_stocks(engine_no_rc, macd_strategy, "MACD Divergence + MA (No RC)", n_stocks=n_stocks)
engine_no_rc.print_result(macd_result_no_rc)
bb_strategy = BollingerBandsStrategy()
bb_result_no_rc = run_backtest_on_multiple_stocks(engine_no_rc, bb_strategy, "Bollinger Bands + Trend (No RC)", n_stocks=n_stocks)
dc_strategy = DonchianChannelStrategy()
dc_result_no_rc = run_backtest_on_multiple_stocks(engine_no_rc, dc_strategy, "Donchian Channel (No RC)", n_stocks=n_stocks)
print("\n" + "=" * 80)
print("Running backtest WITH risk control (Guan Yu's four-layer system)...")
print("=" * 80)
engine_rc = BacktestEngine(initial_capital=100000.0, enable_risk_control=True)
macd_result_rc = run_backtest_on_multiple_stocks(engine_rc, macd_strategy, "MACD Divergence + MA (With RC)", n_stocks=n_stocks)
engine_rc.print_result(macd_result_rc)
bb_result_rc = run_backtest_on_multiple_stocks(engine_rc, bb_strategy, "Bollinger Bands + Trend (With RC)", n_stocks=n_stocks)
dc_result_rc = run_backtest_on_multiple_stocks(engine_rc, dc_strategy, "Donchian Channel (With RC)", n_stocks=n_stocks)
print("\n" + "=" * 80)
print("Comparison Summary: WITHOUT vs WITH Risk Control")
print("=" * 80)
print(f"{'Strategy':30s} | {'RC'} | {'Total Return':>10s} | {'Max Drawdown':>12s} | {'Sharpe':>6s} | {'Win Rate':>8s} | {'Trades':>6s}")
print("-" * 80)
# MACD
print(f"{'MACD Divergence + MA':30s} | {'No RC':<6} | {macd_result_no_rc.total_return:>10.2%} | {macd_result_no_rc.max_drawdown:>12.2%} | {macd_result_no_rc.sharpe_ratio:>6.2f} | {macd_result_no_rc.win_rate:>8.2%} | {macd_result_no_rc.total_trades:>6d}")
print(f"{'MACD Divergence + MA':30s} | {'With RC':<6} | {macd_result_rc.total_return:>10.2%} | {macd_result_rc.max_drawdown:>12.2%} | {macd_result_rc.sharpe_ratio:>6.2f} | {macd_result_rc.win_rate:>8.2%} | {macd_result_rc.total_trades:>6d}")
print("-" * 80)
# Bollinger Bands
print(f"{'Bollinger Bands + Trend':30s} | {'No RC':<6} | {bb_result_no_rc.total_return:>10.2%} | {bb_result_no_rc.max_drawdown:>12.2%} | {bb_result_no_rc.sharpe_ratio:>6.2f} | {bb_result_no_rc.win_rate:>8.2%} | {bb_result_no_rc.total_trades:>6d}")
print(f"{'Bollinger Bands + Trend':30s} | {'With RC':<6} | {bb_result_rc.total_return:>10.2%} | {bb_result_rc.max_drawdown:>12.2%} | {bb_result_rc.sharpe_ratio:>6.2f} | {bb_result_rc.win_rate:>8.2%} | {bb_result_rc.total_trades:>6d}")
print("-" * 80)
# Donchian Channel
print(f"{'Donchian Channel':30s} | {'No RC':<6} | {dc_result_no_rc.total_return:>10.2%} | {dc_result_no_rc.max_drawdown:>12.2%} | {dc_result_no_rc.sharpe_ratio:>6.2f} | {dc_result_no_rc.win_rate:>8.2%} | {dc_result_no_rc.total_trades:>6d}")
print(f"{'Donchian Channel':30s} | {'With RC':<6} | {dc_result_rc.total_return:>10.2%} | {dc_result_rc.max_drawdown:>12.2%} | {dc_result_rc.sharpe_ratio:>6.2f} | {dc_result_rc.win_rate:>8.2%} | {dc_result_rc.total_trades:>6d}")
print("=" * 80)
return {
'no_rc': {'macd': macd_result_no_rc, 'bb': bb_result_no_rc, 'dc': dc_result_no_rc},
'with_rc': {'macd': macd_result_rc, 'bb': bb_result_rc, 'dc': dc_result_rc}
}
if __name__ == "__main__":
results = main()