Files
sanguo_quant_live/pangtong-value/scripts/dynamic_stock_selection.py
T

302 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
动态选股算法 - 价值投资策略
截止时间:18:00
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class DynamicStockSelection:
"""动态选股算法"""
def __init__(self):
self.start_time = datetime.now()
print(f"🚀 动态选股算法启动!")
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
print(f"🎯 目标时间: 18:00")
def load_data(self):
"""加载数据"""
print(f"📊 加载财务数据...")
# 模拟3000只股票数据
np.random.seed(42)
n_stocks = 3000
data = pd.DataFrame({
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
'industry': np.random.choice(['金融', '科技', '消费', '医药', '工业', '能源', '材料', '公用事业'], n_stocks),
'market_cap': np.random.uniform(50, 1000, n_stocks), # 亿
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'ps_ratio': np.random.uniform(0.5, 10, n_stocks),
'dividend_yield': np.random.uniform(0, 0.05, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
'net_margin': np.random.uniform(0.05, 0.25, n_stocks),
'debt_to_equity': np.random.uniform(0.1, 1.5, n_stocks),
'current_ratio': np.random.uniform(1, 3, n_stocks),
'revenue_growth': np.random.uniform(-0.2, 0.5, n_stocks),
'profit_growth': np.random.uniform(-0.3, 0.6, n_stocks),
'fcf_yield': np.random.uniform(0, 0.1, n_stocks),
'volatility': np.random.uniform(0.2, 0.6, n_stocks),
'liquidity': np.random.uniform(1e5, 1e7, n_stocks)
})
print(f"✅ 加载 {n_stocks} 只股票数据完成")
return data
def calculate_factors(self, data):
"""计算因子得分"""
print(f"🔢 计算因子得分...")
# 1. 估值因子得分(越低估值,得分越高)
data['value_score'] = (
(1 - data['pe_ratio'].rank(pct=True)) * 0.4 +
(1 - data['pb_ratio'].rank(pct=True)) * 0.3 +
(1 - data['ps_ratio'].rank(pct=True)) * 0.2 +
data['dividend_yield'].rank(pct=True) * 0.1
)
# 2. 质量因子得分(越高质量,得分越高)
data['quality_score'] = (
data['roe'].rank(pct=True) * 0.3 +
data['gross_margin'].rank(pct=True) * 0.2 +
data['net_margin'].rank(pct=True) * 0.2 +
(1 - data['debt_to_equity'].rank(pct=True)) * 0.15 +
data['current_ratio'].rank(pct=True) * 0.15
)
# 3. 成长因子得分(越高成长,得分越高)
data['growth_score'] = (
data['revenue_growth'].rank(pct=True) * 0.5 +
data['profit_growth'].rank(pct=True) * 0.5
)
# 4. 风险因子得分(越低风险,得分越高)
data['risk_score'] = (
(1 - data['volatility'].rank(pct=True)) * 0.6 +
data['liquidity'].rank(pct=True) * 0.4
)
# 5. 综合得分
data['composite_score'] = (
data['value_score'] * 0.4 + # 估值权重40%
data['quality_score'] * 0.3 + # 质量权重30%
data['growth_score'] * 0.2 + # 成长权重20%
data['risk_score'] * 0.1 # 风险权重10%
)
print(f"✅ 因子计算完成")
return data
def apply_filters(self, data):
"""应用筛选条件"""
print(f"🔍 应用筛选条件...")
filtered = data.copy()
# 1. 估值筛选(PE < 30, PB < 3
filtered = filtered[
(filtered['pe_ratio'] < 30) &
(filtered['pb_ratio'] < 3)
]
# 2. 质量筛选(ROE > 10%, 毛利率 > 20%
filtered = filtered[
(filtered['roe'] > 0.1) &
(filtered['gross_margin'] > 0.2)
]
# 3. 财务健康筛选(负债率 < 100%, 流动比率 > 1
filtered = filtered[
(filtered['debt_to_equity'] < 1) &
(filtered['current_ratio'] > 1)
]
# 4. 流动性筛选(流动性 > 中位数)
liquidity_median = filtered['liquidity'].median()
filtered = filtered[filtered['liquidity'] > liquidity_median]
print(f"✅ 筛选后剩余 {len(filtered)} 只股票")
return filtered
def portfolio_construction(self, data, portfolio_size=20):
"""构建投资组合"""
print(f"🏗️ 构建投资组合...")
# 按行业分散
industries = data['industry'].unique()
portfolio = pd.DataFrame()
for industry in industries:
industry_stocks = data[data['industry'] == industry]
if len(industry_stocks) > 0:
# 每个行业选择前N名
n_per_industry = max(1, portfolio_size // len(industries))
top_stocks = industry_stocks.nlargest(n_per_industry, 'composite_score')
portfolio = pd.concat([portfolio, top_stocks])
# 如果组合数量不足,补充剩余名额
if len(portfolio) < portfolio_size:
remaining = portfolio_size - len(portfolio)
remaining_stocks = data[~data['stock_code'].isin(portfolio['stock_code'])]
top_remaining = remaining_stocks.nlargest(remaining, 'composite_score')
portfolio = pd.concat([portfolio, top_remaining])
# 按综合得分排序
portfolio = portfolio.nlargest(portfolio_size, 'composite_score')
# 计算权重(基于综合得分)
portfolio['weight'] = portfolio['composite_score'] / portfolio['composite_score'].sum()
print(f"✅ 构建 {len(portfolio)} 只股票的投资组合")
return portfolio
def strategy_variants(self, data):
"""生成策略变体"""
print(f"🔄 生成策略变体...")
strategies = {}
# 1. 纯价值策略(只看估值)
value_stocks = data.nlargest(20, 'value_score')
strategies['pure_value'] = value_stocks
# 2. 质量价值策略(估值+质量)
data['value_quality'] = data['value_score'] * 0.6 + data['quality_score'] * 0.4
value_quality_stocks = data.nlargest(20, 'value_quality')
strategies['value_quality'] = value_quality_stocks
# 3. 成长价值策略(估值+成长)
data['value_growth'] = data['value_score'] * 0.7 + data['growth_score'] * 0.3
value_growth_stocks = data.nlargest(20, 'value_growth')
strategies['value_growth'] = value_growth_stocks
# 4. 高股息策略
high_dividend_stocks = data.nlargest(20, 'dividend_yield')
strategies['high_dividend'] = high_dividend_stocks
# 5. 低波动策略
low_vol_stocks = data.nlargest(20, 'risk_score')
strategies['low_volatility'] = low_vol_stocks
print(f"✅ 生成 {len(strategies)} 个策略变体")
return strategies
def run(self):
"""运行选股算法"""
print(f"\n{'='*60}")
print("🚀 动态选股算法开始运行")
print(f"{'='*60}")
# 1. 加载数据
data = self.load_data()
# 2. 计算因子
data = self.calculate_factors(data)
# 3. 应用筛选
filtered_data = self.apply_filters(data)
# 4. 构建主投资组合
main_portfolio = self.portfolio_construction(filtered_data, portfolio_size=20)
# 5. 生成策略变体
strategy_variants = self.strategy_variants(filtered_data)
# 6. 输出结果
self.output_results(main_portfolio, strategy_variants, data)
return main_portfolio, strategy_variants
def output_results(self, portfolio, strategies, full_data):
"""输出结果"""
print(f"\n{'='*60}")
print("📊 选股结果汇总")
print(f"{'='*60}")
# 主投资组合
print(f"\n🏆 主投资组合(20只股票):")
print(f"{'='*40}")
portfolio_display = portfolio[['stock_code', 'industry', 'market_cap', 'pe_ratio', 'pb_ratio', 'roe', 'composite_score', 'weight']].copy()
portfolio_display['weight'] = portfolio_display['weight'].apply(lambda x: f"{x*100:.1f}%")
portfolio_display['roe'] = portfolio_display['roe'].apply(lambda x: f"{x*100:.1f}%")
print(portfolio_display.to_string(index=False))
# 组合特征
print(f"\n📈 组合特征:")
print(f"{'='*40}")
print(f"平均PE: {portfolio['pe_ratio'].mean():.1f}")
print(f"平均PB: {portfolio['pb_ratio'].mean():.2f}")
print(f"平均ROE: {portfolio['roe'].mean()*100:.1f}%")
print(f"平均股息率: {portfolio['dividend_yield'].mean()*100:.2f}%")
print(f"平均市值: {portfolio['market_cap'].mean():.1f}亿")
# 行业分布
print(f"\n🏭 行业分布:")
print(f"{'='*40}")
industry_dist = portfolio['industry'].value_counts()
for industry, count in industry_dist.items():
print(f"{industry}: {count}只 ({count/len(portfolio)*100:.1f}%)")
# 策略变体表现
print(f"\n🔄 策略变体对比:")
print(f"{'='*40}")
for strategy_name, strategy_stocks in strategies.items():
avg_pe = strategy_stocks['pe_ratio'].mean()
avg_pb = strategy_stocks['pb_ratio'].mean()
avg_roe = strategy_stocks['roe'].mean()
print(f"{strategy_name}: PE={avg_pe:.1f}, PB={avg_pb:.2f}, ROE={avg_roe*100:.1f}%")
# 时间统计
elapsed = (datetime.now() - self.start_time).total_seconds()
print(f"\n⏰ 算法运行时间: {elapsed:.2f}")
print(f"🕐 完成时间: {datetime.now().strftime('%H:%M:%S')}")
# 保存结果
self.save_results(portfolio, strategies)
def save_results(self, portfolio, strategies):
"""保存结果"""
import os
# 创建输出目录
output_dir = "selection_results"
os.makedirs(output_dir, exist_ok=True)
# 保存主投资组合
portfolio.to_csv(f"{output_dir}/main_portfolio.csv", index=False)
# 保存策略变体
for strategy_name, strategy_stocks in strategies.items():
strategy_stocks.to_csv(f"{output_dir}/{strategy_name}_portfolio.csv", index=False)
# 保存汇总报告
with open(f"{output_dir}/selection_report.txt", 'w') as f:
f.write("="*60 + "\n")
f.write("动态选股算法结果报告\n")
f.write("="*60 + "\n\n")
f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"股票数量: {len(portfolio)}\n\n")
f.write("主投资组合:\n")
f.write("-"*40 + "\n")
for _, row in portfolio.iterrows():
f.write(f"{row['stock_code']} | {row['industry']} | PE:{row['pe_ratio']:.1f} | PB:{row['pb_ratio']:.2f} | ROE:{row['roe']*100:.1f}% | 权重:{row['weight']*100:.1f}%\n")
print(f"\n💾 结果已保存到 {output_dir}/ 目录")
def main():
"""主函数"""
selector = DynamicStockSelection()
portfolio, strategies = selector.run()
return portfolio, strategies
if __name__ == "__main__":
main()