Files
sanguo_quant_live/zhangfei-technical/02-algorithms/technical_selection_strategies_backtest.py
cfdaily affcfa0c72 按照工作流规则进行目录整理
**主要调整:**
1. 重命名将军工作区目录:
   - data-engineering → zhaoyun-data (赵云数据工程)
   - risk-management → guanyu-risk (关羽风控管理)
   - platform → jiangwei-platform (姜维平台)
   - technical-strategy → zhangfei-technical (张飞技术策略)

2. 创建新目录:
   - archive/ (归档目录)
   - simayi-quality/ (司马懿质量保证)
   - pangtong-value/ (庞统价值投资)

3. 移动内容:
   - value-investing → pangtong-value/research (庞统价值投资)
   - running_data → zhaoyun-data/data (运行数据)
   - 文件任务管理系统文档 → archive/file-task-system

4. 清理文件:
   - 删除所有日志文件
   - 删除agent脚本
   - 删除knowledge-base (使用统一知识库)

5. 创建标准结构:
   - 各将军目录下创建research/, scripts/, reports/, references/子目录

6. 更新.gitignore:
   - 排除日志文件和临时文件

**依据:** management/workflow-rules.md
**制定:** 庞统(凤雏)
**审核:** 诸葛亮
2026-03-25 17:27:35 +08:00

690 lines
25 KiB
Python

