整理pangtong-value:脚本已在scripts/,删除research/中重复脚本

This commit is contained in:
cfdaily
2026-03-25 21:06:50 +08:00
parent f6b0484038
commit e18d0ed3e6
7 changed files with 0 additions and 2399 deletions
@@ -1,302 +0,0 @@
#!/usr/bin/env python3
"""
动态选股算法 - 价值投资策略
截止时间:18:00
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class DynamicStockSelection:
"""动态选股算法"""
def __init__(self):
self.start_time = datetime.now()
print(f"🚀 动态选股算法启动!")
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
print(f"🎯 目标时间: 18:00")
def load_data(self):
"""加载数据"""
print(f"📊 加载财务数据...")
# 模拟3000只股票数据
np.random.seed(42)
n_stocks = 3000
data = pd.DataFrame({
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
'industry': np.random.choice(['金融', '科技', '消费', '医药', '工业', '能源', '材料', '公用事业'], n_stocks),
'market_cap': np.random.uniform(50, 1000, n_stocks), # 亿
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'ps_ratio': np.random.uniform(0.5, 10, n_stocks),
'dividend_yield': np.random.uniform(0, 0.05, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
'net_margin': np.random.uniform(0.05, 0.25, n_stocks),
'debt_to_equity': np.random.uniform(0.1, 1.5, n_stocks),
'current_ratio': np.random.uniform(1, 3, n_stocks),
'revenue_growth': np.random.uniform(-0.2, 0.5, n_stocks),
'profit_growth': np.random.uniform(-0.3, 0.6, n_stocks),
'fcf_yield': np.random.uniform(0, 0.1, n_stocks),
'volatility': np.random.uniform(0.2, 0.6, n_stocks),
'liquidity': np.random.uniform(1e5, 1e7, n_stocks)
})
print(f"✅ 加载 {n_stocks} 只股票数据完成")
return data
def calculate_factors(self, data):
"""计算因子得分"""
print(f"🔢 计算因子得分...")
# 1. 估值因子得分(越低估值,得分越高)
data['value_score'] = (
(1 - data['pe_ratio'].rank(pct=True)) * 0.4 +
(1 - data['pb_ratio'].rank(pct=True)) * 0.3 +
(1 - data['ps_ratio'].rank(pct=True)) * 0.2 +
data['dividend_yield'].rank(pct=True) * 0.1
)
# 2. 质量因子得分(越高质量,得分越高)
data['quality_score'] = (
data['roe'].rank(pct=True) * 0.3 +
data['gross_margin'].rank(pct=True) * 0.2 +
data['net_margin'].rank(pct=True) * 0.2 +
(1 - data['debt_to_equity'].rank(pct=True)) * 0.15 +
data['current_ratio'].rank(pct=True) * 0.15
)
# 3. 成长因子得分(越高成长,得分越高)
data['growth_score'] = (
data['revenue_growth'].rank(pct=True) * 0.5 +
data['profit_growth'].rank(pct=True) * 0.5
)
# 4. 风险因子得分(越低风险,得分越高)
data['risk_score'] = (
(1 - data['volatility'].rank(pct=True)) * 0.6 +
data['liquidity'].rank(pct=True) * 0.4
)
# 5. 综合得分
data['composite_score'] = (
data['value_score'] * 0.4 + # 估值权重40%
data['quality_score'] * 0.3 + # 质量权重30%
data['growth_score'] * 0.2 + # 成长权重20%
data['risk_score'] * 0.1 # 风险权重10%
)
print(f"✅ 因子计算完成")
return data
def apply_filters(self, data):
"""应用筛选条件"""
print(f"🔍 应用筛选条件...")
filtered = data.copy()
# 1. 估值筛选(PE < 30, PB < 3
filtered = filtered[
(filtered['pe_ratio'] < 30) &
(filtered['pb_ratio'] < 3)
]
# 2. 质量筛选(ROE > 10%, 毛利率 > 20%
filtered = filtered[
(filtered['roe'] > 0.1) &
(filtered['gross_margin'] > 0.2)
]
# 3. 财务健康筛选(负债率 < 100%, 流动比率 > 1
filtered = filtered[
(filtered['debt_to_equity'] < 1) &
(filtered['current_ratio'] > 1)
]
# 4. 流动性筛选(流动性 > 中位数)
liquidity_median = filtered['liquidity'].median()
filtered = filtered[filtered['liquidity'] > liquidity_median]
print(f"✅ 筛选后剩余 {len(filtered)} 只股票")
return filtered
def portfolio_construction(self, data, portfolio_size=20):
"""构建投资组合"""
print(f"🏗️ 构建投资组合...")
# 按行业分散
industries = data['industry'].unique()
portfolio = pd.DataFrame()
for industry in industries:
industry_stocks = data[data['industry'] == industry]
if len(industry_stocks) > 0:
# 每个行业选择前N名
n_per_industry = max(1, portfolio_size // len(industries))
top_stocks = industry_stocks.nlargest(n_per_industry, 'composite_score')
portfolio = pd.concat([portfolio, top_stocks])
# 如果组合数量不足,补充剩余名额
if len(portfolio) < portfolio_size:
remaining = portfolio_size - len(portfolio)
remaining_stocks = data[~data['stock_code'].isin(portfolio['stock_code'])]
top_remaining = remaining_stocks.nlargest(remaining, 'composite_score')
portfolio = pd.concat([portfolio, top_remaining])
# 按综合得分排序
portfolio = portfolio.nlargest(portfolio_size, 'composite_score')
# 计算权重(基于综合得分)
portfolio['weight'] = portfolio['composite_score'] / portfolio['composite_score'].sum()
print(f"✅ 构建 {len(portfolio)} 只股票的投资组合")
return portfolio
def strategy_variants(self, data):
"""生成策略变体"""
print(f"🔄 生成策略变体...")
strategies = {}
# 1. 纯价值策略(只看估值)
value_stocks = data.nlargest(20, 'value_score')
strategies['pure_value'] = value_stocks
# 2. 质量价值策略(估值+质量)
data['value_quality'] = data['value_score'] * 0.6 + data['quality_score'] * 0.4
value_quality_stocks = data.nlargest(20, 'value_quality')
strategies['value_quality'] = value_quality_stocks
# 3. 成长价值策略(估值+成长)
data['value_growth'] = data['value_score'] * 0.7 + data['growth_score'] * 0.3
value_growth_stocks = data.nlargest(20, 'value_growth')
strategies['value_growth'] = value_growth_stocks
# 4. 高股息策略
high_dividend_stocks = data.nlargest(20, 'dividend_yield')
strategies['high_dividend'] = high_dividend_stocks
# 5. 低波动策略
low_vol_stocks = data.nlargest(20, 'risk_score')
strategies['low_volatility'] = low_vol_stocks
print(f"✅ 生成 {len(strategies)} 个策略变体")
return strategies
def run(self):
"""运行选股算法"""
print(f"\n{'='*60}")
print("🚀 动态选股算法开始运行")
print(f"{'='*60}")
# 1. 加载数据
data = self.load_data()
# 2. 计算因子
data = self.calculate_factors(data)
# 3. 应用筛选
filtered_data = self.apply_filters(data)
# 4. 构建主投资组合
main_portfolio = self.portfolio_construction(filtered_data, portfolio_size=20)
# 5. 生成策略变体
strategy_variants = self.strategy_variants(filtered_data)
# 6. 输出结果
self.output_results(main_portfolio, strategy_variants, data)
return main_portfolio, strategy_variants
def output_results(self, portfolio, strategies, full_data):
"""输出结果"""
print(f"\n{'='*60}")
print("📊 选股结果汇总")
print(f"{'='*60}")
# 主投资组合
print(f"\n🏆 主投资组合(20只股票):")
print(f"{'='*40}")
portfolio_display = portfolio[['stock_code', 'industry', 'market_cap', 'pe_ratio', 'pb_ratio', 'roe', 'composite_score', 'weight']].copy()
portfolio_display['weight'] = portfolio_display['weight'].apply(lambda x: f"{x*100:.1f}%")
portfolio_display['roe'] = portfolio_display['roe'].apply(lambda x: f"{x*100:.1f}%")
print(portfolio_display.to_string(index=False))
# 组合特征
print(f"\n📈 组合特征:")
print(f"{'='*40}")
print(f"平均PE: {portfolio['pe_ratio'].mean():.1f}")
print(f"平均PB: {portfolio['pb_ratio'].mean():.2f}")
print(f"平均ROE: {portfolio['roe'].mean()*100:.1f}%")
print(f"平均股息率: {portfolio['dividend_yield'].mean()*100:.2f}%")
print(f"平均市值: {portfolio['market_cap'].mean():.1f}亿")
# 行业分布
print(f"\n🏭 行业分布:")
print(f"{'='*40}")
industry_dist = portfolio['industry'].value_counts()
for industry, count in industry_dist.items():
print(f"{industry}: {count}只 ({count/len(portfolio)*100:.1f}%)")
# 策略变体表现
print(f"\n🔄 策略变体对比:")
print(f"{'='*40}")
for strategy_name, strategy_stocks in strategies.items():
avg_pe = strategy_stocks['pe_ratio'].mean()
avg_pb = strategy_stocks['pb_ratio'].mean()
avg_roe = strategy_stocks['roe'].mean()
print(f"{strategy_name}: PE={avg_pe:.1f}, PB={avg_pb:.2f}, ROE={avg_roe*100:.1f}%")
# 时间统计
elapsed = (datetime.now() - self.start_time).total_seconds()
print(f"\n⏰ 算法运行时间: {elapsed:.2f}")
print(f"🕐 完成时间: {datetime.now().strftime('%H:%M:%S')}")
# 保存结果
self.save_results(portfolio, strategies)
def save_results(self, portfolio, strategies):
"""保存结果"""
import os
# 创建输出目录
output_dir = "selection_results"
os.makedirs(output_dir, exist_ok=True)
# 保存主投资组合
portfolio.to_csv(f"{output_dir}/main_portfolio.csv", index=False)
# 保存策略变体
for strategy_name, strategy_stocks in strategies.items():
strategy_stocks.to_csv(f"{output_dir}/{strategy_name}_portfolio.csv", index=False)
# 保存汇总报告
with open(f"{output_dir}/selection_report.txt", 'w') as f:
f.write("="*60 + "\n")
f.write("动态选股算法结果报告\n")
f.write("="*60 + "\n\n")
f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"股票数量: {len(portfolio)}\n\n")
f.write("主投资组合:\n")
f.write("-"*40 + "\n")
for _, row in portfolio.iterrows():
f.write(f"{row['stock_code']} | {row['industry']} | PE:{row['pe_ratio']:.1f} | PB:{row['pb_ratio']:.2f} | ROE:{row['roe']*100:.1f}% | 权重:{row['weight']*100:.1f}%\n")
print(f"\n💾 结果已保存到 {output_dir}/ 目录")
def main():
"""主函数"""
selector = DynamicStockSelection()
portfolio, strategies = selector.run()
return portfolio, strategies
if __name__ == "__main__":
main()
@@ -1,402 +0,0 @@
#!/usr/bin/env python3
"""
A股价值投资多因子综合评分模型
庞统副军师 - 全新深度调研开发
"""
import pandas as pd
import numpy as np
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
class MultiFactorScoringModel:
"""多因子综合评分模型"""
def __init__(self, data):
self.data = data.copy()
self.start_time = datetime.now()
print(f"🚀 A股价值投资多因子综合评分模型启动")
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
print(f"🎯 保持active状态直到明早10点")
def calculate_factors(self):
"""计算各种因子得分"""
print(f"🔢 计算多因子得分...")
# 1. 价值因子得分(越低估值,得分越高)
self.data['value_score'] = (
(1 - self.data['pe_ratio'].rank(pct=True)) * 0.25 +
(1 - self.data['pb_ratio'].rank(pct=True)) * 0.20 +
self.data['dividend_yield'].rank(pct=True) * 0.15 +
(1 - self.data['ps_ratio'].rank(pct=True)) * 0.10
)
# 2. 质量因子得分(越高质量,得分越高)
self.data['quality_score'] = (
self.data['roe_2025'].rank(pct=True) * 0.20 +
self.data['gross_margin_2025'].rank(pct=True) * 0.15 +
self.data['net_margin_2025'].rank(pct=True) * 0.10 +
(1 - self.data['debt_to_equity'].rank(pct=True)) * 0.10 +
self.data['current_ratio'].rank(pct=True) * 0.05
)
# 3. 成长因子得分(越高成长,得分越高)
self.data['growth_score'] = (
self.data['revenue_growth_3y'].rank(pct=True) * 0.15 +
self.data['profit_growth_3y'].rank(pct=True) * 0.10 +
self.data['market_share_growth'].rank(pct=True) * 0.05
)
# 4. 中国特色因子得分
self.data['china_special_score'] = (
self.data['policy_support_score'] * 0.10 +
self.data['reform_progress_score'] * 0.08 +
self.data['specialized_score'] * 0.07
)
# 5. 另类数据因子得分
# 情绪因子:情绪越悲观,得分越高(逆向投资)
self.data['alternative_score'] = (
(1 - self.data['sentiment_score']) * 0.05 + # 情绪越悲观越好
self.data['search_heat'].rank(pct=True) * 0.03 +
self.data['social_media_mentions'].rank(pct=True) * 0.02
)
# 6. 风险控制因子得分(越低风险,得分越高)
self.data['risk_score'] = (
(1 - self.data['volatility_1y'].rank(pct=True)) * 0.04 +
self.data['liquidity_score'] * 0.03 +
self.data['credit_rating'].map({
'AAA': 1.0, 'AA': 0.8, 'A': 0.6, 'BBB': 0.4, 'BB': 0.2
}) * 0.03
)
print(f"✅ 因子得分计算完成")
return self.data
def calculate_composite_score(self, weights=None):
"""计算综合得分"""
print(f"📊 计算综合得分...")
# 默认权重分配
if weights is None:
weights = {
'value': 0.25, # 价值因子 25%
'quality': 0.20, # 质量因子 20%
'growth': 0.15, # 成长因子 15%
'china_special': 0.15, # 中国特色 15%
'alternative': 0.10, # 另类数据 10%
'risk': 0.10, # 风险控制 10%
'industry_diversification': 0.05 # 行业分散 5%
}
# 计算行业分散得分(避免过度集中)
# 确保索引唯一,避免重复索引导致错误
if self.data.index.duplicated().any():
self.data = self.data.reset_index(drop=True)
industry_counts = self.data['industry'].value_counts()
industry_weight = 1 / industry_counts[self.data['industry']].values / len(industry_counts)
self.data['industry_score'] = industry_weight * 100
# 计算综合得分
self.data['composite_score'] = (
self.data['value_score'] * weights['value'] +
self.data['quality_score'] * weights['quality'] +
self.data['growth_score'] * weights['growth'] +
self.data['china_special_score'] * weights['china_special'] +
self.data['alternative_score'] * weights['alternative'] +
self.data['risk_score'] * weights['risk'] +
self.data['industry_score'] * weights['industry_diversification']
)
# 标准化到0-100分
self.data['composite_score_normalized'] = (
(self.data['composite_score'] - self.data['composite_score'].min()) /
(self.data['composite_score'].max() - self.data['composite_score'].min()) * 100
)
print(f"✅ 综合得分计算完成")
return self.data, weights
def select_top_stocks(self, n=50, method='composite'):
"""选择得分最高的股票"""
print(f"🏆 选择Top {n}股票...")
if method == 'composite':
top_stocks = self.data.nlargest(n, 'composite_score_normalized')
elif method == 'value':
top_stocks = self.data.nlargest(n, 'value_score')
elif method == 'quality':
top_stocks = self.data.nlargest(n, 'quality_score')
elif method == 'growth':
top_stocks = self.data.nlargest(n, 'growth_score')
elif method == 'china_special':
top_stocks = self.data.nlargest(n, 'china_special_score')
else:
top_stocks = self.data.nlargest(n, 'composite_score_normalized')
print(f"✅ 选择 {len(top_stocks)} 只Top股票")
return top_stocks
def analyze_portfolio(self, portfolio):
"""分析投资组合特征"""
print(f"📈 分析投资组合特征...")
analysis = {
'股票数量': len(portfolio),
'平均综合得分': portfolio['composite_score_normalized'].mean(),
'平均PE': portfolio['pe_ratio'].mean(),
'平均PB': portfolio['pb_ratio'].mean(),
'平均ROE': portfolio['roe_2025'].mean() * 100,
'平均股息率': portfolio['dividend_yield'].mean() * 100,
'平均营收增长': portfolio['revenue_growth_3y'].mean() * 100,
'平均盈利增长': portfolio['profit_growth_3y'].mean() * 100,
'平均政策得分': portfolio['policy_support_score'].mean(),
'平均改革进展': portfolio['reform_progress_score'].mean(),
'平均专精得分': portfolio['specialized_score'].mean(),
'平均情绪得分': portfolio['sentiment_score'].mean(),
'平均波动率': portfolio['volatility_1y'].mean() * 100,
'平均流动性得分': portfolio['liquidity_score'].mean()
}
# 行业分布
industry_dist = portfolio['industry'].value_counts()
analysis['行业数量'] = len(industry_dist)
analysis['最大行业占比'] = industry_dist.max() / len(portfolio) * 100
# 地域分布
province_dist = portfolio['province'].value_counts()
analysis['地域数量'] = len(province_dist)
# 国企占比
soe_count = portfolio['is_soe'].sum()
analysis['国企占比'] = soe_count / len(portfolio) * 100
# 信用评级分布
credit_dist = portfolio['credit_rating'].value_counts()
analysis['AAA评级占比'] = credit_dist.get('AAA', 0) / len(portfolio) * 100
print(f"✅ 组合分析完成")
return analysis
def run_model(self):
"""运行完整模型"""
print(f"\n{'='*70}")
print("🚀 开始A股价值投资多因子综合评分模型")
print(f"{'='*70}")
# 1. 计算因子得分
scored_data = self.calculate_factors()
# 2. 计算综合得分
scored_data, weights = self.calculate_composite_score()
# 3. 选择各种方法下的Top股票
print(f"\n🔍 各种选股方法结果对比:")
methods = [
('综合得分', 'composite'),
('价值因子', 'value'),
('质量因子', 'quality'),
('成长因子', 'growth'),
('中国特色', 'china_special')
]
portfolios = {}
analyses = {}
for method_name, method_key in methods:
print(f"\n📊 {method_name}选股结果:")
portfolio = self.select_top_stocks(50, method_key)
analysis = self.analyze_portfolio(portfolio)
portfolios[method_key] = portfolio
analyses[method_key] = analysis
# 输出关键指标
print(f" 平均PE: {analysis['平均PE']:.1f}")
print(f" 平均PB: {analysis['平均PB']:.2f}")
print(f" 平均ROE: {analysis['平均ROE']:.1f}%")
print(f" 平均股息率: {analysis['平均股息率']:.2f}%")
print(f" 平均营收增长: {analysis['平均营收增长']:.1f}%")
# 4. 结果对比分析
self.output_comparison(analyses, weights)
return scored_data, portfolios, analyses
def output_comparison(self, analyses, weights):
"""输出结果对比分析"""
print(f"\n{'='*70}")
print("📊 各种选股方法对比分析")
print(f"{'='*70}")
# 创建对比表格
comparison_data = []
methods = ['composite', 'value', 'quality', 'growth', 'china_special']
method_names = ['综合得分', '价值因子', '质量因子', '成长因子', '中国特色']
for method_key, method_name in zip(methods, method_names):
if method_key in analyses:
analysis = analyses[method_key]
comparison_data.append({
'选股方法': method_name,
'平均PE': f"{analysis['平均PE']:.1f}",
'平均PB': f"{analysis['平均PB']:.2f}",
'平均ROE%': f"{analysis['平均ROE']:.1f}",
'平均股息率%': f"{analysis['平均股息率']:.2f}",
'平均营收增长%': f"{analysis['平均营收增长']:.1f}",
'平均政策得分': f"{analysis['平均政策得分']:.3f}",
'平均改革进展': f"{analysis['平均改革进展']:.3f}",
'平均专精得分': f"{analysis['平均专精得分']:.3f}",
'平均情绪得分': f"{analysis['平均情绪得分']:.3f}",
'平均波动率%': f"{analysis['平均波动率']:.1f}",
'国企占比%': f"{analysis['国企占比']:.1f}"
})
comparison_df = pd.DataFrame(comparison_data)
print(comparison_df.to_string(index=False))
# 权重说明
print(f"\n🔢 综合得分权重分配:")
for factor, weight in weights.items():
print(f" {factor}: {weight*100:.1f}%")
# 结论和建议
print(f"\n🎯 模型结论和建议:")
print(f"1. 🏆 综合得分方法最平衡")
print(f" 优势: 平衡价值、质量、成长、特色、风险")
print(f" 特征: 合理估值+良好质量+适度成长+中国特色")
print(f"2. ✅ 价值因子方法最安全")
print(f" 优势: 估值最低,安全边际最大")
print(f" 风险: 可能存在价值陷阱,需结合质量分析")
print(f"3. 📈 质量因子方法最稳健")
print(f" 优势: 财务质量最好,波动率较低")
print(f" 特征: 高ROE、高盈利质量、财务健康")
print(f"4. ⚠️ 成长因子方法风险最高")
print(f" 风险: 估值最高,波动率最大")
print(f" 建议: 必须严格控制估值,避免成长陷阱")
print(f"5. 🇨🇳 中国特色方法机会独特")
print(f" 优势: 政策支持、国企改革、专精特新机会")
print(f" 应用: 作为补充策略,把握中国特色机会")
# 推荐策略
print(f"\n🚀 推荐的投资策略:")
print(f"1. 核心策略: 综合得分选股 (权重70%)")
print(f" 采用多因子综合评分,平衡各种因素")
print(f"2. 卫星策略: 中国特色机会捕捉 (权重20%)")
print(f" 重点把握政策支持、国企改革、专精特新机会")
print(f"3. 战术策略: 情绪极端逆向投资 (权重10%)")
print(f" 在市场情绪极端时进行逆向投资")
print(f"4. 风险控制策略:")
print(f" 个股风险控制: 分散投资,仓位限制")
print(f" 行业风险控制: 行业中性,避免过度集中")
print(f" 市场风险控制: 动态仓位调整,止损机制")
print(f" 流动性风险控制: 关注流动性,避免流动性风险")
# 时间统计
elapsed = (datetime.now() - self.start_time).total_seconds()
print(f"\n⏰ 模型运行时间: {elapsed:.2f}")
print(f"🕐 完成时间: {datetime.now().strftime('%H:%M:%S')}")
# 保存结果
self.save_results(comparison_df, weights)
def save_results(self, comparison_df, weights):
"""保存结果"""
import os
# 创建输出目录
output_dir = "multi_factor_results"
os.makedirs(output_dir, exist_ok=True)
# 保存对比结果
comparison_df.to_csv(f"{output_dir}/method_comparison.csv", index=False)
# 保存权重配置
weights_df = pd.DataFrame([weights])
weights_df.to_csv(f"{output_dir}/factor_weights.csv", index=False)
# 保存得分数据
self.data.to_csv(f"{output_dir}/scored_stock_data.csv", index=False)
# 保存报告
with open(f"{output_dir}/model_report.txt", 'w') as f:
f.write("="*70 + "\n")
f.write("A股价值投资多因子综合评分模型报告\n")
f.write("="*70 + "\n\n")
f.write(f"模型运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"股票数量: {len(self.data)}\n")
f.write(f"因子数量: 6大类因子\n\n")
f.write("权重分配:\n")
f.write("-"*40 + "\n")
for factor, weight in weights.items():
f.write(f"{factor}: {weight*100:.1f}%\n")
f.write("\n各种选股方法对比:\n")
f.write("-"*40 + "\n")
f.write(comparison_df.to_string())
print(f"\n💾 模型结果已保存到 {output_dir}/ 目录")
def main():
"""主函数"""
# 加载数据(这里使用之前生成的数据)
from datetime import datetime
import numpy as np
print(f"🚀 庞统副军师 - 多因子综合评分模型深度调研")
print(f"🕐 当前时间: {datetime.now().strftime('%H:%M:%S')}")
# 模拟数据(实际应用中应加载真实数据)
np.random.seed(123)
n_stocks = 3500
data = pd.DataFrame({
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
'industry': np.random.choice(['金融', '科技', '消费', '医药', '工业', '能源', '材料', '公用事业', '房地产', '交通运输'], n_stocks),
'province': np.random.choice(['北京', '上海', '广东', '浙江', '江苏', '山东', '福建', '四川', '湖北', '湖南'], n_stocks),
'pe_ratio': np.random.uniform(8, 60, n_stocks),
'pb_ratio': np.random.uniform(0.8, 8, n_stocks),
'ps_ratio': np.random.uniform(0.5, 15, n_stocks),
'dividend_yield': np.random.uniform(0, 0.08, n_stocks),
'roe_2025': np.random.uniform(0.02, 0.35, n_stocks),
'gross_margin_2025': np.random.uniform(0.15, 0.65, n_stocks),
'net_margin_2025': np.random.uniform(0.05, 0.3, n_stocks),
'debt_to_equity': np.random.uniform(0.1, 2.0, n_stocks),
'current_ratio': np.random.uniform(0.8, 5, n_stocks),
'revenue_growth_3y': np.random.uniform(-0.2, 0.8, n_stocks),
'profit_growth_3y': np.random.uniform(-0.3, 1.0, n_stocks),
'market_share_growth': np.random.uniform(-0.1, 0.5, n_stocks),
'policy_support_score': np.random.uniform(0, 1, n_stocks),
'reform_progress_score': np.random.uniform(0, 1, n_stocks),
'specialized_score': np.random.uniform(0, 1, n_stocks),
'sentiment_score': np.random.uniform(0, 1, n_stocks),
'search_heat': np.random.uniform(0, 1, n_stocks),
'social_media_mentions': np.random.randint(0, 10000, n_stocks),
'volatility_1y': np.random.uniform(0.2, 0.8, n_stocks),
'liquidity_score': np.random.uniform(0, 1, n_stocks),
'credit_rating': np.random.choice(['AAA', 'AA', 'A', 'BBB', 'BB'], n_stocks),
'is_soe': np.random.choice([True, False], n_stocks, p=[0.3, 0.7])
})
print(f"📊 加载 {n_stocks} 只A股数据")
# 运行模型
model = MultiFactorScoringModel(data)
scored_data, portfolios, analyses = model.run_model()
return scored_data, portfolios, analyses
if __name__ == "__main__":
main()
@@ -1,391 +0,0 @@
#!/usr/bin/env python3
"""
实时价值因子监测面板
更新时间:每5分钟
"""
import dash
from dash import dcc, html, Input, Output
import plotly.graph_objs as go
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import threading
import time
class RealTimeValueDashboard:
"""实时价值因子监测面板"""
def __init__(self):
self.app = dash.Dash(__name__)
self.data = None
self.update_interval = 300 # 5分钟更新
self.last_update = datetime.now()
# 初始化布局
self.setup_layout()
def setup_layout(self):
"""设置仪表板布局"""
self.app.layout = html.Div([
# 标题栏
html.Div([
html.H1("📊 实时价值因子监测面板", style={'textAlign': 'center', 'color': '#2E86C1'}),
html.Div([
html.Span("🕐 最后更新: ", style={'fontWeight': 'bold'}),
html.Span(id='last-update-time'),
html.Span(" | ", style={'margin': '0 10px'}),
html.Span("📈 监控股票数: ", style={'fontWeight': 'bold'}),
html.Span(id='stock-count'),
html.Span(" | ", style={'margin': '0 10px'}),
html.Span("⚡ 更新频率: ", style={'fontWeight': 'bold'}),
html.Span("5分钟")
], style={'textAlign': 'center', 'marginBottom': '20px'})
]),
# 第一行:关键指标
html.Div([
html.Div([
html.H3("🏆 价值投资Top 10", style={'textAlign': 'center'}),
dcc.Graph(id='top-10-chart', style={'height': '400px'})
], className='six columns'),
html.Div([
html.H3("📊 因子分布", style={'textAlign': 'center'}),
dcc.Graph(id='factor-distribution', style={'height': '400px'})
], className='six columns')
], className='row'),
# 第二行:详细分析
html.Div([
html.Div([
html.H3("💰 估值因子热力图", style={'textAlign': 'center'}),
dcc.Graph(id='valuation-heatmap', style={'height': '400px'})
], className='six columns'),
html.Div([
html.H3("📈 质量因子趋势", style={'textAlign': 'center'}),
dcc.Graph(id='quality-trend', style={'height': '400px'})
], className='six columns')
], className='row'),
# 第三行:控制面板
html.Div([
html.Div([
html.H3("⚙️ 控制面板", style={'textAlign': 'center'}),
html.Div([
html.Label("选择行业:"),
dcc.Dropdown(
id='industry-selector',
options=[
{'label': '全部行业', 'value': 'all'},
{'label': '金融', 'value': 'financial'},
{'label': '科技', 'value': 'tech'},
{'label': '消费', 'value': 'consumer'},
{'label': '医药', 'value': 'medical'},
{'label': '工业', 'value': 'industrial'}
],
value='all',
style={'marginBottom': '20px'}
),
html.Label("选择市值范围:"),
dcc.RangeSlider(
id='market-cap-slider',
min=0,
max=1000,
step=50,
value=[100, 500],
marks={i: f'{i}亿' for i in range(0, 1001, 100)},
style={'marginBottom': '20px'}
),
html.Button('🔄 立即更新数据', id='update-button', n_clicks=0,
style={'width': '100%', 'padding': '10px', 'backgroundColor': '#2E86C1', 'color': 'white'})
])
], className='six columns'),
html.Div([
html.H3("📋 实时数据表", style={'textAlign': 'center'}),
html.Div(id='real-time-table', style={'height': '400px', 'overflowY': 'scroll'})
], className='six columns')
], className='row'),
# 定时器
dcc.Interval(
id='interval-component',
interval=5*60*1000, # 5分钟
n_intervals=0
)
])
# 设置回调
self.setup_callbacks()
def setup_callbacks(self):
"""设置回调函数"""
@self.app.callback(
[Output('last-update-time', 'children'),
Output('stock-count', 'children'),
Output('top-10-chart', 'figure'),
Output('factor-distribution', 'figure'),
Output('valuation-heatmap', 'figure'),
Output('quality-trend', 'figure'),
Output('real-time-table', 'children')],
[Input('interval-component', 'n_intervals'),
Input('update-button', 'n_clicks'),
Input('industry-selector', 'value'),
Input('market-cap-slider', 'value')]
)
def update_dashboard(n_intervals, n_clicks, industry, market_cap_range):
"""更新仪表板"""
# 更新数据
self.update_data()
# 过滤数据
filtered_data = self.filter_data(industry, market_cap_range)
# 更新时间
current_time = datetime.now().strftime('%H:%M:%S')
# 1. Top 10图表
top_10_fig = self.create_top_10_chart(filtered_data)
# 2. 因子分布图
factor_fig = self.create_factor_distribution(filtered_data)
# 3. 估值热力图
heatmap_fig = self.create_valuation_heatmap(filtered_data)
# 4. 质量趋势图
trend_fig = self.create_quality_trend(filtered_data)
# 5. 实时数据表
table = self.create_real_time_table(filtered_data)
return [
current_time,
len(filtered_data),
top_10_fig,
factor_fig,
heatmap_fig,
trend_fig,
table
]
def update_data(self):
"""更新数据"""
# 这里应该从数据源获取实时数据
# 暂时使用模拟数据
np.random.seed(int(time.time()))
n_stocks = 3000
self.data = pd.DataFrame({
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
'industry': np.random.choice(['financial', 'tech', 'consumer', 'medical', 'industrial'], n_stocks),
'market_cap': np.random.uniform(50, 1000, n_stocks),
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
'value_score': np.random.uniform(0, 1, n_stocks),
'quality_score': np.random.uniform(0, 1, n_stocks),
'composite_score': np.random.uniform(0, 1, n_stocks)
})
self.last_update = datetime.now()
def filter_data(self, industry, market_cap_range):
"""过滤数据"""
if self.data is None:
return pd.DataFrame()
filtered = self.data.copy()
# 按行业过滤
if industry != 'all':
filtered = filtered[filtered['industry'] == industry]
# 按市值过滤
filtered = filtered[(filtered['market_cap'] >= market_cap_range[0]) &
(filtered['market_cap'] <= market_cap_range[1])]
return filtered
def create_top_10_chart(self, data):
"""创建Top 10图表"""
if len(data) == 0:
return {}
top_10 = data.nlargest(10, 'composite_score')
fig = go.Figure(data=[
go.Bar(
x=top_10['stock_code'],
y=top_10['composite_score'],
text=top_10['composite_score'].round(3),
textposition='auto',
marker_color='#2E86C1'
)
])
fig.update_layout(
title='价值投资综合得分Top 10',
xaxis_title='股票代码',
yaxis_title='综合得分',
yaxis_range=[0, 1]
)
return fig
def create_factor_distribution(self, data):
"""创建因子分布图"""
if len(data) == 0:
return {}
fig = go.Figure()
# 估值因子分布
fig.add_trace(go.Histogram(
x=data['pe_ratio'],
name='市盈率分布',
opacity=0.7,
marker_color='#E74C3C'
))
# 质量因子分布
fig.add_trace(go.Histogram(
x=data['roe'],
name='ROE分布',
opacity=0.7,
marker_color='#2ECC71'
))
fig.update_layout(
title='因子分布图',
xaxis_title='因子值',
yaxis_title='频数',
barmode='overlay'
)
return fig
def create_valuation_heatmap(self, data):
"""创建估值热力图"""
if len(data) == 0:
return {}
# 创建热力图数据
heatmap_data = data.pivot_table(
values='composite_score',
index=pd.cut(data['pe_ratio'], bins=10),
columns=pd.cut(data['pb_ratio'], bins=10),
aggfunc='mean'
)
fig = go.Figure(data=go.Heatmap(
z=heatmap_data.values,
x=[f'{col.left:.1f}-{col.right:.1f}' for col in heatmap_data.columns],
y=[f'{idx.left:.1f}-{idx.right:.1f}' for idx in heatmap_data.index],
colorscale='Viridis'
))
fig.update_layout(
title='估值因子热力图 (PE vs PB)',
xaxis_title='市净率(PB)区间',
yaxis_title='市盈率(PE)区间'
)
return fig
def create_quality_trend(self, data):
"""创建质量趋势图"""
if len(data) == 0:
return {}
# 按行业分组计算平均质量得分
industry_quality = data.groupby('industry').agg({
'roe': 'mean',
'gross_margin': 'mean',
'quality_score': 'mean'
}).reset_index()
fig = go.Figure()
fig.add_trace(go.Scatter(
x=industry_quality['industry'],
y=industry_quality['roe'],
mode='lines+markers',
name='ROE',
line=dict(color='#2ECC71', width=3)
))
fig.add_trace(go.Scatter(
x=industry_quality['industry'],
y=industry_quality['gross_margin'],
mode='lines+markers',
name='毛利率',
line=dict(color='#E74C3C', width=3)
))
fig.update_layout(
title='各行业质量因子趋势',
xaxis_title='行业',
yaxis_title='因子值',
yaxis_range=[0, 1]
)
return fig
def create_real_time_table(self, data):
"""创建实时数据表"""
if len(data) == 0:
return "暂无数据"
top_20 = data.nlargest(20, 'composite_score')
table = html.Table([
html.Thead(
html.Tr([
html.Th('股票代码'),
html.Th('行业'),
html.Th('市值(亿)'),
html.Th('PE'),
html.Th('PB'),
html.Th('ROE'),
html.Th('综合得分')
])
),
html.Tbody([
html.Tr([
html.Td(row['stock_code']),
html.Td(row['industry']),
html.Td(f"{row['market_cap']:.1f}"),
html.Td(f"{row['pe_ratio']:.1f}"),
html.Td(f"{row['pb_ratio']:.2f}"),
html.Td(f"{row['roe']:.2%}"),
html.Td(f"{row['composite_score']:.3f}")
]) for _, row in top_20.iterrows()
])
], style={'width': '100%', 'borderCollapse': 'collapse'})
return table
def run(self, debug=False):
"""运行仪表板"""
print(f"🚀 启动实时价值因子监测面板...")
print(f"📊 监控股票数: 3000+")
print(f"⚡ 更新频率: 每5分钟")
print(f"🌐 访问地址: http://127.0.0.1:8050")
print(f"🕐 启动时间: {datetime.now().strftime('%H:%M:%S')}")
self.app.run_server(debug=debug)
def main():
"""主函数"""
dashboard = RealTimeValueDashboard()
dashboard.run(debug=False)
if __name__ == '__main__':
main()
@@ -1,351 +0,0 @@
#!/usr/bin/env python3
"""
A股价值投资选股方法历史回测验证
庞统副军师 - 深度调研执行
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class ValueInvestingBacktest:
"""价值投资选股方法回测验证框架"""
def __init__(self):
self.start_time = datetime.now()
print(f"🚀 价值投资选股方法历史回测验证启动")
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
print(f"🎯 保持active状态直到明早10点")
def generate_historical_data(self, n_stocks=3000, n_years=10):
"""生成历史数据(模拟)"""
print(f"📈 生成历史数据...")
np.random.seed(42)
# 生成日期序列
end_date = datetime.now()
start_date = end_date - timedelta(days=n_years*365)
dates = pd.date_range(start=start_date, end=end_date, freq='B')
# 生成股票代码
stock_codes = [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)]
# 生成基础特征数据
base_features = pd.DataFrame({
'stock_code': stock_codes,
'industry': np.random.choice(['金融', '科技', '消费', '医药', '工业', '能源', '材料', '公用事业'], n_stocks),
'market_cap': np.random.uniform(50, 1000, n_stocks),
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'revenue_growth': np.random.uniform(-0.2, 0.5, n_stocks),
'profit_growth': np.random.uniform(-0.3, 0.6, n_stocks),
'dividend_yield': np.random.uniform(0, 0.05, n_stocks),
'volatility': np.random.uniform(0.2, 0.6, n_stocks)
})
# 生成价格数据
price_data = pd.DataFrame(index=dates, columns=stock_codes)
for idx, stock in enumerate(stock_codes):
# 基础收益率(年化8-15%
base_daily_return = np.random.uniform(0.0003, 0.0006)
# 行业因子
industry_factor = {
'金融': 0.0002,
'科技': 0.0008,
'消费': 0.0005,
'医药': 0.0004,
'工业': 0.0003,
'能源': 0.0002,
'材料': 0.0003,
'公用事业': 0.0001
}[base_features.loc[idx, 'industry']]
# 价值因子(低估值有超额收益)
pe_factor = -0.0001 if base_features.loc[idx, 'pe_ratio'] < 20 else 0
# 质量因子(高质量有超额收益)
roe_factor = 0.00005 * base_features.loc[idx, 'roe'] * 100
# 生成日收益率
daily_returns = np.random.normal(
base_daily_return + industry_factor + pe_factor + roe_factor,
base_features.loc[idx, 'volatility'] * 0.01,
len(dates)
)
# 计算价格(从100开始)
prices = 100 * np.exp(np.cumsum(daily_returns))
price_data[stock] = prices
print(f"✅ 生成 {n_stocks} 只股票 {n_years} 年历史数据")
return price_data, base_features
def calculate_factors(self, features_data):
"""计算各种因子"""
print(f"🔢 计算选股因子...")
data = features_data.copy()
# 1. 价值因子
data['value_factor'] = (
(1 - data['pe_ratio'].rank(pct=True)) * 0.4 +
(1 - data['pb_ratio'].rank(pct=True)) * 0.3 +
data['dividend_yield'].rank(pct=True) * 0.3
)
# 2. 质量因子
data['quality_factor'] = (
data['roe'].rank(pct=True) * 0.4 +
(1 - data['volatility'].rank(pct=True)) * 0.3 +
data['profit_growth'].rank(pct=True) * 0.3
)
# 3. 成长因子
data['growth_factor'] = (
data['revenue_growth'].rank(pct=True) * 0.5 +
data['profit_growth'].rank(pct=True) * 0.5
)
# 4. 综合因子
data['composite_factor'] = (
data['value_factor'] * 0.4 +
data['quality_factor'] * 0.3 +
data['growth_factor'] * 0.3
)
print(f"✅ 因子计算完成")
return data
def test_selection_methods(self, price_data, features_data):
"""测试各种选股方法"""
print(f"📊 测试各种选股方法...")
# 计算月度收益率
monthly_prices = price_data.resample('ME').last()
monthly_returns = monthly_prices.pct_change()
results = {}
# 1. 价值因子选股
print(f"1. 测试价值因子选股...")
value_stocks = features_data.nlargest(50, 'value_factor')['stock_code'].tolist()
value_returns = monthly_returns[value_stocks].mean(axis=1)
results['value'] = self.calculate_performance(value_returns)
# 2. 质量因子选股
print(f"2. 测试质量因子选股...")
quality_stocks = features_data.nlargest(50, 'quality_factor')['stock_code'].tolist()
quality_returns = monthly_returns[quality_stocks].mean(axis=1)
results['quality'] = self.calculate_performance(quality_returns)
# 3. 成长因子选股
print(f"3. 测试成长因子选股...")
growth_stocks = features_data.nlargest(50, 'growth_factor')['stock_code'].tolist()
growth_returns = monthly_returns[growth_stocks].mean(axis=1)
results['growth'] = self.calculate_performance(growth_returns)
# 4. 综合因子选股
print(f"4. 测试综合因子选股...")
composite_stocks = features_data.nlargest(50, 'composite_factor')['stock_code'].tolist()
composite_returns = monthly_returns[composite_stocks].mean(axis=1)
results['composite'] = self.calculate_performance(composite_returns)
# 5. 基准(等权重全市场)
print(f"5. 计算基准收益...")
benchmark_returns = monthly_returns.mean(axis=1)
results['benchmark'] = self.calculate_performance(benchmark_returns)
print(f"✅ 选股方法测试完成")
return results
def calculate_performance(self, returns_series):
"""计算绩效指标"""
if len(returns_series) < 2:
return {}
# 年化收益率
annual_return = (1 + returns_series.mean()) ** 12 - 1
# 年化波动率
annual_vol = returns_series.std() * np.sqrt(12)
# 夏普比率(假设无风险利率3%
risk_free_rate = 0.03
sharpe_ratio = (annual_return - risk_free_rate) / annual_vol if annual_vol > 0 else 0
# 最大回撤
cumulative_returns = (1 + returns_series).cumprod()
running_max = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - running_max) / running_max
max_drawdown = drawdown.min()
# 胜率(月度正收益比例)
win_rate = (returns_series > 0).mean()
return {
'annual_return': annual_return,
'annual_volatility': annual_vol,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'win_rate': win_rate
}
def run_backtest(self):
"""运行完整回测"""
print(f"\n{'='*60}")
print("🚀 开始价值投资选股方法历史回测验证")
print(f"{'='*60}")
# 1. 生成历史数据
price_data, features_data = self.generate_historical_data(n_stocks=3000, n_years=10)
# 2. 计算因子
features_with_factors = self.calculate_factors(features_data)
# 3. 测试各种选股方法
results = self.test_selection_methods(price_data, features_with_factors)
# 4. 输出结果
self.output_results(results, features_with_factors)
return results
def output_results(self, results, features_data):
"""输出回测结果"""
print(f"\n{'='*60}")
print("📊 价值投资选股方法历史回测结果")
print(f"{'='*60}")
# 绩效对比
print(f"\n📈 绩效指标对比(年化):")
print(f"{'方法':<15} {'收益率':<10} {'波动率':<10} {'夏普比率':<10} {'最大回撤':<10} {'胜率':<10}")
print(f"{'-'*65}")
for method, metrics in results.items():
if method == 'benchmark':
method_name = '基准(全市场)'
elif method == 'value':
method_name = '价值因子'
elif method == 'quality':
method_name = '质量因子'
elif method == 'growth':
method_name = '成长因子'
elif method == 'composite':
method_name = '综合因子'
else:
method_name = method
if metrics:
print(f"{method_name:<15} {metrics['annual_return']*100:>6.2f}% {metrics['annual_volatility']*100:>6.2f}% {metrics['sharpe_ratio']:>8.3f} {metrics['max_drawdown']*100:>8.2f}% {metrics['win_rate']*100:>7.1f}%")
# 超额收益分析
print(f"\n🎯 超额收益分析(相对于基准):")
print(f"{'方法':<15} {'超额收益':<10} {'信息比率':<10}")
print(f"{'-'*35}")
benchmark_return = results['benchmark']['annual_return']
for method, metrics in results.items():
if method != 'benchmark' and metrics:
excess_return = metrics['annual_return'] - benchmark_return
# 简化信息比率计算
info_ratio = excess_return / metrics['annual_volatility'] if metrics['annual_volatility'] > 0 else 0
method_name = {
'value': '价值因子',
'quality': '质量因子',
'growth': '成长因子',
'composite': '综合因子'
}[method]
print(f"{method_name:<15} {excess_return*100:>6.2f}% {info_ratio:>8.3f}")
# 选股方法特征分析
print(f"\n🔬 各种选股方法的股票特征:")
print(f"{'方法':<15} {'平均PE':<10} {'平均PB':<10} {'平均ROE':<10} {'平均市值(亿)':<12}")
print(f"{'-'*57}")
methods = ['value', 'quality', 'growth', 'composite']
for method in methods:
if method == 'value':
top_stocks = features_data.nlargest(50, 'value_factor')
method_name = '价值因子'
elif method == 'quality':
top_stocks = features_data.nlargest(50, 'quality_factor')
method_name = '质量因子'
elif method == 'growth':
top_stocks = features_data.nlargest(50, 'growth_factor')
method_name = '成长因子'
elif method == 'composite':
top_stocks = features_data.nlargest(50, 'composite_factor')
method_name = '综合因子'
avg_pe = top_stocks['pe_ratio'].mean()
avg_pb = top_stocks['pb_ratio'].mean()
avg_roe = top_stocks['roe'].mean()
avg_mcap = top_stocks['market_cap'].mean()
print(f"{method_name:<15} {avg_pe:>8.1f} {avg_pb:>8.2f} {avg_roe*100:>8.1f}% {avg_mcap:>10.1f}")
# 结论和建议
print(f"\n🎯 调研结论和建议:")
print(f"1. ✅ 价值因子选股:低估值股票在长期有明显超额收益")
print(f"2. ✅ 质量因子选股:高质量股票波动率较低,风险调整后收益较好")
print(f"3. ⚠️ 成长因子选股:需要结合估值考虑,避免成长陷阱")
print(f"4. 🏆 综合因子选股:平衡价值、质量和成长,表现最稳定")
print(f"5. 📊 多因子方法优于单因子方法")
# 时间统计
elapsed = (datetime.now() - self.start_time).total_seconds()
print(f"\n⏰ 回测运行时间: {elapsed:.2f}")
print(f"🕐 完成时间: {datetime.now().strftime('%H:%M:%S')}")
# 保存结果
self.save_results(results, features_data)
def save_results(self, results, features_data):
"""保存结果"""
import os
# 创建输出目录
output_dir = "backtest_results"
os.makedirs(output_dir, exist_ok=True)
# 保存回测结果
results_df = pd.DataFrame(results).T
results_df.to_csv(f"{output_dir}/selection_methods_performance.csv")
# 保存因子数据
features_data.to_csv(f"{output_dir}/factor_data.csv", index=False)
# 保存报告
with open(f"{output_dir}/selection_methods_report.txt", 'w') as f:
f.write("="*60 + "\n")
f.write("价值投资选股方法历史回测验证报告\n")
f.write("="*60 + "\n\n")
f.write(f"回测时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"数据期间: 10年历史数据\n")
f.write(f"股票数量: 3000只A股\n\n")
f.write("绩效对比:\n")
f.write("-"*40 + "\n")
for method, metrics in results.items():
if metrics:
f.write(f"{method}: 年化收益率={metrics['annual_return']*100:.2f}%, 夏普比率={metrics['sharpe_ratio']:.3f}, 最大回撤={metrics['max_drawdown']*100:.2f}%\n")
print(f"\n💾 回测结果已保存到 {output_dir}/ 目录")
def main():
"""主函数"""
backtest = ValueInvestingBacktest()
results = backtest.run_backtest()
return results
if __name__ == "__main__":
main()
@@ -1,414 +0,0 @@
#!/usr/bin/env python3
"""
价值投资选股方法历史回测验证
庞统副军师 - 深度调研执行
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class StockSelectionBacktest:
"""选股方法历史回测验证框架"""
def __init__(self):
self.start_time = datetime.now()
print(f"🚀 价值投资选股方法历史回测验证启动")
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
print(f"🎯 保持active状态直到明早10点")
def simulate_historical_returns(self, n_stocks=3000, n_years=10):
"""模拟历史收益率数据"""
print(f"📈 模拟历史收益率数据...")
np.random.seed(42)
# 生成日期序列
end_date = datetime.now()
start_date = end_date - timedelta(days=n_years*365)
dates = pd.date_range(start=start_date, end=end_date, freq='D')
# 生成股票代码
stock_codes = [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)]
# 生成基础特征数据
base_features = pd.DataFrame({
'stock_code': stock_codes,
'industry': np.random.choice(['金融', '科技', '消费', '医药', '工业', '能源', '材料', '公用事业'], n_stocks),
'market_cap': np.random.uniform(50, 1000, n_stocks),
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'revenue_growth': np.random.uniform(-0.2, 0.5, n_stocks),
'profit_growth': np.random.uniform(-0.3, 0.6, n_stocks),
'dividend_yield': np.random.uniform(0, 0.05, n_stocks),
'volatility': np.random.uniform(0.2, 0.6, n_stocks),
'policy_score': np.random.uniform(0, 1, n_stocks),
'soe_reform_score': np.random.uniform(0, 1, n_stocks),
'specialized_score': np.random.uniform(0, 1, n_stocks),
'sentiment_score': np.random.uniform(0, 1, n_stocks)
})
# 生成月度收益率数据
monthly_dates = pd.date_range(start=start_date, end=end_date, freq='MS')
monthly_returns = pd.DataFrame(index=monthly_dates, columns=stock_codes)
for idx, stock in enumerate(stock_codes):
# 基础收益率(年化8-15%
base_monthly_return = np.random.uniform(0.006, 0.012)
# 根据特征调整收益率
# 低PE有超额收益
if base_features.loc[idx, 'pe_ratio'] < 20:
pe_premium = 0.002
else:
pe_premium = -0.001
# 高ROE有超额收益
roe_premium = base_features.loc[idx, 'roe'] * 0.01
# 高增长有超额收益但波动大
growth_premium = base_features.loc[idx, 'revenue_growth'] * 0.005
# 政策支持有超额收益
policy_premium = base_features.loc[idx, 'policy_score'] * 0.001
# 国企改革有超额收益
soe_premium = base_features.loc[idx, 'soe_reform_score'] * 0.001
# 专精特新有超额收益
specialized_premium = base_features.loc[idx, 'specialized_score'] * 0.001
# 情绪极端有反转收益
if base_features.loc[idx, 'sentiment_score'] < 0.2:
sentiment_premium = 0.003 # 悲观情绪反转收益
elif base_features.loc[idx, 'sentiment_score'] > 0.8:
sentiment_premium = -0.002 # 乐观情绪反转风险
else:
sentiment_premium = 0
# 计算月度收益率
expected_return = base_monthly_return + pe_premium + roe_premium + growth_premium + \
policy_premium + soe_premium + specialized_premium + sentiment_premium
# 添加随机波动
monthly_returns[stock] = np.random.normal(
expected_return,
base_features.loc[idx, 'volatility'] * 0.05,
len(monthly_dates)
)
print(f"✅ 模拟 {n_stocks} 只股票 {n_years} 年历史收益率数据")
return monthly_returns, base_features
def calculate_selection_scores(self, features_data):
"""计算各种选股方法的得分"""
print(f"🔢 计算选股方法得分...")
data = features_data.copy()
# 1. 传统价值因子得分(越低估值得分越高)
data['value_score'] = (
(1 - data['pe_ratio'].rank(pct=True)) * 0.4 +
(1 - data['pb_ratio'].rank(pct=True)) * 0.3 +
data['dividend_yield'].rank(pct=True) * 0.3
)
# 2. 质量因子得分
data['quality_score'] = (
data['roe'].rank(pct=True) * 0.4 +
(1 - data['volatility'].rank(pct=True)) * 0.3 +
data['profit_growth'].rank(pct=True) * 0.3
)
# 3. 成长因子得分
data['growth_score'] = (
data['revenue_growth'].rank(pct=True) * 0.5 +
data['profit_growth'].rank(pct=True) * 0.5
)
# 4. 政策驱动得分
data['policy_score_adj'] = data['policy_score']
# 5. 国企改革得分
data['soe_score_adj'] = data['soe_reform_score']
# 6. 专精特新得分
data['specialized_score_adj'] = data['specialized_score']
# 7. 情绪因子得分(情绪越悲观得分越高)
data['sentiment_score_adj'] = 1 - data['sentiment_score']
# 8. 综合得分(多因子综合)
data['composite_score'] = (
data['value_score'] * 0.2 + # 传统价值 20%
data['quality_score'] * 0.2 + # 质量因子 20%
data['growth_score'] * 0.1 + # 成长因子 10%
data['policy_score_adj'] * 0.1 + # 政策驱动 10%
data['soe_score_adj'] * 0.1 + # 国企改革 10%
data['specialized_score_adj'] * 0.1 + # 专精特新 10%
data['sentiment_score_adj'] * 0.1 + # 情绪因子 10%
(1 - data['volatility'].rank(pct=True)) * 0.1 # 风险控制 10%
)
print(f"✅ 选股方法得分计算完成")
return data
def test_selection_methods(self, monthly_returns, scored_data, portfolio_size=50):
"""测试各种选股方法"""
print(f"📊 测试各种选股方法...")
results = {}
# 基准:等权重全市场
print(f"1. 计算基准收益...")
benchmark_returns = monthly_returns.mean(axis=1)
results['benchmark'] = self.calculate_performance(benchmark_returns)
# 测试各种选股方法
methods = [
('value', 'value_score', '传统价值因子'),
('quality', 'quality_score', '质量因子'),
('growth', 'growth_score', '成长因子'),
('policy', 'policy_score_adj', '政策驱动'),
('soe', 'soe_score_adj', '国企改革'),
('specialized', 'specialized_score_adj', '专精特新'),
('sentiment', 'sentiment_score_adj', '情绪因子'),
('composite', 'composite_score', '综合因子')
]
for method_key, score_col, method_name in methods:
print(f"2. 测试{method_name}选股...")
# 选择得分最高的股票
top_stocks = scored_data.nlargest(portfolio_size, score_col)['stock_code'].tolist()
# 计算投资组合收益率
if top_stocks:
portfolio_returns = monthly_returns[top_stocks].mean(axis=1)
results[method_key] = self.calculate_performance(portfolio_returns)
results[method_key]['method_name'] = method_name
else:
results[method_key] = {'method_name': method_name, 'error': '无有效股票'}
print(f"✅ 所有选股方法测试完成")
return results
def calculate_performance(self, returns_series):
"""计算绩效指标"""
if len(returns_series) < 2:
return {'error': '数据不足'}
# 年化收益率
annual_return = (1 + returns_series.mean()) ** 12 - 1
# 年化波动率
annual_vol = returns_series.std() * np.sqrt(12)
# 夏普比率(假设无风险利率3%
risk_free_rate = 0.03
sharpe_ratio = (annual_return - risk_free_rate) / annual_vol if annual_vol > 0 else 0
# 最大回撤
cumulative_returns = (1 + returns_series).cumprod()
running_max = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - running_max) / running_max
max_drawdown = drawdown.min()
# 胜率(月度正收益比例)
win_rate = (returns_series > 0).mean()
# Calmar比率(年化收益/最大回撤)
calmar_ratio = abs(annual_return / max_drawdown) if max_drawdown < 0 else 0
return {
'annual_return': annual_return,
'annual_volatility': annual_vol,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'calmar_ratio': calmar_ratio
}
def run_backtest(self):
"""运行完整回测"""
print(f"\n{'='*60}")
print("🚀 开始价值投资选股方法历史回测验证")
print(f"{'='*60}")
# 1. 模拟历史数据
monthly_returns, features_data = self.simulate_historical_returns(n_stocks=3000, n_years=10)
# 2. 计算选股得分
scored_data = self.calculate_selection_scores(features_data)
# 3. 测试各种选股方法
results = self.test_selection_methods(monthly_returns, scored_data, portfolio_size=50)
# 4. 输出结果
self.output_results(results, scored_data)
return results
def output_results(self, results, scored_data):
"""输出回测结果"""
print(f"\n{'='*60}")
print("📊 价值投资选股方法历史回测结果")
print(f"{'='*60}")
# 绩效对比
print(f"\n📈 各种选股方法绩效对比(年化):")
print(f"{'方法':<15} {'收益率':<10} {'波动率':<10} {'夏普比率':<10} {'最大回撤':<10} {'胜率':<10} {'Calmar比率':<10}")
print(f"{'-'*85}")
# 基准
if 'benchmark' in results:
bench = results['benchmark']
print(f"{'基准(全市场)':<15} {bench['annual_return']*100:>6.2f}% {bench['annual_volatility']*100:>6.2f}% {bench['sharpe_ratio']:>8.3f} {bench['max_drawdown']*100:>8.2f}% {bench['win_rate']*100:>7.1f}% {bench['calmar_ratio']:>8.3f}")
# 各种选股方法
method_order = ['value', 'quality', 'growth', 'policy', 'soe', 'specialized', 'sentiment', 'composite']
for method_key in method_order:
if method_key in results and 'error' not in results[method_key]:
metrics = results[method_key]
method_name = metrics.get('method_name', method_key)
print(f"{method_name:<15} {metrics['annual_return']*100:>6.2f}% {metrics['annual_volatility']*100:>6.2f}% {metrics['sharpe_ratio']:>8.3f} {metrics['max_drawdown']*100:>8.2f}% {metrics['win_rate']*100:>7.1f}% {metrics['calmar_ratio']:>8.3f}")
# 超额收益分析
print(f"\n🎯 超额收益分析(相对于基准):")
print(f"{'方法':<15} {'超额收益':<10} {'信息比率':<10}")
print(f"{'-'*35}")
if 'benchmark' in results:
benchmark_return = results['benchmark']['annual_return']
for method_key in method_order:
if method_key in results and 'error' not in results[method_key]:
metrics = results[method_key]
method_name = metrics.get('method_name', method_key)
excess_return = metrics['annual_return'] - benchmark_return
# 简化信息比率计算
tracking_error = metrics['annual_volatility'] * 0.8 # 假设跟踪误差为波动率的80%
info_ratio = excess_return / tracking_error if tracking_error > 0 else 0
print(f"{method_name:<15} {excess_return*100:>6.2f}% {info_ratio:>8.3f}")
# 选股方法特征分析
print(f"\n🔬 各种选股方法的股票特征:")
print(f"{'方法':<15} {'平均PE':<10} {'平均PB':<10} {'平均ROE':<10} {'平均增长':<10} {'平均市值(亿)':<12}")
print(f"{'-'*67}")
for method_key, score_col, method_name in [
('value', 'value_score', '传统价值'),
('quality', 'quality_score', '质量因子'),
('growth', 'growth_score', '成长因子'),
('composite', 'composite_score', '综合因子')
]:
top_stocks = scored_data.nlargest(50, score_col)
avg_pe = top_stocks['pe_ratio'].mean()
avg_pb = top_stocks['pb_ratio'].mean()
avg_roe = top_stocks['roe'].mean()
avg_growth = top_stocks['revenue_growth'].mean()
avg_mcap = top_stocks['market_cap'].mean()
print(f"{method_name:<15} {avg_pe:>8.1f} {avg_pb:>8.2f} {avg_roe*100:>8.1f}% {avg_growth*100:>8.1f}% {avg_mcap:>10.1f}")
# 结论和建议
print(f"\n🎯 调研结论和建议:")
print(f"1. 🏆 综合因子选股表现最佳")
print(f" 优势: 平衡各种因子,风险调整后收益最高")
print(f" 特征: 合理估值+高质量+适度成长+特色机会")
print(f"2. ✅ 传统价值因子选股稳健有效")
print(f" 优势: 低估值提供安全边际,超额收益稳定")
print(f" 风险: 可能存在价值陷阱,需结合质量分析")
print(f"3. 📈 质量因子选股风险较低")
print(f" 优势: 波动率低,回撤控制好,适合保守投资者")
print(f" 特征: 高ROE、高盈利质量、财务健康")
print(f"4. ⚠️ 成长因子选股需谨慎")
print(f" 风险: 高估值、高波动、大回撤")
print(f" 建议: 必须结合估值,避免成长陷阱")
print(f"5. 🇨🇳 中国特色因子有价值")
print(f" 优势: 政策、国企改革、专精特新提供独特机会")
print(f" 应用: 作为补充因子,提高策略适应性")
print(f"6. 😊 情绪因子提供逆向机会")
print(f" 优势: 情绪极端时提供价值回归机会")
print(f" 应用: 作为战术调整因子,把握市场情绪")
# 推荐框架
print(f"\n🚀 推荐的价值投资选股框架:")
print(f"1. 核心策略: 多因子综合评分体系")
print(f" 权重建议: 价值30% + 质量25% + 成长15% + 特色20% + 风险10%")
print(f"2. 动态调整机制")
print(f" 根据市场环境调整因子权重")
print(f" 牛市提高成长因子权重")
print(f" 熊市提高价值和质量因子权重")
print(f" 政策敏感期提高特色因子权重")
print(f"3. 风险控制体系")
print(f" 个股风险控制: 分散投资,避免过度集中")
print(f" 行业风险控制: 行业中性,避免行业过度暴露")
print(f" 市场风险控制: 仓位管理,市场极端时降低仓位")
print(f" 流动性风险控制: 关注流动性,避免流动性风险")
# 时间统计
elapsed = (datetime.now() - self.start_time).total_seconds()
print(f"\n⏰ 回测运行时间: {elapsed:.2f}")
print(f"🕐 完成时间: {datetime.now().strftime('%H:%M:%S')}")
# 保存结果
self.save_results(results, scored_data)
def save_results(self, results, scored_data):
"""保存结果"""
import os
# 创建输出目录
output_dir = "selection_backtest_results"
os.makedirs(output_dir, exist_ok=True)
# 保存绩效结果
performance_df = pd.DataFrame(results).T
performance_df.to_csv(f"{output_dir}/performance_results.csv")
# 保存特征数据
scored_data.to_csv(f"{output_dir}/scored_stock_data.csv", index=False)
# 保存报告
with open(f"{output_dir}/backtest_report.txt", 'w') as f:
f.write("="*60 + "\n")
f.write("价值投资选股方法历史回测验证报告\n")
f.write("="*60 + "\n\n")
f.write(f"回测时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"数据期间: 10年历史数据\n")
f.write(f"股票数量: 3000只A股\n\n")
f.write("绩效对比:\n")
f.write("-"*40 + "\n")
for method, metrics in results.items():
if isinstance(metrics, dict) and 'annual_return' in metrics:
method_name = metrics.get('method_name', method)
f.write(f"{method_name}: 年化收益率={metrics['annual_return']*100:.2f}%, 夏普比率={metrics['sharpe_ratio']:.3f}, 最大回撤={metrics['max_drawdown']*100:.2f}%\n")
print(f"\n💾 回测结果已保存到 {output_dir}/ 目录")
def main():
"""主函数"""
backtest = StockSelectionBacktest()
results = backtest.run_backtest()
return results
if __name__ == "__main__":
main()
@@ -1,194 +0,0 @@
#!/usr/bin/env python3
"""
超级财务智能体 - 并行财务因子计算引擎
启动时间2026-03-21 17:45
截止时间18:00
"""
import sys
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
import warnings
warnings.filterwarnings('ignore')
class SuperFinancialAgent:
"""超级财务智能体"""
def __init__(self):
self.start_time = datetime.now()
self.cpu_cores = min(10, mp.cpu_count())
print(f"⚡ 超级财务智能体启动!")
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
print(f"🎯 目标时间: 18:00")
print(f"⏰ 剩余时间: {15}分钟")
print(f"💻 CPU核心: {self.cpu_cores}核并行")
def calculate_valuation_factors(self, stock_data):
"""计算估值因子(核心1-3"""
factors = {}
# 1. 市盈率因子
if 'pe_ratio' in stock_data.columns:
factors['pe_rank'] = stock_data['pe_ratio'].rank(pct=True)
factors['pe_zscore'] = (stock_data['pe_ratio'] - stock_data['pe_ratio'].mean()) / stock_data['pe_ratio'].std()
# 2. 市净率因子
if 'pb_ratio' in stock_data.columns:
factors['pb_rank'] = stock_data['pb_ratio'].rank(pct=True)
factors['pb_zscore'] = (stock_data['pb_ratio'] - stock_data['pb_ratio'].mean()) / stock_data['pb_ratio'].std()
# 3. 市销率因子
if 'ps_ratio' in stock_data.columns:
factors['ps_rank'] = stock_data['ps_ratio'].rank(pct=True)
factors['ps_zscore'] = (stock_data['ps_ratio'] - stock_data['ps_ratio'].mean()) / stock_data['ps_ratio'].std()
return factors
def calculate_quality_factors(self, stock_data):
"""计算质量因子(核心4-6"""
factors = {}
# 4. ROE因子
if 'roe' in stock_data.columns:
factors['roe_rank'] = stock_data['roe'].rank(pct=True)
factors['roe_stability'] = stock_data['roe'].rolling(5).std()
# 5. 毛利率因子
if 'gross_margin' in stock_data.columns:
factors['gross_margin_rank'] = stock_data['gross_margin'].rank(pct=True)
factors['margin_stability'] = stock_data['gross_margin'].rolling(5).std()
# 6. 现金流因子
if 'free_cash_flow' in stock_data.columns:
factors['fcf_rank'] = stock_data['free_cash_flow'].rank(pct=True)
factors['fcf_yield'] = stock_data['free_cash_flow'] / stock_data['market_cap']
return factors
def calculate_growth_factors(self, stock_data):
"""计算成长因子(核心7-8"""
factors = {}
# 7. 营收增长因子
if 'revenue' in stock_data.columns:
revenue_growth = stock_data['revenue'].pct_change(periods=4)
factors['revenue_growth_rank'] = revenue_growth.rank(pct=True)
# 8. 盈利增长因子
if 'net_profit' in stock_data.columns:
profit_growth = stock_data['net_profit'].pct_change(periods=4)
factors['profit_growth_rank'] = profit_growth.rank(pct=True)
return factors
def calculate_risk_factors(self, stock_data):
"""计算风险因子(核心9-10"""
factors = {}
# 9. 波动率因子
if 'close' in stock_data.columns:
volatility = stock_data['close'].rolling(20).std()
factors['volatility_rank'] = volatility.rank(pct=True)
# 10. 流动性因子
if 'volume' in stock_data.columns:
avg_volume = stock_data['volume'].rolling(20).mean()
factors['liquidity_rank'] = avg_volume.rank(pct=True)
return factors
def parallel_factor_calculation(self, stock_data):
"""并行计算所有因子"""
print(f"🔢 开始并行因子计算...")
# 准备任务
tasks = [
(self.calculate_valuation_factors, stock_data),
(self.calculate_quality_factors, stock_data),
(self.calculate_growth_factors, stock_data),
(self.calculate_risk_factors, stock_data)
]
# 并行计算
all_factors = {}
with ProcessPoolExecutor(max_workers=self.cpu_cores) as executor:
future_to_task = {executor.submit(func, data): (func.__name__, data) for func, data in tasks}
for future in as_completed(future_to_task):
task_name, _ = future_to_task[future]
try:
factors = future.result()
all_factors.update(factors)
print(f"{task_name} 计算完成")
except Exception as e:
print(f"{task_name} 计算失败: {e}")
return all_factors
def main():
"""主函数"""
agent = SuperFinancialAgent()
# 1. 生成模拟数据(实际项目中从数据源获取)
print(f"\n📊 生成模拟财务数据...")
np.random.seed(42)
n_stocks = 3000
stock_data = pd.DataFrame({
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'ps_ratio': np.random.uniform(0.5, 10, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
'free_cash_flow': np.random.uniform(1e6, 1e9, n_stocks),
'market_cap': np.random.uniform(1e8, 1e11, n_stocks),
'revenue': np.random.uniform(1e7, 1e10, n_stocks),
'net_profit': np.random.uniform(1e6, 1e9, n_stocks),
'close': np.random.uniform(10, 100, n_stocks),
'volume': np.random.uniform(1e5, 1e7, n_stocks)
})
print(f"✅ 生成 {n_stocks} 只股票财务数据")
# 2. 并行计算因子
factors = agent.parallel_factor_calculation(stock_data)
# 3. 合并因子数据
for factor_name, factor_values in factors.items():
stock_data[factor_name] = factor_values
# 4. 计算综合价值得分
print(f"\n🎯 计算综合价值得分...")
# 价值因子(越低越好)
value_factors = ['pe_rank', 'pb_rank', 'ps_rank']
value_score = stock_data[value_factors].mean(axis=1)
# 质量因子(越高越好)
quality_factors = ['roe_rank', 'gross_margin_rank', 'fcf_rank']
quality_score = stock_data[quality_factors].mean(axis=1)
# 综合得分:价值得分(高) + 质量得分(高)
stock_data['value_quality_score'] = (1 - value_score) * 0.6 + quality_score * 0.4
stock_data['value_quality_rank'] = stock_data['value_quality_score'].rank(ascending=False, pct=True)
# 5. 输出结果
print(f"\n📈 计算完成!")
print(f"⏰ 耗时: {(datetime.now() - agent.start_time).total_seconds():.2f}")
print(f"📊 总因子数: {len(factors)}")
print(f"🏆 综合价值得分计算完成")
# 显示前10名
top_stocks = stock_data.nlargest(10, 'value_quality_score')[['stock_code', 'value_quality_score', 'value_quality_rank']]
print(f"\n🏅 价值投资前10名:")
print(top_stocks.to_string(index=False))
return stock_data
if __name__ == "__main__":
stock_data = main()
@@ -1,345 +0,0 @@
#!/usr/bin/env python3
"""
价值投资策略回测框架
紧急提交时间18:13
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class ValueInvestingBacktest:
"""价值投资策略回测框架"""
def __init__(self):
self.start_time = datetime.now()
print(f"🚀 价值投资策略回测框架启动!")
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
print(f"🎯 紧急提交时间: 18:00(立即补交)")
def generate_price_data(self, n_stocks=3000, n_days=252):
"""生成价格数据(模拟)"""
print(f"📈 生成价格数据...")
np.random.seed(42)
# 生成基础价格数据
dates = pd.date_range(end=datetime.now(), periods=n_days, freq='B')
stock_codes = [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)]
# 创建价格DataFrame
price_data = pd.DataFrame(index=dates, columns=stock_codes)
# 为每只股票生成价格序列
for stock in stock_codes:
# 基础收益率(年化10-20%
base_return = np.random.uniform(0.0004, 0.0008, n_days)
# 随机波动
volatility = np.random.uniform(0.01, 0.03, n_days)
random_shocks = np.random.normal(0, volatility)
# 计算日收益率
daily_returns = base_return + random_shocks
# 计算价格(从100开始)
prices = 100 * np.exp(np.cumsum(daily_returns))
price_data[stock] = prices
print(f"✅ 生成 {n_stocks} 只股票 {n_days} 天价格数据")
return price_data
def generate_fundamental_data(self, n_stocks=3000):
"""生成基本面数据(模拟)"""
print(f"📊 生成基本面数据...")
np.random.seed(42)
fundamental_data = pd.DataFrame({
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
'industry': np.random.choice(['金融', '科技', '消费', '医药', '工业', '能源', '材料', '公用事业'], n_stocks),
'market_cap': np.random.uniform(50, 1000, n_stocks),
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'ps_ratio': np.random.uniform(0.5, 10, n_stocks),
'dividend_yield': np.random.uniform(0, 0.05, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
'net_margin': np.random.uniform(0.05, 0.25, n_stocks),
'debt_to_equity': np.random.uniform(0.1, 1.5, n_stocks),
'current_ratio': np.random.uniform(1, 3, n_stocks),
'revenue_growth': np.random.uniform(-0.2, 0.5, n_stocks),
'profit_growth': np.random.uniform(-0.3, 0.6, n_stocks),
'fcf_yield': np.random.uniform(0, 0.1, n_stocks)
})
print(f"✅ 生成 {n_stocks} 只股票基本面数据")
return fundamental_data
def calculate_value_score(self, fundamental_data):
"""计算价值得分"""
print(f"🔢 计算价值得分...")
data = fundamental_data.copy()
# 1. 估值因子得分(越低估值,得分越高)
data['value_score'] = (
(1 - data['pe_ratio'].rank(pct=True)) * 0.4 +
(1 - data['pb_ratio'].rank(pct=True)) * 0.3 +
(1 - data['ps_ratio'].rank(pct=True)) * 0.2 +
data['dividend_yield'].rank(pct=True) * 0.1
)
# 2. 质量因子得分(越高质量,得分越高)
data['quality_score'] = (
data['roe'].rank(pct=True) * 0.3 +
data['gross_margin'].rank(pct=True) * 0.2 +
data['net_margin'].rank(pct=True) * 0.2 +
(1 - data['debt_to_equity'].rank(pct=True)) * 0.15 +
data['current_ratio'].rank(pct=True) * 0.15
)
# 3. 综合价值得分
data['composite_value_score'] = data['value_score'] * 0.6 + data['quality_score'] * 0.4
print(f"✅ 价值得分计算完成")
return data
def select_portfolio(self, fundamental_data, portfolio_size=20):
"""选择投资组合"""
print(f"🏗️ 选择投资组合...")
# 按综合价值得分排序
top_stocks = fundamental_data.nlargest(portfolio_size, 'composite_value_score')
# 计算等权重
top_stocks['weight'] = 1.0 / portfolio_size
print(f"✅ 选择 {portfolio_size} 只股票投资组合")
return top_stocks
def run_backtest(self, price_data, portfolio, rebalance_freq='M'):
"""运行回测"""
print(f"📊 运行回测...")
# 获取投资组合股票代码
portfolio_stocks = portfolio['stock_code'].tolist()
# 只保留投资组合中的股票价格数据
portfolio_prices = price_data[portfolio_stocks]
# 计算投资组合每日收益率(等权重)
portfolio_returns = portfolio_prices.pct_change().mean(axis=1)
# 计算基准收益率(所有股票等权重)
benchmark_returns = price_data.pct_change().mean(axis=1)
# 计算累计收益率
portfolio_cumulative = (1 + portfolio_returns).cumprod()
benchmark_cumulative = (1 + benchmark_returns).cumprod()
# 计算回测指标
results = self.calculate_metrics(portfolio_returns, benchmark_returns)
print(f"✅ 回测完成")
return results, portfolio_returns, benchmark_returns, portfolio_cumulative, benchmark_cumulative
def calculate_metrics(self, portfolio_returns, benchmark_returns):
"""计算回测指标"""
# 年化收益率
annual_portfolio_return = (1 + portfolio_returns.mean()) ** 252 - 1
annual_benchmark_return = (1 + benchmark_returns.mean()) ** 252 - 1
# 年化波动率
annual_portfolio_vol = portfolio_returns.std() * np.sqrt(252)
annual_benchmark_vol = benchmark_returns.std() * np.sqrt(252)
# 夏普比率(假设无风险利率3%
risk_free_rate = 0.03
portfolio_sharpe = (annual_portfolio_return - risk_free_rate) / annual_portfolio_vol if annual_portfolio_vol > 0 else 0
benchmark_sharpe = (annual_benchmark_return - risk_free_rate) / annual_benchmark_vol if annual_benchmark_vol > 0 else 0
# 最大回撤
cumulative_returns = (1 + portfolio_returns).cumprod()
running_max = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - running_max) / running_max
max_drawdown = drawdown.min()
# 胜率
winning_months = (portfolio_returns > benchmark_returns).sum()
total_months = len(portfolio_returns)
win_rate = winning_months / total_months if total_months > 0 else 0
# 信息比率
active_returns = portfolio_returns - benchmark_returns
information_ratio = (active_returns.mean() * 252) / (active_returns.std() * np.sqrt(252)) if active_returns.std() > 0 else 0
results = {
'annual_return': annual_portfolio_return,
'annual_benchmark_return': annual_benchmark_return,
'annual_volatility': annual_portfolio_vol,
'benchmark_volatility': annual_benchmark_vol,
'sharpe_ratio': portfolio_sharpe,
'benchmark_sharpe': benchmark_sharpe,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'information_ratio': information_ratio,
'excess_return': annual_portfolio_return - annual_benchmark_return
}
return results
def run(self):
"""运行完整回测"""
print(f"\n{'='*60}")
print("🚀 价值投资策略回测开始")
print(f"{'='*60}")
# 1. 生成数据
price_data = self.generate_price_data(n_stocks=3000, n_days=252)
fundamental_data = self.generate_fundamental_data(n_stocks=3000)
# 2. 计算价值得分
scored_data = self.calculate_value_score(fundamental_data)
# 3. 选择投资组合
portfolio = self.select_portfolio(scored_data, portfolio_size=20)
# 4. 运行回测
results, portfolio_returns, benchmark_returns, portfolio_cumulative, benchmark_cumulative = self.run_backtest(
price_data, portfolio
)
# 5. 输出结果
self.output_results(results, portfolio, portfolio_cumulative, benchmark_cumulative)
return results, portfolio
def output_results(self, results, portfolio, portfolio_cumulative, benchmark_cumulative):
"""输出结果"""
print(f"\n{'='*60}")
print("📊 回测结果汇总")
print(f"{'='*60}")
# 业绩指标
print(f"\n📈 业绩指标:")
print(f"{'='*40}")
print(f"年化收益率: {results['annual_return']*100:.2f}%")
print(f"基准收益率: {results['annual_benchmark_return']*100:.2f}%")
print(f"超额收益: {results['excess_return']*100:.2f}%")
print(f"年化波动率: {results['annual_volatility']*100:.2f}%")
print(f"夏普比率: {results['sharpe_ratio']:.3f}")
print(f"基准夏普: {results['benchmark_sharpe']:.3f}")
print(f"最大回撤: {results['max_drawdown']*100:.2f}%")
print(f"胜率: {results['win_rate']*100:.1f}%")
print(f"信息比率: {results['information_ratio']:.3f}")
# 投资组合
print(f"\n🏆 投资组合(前10只):")
print(f"{'='*40}")
top_10 = portfolio.head(10)
display_cols = ['stock_code', 'industry', 'pe_ratio', 'pb_ratio', 'roe', 'composite_value_score', 'weight']
display_data = top_10[display_cols].copy()
display_data['roe'] = display_data['roe'].apply(lambda x: f"{x*100:.1f}%")
display_data['weight'] = display_data['weight'].apply(lambda x: f"{x*100:.1f}%")
display_data['composite_value_score'] = display_data['composite_value_score'].round(3)
print(display_data.to_string(index=False))
# 组合特征
print(f"\n📊 组合特征:")
print(f"{'='*40}")
print(f"平均PE: {portfolio['pe_ratio'].mean():.1f}")
print(f"平均PB: {portfolio['pb_ratio'].mean():.2f}")
print(f"平均ROE: {portfolio['roe'].mean()*100:.1f}%")
print(f"平均股息率: {portfolio['dividend_yield'].mean()*100:.2f}%")
print(f"平均市值: {portfolio['market_cap'].mean():.1f}亿")
# 累计收益率
final_portfolio_return = portfolio_cumulative.iloc[-1] - 1
final_benchmark_return = benchmark_cumulative.iloc[-1] - 1
print(f"\n💰 累计收益率:")
print(f"{'='*40}")
print(f"投资组合: {final_portfolio_return*100:.2f}%")
print(f"基准: {final_benchmark_return*100:.2f}%")
print(f"超额: {(final_portfolio_return - final_benchmark_return)*100:.2f}%")
# 时间统计
elapsed = (datetime.now() - self.start_time).total_seconds()
print(f"\n⏰ 回测运行时间: {elapsed:.2f}")
print(f"🕐 完成时间: {datetime.now().strftime('%H:%M:%S')}")
# 结论
print(f"\n🎯 结论:")
print(f"{'='*40}")
if results['excess_return'] > 0:
print(f"✅ 价值投资策略表现优于基准")
if results['sharpe_ratio'] > results['benchmark_sharpe']:
print(f"✅ 风险调整后收益也优于基准")
else:
print(f"⚠️ 风险调整后收益略低于基准")
else:
print(f"❌ 价值投资策略表现弱于基准")
# 建议
print(f"\n💡 建议:")
print(f"{'='*40}")
print(f"1. 考虑增加质量因子权重")
print(f"2. 优化估值因子组合")
print(f"3. 增加行业轮动机制")
print(f"4. 考虑市场周期调整")
# 保存结果
self.save_results(results, portfolio, portfolio_cumulative, benchmark_cumulative)
def save_results(self, results, portfolio, portfolio_cumulative, benchmark_cumulative):
"""保存结果"""
import os
# 创建输出目录
output_dir = "backtest_results"
os.makedirs(output_dir, exist_ok=True)
# 保存投资组合
portfolio.to_csv(f"{output_dir}/value_portfolio.csv", index=False)
# 保存回测结果
results_df = pd.DataFrame([results])
results_df.to_csv(f"{output_dir}/backtest_results.csv", index=False)
# 保存累计收益率
cumulative_df = pd.DataFrame({
'portfolio': portfolio_cumulative,
'benchmark': benchmark_cumulative
})
cumulative_df.to_csv(f"{output_dir}/cumulative_returns.csv")
# 保存报告
with open(f"{output_dir}/backtest_report.txt", 'w') as f:
f.write("="*60 + "\n")
f.write("价值投资策略回测报告\n")
f.write("="*60 + "\n\n")
f.write(f"回测时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"股票数量: {len(portfolio)}\n")
f.write(f"回测周期: 252个交易日\n\n")
f.write("业绩指标:\n")
f.write("-"*40 + "\n")
for key, value in results.items():
if 'return' in key or 'drawdown' in key or 'rate' in key:
f.write(f"{key}: {value*100:.2f}%\n")
else:
f.write(f"{key}: {value:.3f}\n")
print(f"\n💾 结果已保存到 {output_dir}/ 目录")
def main():
"""主函数"""
backtest = ValueInvestingBacktest()
results, portfolio = backtest.run()
return results, portfolio
if __name__ == "__main__":
main()