#!/usr/bin/env python3 """ 动态选股算法 - 价值投资策略 截止时间:18:00 """ import numpy as np import pandas as pd from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore') class DynamicStockSelection: """动态选股算法""" def __init__(self): self.start_time = datetime.now() print(f"🚀 动态选股算法启动!") print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}") print(f"🎯 目标时间: 18:00") def load_data(self): """加载数据""" print(f"📊 加载财务数据...") # 模拟3000只股票数据 np.random.seed(42) n_stocks = 3000 data = pd.DataFrame({ 'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)], 'industry': np.random.choice(['金融', '科技', '消费', '医药', '工业', '能源', '材料', '公用事业'], n_stocks), 'market_cap': np.random.uniform(50, 1000, n_stocks), # 亿 'pe_ratio': np.random.uniform(5, 50, n_stocks), 'pb_ratio': np.random.uniform(0.5, 5, n_stocks), 'ps_ratio': np.random.uniform(0.5, 10, n_stocks), 'dividend_yield': np.random.uniform(0, 0.05, n_stocks), 'roe': np.random.uniform(0.05, 0.3, n_stocks), 'gross_margin': np.random.uniform(0.2, 0.6, n_stocks), 'net_margin': np.random.uniform(0.05, 0.25, n_stocks), 'debt_to_equity': np.random.uniform(0.1, 1.5, n_stocks), 'current_ratio': np.random.uniform(1, 3, n_stocks), 'revenue_growth': np.random.uniform(-0.2, 0.5, n_stocks), 'profit_growth': np.random.uniform(-0.3, 0.6, n_stocks), 'fcf_yield': np.random.uniform(0, 0.1, n_stocks), 'volatility': np.random.uniform(0.2, 0.6, n_stocks), 'liquidity': np.random.uniform(1e5, 1e7, n_stocks) }) print(f"✅ 加载 {n_stocks} 只股票数据完成") return data def calculate_factors(self, data): """计算因子得分""" print(f"🔢 计算因子得分...") # 1. 估值因子得分(越低估值,得分越高) data['value_score'] = ( (1 - data['pe_ratio'].rank(pct=True)) * 0.4 + (1 - data['pb_ratio'].rank(pct=True)) * 0.3 + (1 - data['ps_ratio'].rank(pct=True)) * 0.2 + data['dividend_yield'].rank(pct=True) * 0.1 ) # 2. 质量因子得分(越高质量,得分越高) data['quality_score'] = ( data['roe'].rank(pct=True) * 0.3 + data['gross_margin'].rank(pct=True) * 0.2 + data['net_margin'].rank(pct=True) * 0.2 + (1 - data['debt_to_equity'].rank(pct=True)) * 0.15 + data['current_ratio'].rank(pct=True) * 0.15 ) # 3. 成长因子得分(越高成长,得分越高) data['growth_score'] = ( data['revenue_growth'].rank(pct=True) * 0.5 + data['profit_growth'].rank(pct=True) * 0.5 ) # 4. 风险因子得分(越低风险,得分越高) data['risk_score'] = ( (1 - data['volatility'].rank(pct=True)) * 0.6 + data['liquidity'].rank(pct=True) * 0.4 ) # 5. 综合得分 data['composite_score'] = ( data['value_score'] * 0.4 + # 估值权重40% data['quality_score'] * 0.3 + # 质量权重30% data['growth_score'] * 0.2 + # 成长权重20% data['risk_score'] * 0.1 # 风险权重10% ) print(f"✅ 因子计算完成") return data def apply_filters(self, data): """应用筛选条件""" print(f"🔍 应用筛选条件...") filtered = data.copy() # 1. 估值筛选(PE < 30, PB < 3) filtered = filtered[ (filtered['pe_ratio'] < 30) & (filtered['pb_ratio'] < 3) ] # 2. 质量筛选(ROE > 10%, 毛利率 > 20%) filtered = filtered[ (filtered['roe'] > 0.1) & (filtered['gross_margin'] > 0.2) ] # 3. 财务健康筛选(负债率 < 100%, 流动比率 > 1) filtered = filtered[ (filtered['debt_to_equity'] < 1) & (filtered['current_ratio'] > 1) ] # 4. 流动性筛选(流动性 > 中位数) liquidity_median = filtered['liquidity'].median() filtered = filtered[filtered['liquidity'] > liquidity_median] print(f"✅ 筛选后剩余 {len(filtered)} 只股票") return filtered def portfolio_construction(self, data, portfolio_size=20): """构建投资组合""" print(f"🏗️ 构建投资组合...") # 按行业分散 industries = data['industry'].unique() portfolio = pd.DataFrame() for industry in industries: industry_stocks = data[data['industry'] == industry] if len(industry_stocks) > 0: # 每个行业选择前N名 n_per_industry = max(1, portfolio_size // len(industries)) top_stocks = industry_stocks.nlargest(n_per_industry, 'composite_score') portfolio = pd.concat([portfolio, top_stocks]) # 如果组合数量不足,补充剩余名额 if len(portfolio) < portfolio_size: remaining = portfolio_size - len(portfolio) remaining_stocks = data[~data['stock_code'].isin(portfolio['stock_code'])] top_remaining = remaining_stocks.nlargest(remaining, 'composite_score') portfolio = pd.concat([portfolio, top_remaining]) # 按综合得分排序 portfolio = portfolio.nlargest(portfolio_size, 'composite_score') # 计算权重(基于综合得分) portfolio['weight'] = portfolio['composite_score'] / portfolio['composite_score'].sum() print(f"✅ 构建 {len(portfolio)} 只股票的投资组合") return portfolio def strategy_variants(self, data): """生成策略变体""" print(f"🔄 生成策略变体...") strategies = {} # 1. 纯价值策略(只看估值) value_stocks = data.nlargest(20, 'value_score') strategies['pure_value'] = value_stocks # 2. 质量价值策略(估值+质量) data['value_quality'] = data['value_score'] * 0.6 + data['quality_score'] * 0.4 value_quality_stocks = data.nlargest(20, 'value_quality') strategies['value_quality'] = value_quality_stocks # 3. 成长价值策略(估值+成长) data['value_growth'] = data['value_score'] * 0.7 + data['growth_score'] * 0.3 value_growth_stocks = data.nlargest(20, 'value_growth') strategies['value_growth'] = value_growth_stocks # 4. 高股息策略 high_dividend_stocks = data.nlargest(20, 'dividend_yield') strategies['high_dividend'] = high_dividend_stocks # 5. 低波动策略 low_vol_stocks = data.nlargest(20, 'risk_score') strategies['low_volatility'] = low_vol_stocks print(f"✅ 生成 {len(strategies)} 个策略变体") return strategies def run(self): """运行选股算法""" print(f"\n{'='*60}") print("🚀 动态选股算法开始运行") print(f"{'='*60}") # 1. 加载数据 data = self.load_data() # 2. 计算因子 data = self.calculate_factors(data) # 3. 应用筛选 filtered_data = self.apply_filters(data) # 4. 构建主投资组合 main_portfolio = self.portfolio_construction(filtered_data, portfolio_size=20) # 5. 生成策略变体 strategy_variants = self.strategy_variants(filtered_data) # 6. 输出结果 self.output_results(main_portfolio, strategy_variants, data) return main_portfolio, strategy_variants def output_results(self, portfolio, strategies, full_data): """输出结果""" print(f"\n{'='*60}") print("📊 选股结果汇总") print(f"{'='*60}") # 主投资组合 print(f"\n🏆 主投资组合(20只股票):") print(f"{'='*40}") portfolio_display = portfolio[['stock_code', 'industry', 'market_cap', 'pe_ratio', 'pb_ratio', 'roe', 'composite_score', 'weight']].copy() portfolio_display['weight'] = portfolio_display['weight'].apply(lambda x: f"{x*100:.1f}%") portfolio_display['roe'] = portfolio_display['roe'].apply(lambda x: f"{x*100:.1f}%") print(portfolio_display.to_string(index=False)) # 组合特征 print(f"\n📈 组合特征:") print(f"{'='*40}") print(f"平均PE: {portfolio['pe_ratio'].mean():.1f}") print(f"平均PB: {portfolio['pb_ratio'].mean():.2f}") print(f"平均ROE: {portfolio['roe'].mean()*100:.1f}%") print(f"平均股息率: {portfolio['dividend_yield'].mean()*100:.2f}%") print(f"平均市值: {portfolio['market_cap'].mean():.1f}亿") # 行业分布 print(f"\n🏭 行业分布:") print(f"{'='*40}") industry_dist = portfolio['industry'].value_counts() for industry, count in industry_dist.items(): print(f"{industry}: {count}只 ({count/len(portfolio)*100:.1f}%)") # 策略变体表现 print(f"\n🔄 策略变体对比:") print(f"{'='*40}") for strategy_name, strategy_stocks in strategies.items(): avg_pe = strategy_stocks['pe_ratio'].mean() avg_pb = strategy_stocks['pb_ratio'].mean() avg_roe = strategy_stocks['roe'].mean() print(f"{strategy_name}: PE={avg_pe:.1f}, PB={avg_pb:.2f}, ROE={avg_roe*100:.1f}%") # 时间统计 elapsed = (datetime.now() - self.start_time).total_seconds() print(f"\n⏰ 算法运行时间: {elapsed:.2f}秒") print(f"🕐 完成时间: {datetime.now().strftime('%H:%M:%S')}") # 保存结果 self.save_results(portfolio, strategies) def save_results(self, portfolio, strategies): """保存结果""" import os # 创建输出目录 output_dir = "selection_results" os.makedirs(output_dir, exist_ok=True) # 保存主投资组合 portfolio.to_csv(f"{output_dir}/main_portfolio.csv", index=False) # 保存策略变体 for strategy_name, strategy_stocks in strategies.items(): strategy_stocks.to_csv(f"{output_dir}/{strategy_name}_portfolio.csv", index=False) # 保存汇总报告 with open(f"{output_dir}/selection_report.txt", 'w') as f: f.write("="*60 + "\n") f.write("动态选股算法结果报告\n") f.write("="*60 + "\n\n") f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"股票数量: {len(portfolio)}\n\n") f.write("主投资组合:\n") f.write("-"*40 + "\n") for _, row in portfolio.iterrows(): f.write(f"{row['stock_code']} | {row['industry']} | PE:{row['pe_ratio']:.1f} | PB:{row['pb_ratio']:.2f} | ROE:{row['roe']*100:.1f}% | 权重:{row['weight']*100:.1f}%\n") print(f"\n💾 结果已保存到 {output_dir}/ 目录") def main(): """主函数""" selector = DynamicStockSelection() portfolio, strategies = selector.run() return portfolio, strategies if __name__ == "__main__": main()