"""
技术选股策略回测框架
实现三种推荐策略:
1. MACD底背离+均线
2. 布林带下轨+趋势
3. 唐奇安通道突破
作者:张飞
日期:2026年3月24日
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class Trade:
"""交易记录"""
code: str
entry_date: datetime
exit_date: Optional[datetime]
entry_price: float
exit_price: Optional[float]
direction: int # 1多头, -1空头
shares: int
entry_value: float
exit_value: Optional[float]
profit: Optional[float]
profit_pct: Optional[float]
hold_days: Optional[int]
strategy: str
@dataclass
class BacktestResult:
"""回测结果"""
strategy: str
start_date: datetime
end_date: datetime
initial_capital: float
final_capital: float
total_return: float
annual_return: float
max_drawdown: float
sharpe_ratio: float
win_rate: float
total_trades: int
win_trades: int
loss_trades: int
avg_profit_pct: float
avg_win_pct: float
avg_loss_pct: float
trades: List[Trade]
class TechnicalIndicators:
"""技术指标计算器"""
@staticmethod
def calculate_sma(prices: np.ndarray, period: int) -> np.ndarray:
"""简单移动平均"""
return pd.Series(prices).rolling(window=period, min_periods=1).mean().values
@staticmethod
def calculate_ema(prices: np.ndarray, period: int) -> np.ndarray:
"""指数移动平均"""
return pd.Series(prices).ewm(span=period, adjust=False).mean().values
@staticmethod
def calculate_macd(prices: np.ndarray,
fast: int = 12,
slow: int = 26,
signal: int = 9) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
计算MACD
返回: (DIF, DEA, MACD)
"""
ema_fast = TechnicalIndicators.calculate_ema(prices, fast)
ema_slow = TechnicalIndicators.calculate_ema(prices, slow)
dif = ema_fast - ema_slow
dea = TechnicalIndicators.calculate_ema(dif, signal)
macd = 2 * (dif - dea)
return dif, dea, macd
@staticmethod
def calculate_bollinger_bands(prices: np.ndarray,
period: int = 20,
num_std: float = 2.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
计算布林带
返回: (上轨, 中轨, 下轨)
"""
sma = TechnicalIndicators.calculate_sma(prices, period)
std = pd.Series(prices).rolling(window=period, min_periods=1).std().values
upper = sma + num_std * std
lower = sma - num_std * std
return upper, sma, lower
@staticmethod
def calculate_donchian_channel(high: np.ndarray,
low: np.ndarray,
period: int = 20) -> Tuple[np.ndarray, np.ndarray]:
"""
计算唐奇安通道
返回: (上轨, 下轨)
"""
upper = pd.Series(high).rolling(window=period, min_periods=1).max().values
lower = pd.Series(low).rolling(window=period, min_periods=1).min().values
return upper, lower
@staticmethod
def calculate_atr(high: np.ndarray,
low: np.ndarray,
close: np.ndarray,
period: int = 14) -> np.ndarray:
"""计算ATR平均真实波幅"""
tr = np.zeros(len(high))
for i in range(len(high)):
if i == 0:
tr[i] = high[i] - low[i]
else:
hl = high[i] - low[i]
hc = abs(high[i] - close[i-1])
lc = abs(low[i] - close[i-1])
tr[i] = max(hl, hc, lc)
atr = pd.Series(tr).rolling(window=period, min_periods=1).mean().values
return atr
class MACDDivergenceStrategy:
"""
MACD底背离 + 均线过滤策略
买入条件:
1. 股价创近期新低(20日最低)
2. MACD DIF值没有创新低(底背离)
3. 价格站上20日均线(趋势向上确认)
4. 成交量放大(可选)
卖出条件:
1. 收盘价跌破20日均线
2. 或亏损达到5%
3. 或盈利达到20%
"""
def __init__(self, ma_period: int = 20,
divergence_period: int = 20,
stop_loss_pct: float = 0.05,
take_profit_pct: float = 0.20):
self.ma_period = ma_period
self.divergence_period = divergence_period
self.stop_loss_pct = stop_loss_pct
self.take_profit_pct = take_profit_pct
self.name = "MACD底背离+均线"
def check_buy_signal(self, data: pd.DataFrame, idx: int) -> bool:
"""检查买入信号"""
条件"""
if idx < self.divergence_period + self.ma_period:
return False
# 1. 股价创近期新低(20日最低)
recent_low = data['close'].iloc[idx-self.divergence_period:idx].min()
current_price = data['close'].iloc[idx]
if current_price > recent_low:
return False
# 2. MACD DIF没有创新低(底背离)
dif, _, _ = TechnicalIndicators.calculate_macd(data['close'].values)
recent_dif_low = dif[idx-self.divergence_period:idx].min()
current_dif = dif[idx]
if current_dif <= recent_dif_low:
return False # 不是底背离
# 3. 价格站上20日均线
ma = TechnicalIndicators.calculate_sma(data['close'].values, self.ma_period)
if current_price < ma[idx]:
return False
# 4. 检查背离确认(前一日价格也是低点)
if idx > 0 and data['close'].iloc[idx-1] > recent_low:
return False
return True
def check_sell_signal(self, data: pd.DataFrame, trade: Trade, idx: int) -> bool:
"""检查卖出信号"""
current_price = data['close'].iloc[idx]
# 1. 破位20日均线
ma = TechnicalIndicators.calculate_sma(data['close'].values, self.ma_period)
if current_price < ma[idx]:
return True
# 2. 止损
profit_pct = (current_price - trade.entry_price) / trade.entry_price
if profit_pct <= -self.stop_loss_pct:
return True
# 3. 止盈
if profit_pct >= self.take_profit_pct:
return True
return False
class BollingerBandsStrategy:
"""
布林孺下轨 + 趋势过滤策略
买入条件:
1. 股价触及或跌破布林带下轨
2. 均线系统多头排列 (MA5 > MA10 > MA20)
3. RSI < 30 (超卖确认)
卖出条件:
1. 收盘价站上布林带中轨 (回归均值)
2. 或跌破20日均线 (趋势破坏)
3. 或止损/止盈
"""
def __init__(self, bb_period: int = 20,
bb_std: float = 2.0,
stop_loss_pct: float = 0.05,
take_profit_pct: float = 0.15):
self.bb_period = bb_period
self.bb_std = bb_std
self.stop_loss_pct = stop_loss_pct
self.take_profit_pct = take_profit_pct
self.name = "布林带下轨+趋势"
def calculate_rsi(self, prices: np.ndarray, period: int = 14) -> np.ndarray:
"""计算RSI"""
delta = np.diff(prices)
gain = np.where(delta > 0, delta, 0)
loss = np.where(delta < 0, -delta, 0)
avg_gain = np.zeros_like(prices)
avg_loss = np.zeros_like(prices)
if len(prices) > period:
avg_gain[period] = np.mean(gain[:period])
avg_loss[period] = np.mean(loss[:period])
for i in range(period + 1, len(prices)):
avg_gain[i] = (avg_gain[i-1] * (period - 1) + gain[i-1]) / period
avg_loss[i] = (avg_loss[i-1] * (period - 1) + loss[i-1]) / period
rs = avg_gain / (avg_loss + 1e-10)
rsi = 100 - (100 / (1 + rs))
return rsi
def check_buy_signal(self, data: pd.DataFrame, idx: int) -> bool:
"""检查买入信号"""
if idx < self.bb_period + 20:
return False
current_price = data['close'].iloc[idx]
# 1. 触及或跌破布林带下轨
bb_upper, bb_mid, bb_lower = TechnicalIndicators.calculate_bollinger_bands(
data['close'].values, self.bb_period, self.bb_std
)
if current_price > bb_lower[idx] * 1.02: # 允许2%误差
return False
# 2. 均线多头排列
ma5 = TechnicalIndicators.calculate_sma(data['close'].values, 5)
ma10 = TechnicalIndicators.calculate_sma(data['close'].values, 10)
ma20 = TechnicalIndicators.calculate_sma(data['close'].values, 20)
if not (ma5[idx] > ma10[idx] > ma20[idx]):
return False
# 3. RSI超卖
rsi = self.calculate_rsi(data['close'].values)
if rsi[idx] > 35: # 稍微放宽到35
return False
return True
def check_sell_signal(self, data: pd.DataFrame, trade: Trade, idx: int) -> bool:
"""检查卖出信号"""
current_price = data['close'].iloc[idx]
# 1. 回归中轨
bb_upper, bb_mid, bb_lower = TechnicalIndicators.calculate_bollinger_bands(
data['close'].values, self.bb_period, self.bb_std
)
if current_price >= bb_mid[idx]:
return True
# 2. 跌破20日均线(趋势破坏)
ma20 = TechnicalIndicators.calculate_sma(data['close'].values, 20)
if current_price < ma20[idx]:
return True
# 3. 止损
profit_pct = (current_price - trade.entry_price) / trade.entry_price
if profit_pct <= -self.stop_loss_pct:
return True
# 4. 止盈
if profit_pct >= self.take_profit_pct:
return True
return False
class DonchianChannelStrategy:
"""
唐奇安通道突破策略 (经典趋势跟踪)
买入条件:
1. 收盘价突破20日唐奇安通道上轨
2. 成交量放大确认 (可选)
卖出条件:
1. 收盘价跌破10日唐奇安通道下轨
2. 或ATR止损 (2倍ATR)
"""
def __init__(self, channel_period: int = 20,
exit_period: int = 10,
atr_period: int = 14,
atr_multiplier: float = 2.0):
self.channel_period = channel_period
self.exit_period = exit_period
self.atr_period = atr_period
self.atr_multiplier = atr_multiplier
self.name = "唐奇安通道突破"
def check_buy_signal(self, data: pd.DataFrame, idx: int) -> bool:
"""检查买入信号"""
if idx < self.channel_period:
return False
current_price = data['close'].iloc[idx]
# 1. 突破上轨
dc_upper, dc_lower = TechnicalIndicators.calculate_donchian_channel(
data['high'].values, data['low'].values, self.channel_period
)
# 前一日未突破,今日突破
if idx > 0:
prev_price = data['close'].iloc[idx-1]
if prev_price > dc_upper[idx-1]:
return False # 已经在通道上沿
if current_price > dc_upper[idx]:
return True
return False
def check_sell_signal(self, data: pd.DataFrame, trade: Trade, idx: int) -> bool:
"""检查卖出信号"""
current_price = data['close'].iloc[idx]
# 1. 跌破10日通道下轨
dc_upper_exit, dc_lower_exit = TechnicalIndicators.calculate_donchian_channel(
data['high'].values, data['low'].values, self.exit_period
)
if current_price < dc_lower_exit[idx]:
return True
# 2. ATR止损
atr = TechnicalIndicators.calculate_atr(
data['high'].values, data['low'].values, data['close'].values, self.atr_period
)
stop_price = trade.entry_price - self.atr_multiplier * atr[idx]
if current_price < stop_price:
return True
return False
class BacktestEngine:
"""回测引擎"""
def __init__(self, initial_capital: float = 100000.0):
self.initial_capital = initial_capital
self.commission_rate = 0.0003 # 万三手续费
def backtest(self, data: pd.DataFrame, strategy, strategy_name: str) -> BacktestResult:
"""执行回测"""
logger.info(f"开始回测策略: {strategy_name}")
data = data.copy().reset_index(drop=True)
capital = self.initial_capital
trades: List[Trade] = []
open_positions: Dict[str, Trade] = {}
for idx in range(len(data)):
current_date = data['date'].iloc[idx] if 'date' in data.columns else idx
current_price = data['close'].iloc[idx]
# 检查平仓信号
for code, trade in list(open_positions.items()):
if strategy.check_sell_signal(data, trade, idx):
# 平仓
exit_price = current_price
commission = exit_price * trade.shares * self.commission_rate
exit_value = exit_price * trade.shares - commission
profit = exit_value - trade.entry_value
profit_pct = profit / trade.entry_value
trade.exit_date = current_date
trade.exit_price = exit_price
trade.exit_value = exit_value
trade.profit = profit
trade.profit_pct = profit_pct
trade.hold_days = idx - data.index.get_loc(trade.entry_date) if hasattr(trade.entry_date, 'strftime') else 0
capital += exit_value
trades.append(trade)
del open_positions[code]
logger.debug(f"平仓 {code} @ {exit_price:.2f}, 收益: {profit_pct:.2%}")
# 检查开仓信号
if capital > 0:
if strategy.check_buy_signal(data, idx):
if len(open_positions) == 0: # 单持仓策略,简化回测
code = data['code'].iloc[idx] if 'code' in data.columns else 'STOCK'
# 固定仓位:80%资金
position_size = capital * 0.8
shares = int(position_size / current_price)
if shares > 0:
commission = current_price * shares * self.commission_rate
entry_value = current_price * shares + commission
if entry_value <= capital:
trade = Trade(
code=code,
entry_date=current_date,
exit_date=None,
entry_price=current_price,
exit_price=None,
direction=1,
shares=shares,
entry_value=entry_value,
exit_value=None,
profit=None,
profit_pct=None,
hold_days=None,
strategy=strategy_name
)
capital -= entry_value
open_positions[code] = trade
logger.debug(f"开仓 {code} @ {current_price:.2f}, 数量: {shares}")
# 强制平仓未结束的持仓
for code, trade in open_positions.items():
exit_price = data['close'].iloc[-1]
commission = exit_price * trade.shares * self.commission_rate
exit_value = exit_price * trade.shares - commission
profit = exit_value - trade.entry_value
profit_pct = profit / trade.entry_value
trade.exit_date = data['date'].iloc[-1] if 'date' in data.columns else len(data) - 1
trade.exit_price = exit_price
trade.exit_value = exit_value
trade.profit = profit
trade.profit_pct = profit_pct
trade.hold_days = len(data) - 1
capital += exit_value
trades.append(trade)
# 计算绩效指标
result = self._calculate_performance(
strategy_name, capital, trades, data
)
logger.info(f"回测完成: {strategy_name}, 总收益: {result.total_return:.2%}")
return result
def _calculate_performance(self, strategy_name: str,
final_capital: float,
trades: List[Trade],
data: pd.DataFrame) -> BacktestResult:
"""计算绩效指标"""
total_return = (final_capital - self.initial_capital) / self.initial_capital
# 计算年化收益
if 'date' in data.columns:
days = (data['date'].iloc[-1] - data['date'].iloc[0]).days
else:
days = len(data)
annual_return = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0
# 最大回撤(简化版,基于交易)
peak = self.initial_capital
max_drawdown = 0
capital_curve = [self.initial_capital]
for trade in sorted(trades, key=lambda t: t.entry_date if hasattr(t.entry_date, 'strftime') else 0):
capital_curve.append(capital_curve[-1] + trade.profit)
peak = max(peak, capital_curve[-1])
drawdown = (peak - capital_curve[-1]) / peak
max_drawdown = max(max_drawdown, drawdown)
# 夏普比率(简化)
if len(trades) > 1:
returns = [t.profit_pct for t in trades if t.profit_pct is not None]
if returns:
mean_return = np.mean(returns)
std_return = np.std(returns)
sharpe_ratio = mean_return / std_return * np.sqrt(252) if std_return > 0 else 0
else:
sharpe_ratio = 0
else:
sharpe_ratio = 0
# 胜率
win_trades = [t for t in trades if t.profit_pct and t.profit_pct > 0]
loss_trades = [t for t in trades if t.profit_pct and t.profit_pct <= 0]
win_rate = len(win_trades) / len(trades) if trades else 0
# 平均收益
avg_profit_pct = np.mean([t.profit_pct for t in trades if t.profit_pct is not None]) if trades else 0
avg_win_pct = np.mean([t.profit_pct for t in win_trades]) if win_trades else 0
avg_loss_pct = np.mean([t.profit_pct for t in loss_trades]) if loss_trades else 0
return BacktestResult(
strategy=strategy_name,
start_date=data['date'].iloc[0] if 'date' in data.columns else 0,
end_date=data['date'].iloc[-1] if 'date' in data.columns else len(data) - 1,
initial_capital=self.initial_capital,
final_capital=final_capital,
total_return=total_return,
annual_return=annual_return,
max_drawdown=max_drawdown,
sharpe_ratio=sharpe_ratio,
win_rate=win_rate,
total_trades=len(trades),
win_trades=len(win_trades),
loss_trades=len(loss_trades),
avg_profit_pct=avg_profit_pct,
avg_win_pct=avg_win_pct,
avg_loss_pct=avg_loss_pct,
trades=trades
)
def print_result(self, result: BacktestResult):
"""打印回测结果"""
print("\n" + "=" * 80)
print(f"策略回测结果: {result.strategy}")
print("=" * 80)
print(f"回测期间: {result.start_date} ~ {result.end_date}")
print(f"初始资金: {result.initial_capital:,.2f}")
print(f"最终资金: {result.final_capital:,.2f}")
print("-" * 80)
print(f"总收益: {result.total_return:.2%}")
print(f"年化收益: {result.annual_return:.2%}")
print(f"最大回撤: {result.max_drawdown:.2%}")
print(f"夏普比率: {result.sharpe_ratio:.2f}")
print(f"胜率: {result.win_rate:.2%}")
print("-" * 80)
print(f"总交易次数: {result.total_trades}")
print(f"盈利次数: {result.win_trades}")
print(f"亏损次数: {result.loss_trades}")
print(f"平均收益: {result.avg_profit_pct:.2%}")
print(f"平均盈利: {result.avg_win_pct:.2%}")
print(f"平均亏损: {result.avg_loss_pct:.2%}")
print("=" * 80)
# 打印交易明细
if result.trades:
print("\n交易明细:")
print("-" * 80)
for i, trade in enumerate(result.trades, 1):
print(f"{i}. {trade.code}")
print(f" 买入: {trade.entry_price:.2f} @ {trade.entry_date}")
print(f" 卖出: {trade.exit_price:.2f} @ {trade.exit_date}")
print(f" 收益: {trade.profit_pct:.2%}, 持有: {trade.hold_days}")
print("-" * 80)
def generate_sample_data(days: int = 500, seed: int = 42) -> pd.DataFrame:
"""生成模拟数据用于测试"""
np.random.seed(seed)
# 随机游走价格
returns = np.random.normal(0.001, 0.02, days)
prices = 100 * np.cumprod(1 + returns)
data = pd.DataFrame({
'date': pd.date_range(start='2024-01-01', periods=days, freq='D'),
'open': prices * (1 + np.random.uniform(-0.01, 0.01, days)),
'high': prices * (1 + np.abs(np.random.uniform(0, 0.02, days))),
'low': prices * (1 - np.abs(np.random.uniform(0, 0.02, days))),
'close': prices,
'volume': np.random.randint(1000000, 10000000, days),
'code': 'TEST001'
})
return data
def main():
"""主函数 - 演示三种策略回测"""
print("\n" + "=" * 80)
print("技术选股策略回测系统 - 张飞出品")
print("=" * 80)
# 生成模拟数据
print("\n生成模拟数据...")
data = generate_sample_data(days=500)
# 创建回测引擎
engine = BacktestEngine(initial_capital=100000.0)
# 1. MACD底背离+均线策略
print("\n" + "=" * 80)
print("策略1: MACD底背离 + 均线过滤")
print("=" * 80)
macd_strategy = MACDDivergenceStrategy(
ma_period=20,
divergence_period=20,
stop_loss_pct=0.05,
take_profit_pct=0.20
)
macd_result = engine.backtest(data, macd_strategy, "MACD底背离+均线")
engine.print_result(macd_result)
# 2. 布林带下轨+趋势策略
print("\n" + "=" * 80)
print("策略2: 布林带下轨 + 趋势过滤")
print("=" * 80)
bb_strategy = BollingerBandsStrategy(
bb_period=20,
bb_std=2.0,
stop_loss_pct=0.05,
take_profit_pct=0.15
)
bb_result = engine.backtest(data, bb_strategy, "布林带下轨+趋势")
engine.print_result(bb_result)
# 3. 唐奇安通道突破策略
print("\n" + "=" * 80)
print("策略3: 唐奇安通道突破")
print("=" * 80)
dc_strategy = DonchianChannelStrategy(
channel_period=20,
exit_period=10,
atr_period=14,
atr_multiplier=2.0
)
dc_result = engine.backtest(data, dc_strategy, "唐奇安通道突破")
engine.print_result(dc_result)
# 策略对比
print("\n" + "=" * 80)
print("策略对比汇总")
print("=" * 80)
comparison = pd.DataFrame({
'策略': [macd_result.strategy, bb_result.strategy, dc_result.strategy],
'总收益': [macd_result.total_return, bb_result.total_return, dc_result.total_return],
'年化收益': [macd_result.annual_return, bb_result.annual_return, dc_result.annual_return],
'最大回撤': [macd_result.max_drawdown, bb_result.max_drawdown, dc_result.max_drawdown],
'夏普比率': [macd_result.sharpe_ratio, bb_result.sharpe_ratio, dc_result.sharpe_ratio],
'胜率': [macd_result.win_rate, bb_result.win_rate, dc_result.win_rate],
'交易次数': [macd_result.total_trades, bb_result.total_trades, dc_result.total_trades],
})
print(comparison.to_string(index=False))
print("=" * 80)
return {
'macd_divergence': macd_result,
'bollinger_bands': bb_result,
'donchian_channel': dc_result
}
if __name__ == "__main__":
results = main()