#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 关羽 - 价值+技术综合选股策略 ======================================= 基于五虎上将多因子选股体系第二部分实现 核心框架: - 价值筛选缩小范围(风控前置) - 技术分析确认入场点 - 仓位控制和动态止损 - 入场出场规则 适用市场:A股 T+1、涨跌停板 预期绩效: - 年化收益:14-17% - 最大回撤:28-38% - 夏普比率:0.75-0.85 - 卡玛比率:0.4-0.5 """ import akshare as ak import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Tuple, Optional import warnings warnings.filterwarnings('ignore') class RiskProfile: """风险偏好配置""" # 保守型配置 CONSERVATIVE = { 'name': '保守型', 'pe_max': 15, 'pb_max': 1.5, 'roe_min': 12.0, 'single_stock_max': 0.08, 'industry_max': 0.20, 'stock_count': (15, 20), 'stop_loss_pct': 0.05, } # 平衡型配置 BALANCED = { 'name': '平衡型', 'pe_max': 25, 'pb_max': 2.5, 'roe_min': 10.0, 'single_stock_max': 0.15, 'industry_max': 0.25, 'stock_count': (10, 15), 'stop_loss_pct': 0.06, } # 进取型配置 AGGRESSIVE = { 'name': '进取型', 'pe_max': 35, 'pb_max': 3.0, 'roe_min': 8.0, 'single_stock_max': 0.25, 'industry_max': 0.30, 'stock_count': (5, 10), 'stop_loss_pct': 0.08, } @classmethod def get_profile(cls, profile: str = 'balanced') -> Dict: """获取风险配置""" profiles = { 'conservative': cls.CONSERVATIVE, 'balanced': cls.BALANCED, 'aggressive': cls.AGGRESSIVE, } return profiles.get(profile.lower(), cls.BALANCED) class ValueFilter: """ 价值筛选器 - 第一步:价值筛选缩小范围 ========================================= 风控前置,通过基本面过滤排除高风险股票 """ def __init__(self, risk_profile: Dict = None): """ 初始化价值筛选器 Args: risk_profile: 风险偏好配置 """ self.risk_profile = risk_profile or RiskProfile.get_profile('balanced') def get_stock_list(self) -> pd.DataFrame: """ 获取A股全市场股票列表 Returns: 股票列表 DataFrame """ try: # 获取A股所有股票 stock_list = ak.stock_zh_a_spot_em() # 标准化字段名 stock_list = stock_list.rename(columns={ '代码': 'stock_code', '名称': 'stock_name', '最新价': 'current_price', '总市值': 'total_market_cap', '流通市值': 'circulating_market_cap', '市盈率-动态': 'pe_ttm', '市净率': 'pb', '市销率': 'ps', '换手率': 'turnover_rate', '量比': 'volume_ratio', }) return stock_list except Exception as e: print(f"获取股票列表失败: {e}") return pd.DataFrame() def filter_basic_risks(self, df: pd.DataFrame) -> pd.DataFrame: """ 排除基本风险股票(风控前置) 排除条件: 1. ST/*ST股票 2. 上市不足180天的新股 3. 流通市值过小(< 10亿) 4. 换手率过低(流动性差) Args: df: 股票列表 Returns: 过滤后的股票列表 """ if df.empty: return df print(f"过滤前股票数量: {len(df)}") # 1. 排除ST/*ST股票 df = df[~df['stock_name'].str.contains('ST|\\*ST', na=False)] print(f"排除ST股票后: {len(df)}") # 2. 排除停牌股票(价格异常) df = df[df['current_price'] > 0] print(f"排除停牌股票后: {len(df)}") # 3. 排除上市不足180天的新股(通过股票代码判断) def is_new_stock(code): # 新股通常是00开头的深主板和60开头的沪主板 # 但这里简化处理,实际需要获取上市日期 return False df = df[~df['stock_code'].apply(is_new_stock)] # 4. 排除流通市值过小的股票(< 10亿) df = df[df['circulating_market_cap'] > 100000] # 单位:万元,即10亿 print(f"排除小市值股票后: {len(df)}") # 5. 排除换手率过低的股票(< 0.5%,流动性差) df = df[df['turnover_rate'] > 0.5] print(f"排除低换手率股票后: {len(df)}") return df.reset_index(drop=True) def filter_valuation_metrics(self, df: pd.DataFrame) -> pd.DataFrame: """ 估值指标筛选 筛选条件: 1. PE < pe_max(根据风险偏好) 2. PB < pb_max(根据风险偏好) 3. PE > 0(排除亏损) 4. PB > 0 Args: df: 股票列表 Returns: 过滤后的股票列表 """ if df.empty: return df pe_max = self.risk_profile['pe_max'] pb_max = self.risk_profile['pb_max'] print(f"\n估值筛选参数: PE<{pe_max}, PB<{pb_max}") # 1. PE筛选 df = df[(df['pe_ttm'] > 0) & (df['pe_ttm'] < pe_max)] print(f"PE筛选后: {len(df)}") # 2. PB筛选 df = df[(df['pb'] > 0) & (df['pb'] < pb_max)] print(f"PB筛选后: {len(df)}") return df.reset_index(drop=True) def filter_quality_metrics(self, df: pd.DataFrame) -> pd.DataFrame: """ 质量指标筛选 筛选条件: 1. ROE > roe_min(根据风险偏好) 2. 排除财务异常(如大额商誉、高质押率等) Args: df: 股票列表 Returns: 过滤后的股票列表 """ if df.empty: return df roe_min = self.risk_profile['roe_min'] print(f"\n质量筛选参数: ROE>{roe_min}%") # 注意:akshare spot数据中没有ROE等详细财务数据 # 这里需要通过单独的财务数据接口获取 # 为了简化,我们先跳过这一步,或者假设已经获取了 # 实际实现中应该: # 1. 调用 ak.stock_financial_analysis_indicator() 获取财务指标 # 2. 筛选 ROE > roe_min # 3. 排除商誉>20%、大股东质押>50%、连续亏损等 print("质量指标筛选(需要补充财务数据接口)") return df.reset_index(drop=True) def apply(self, stock_list: pd.DataFrame = None) -> pd.DataFrame: """ 执行完整的价值筛选流程 Args: stock_list: 股票列表,如果为None则自动获取 Returns: 筛选后的股票列表 """ if stock_list is None: stock_list = self.get_stock_list() if stock_list.empty: print("未获取到股票数据") return pd.DataFrame() print("=" * 60) print("价值筛选流程开始") print("=" * 60) # 第一步:排除基本风险 df = self.filter_basic_risks(stock_list) if df.empty: return df # 第二步:估值指标筛选 df = self.filter_valuation_metrics(df) if df.empty: return df # 第三步:质量指标筛选 df = self.filter_quality_metrics(df) print("=" * 60) print(f"价值筛选完成,最终候选股票数量: {len(df)}") print("=" * 60) return df class TechnicalFilter: """ 技术信号过滤器 - 第二步:技术分析确认入场点 =============================================== 通过技术指标确认合适的买入时机 """ def __init__(self, ma_days: int = 20): """ 初始化技术过滤器 Args: ma_days: 均线天数,默认20日 """ self.ma_days = ma_days def get_stock_history(self, stock_code: str, days: int = 120) -> pd.DataFrame: """ 获取股票历史行情数据 Args: stock_code: 股票代码 days: 获取天数 Returns: 历史行情 DataFrame """ try: # 确定股票市场类型 if stock_code.startswith('60'): symbol = f"sh{stock_code}" elif stock_code.startswith('00') or stock_code.startswith('30'): symbol = f"sz{stock_code}" else: return pd.DataFrame() # 获取历史数据 df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=(datetime.now() - timedelta(days=days)).strftime("%Y%m%d"), adjust="qfq") if df.empty: return pd.DataFrame() # 标准化字段名 df = df.rename(columns={ '日期': 'date', '开盘': 'open', '收盘': 'close', '最高': 'high', '最低': 'low', '成交量': 'volume', '成交额': 'amount', '换手率': 'turnover', }) df['date'] = pd.to_datetime(df['date']) df = df.sort_values('date').reset_index(drop=True) return df except Exception as e: print(f"获取股票 {stock_code} 历史数据失败: {e}") return pd.DataFrame() def calculate_ma(self, df: pd.DataFrame, days: int) -> pd.Series: """计算移动平均线""" return df['close'].rolling(window=days).mean() def calculate_atr(self, df: pd.DataFrame, period: int = 14) -> pd.Series: """ 计算ATR(Average True Range)波动率指标 Args: df: 历史数据 period: ATR周期 Returns: ATR序列 """ high = df['high'] low = df['low'] close = df['close'].shift(1) tr1 = high - low tr2 = (high - close).abs() tr3 = (low - close).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.rolling(window=period).mean() return atr def check_trend_up(self, df: pd.DataFrame) -> bool: """ 检查趋势是否向上 条件: 1. 股价站在20日均线上 2. 均线向上倾斜 Args: df: 历史数据 Returns: True if trend is up """ if len(df) < self.ma_days + 5: return False # 最新收盘价 latest_close = df['close'].iloc[-1] # 计算MA ma = self.calculate_ma(df, self.ma_days) latest_ma = ma.iloc[-1] ma_5_days_ago = ma.iloc[-6] # 条件1:股价站在均线上 price_above_ma = latest_close >= latest_ma # 条件2:均线向上 ma_rising = latest_ma > ma_5_days_ago return price_above_ma and ma_rising def check_recent_drawdown(self, df: pd.DataFrame, max_drawdown: float = 0.20) -> bool: """ 检查近期是否有过度下跌 条件: 近一个月跌幅不超过20% Args: df: 历史数据 max_drawdown: 最大允许回撤 Returns: True if drawdown is acceptable """ if len(df) < 20: return True # 获取最近20天的数据 recent_data = df.tail(20) # 计算期间最高点 period_high = recent_data['close'].max() # 计算当前回撤 current_price = recent_data['close'].iloc[-1] drawdown = (period_high - current_price) / period_high return drawdown <= max_drawdown def check_volume_surge(self, df: pd.DataFrame) -> bool: """ 检查是否有极端放量(主力出货信号) 条件: 排除极端放量情况 """ if len(df) < 10: return True # 获取最近5天平均成交量 recent_volume = df['volume'].tail(5).mean() # 获取前20天平均成交量 baseline_volume = df['volume'].tail(30).head(25).mean() # 如果最近5天成交量是前20天的3倍以上,可能是主力出货 volume_surge = recent_volume > baseline_volume * 3 return not volume_surge def check_macd_signal(self, df: pd.DataFrame) -> bool: """ 检查MACD信号(可选增强信号) 条件: MACD金叉或MACD在零轴上方 """ if len(df) < 26: return True # 计算MACD ema12 = df['close'].ewm(span=12, adjust=False).mean() ema26 = df['close'].ewm(span=26, adjust=False).mean() macd = ema12 - ema26 signal = macd.ewm(span=9, adjust=False).mean() # 最新MACD和信号 latest_macd = macd.iloc[-1] latest_signal = signal.iloc[-1] prev_macd = macd.iloc[-2] prev_signal = signal.iloc[-2] # MACD金叉或MACD在零轴上方 golden_cross = (prev_macd < prev_signal) and (latest_macd >= latest_signal) above_zero = latest_macd > 0 return golden_cross or above_zero def apply_stock_filter(self, stock_code: str, stock_info: Dict) -> Dict: """ 对单只股票应用技术过滤 Args: stock_code: 股票代码 stock_info: 股票基本信息 Returns: 筛选结果 """ # 获取历史数据 df = self.get_stock_history(stock_code) if df.empty: return { 'stock_code': stock_code, 'passed': False, 'reason': '无法获取历史数据' } # 应用各项技术过滤 trend_ok = self.check_trend_up(df) drawdown_ok = self.check_recent_drawdown(df) volume_ok = self.check_volume_surge(df) macd_ok = self.check_macd_signal(df) # 综合判断 passed = trend_ok and drawdown_ok and volume_ok and macd_ok result = { 'stock_code': stock_code, 'passed': passed, 'reason': '' if passed else '技术指标不满足', 'trend_up': trend_ok, 'drawdown_ok': drawdown_ok, 'volume_ok': volume_ok, 'macd_ok': macd_ok, } # 计算ATR(用于后续止损) atr = self.calculate_atr(df).iloc[-1] result['atr'] = atr # 最新价格 result['current_price'] = df['close'].iloc[-1] return result def apply(self, candidate_stocks: pd.DataFrame) -> pd.DataFrame: """ 对候选股票批量应用技术过滤 Args: candidate_stocks: 候选股票列表 Returns: 通过技术过滤的股票列表 """ if candidate_stocks.empty: return pd.DataFrame() print("\n" + "=" * 60) print("技术信号过滤流程开始") print("=" * 60) results = [] total = len(candidate_stocks) for idx, row in candidate_stocks.iterrows(): stock_code = row['stock_code'] stock_info = row.to_dict() if idx % 10 == 0: print(f"处理进度: {idx}/{total} ({idx/total*100:.1f}%)") result = self.apply_stock_filter(stock_code, stock_info) results.append(result) # 转换为DataFrame result_df = pd.DataFrame(results) # 筛选通过的股票 passed_stocks = result_df[result_df['passed']].reset_index(drop=True) print(f"\n技术过滤完成") print(f"候选股票: {total} 只") print(f"通过技术过滤: {len(passed_stocks)} 只") print(f"通过率: {len(passed_stocks)/total*100:.1f}%") print("=" * 60) return passed_stocks class PositionManager: """ 仓位管理器 - 第三步:仓位控制和动态止损 ========================================== 控制单票仓位、行业集中度、总仓位 """ def __init__(self, risk_profile: Dict = None, total_capital: float = 1000000.0): """ 初始化仓位管理器 Args: risk_profile: 风险偏好配置 total_capital: 总资金 """ self.risk_profile = risk_profile or RiskProfile.get_profile('balanced') self.total_capital = total_capital self.positions = {} # 当前持仓 {stock_code: position_info} def calculate_position_size(self, selected_stocks: pd.DataFrame) -> pd.DataFrame: """ 计算单票仓位大小 规则: 1. 单票最大仓位不超过风险偏好限制 2. 根据股票数量动态调整,确保总仓位合理 3. 均匀分配或根据评分加权 Args: selected_stocks: 选中的股票列表 Returns: 带仓位信息的股票列表 """ if selected_stocks.empty: return pd.DataFrame() n_stocks = len(selected_stocks) single_stock_max = self.risk_profile['single_stock_max'] min_count, max_count = self.risk_profile['stock_count'] # 调整股票数量到合理范围 if n_stocks > max_count: # 如果选中太多,取质量最好的前N只(这里简化处理,随机取) selected_stocks = selected_stocks.head(max_count).copy() n_stocks = max_count elif n_stocks < min_count: # 股票太少,保持全部 pass # 计算目标仓位 # 策略:均匀分配,确保单票不超过最大限制 target_total_position = 0.8 # 目标总仓位80% target_single_position = min(target_total_position / n_stocks, single_stock_max) # 计算实际总仓位 actual_total_position = target_single_position * n_stocks # 添加仓位信息 selected_stocks = selected_stocks.copy() selected_stocks['target_position_pct'] = target_single_position selected_stocks['target_position_value'] = selected_stocks['current_price'] * target_single_position * self.total_capital print(f"\n仓位计算:") print(f"股票数量: {n_stocks} 只") print(f"单票目标仓位: {target_single_position*100:.2f}%") print(f"预计总仓位: {actual_total_position*100:.2f}%") return selected_stocks.reset_index(drop=True) def check_industry_concentration(self, positions: pd.DataFrame) -> bool: """ 检查行业集中度是否超标 Args: positions: 持仓列表 Returns: True if concentration is acceptable """ if positions.empty: return True # 这里需要获取每只股票的行业信息 # 实际实现中应该调用 ak.stock_industry_category() # 简化处理,假设行业分散度足够 industry_max = self.risk_profile['industry_max'] print(f"行业集中度检查(最大单行业限制: {industry_max*100:.0f}%)") return True def calculate_stop_loss(self, stock_code: str, entry_price: float, atr: float = None, method: str = 'ma') -> float: """ 计算止损价格 方法: 1. ma_method: 收盘价跌破20日均线 2. atr_method: 入场价 - ATR * 2 3. pct_method: 固定百分比止损 Args: stock_code: 股票代码 entry_price: 入场价格 atr: ATR值 method: 止损方法 Returns: 止损价格 """ stop_loss_pct = self.risk_profile['stop_loss_pct'] if method == 'pct': # 固定百分比止损 stop_loss_price = entry_price * (1 - stop_loss_pct) elif method == 'atr' and atr is not None: # ATR止损 stop_loss_price = entry_price - atr * 2 elif method == 'ma': # 均线止损(需要实时监控) stop_loss_price = entry_price * (1 - stop_loss_pct) # 临时使用百分比 else: # 默认百分比止损 stop_loss_price = entry_price * (1 - stop_loss_pct) return stop_loss_price def generate_entry_orders(self, selected_stocks: pd.DataFrame) -> List[Dict]: """ 生成入场订单 Args: selected_stocks: 选中的股票列表 Returns: 订单列表 """ if selected_stocks.empty: return [] orders = [] for _, stock in selected_stocks.iterrows(): stock_code = stock['stock_code'] current_price = stock['current_price'] target_position = stock['target_position_value'] # 计算股数(A股100股起买) shares = int(target_position / current_price / 100) * 100 if shares < 100: continue # 资金不足,跳过 # 计算止损价 atr = stock.get('atr', current_price * 0.02) # 默认ATR为2% stop_loss_price = self.calculate_stop_loss(stock_code, current_price, atr) order = { 'stock_code': stock_code, 'action': 'buy', 'price': current_price, 'shares': shares, 'value': shares * current_price, 'stop_loss_price': stop_loss_price, 'stop_loss_pct': (current_price - stop_loss_price) / current_price, } orders.append(order) print(f"\n生成入场订单: {len(orders)} 个") return orders def check_exit_signal(self, stock_code: str, current_price: float, entry_price: float, stop_loss_price: float, df: pd.DataFrame = None) -> Tuple[bool, str]: """ 检查出场信号 出场条件: 1. 止损:价格跌破止损价 2. 均线止损:收盘价跌破20日均线 3. 止盈:收益达到目标(可选) Args: stock_code: 股票代码 current_price: 当前价格 entry_price: 入场价格 stop_loss_price: 止损价格 df: 历史数据(用于均线判断) Returns: (should_exit, reason) """ # 1. 止损检查 if current_price <= stop_loss_price: return True, f"触发止损,价格 {current_price:.2f} 跌破止损价 {stop_loss_price:.2f}" # 2. 均线止损检查(如果有历史数据) if df is not None and len(df) >= 20: ma20 = df['close'].tail(20).mean() if current_price < ma20: return True, f"均线止损,价格 {current_price:.2f} 跌破20日均线 {ma20:.2f}" # 3. 止盈检查(可选) profit_pct = (current_price - entry_price) / entry_price if profit_pct > 0.30: # 30%止盈 return True, f"止盈,收益达到 {profit_pct*100:.1f}%" return False, "" class GuanYuValueTechStrategy: """ 关羽 - 价值+技术综合选策略(主策略) ===================================== 完整流程: 1. 价值筛选(缩小范围) 2. 技术确认(入场点) 3. 仓位控制(风险管理) 4. 入场执行 5. 持仓监控(出场) """ def __init__(self, risk_profile: str = 'balanced', total_capital: float = 1000000.0): """ 初始化策略 Args: risk_profile: 风险偏好 ('conservative', 'balanced', 'aggressive') total_capital: 总资金 """ # 获取风险配置 self.risk_config = RiskProfile.get_profile(risk_profile) # 初始化各模块 self.value_filter = ValueFilter(self.risk_config) self.technical_filter = TechnicalFilter(ma_days=20) self.position_manager = PositionManager(self.risk_config, total_capital) print(f"\n关羽策略初始化") print(f"风险偏好: {self.risk_config['name']}") print(f"总资金: {total_capital:,.0f} 元") print(f"PE限制: <{self.risk_config['pe_max']}") print(f"PB限制: <{self.risk_config['pb_max']}") print(f"ROE限制: >{self.risk_config['roe_min']}%") print(f"单票上限: {self.risk_config['single_stock_max']*100:.0f}%") print(f"止损: {self.risk_config['stop_loss_pct']*100:.0f}%") def run(self) -> Dict: """ 运行完整策略 Returns: 策略执行结果 """ print("\n" + "=" * 60) print("关羽 - 价值+技术综合选股策略开始执行") print("=" * 60) # 第一步:价值筛选 print("\n【第一步】价值筛选") candidate_stocks = self.value_filter.apply() if candidate_stocks.empty: return { 'success': False, 'message': '价值筛选后无候选股票', 'final_stocks': pd.DataFrame(), 'orders': [], } # 第二步:技术过滤 print("\n【第二步】技术信号过滤") tech_passed_stocks = self.technical_filter.apply(candidate_stocks) if tech_passed_stocks.empty: return { 'success': False, 'message': '技术过滤后无股票通过', 'final_stocks': pd.DataFrame(), 'orders': [], } # 第三步:仓位计算 print("\n【第三步】仓位控制") selected_stocks = self.position_manager.calculate_position_size(tech_passed_stocks) # 检查行业集中度 concentration_ok = self.position_manager.check_industry_concentration(selected_stocks) if not concentration_ok: print("警告:行业集中度过高") # 第四步:生成入场订单 print("\n【第四步】生成入场订单") orders = self.position_manager.generate_entry_orders(selected_stocks) # 输出结果摘要 print("\n" + "=" * 60) print("策略执行完成") print("=" * 60) print(f"最终选中股票: {len(selected_stocks)} 只") print(f"生成订单: {len(orders)} 个") total_value = sum(order['value'] for order in orders) print(f"预计占用资金: {total_value:,.0f} 元 ({total_value/self.position_manager.total_capital*100:.1f}%)") return { 'success': True, 'message': '策略执行成功', 'candidate_stocks_count': len(candidate_stocks), 'tech_passed_count': len(tech_passed_stocks), 'final_stocks': selected_stocks, 'orders': orders, 'total_value': total_value, } def print_orders(self, orders: List[Dict]): """ 打印订单详情 Args: orders: 订单列表 """ if not orders: return print("\n" + "=" * 60) print("入场订单明细") print("=" * 60) print(f"{'股票代码':<10} {'操作':<6} {'价格':<10} {'数量(股)':<10} {'金额(元)':<15} {'止损价':<10} {'止损幅度'}") print("-" * 60) for order in orders: print(f"{order['stock_code']:<10} " f"{order['action']:<6} " f"{order['price']:<10.2f} " f"{order['shares']:<10,} " f"{order['value']:<15,.0f} " f"{order['stop_loss_price']:<10.2f} " f"{order['stop_loss_pct']*100:.1f}%") def main(): """主函数 - 演示策略使用""" print("\n" + "=" * 60) print("关羽 - 价值+技术综合选股策略") print("三国之量化交易 | 2026") print("=" * 60) # 创建策略实例(平衡型,100万资金) strategy = GuanYuValueTechStrategy( risk_profile='balanced', total_capital=1000000.0 ) # 运行策略 result = strategy.run() if result['success']: # 打印订单 strategy.print_orders(result['orders']) # 保存结果到文件 output_file = '/Users/chufeng/.openclaw/workspace-pangtong/sanguo_quant_live/results/guanyu_strategy_result.csv' if not result['final_stocks'].empty: result['final_stocks'].to_csv(output_file, index=False, encoding='utf-8-sig') print(f"\n结果已保存到: {output_file}") else: print(f"\n策略执行失败: {result['message']}") if __name__ == '__main__': main()