Files
sanguo_quant_live/strategies/guanyu_value_tech_strategy.py
T
cfdaily 63d58ec123 docs(jiangwei): 更新基础设施环境检查结果到整合报告
补充内容:
- Python环境检查(3.14.3,核心依赖完整)
- vn.py环境检查(4.3.0,sanguo集成)
- 数据库配置检查
- 目录结构验证
- 模块导入测试
- 四位将军环境就绪状态
- 综合环境评估(9.5/10)
- 完整部署说明
- 依赖列表安装指南

更新人:姜维(伯约)
检查时间:2026-03-24 12:33 GMT+8
更新时间:2026-03-24 18:24 GMT+8
结论:环境完全就绪
2026-03-24 18:28:54 +08:00

961 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
关羽 - 价值+技术综合选股策略
=======================================
基于五虎上将多因子选股体系第二部分实现
核心框架:
- 价值筛选缩小范围(风控前置)
- 技术分析确认入场点
- 仓位控制和动态止损
- 入场出场规则
适用市场:A股 T+1、涨跌停板
预期绩效:
- 年化收益:14-17%
- 最大回撤:28-38%
- 夏普比率:0.75-0.85
- 卡玛比率:0.4-0.5
"""
import akshare as ak
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')
class RiskProfile:
"""风险偏好配置"""
# 保守型配置
CONSERVATIVE = {
'name': '保守型',
'pe_max': 15,
'pb_max': 1.5,
'roe_min': 12.0,
'single_stock_max': 0.08,
'industry_max': 0.20,
'stock_count': (15, 20),
'stop_loss_pct': 0.05,
}
# 平衡型配置
BALANCED = {
'name': '平衡型',
'pe_max': 25,
'pb_max': 2.5,
'roe_min': 10.0,
'single_stock_max': 0.15,
'industry_max': 0.25,
'stock_count': (10, 15),
'stop_loss_pct': 0.06,
}
# 进取型配置
AGGRESSIVE = {
'name': '进取型',
'pe_max': 35,
'pb_max': 3.0,
'roe_min': 8.0,
'single_stock_max': 0.25,
'industry_max': 0.30,
'stock_count': (5, 10),
'stop_loss_pct': 0.08,
}
@classmethod
def get_profile(cls, profile: str = 'balanced') -> Dict:
"""获取风险配置"""
profiles = {
'conservative': cls.CONSERVATIVE,
'balanced': cls.BALANCED,
'aggressive': cls.AGGRESSIVE,
}
return profiles.get(profile.lower(), cls.BALANCED)
class ValueFilter:
"""
价值筛选器 - 第一步:价值筛选缩小范围
=========================================
风控前置,通过基本面过滤排除高风险股票
"""
def __init__(self, risk_profile: Dict = None):
"""
初始化价值筛选器
Args:
risk_profile: 风险偏好配置
"""
self.risk_profile = risk_profile or RiskProfile.get_profile('balanced')
def get_stock_list(self) -> pd.DataFrame:
"""
获取A股全市场股票列表
Returns:
股票列表 DataFrame
"""
try:
# 获取A股所有股票
stock_list = ak.stock_zh_a_spot_em()
# 标准化字段名
stock_list = stock_list.rename(columns={
'代码': 'stock_code',
'名称': 'stock_name',
'最新价': 'current_price',
'总市值': 'total_market_cap',
'流通市值': 'circulating_market_cap',
'市盈率-动态': 'pe_ttm',
'市净率': 'pb',
'市销率': 'ps',
'换手率': 'turnover_rate',
'量比': 'volume_ratio',
})
return stock_list
except Exception as e:
print(f"获取股票列表失败: {e}")
return pd.DataFrame()
def filter_basic_risks(self, df: pd.DataFrame) -> pd.DataFrame:
"""
排除基本风险股票(风控前置)
排除条件:
1. ST/*ST股票
2. 上市不足180天的新股
3. 流通市值过小(< 10亿)
4. 换手率过低(流动性差)
Args:
df: 股票列表
Returns:
过滤后的股票列表
"""
if df.empty:
return df
print(f"过滤前股票数量: {len(df)}")
# 1. 排除ST/*ST股票
df = df[~df['stock_name'].str.contains('ST|\\*ST', na=False)]
print(f"排除ST股票后: {len(df)}")
# 2. 排除停牌股票(价格异常)
df = df[df['current_price'] > 0]
print(f"排除停牌股票后: {len(df)}")
# 3. 排除上市不足180天的新股(通过股票代码判断)
def is_new_stock(code):
# 新股通常是00开头的深主板和60开头的沪主板
# 但这里简化处理,实际需要获取上市日期
return False
df = df[~df['stock_code'].apply(is_new_stock)]
# 4. 排除流通市值过小的股票(< 10亿)
df = df[df['circulating_market_cap'] > 100000] # 单位:万元,即10亿
print(f"排除小市值股票后: {len(df)}")
# 5. 排除换手率过低的股票(< 0.5%,流动性差)
df = df[df['turnover_rate'] > 0.5]
print(f"排除低换手率股票后: {len(df)}")
return df.reset_index(drop=True)
def filter_valuation_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
"""
估值指标筛选
筛选条件:
1. PE < pe_max(根据风险偏好)
2. PB < pb_max(根据风险偏好)
3. PE > 0(排除亏损)
4. PB > 0
Args:
df: 股票列表
Returns:
过滤后的股票列表
"""
if df.empty:
return df
pe_max = self.risk_profile['pe_max']
pb_max = self.risk_profile['pb_max']
print(f"\n估值筛选参数: PE<{pe_max}, PB<{pb_max}")
# 1. PE筛选
df = df[(df['pe_ttm'] > 0) & (df['pe_ttm'] < pe_max)]
print(f"PE筛选后: {len(df)}")
# 2. PB筛选
df = df[(df['pb'] > 0) & (df['pb'] < pb_max)]
print(f"PB筛选后: {len(df)}")
return df.reset_index(drop=True)
def filter_quality_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
"""
质量指标筛选
筛选条件:
1. ROE > roe_min(根据风险偏好)
2. 排除财务异常(如大额商誉、高质押率等)
Args:
df: 股票列表
Returns:
过滤后的股票列表
"""
if df.empty:
return df
roe_min = self.risk_profile['roe_min']
print(f"\n质量筛选参数: ROE>{roe_min}%")
# 注意:akshare spot数据中没有ROE等详细财务数据
# 这里需要通过单独的财务数据接口获取
# 为了简化,我们先跳过这一步,或者假设已经获取了
# 实际实现中应该:
# 1. 调用 ak.stock_financial_analysis_indicator() 获取财务指标
# 2. 筛选 ROE > roe_min
# 3. 排除商誉>20%、大股东质押>50%、连续亏损等
print("质量指标筛选(需要补充财务数据接口)")
return df.reset_index(drop=True)
def apply(self, stock_list: pd.DataFrame = None) -> pd.DataFrame:
"""
执行完整的价值筛选流程
Args:
stock_list: 股票列表,如果为None则自动获取
Returns:
筛选后的股票列表
"""
if stock_list is None:
stock_list = self.get_stock_list()
if stock_list.empty:
print("未获取到股票数据")
return pd.DataFrame()
print("=" * 60)
print("价值筛选流程开始")
print("=" * 60)
# 第一步:排除基本风险
df = self.filter_basic_risks(stock_list)
if df.empty:
return df
# 第二步:估值指标筛选
df = self.filter_valuation_metrics(df)
if df.empty:
return df
# 第三步:质量指标筛选
df = self.filter_quality_metrics(df)
print("=" * 60)
print(f"价值筛选完成,最终候选股票数量: {len(df)}")
print("=" * 60)
return df
class TechnicalFilter:
"""
技术信号过滤器 - 第二步:技术分析确认入场点
===============================================
通过技术指标确认合适的买入时机
"""
def __init__(self, ma_days: int = 20):
"""
初始化技术过滤器
Args:
ma_days: 均线天数,默认20日
"""
self.ma_days = ma_days
def get_stock_history(self, stock_code: str, days: int = 120) -> pd.DataFrame:
"""
获取股票历史行情数据
Args:
stock_code: 股票代码
days: 获取天数
Returns:
历史行情 DataFrame
"""
try:
# 确定股票市场类型
if stock_code.startswith('60'):
symbol = f"sh{stock_code}"
elif stock_code.startswith('00') or stock_code.startswith('30'):
symbol = f"sz{stock_code}"
else:
return pd.DataFrame()
# 获取历史数据
df = ak.stock_zh_a_hist(symbol=symbol, period="daily",
start_date=(datetime.now() - timedelta(days=days)).strftime("%Y%m%d"),
adjust="qfq")
if df.empty:
return pd.DataFrame()
# 标准化字段名
df = df.rename(columns={
'日期': 'date',
'开盘': 'open',
'收盘': 'close',
'最高': 'high',
'最低': 'low',
'成交量': 'volume',
'成交额': 'amount',
'换手率': 'turnover',
})
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date').reset_index(drop=True)
return df
except Exception as e:
print(f"获取股票 {stock_code} 历史数据失败: {e}")
return pd.DataFrame()
def calculate_ma(self, df: pd.DataFrame, days: int) -> pd.Series:
"""计算移动平均线"""
return df['close'].rolling(window=days).mean()
def calculate_atr(self, df: pd.DataFrame, period: int = 14) -> pd.Series:
"""
计算ATRAverage True Range)波动率指标
Args:
df: 历史数据
period: ATR周期
Returns:
ATR序列
"""
high = df['high']
low = df['low']
close = df['close'].shift(1)
tr1 = high - low
tr2 = (high - close).abs()
tr3 = (low - close).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(window=period).mean()
return atr
def check_trend_up(self, df: pd.DataFrame) -> bool:
"""
检查趋势是否向上
条件:
1. 股价站在20日均线上
2. 均线向上倾斜
Args:
df: 历史数据
Returns:
True if trend is up
"""
if len(df) < self.ma_days + 5:
return False
# 最新收盘价
latest_close = df['close'].iloc[-1]
# 计算MA
ma = self.calculate_ma(df, self.ma_days)
latest_ma = ma.iloc[-1]
ma_5_days_ago = ma.iloc[-6]
# 条件1:股价站在均线上
price_above_ma = latest_close >= latest_ma
# 条件2:均线向上
ma_rising = latest_ma > ma_5_days_ago
return price_above_ma and ma_rising
def check_recent_drawdown(self, df: pd.DataFrame, max_drawdown: float = 0.20) -> bool:
"""
检查近期是否有过度下跌
条件:
近一个月跌幅不超过20%
Args:
df: 历史数据
max_drawdown: 最大允许回撤
Returns:
True if drawdown is acceptable
"""
if len(df) < 20:
return True
# 获取最近20天的数据
recent_data = df.tail(20)
# 计算期间最高点
period_high = recent_data['close'].max()
# 计算当前回撤
current_price = recent_data['close'].iloc[-1]
drawdown = (period_high - current_price) / period_high
return drawdown <= max_drawdown
def check_volume_surge(self, df: pd.DataFrame) -> bool:
"""
检查是否有极端放量(主力出货信号)
条件:
排除极端放量情况
"""
if len(df) < 10:
return True
# 获取最近5天平均成交量
recent_volume = df['volume'].tail(5).mean()
# 获取前20天平均成交量
baseline_volume = df['volume'].tail(30).head(25).mean()
# 如果最近5天成交量是前20天的3倍以上,可能是主力出货
volume_surge = recent_volume > baseline_volume * 3
return not volume_surge
def check_macd_signal(self, df: pd.DataFrame) -> bool:
"""
检查MACD信号(可选增强信号)
条件:
MACD金叉或MACD在零轴上方
"""
if len(df) < 26:
return True
# 计算MACD
ema12 = df['close'].ewm(span=12, adjust=False).mean()
ema26 = df['close'].ewm(span=26, adjust=False).mean()
macd = ema12 - ema26
signal = macd.ewm(span=9, adjust=False).mean()
# 最新MACD和信号
latest_macd = macd.iloc[-1]
latest_signal = signal.iloc[-1]
prev_macd = macd.iloc[-2]
prev_signal = signal.iloc[-2]
# MACD金叉或MACD在零轴上方
golden_cross = (prev_macd < prev_signal) and (latest_macd >= latest_signal)
above_zero = latest_macd > 0
return golden_cross or above_zero
def apply_stock_filter(self, stock_code: str, stock_info: Dict) -> Dict:
"""
对单只股票应用技术过滤
Args:
stock_code: 股票代码
stock_info: 股票基本信息
Returns:
筛选结果
"""
# 获取历史数据
df = self.get_stock_history(stock_code)
if df.empty:
return {
'stock_code': stock_code,
'passed': False,
'reason': '无法获取历史数据'
}
# 应用各项技术过滤
trend_ok = self.check_trend_up(df)
drawdown_ok = self.check_recent_drawdown(df)
volume_ok = self.check_volume_surge(df)
macd_ok = self.check_macd_signal(df)
# 综合判断
passed = trend_ok and drawdown_ok and volume_ok and macd_ok
result = {
'stock_code': stock_code,
'passed': passed,
'reason': '' if passed else '技术指标不满足',
'trend_up': trend_ok,
'drawdown_ok': drawdown_ok,
'volume_ok': volume_ok,
'macd_ok': macd_ok,
}
# 计算ATR(用于后续止损)
atr = self.calculate_atr(df).iloc[-1]
result['atr'] = atr
# 最新价格
result['current_price'] = df['close'].iloc[-1]
return result
def apply(self, candidate_stocks: pd.DataFrame) -> pd.DataFrame:
"""
对候选股票批量应用技术过滤
Args:
candidate_stocks: 候选股票列表
Returns:
通过技术过滤的股票列表
"""
if candidate_stocks.empty:
return pd.DataFrame()
print("\n" + "=" * 60)
print("技术信号过滤流程开始")
print("=" * 60)
results = []
total = len(candidate_stocks)
for idx, row in candidate_stocks.iterrows():
stock_code = row['stock_code']
stock_info = row.to_dict()
if idx % 10 == 0:
print(f"处理进度: {idx}/{total} ({idx/total*100:.1f}%)")
result = self.apply_stock_filter(stock_code, stock_info)
results.append(result)
# 转换为DataFrame
result_df = pd.DataFrame(results)
# 筛选通过的股票
passed_stocks = result_df[result_df['passed']].reset_index(drop=True)
print(f"\n技术过滤完成")
print(f"候选股票: {total}")
print(f"通过技术过滤: {len(passed_stocks)}")
print(f"通过率: {len(passed_stocks)/total*100:.1f}%")
print("=" * 60)
return passed_stocks
class PositionManager:
"""
仓位管理器 - 第三步:仓位控制和动态止损
==========================================
控制单票仓位、行业集中度、总仓位
"""
def __init__(self, risk_profile: Dict = None, total_capital: float = 1000000.0):
"""
初始化仓位管理器
Args:
risk_profile: 风险偏好配置
total_capital: 总资金
"""
self.risk_profile = risk_profile or RiskProfile.get_profile('balanced')
self.total_capital = total_capital
self.positions = {} # 当前持仓 {stock_code: position_info}
def calculate_position_size(self, selected_stocks: pd.DataFrame) -> pd.DataFrame:
"""
计算单票仓位大小
规则:
1. 单票最大仓位不超过风险偏好限制
2. 根据股票数量动态调整,确保总仓位合理
3. 均匀分配或根据评分加权
Args:
selected_stocks: 选中的股票列表
Returns:
带仓位信息的股票列表
"""
if selected_stocks.empty:
return pd.DataFrame()
n_stocks = len(selected_stocks)
single_stock_max = self.risk_profile['single_stock_max']
min_count, max_count = self.risk_profile['stock_count']
# 调整股票数量到合理范围
if n_stocks > max_count:
# 如果选中太多,取质量最好的前N只(这里简化处理,随机取)
selected_stocks = selected_stocks.head(max_count).copy()
n_stocks = max_count
elif n_stocks < min_count:
# 股票太少,保持全部
pass
# 计算目标仓位
# 策略:均匀分配,确保单票不超过最大限制
target_total_position = 0.8 # 目标总仓位80%
target_single_position = min(target_total_position / n_stocks, single_stock_max)
# 计算实际总仓位
actual_total_position = target_single_position * n_stocks
# 添加仓位信息
selected_stocks = selected_stocks.copy()
selected_stocks['target_position_pct'] = target_single_position
selected_stocks['target_position_value'] = selected_stocks['current_price'] * target_single_position * self.total_capital
print(f"\n仓位计算:")
print(f"股票数量: {n_stocks}")
print(f"单票目标仓位: {target_single_position*100:.2f}%")
print(f"预计总仓位: {actual_total_position*100:.2f}%")
return selected_stocks.reset_index(drop=True)
def check_industry_concentration(self, positions: pd.DataFrame) -> bool:
"""
检查行业集中度是否超标
Args:
positions: 持仓列表
Returns:
True if concentration is acceptable
"""
if positions.empty:
return True
# 这里需要获取每只股票的行业信息
# 实际实现中应该调用 ak.stock_industry_category()
# 简化处理,假设行业分散度足够
industry_max = self.risk_profile['industry_max']
print(f"行业集中度检查(最大单行业限制: {industry_max*100:.0f}%")
return True
def calculate_stop_loss(self, stock_code: str, entry_price: float,
atr: float = None, method: str = 'ma') -> float:
"""
计算止损价格
方法:
1. ma_method: 收盘价跌破20日均线
2. atr_method: 入场价 - ATR * 2
3. pct_method: 固定百分比止损
Args:
stock_code: 股票代码
entry_price: 入场价格
atr: ATR值
method: 止损方法
Returns:
止损价格
"""
stop_loss_pct = self.risk_profile['stop_loss_pct']
if method == 'pct':
# 固定百分比止损
stop_loss_price = entry_price * (1 - stop_loss_pct)
elif method == 'atr' and atr is not None:
# ATR止损
stop_loss_price = entry_price - atr * 2
elif method == 'ma':
# 均线止损(需要实时监控)
stop_loss_price = entry_price * (1 - stop_loss_pct) # 临时使用百分比
else:
# 默认百分比止损
stop_loss_price = entry_price * (1 - stop_loss_pct)
return stop_loss_price
def generate_entry_orders(self, selected_stocks: pd.DataFrame) -> List[Dict]:
"""
生成入场订单
Args:
selected_stocks: 选中的股票列表
Returns:
订单列表
"""
if selected_stocks.empty:
return []
orders = []
for _, stock in selected_stocks.iterrows():
stock_code = stock['stock_code']
current_price = stock['current_price']
target_position = stock['target_position_value']
# 计算股数(A股100股起买)
shares = int(target_position / current_price / 100) * 100
if shares < 100:
continue # 资金不足,跳过
# 计算止损价
atr = stock.get('atr', current_price * 0.02) # 默认ATR为2%
stop_loss_price = self.calculate_stop_loss(stock_code, current_price, atr)
order = {
'stock_code': stock_code,
'action': 'buy',
'price': current_price,
'shares': shares,
'value': shares * current_price,
'stop_loss_price': stop_loss_price,
'stop_loss_pct': (current_price - stop_loss_price) / current_price,
}
orders.append(order)
print(f"\n生成入场订单: {len(orders)}")
return orders
def check_exit_signal(self, stock_code: str, current_price: float,
entry_price: float, stop_loss_price: float,
df: pd.DataFrame = None) -> Tuple[bool, str]:
"""
检查出场信号
出场条件:
1. 止损:价格跌破止损价
2. 均线止损:收盘价跌破20日均线
3. 止盈:收益达到目标(可选)
Args:
stock_code: 股票代码
current_price: 当前价格
entry_price: 入场价格
stop_loss_price: 止损价格
df: 历史数据(用于均线判断)
Returns:
(should_exit, reason)
"""
# 1. 止损检查
if current_price <= stop_loss_price:
return True, f"触发止损,价格 {current_price:.2f} 跌破止损价 {stop_loss_price:.2f}"
# 2. 均线止损检查(如果有历史数据)
if df is not None and len(df) >= 20:
ma20 = df['close'].tail(20).mean()
if current_price < ma20:
return True, f"均线止损,价格 {current_price:.2f} 跌破20日均线 {ma20:.2f}"
# 3. 止盈检查(可选)
profit_pct = (current_price - entry_price) / entry_price
if profit_pct > 0.30: # 30%止盈
return True, f"止盈,收益达到 {profit_pct*100:.1f}%"
return False, ""
class GuanYuValueTechStrategy:
"""
关羽 - 价值+技术综合选策略(主策略)
=====================================
完整流程:
1. 价值筛选(缩小范围)
2. 技术确认(入场点)
3. 仓位控制(风险管理)
4. 入场执行
5. 持仓监控(出场)
"""
def __init__(self, risk_profile: str = 'balanced', total_capital: float = 1000000.0):
"""
初始化策略
Args:
risk_profile: 风险偏好 ('conservative', 'balanced', 'aggressive')
total_capital: 总资金
"""
# 获取风险配置
self.risk_config = RiskProfile.get_profile(risk_profile)
# 初始化各模块
self.value_filter = ValueFilter(self.risk_config)
self.technical_filter = TechnicalFilter(ma_days=20)
self.position_manager = PositionManager(self.risk_config, total_capital)
print(f"\n关羽策略初始化")
print(f"风险偏好: {self.risk_config['name']}")
print(f"总资金: {total_capital:,.0f}")
print(f"PE限制: <{self.risk_config['pe_max']}")
print(f"PB限制: <{self.risk_config['pb_max']}")
print(f"ROE限制: >{self.risk_config['roe_min']}%")
print(f"单票上限: {self.risk_config['single_stock_max']*100:.0f}%")
print(f"止损: {self.risk_config['stop_loss_pct']*100:.0f}%")
def run(self) -> Dict:
"""
运行完整策略
Returns:
策略执行结果
"""
print("\n" + "=" * 60)
print("关羽 - 价值+技术综合选股策略开始执行")
print("=" * 60)
# 第一步:价值筛选
print("\n【第一步】价值筛选")
candidate_stocks = self.value_filter.apply()
if candidate_stocks.empty:
return {
'success': False,
'message': '价值筛选后无候选股票',
'final_stocks': pd.DataFrame(),
'orders': [],
}
# 第二步:技术过滤
print("\n【第二步】技术信号过滤")
tech_passed_stocks = self.technical_filter.apply(candidate_stocks)
if tech_passed_stocks.empty:
return {
'success': False,
'message': '技术过滤后无股票通过',
'final_stocks': pd.DataFrame(),
'orders': [],
}
# 第三步:仓位计算
print("\n【第三步】仓位控制")
selected_stocks = self.position_manager.calculate_position_size(tech_passed_stocks)
# 检查行业集中度
concentration_ok = self.position_manager.check_industry_concentration(selected_stocks)
if not concentration_ok:
print("警告:行业集中度过高")
# 第四步:生成入场订单
print("\n【第四步】生成入场订单")
orders = self.position_manager.generate_entry_orders(selected_stocks)
# 输出结果摘要
print("\n" + "=" * 60)
print("策略执行完成")
print("=" * 60)
print(f"最终选中股票: {len(selected_stocks)}")
print(f"生成订单: {len(orders)}")
total_value = sum(order['value'] for order in orders)
print(f"预计占用资金: {total_value:,.0f} 元 ({total_value/self.position_manager.total_capital*100:.1f}%)")
return {
'success': True,
'message': '策略执行成功',
'candidate_stocks_count': len(candidate_stocks),
'tech_passed_count': len(tech_passed_stocks),
'final_stocks': selected_stocks,
'orders': orders,
'total_value': total_value,
}
def print_orders(self, orders: List[Dict]):
"""
打印订单详情
Args:
orders: 订单列表
"""
if not orders:
return
print("\n" + "=" * 60)
print("入场订单明细")
print("=" * 60)
print(f"{'股票代码':<10} {'操作':<6} {'价格':<10} {'数量(股)':<10} {'金额(元)':<15} {'止损价':<10} {'止损幅度'}")
print("-" * 60)
for order in orders:
print(f"{order['stock_code']:<10} "
f"{order['action']:<6} "
f"{order['price']:<10.2f} "
f"{order['shares']:<10,} "
f"{order['value']:<15,.0f} "
f"{order['stop_loss_price']:<10.2f} "
f"{order['stop_loss_pct']*100:.1f}%")
def main():
"""主函数 - 演示策略使用"""
print("\n" + "=" * 60)
print("关羽 - 价值+技术综合选股策略")
print("三国之量化交易 | 2026")
print("=" * 60)
# 创建策略实例(平衡型,100万资金)
strategy = GuanYuValueTechStrategy(
risk_profile='balanced',
total_capital=1000000.0
)
# 运行策略
result = strategy.run()
if result['success']:
# 打印订单
strategy.print_orders(result['orders'])
# 保存结果到文件
output_file = '/Users/chufeng/.openclaw/workspace-pangtong/sanguo_quant_live/results/guanyu_strategy_result.csv'
if not result['final_stocks'].empty:
result['final_stocks'].to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"\n结果已保存到: {output_file}")
else:
print(f"\n策略执行失败: {result['message']}")
if __name__ == '__main__':
main()