Files

345 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
价值投资策略回测框架
紧急提交时间:18:13
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class ValueInvestingBacktest:
"""价值投资策略回测框架"""
def __init__(self):
self.start_time = datetime.now()
print(f"🚀 价值投资策略回测框架启动!")
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
print(f"🎯 紧急提交时间: 18:00(立即补交)")
def generate_price_data(self, n_stocks=3000, n_days=252):
"""生成价格数据(模拟)"""
print(f"📈 生成价格数据...")
np.random.seed(42)
# 生成基础价格数据
dates = pd.date_range(end=datetime.now(), periods=n_days, freq='B')
stock_codes = [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)]
# 创建价格DataFrame
price_data = pd.DataFrame(index=dates, columns=stock_codes)
# 为每只股票生成价格序列
for stock in stock_codes:
# 基础收益率(年化10-20%
base_return = np.random.uniform(0.0004, 0.0008, n_days)
# 随机波动
volatility = np.random.uniform(0.01, 0.03, n_days)
random_shocks = np.random.normal(0, volatility)
# 计算日收益率
daily_returns = base_return + random_shocks
# 计算价格(从100开始)
prices = 100 * np.exp(np.cumsum(daily_returns))
price_data[stock] = prices
print(f"✅ 生成 {n_stocks} 只股票 {n_days} 天价格数据")
return price_data
def generate_fundamental_data(self, n_stocks=3000):
"""生成基本面数据(模拟)"""
print(f"📊 生成基本面数据...")
np.random.seed(42)
fundamental_data = pd.DataFrame({
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
'industry': np.random.choice(['金融', '科技', '消费', '医药', '工业', '能源', '材料', '公用事业'], n_stocks),
'market_cap': np.random.uniform(50, 1000, n_stocks),
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'ps_ratio': np.random.uniform(0.5, 10, n_stocks),
'dividend_yield': np.random.uniform(0, 0.05, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
'net_margin': np.random.uniform(0.05, 0.25, n_stocks),
'debt_to_equity': np.random.uniform(0.1, 1.5, n_stocks),
'current_ratio': np.random.uniform(1, 3, n_stocks),
'revenue_growth': np.random.uniform(-0.2, 0.5, n_stocks),
'profit_growth': np.random.uniform(-0.3, 0.6, n_stocks),
'fcf_yield': np.random.uniform(0, 0.1, n_stocks)
})
print(f"✅ 生成 {n_stocks} 只股票基本面数据")
return fundamental_data
def calculate_value_score(self, fundamental_data):
"""计算价值得分"""
print(f"🔢 计算价值得分...")
data = fundamental_data.copy()
# 1. 估值因子得分(越低估值,得分越高)
data['value_score'] = (
(1 - data['pe_ratio'].rank(pct=True)) * 0.4 +
(1 - data['pb_ratio'].rank(pct=True)) * 0.3 +
(1 - data['ps_ratio'].rank(pct=True)) * 0.2 +
data['dividend_yield'].rank(pct=True) * 0.1
)
# 2. 质量因子得分(越高质量,得分越高)
data['quality_score'] = (
data['roe'].rank(pct=True) * 0.3 +
data['gross_margin'].rank(pct=True) * 0.2 +
data['net_margin'].rank(pct=True) * 0.2 +
(1 - data['debt_to_equity'].rank(pct=True)) * 0.15 +
data['current_ratio'].rank(pct=True) * 0.15
)
# 3. 综合价值得分
data['composite_value_score'] = data['value_score'] * 0.6 + data['quality_score'] * 0.4
print(f"✅ 价值得分计算完成")
return data
def select_portfolio(self, fundamental_data, portfolio_size=20):
"""选择投资组合"""
print(f"🏗️ 选择投资组合...")
# 按综合价值得分排序
top_stocks = fundamental_data.nlargest(portfolio_size, 'composite_value_score')
# 计算等权重
top_stocks['weight'] = 1.0 / portfolio_size
print(f"✅ 选择 {portfolio_size} 只股票投资组合")
return top_stocks
def run_backtest(self, price_data, portfolio, rebalance_freq='M'):
"""运行回测"""
print(f"📊 运行回测...")
# 获取投资组合股票代码
portfolio_stocks = portfolio['stock_code'].tolist()
# 只保留投资组合中的股票价格数据
portfolio_prices = price_data[portfolio_stocks]
# 计算投资组合每日收益率(等权重)
portfolio_returns = portfolio_prices.pct_change().mean(axis=1)
# 计算基准收益率(所有股票等权重)
benchmark_returns = price_data.pct_change().mean(axis=1)
# 计算累计收益率
portfolio_cumulative = (1 + portfolio_returns).cumprod()
benchmark_cumulative = (1 + benchmark_returns).cumprod()
# 计算回测指标
results = self.calculate_metrics(portfolio_returns, benchmark_returns)
print(f"✅ 回测完成")
return results, portfolio_returns, benchmark_returns, portfolio_cumulative, benchmark_cumulative
def calculate_metrics(self, portfolio_returns, benchmark_returns):
"""计算回测指标"""
# 年化收益率
annual_portfolio_return = (1 + portfolio_returns.mean()) ** 252 - 1
annual_benchmark_return = (1 + benchmark_returns.mean()) ** 252 - 1
# 年化波动率
annual_portfolio_vol = portfolio_returns.std() * np.sqrt(252)
annual_benchmark_vol = benchmark_returns.std() * np.sqrt(252)
# 夏普比率(假设无风险利率3%
risk_free_rate = 0.03
portfolio_sharpe = (annual_portfolio_return - risk_free_rate) / annual_portfolio_vol if annual_portfolio_vol > 0 else 0
benchmark_sharpe = (annual_benchmark_return - risk_free_rate) / annual_benchmark_vol if annual_benchmark_vol > 0 else 0
# 最大回撤
cumulative_returns = (1 + portfolio_returns).cumprod()
running_max = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - running_max) / running_max
max_drawdown = drawdown.min()
# 胜率
winning_months = (portfolio_returns > benchmark_returns).sum()
total_months = len(portfolio_returns)
win_rate = winning_months / total_months if total_months > 0 else 0
# 信息比率
active_returns = portfolio_returns - benchmark_returns
information_ratio = (active_returns.mean() * 252) / (active_returns.std() * np.sqrt(252)) if active_returns.std() > 0 else 0
results = {
'annual_return': annual_portfolio_return,
'annual_benchmark_return': annual_benchmark_return,
'annual_volatility': annual_portfolio_vol,
'benchmark_volatility': annual_benchmark_vol,
'sharpe_ratio': portfolio_sharpe,
'benchmark_sharpe': benchmark_sharpe,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'information_ratio': information_ratio,
'excess_return': annual_portfolio_return - annual_benchmark_return
}
return results
def run(self):
"""运行完整回测"""
print(f"\n{'='*60}")
print("🚀 价值投资策略回测开始")
print(f"{'='*60}")
# 1. 生成数据
price_data = self.generate_price_data(n_stocks=3000, n_days=252)
fundamental_data = self.generate_fundamental_data(n_stocks=3000)
# 2. 计算价值得分
scored_data = self.calculate_value_score(fundamental_data)
# 3. 选择投资组合
portfolio = self.select_portfolio(scored_data, portfolio_size=20)
# 4. 运行回测
results, portfolio_returns, benchmark_returns, portfolio_cumulative, benchmark_cumulative = self.run_backtest(
price_data, portfolio
)
# 5. 输出结果
self.output_results(results, portfolio, portfolio_cumulative, benchmark_cumulative)
return results, portfolio
def output_results(self, results, portfolio, portfolio_cumulative, benchmark_cumulative):
"""输出结果"""
print(f"\n{'='*60}")
print("📊 回测结果汇总")
print(f"{'='*60}")
# 业绩指标
print(f"\n📈 业绩指标:")
print(f"{'='*40}")
print(f"年化收益率: {results['annual_return']*100:.2f}%")
print(f"基准收益率: {results['annual_benchmark_return']*100:.2f}%")
print(f"超额收益: {results['excess_return']*100:.2f}%")
print(f"年化波动率: {results['annual_volatility']*100:.2f}%")
print(f"夏普比率: {results['sharpe_ratio']:.3f}")
print(f"基准夏普: {results['benchmark_sharpe']:.3f}")
print(f"最大回撤: {results['max_drawdown']*100:.2f}%")
print(f"胜率: {results['win_rate']*100:.1f}%")
print(f"信息比率: {results['information_ratio']:.3f}")
# 投资组合
print(f"\n🏆 投资组合(前10只):")
print(f"{'='*40}")
top_10 = portfolio.head(10)
display_cols = ['stock_code', 'industry', 'pe_ratio', 'pb_ratio', 'roe', 'composite_value_score', 'weight']
display_data = top_10[display_cols].copy()
display_data['roe'] = display_data['roe'].apply(lambda x: f"{x*100:.1f}%")
display_data['weight'] = display_data['weight'].apply(lambda x: f"{x*100:.1f}%")
display_data['composite_value_score'] = display_data['composite_value_score'].round(3)
print(display_data.to_string(index=False))
# 组合特征
print(f"\n📊 组合特征:")
print(f"{'='*40}")
print(f"平均PE: {portfolio['pe_ratio'].mean():.1f}")
print(f"平均PB: {portfolio['pb_ratio'].mean():.2f}")
print(f"平均ROE: {portfolio['roe'].mean()*100:.1f}%")
print(f"平均股息率: {portfolio['dividend_yield'].mean()*100:.2f}%")
print(f"平均市值: {portfolio['market_cap'].mean():.1f}亿")
# 累计收益率
final_portfolio_return = portfolio_cumulative.iloc[-1] - 1
final_benchmark_return = benchmark_cumulative.iloc[-1] - 1
print(f"\n💰 累计收益率:")
print(f"{'='*40}")
print(f"投资组合: {final_portfolio_return*100:.2f}%")
print(f"基准: {final_benchmark_return*100:.2f}%")
print(f"超额: {(final_portfolio_return - final_benchmark_return)*100:.2f}%")
# 时间统计
elapsed = (datetime.now() - self.start_time).total_seconds()
print(f"\n⏰ 回测运行时间: {elapsed:.2f}")
print(f"🕐 完成时间: {datetime.now().strftime('%H:%M:%S')}")
# 结论
print(f"\n🎯 结论:")
print(f"{'='*40}")
if results['excess_return'] > 0:
print(f"✅ 价值投资策略表现优于基准")
if results['sharpe_ratio'] > results['benchmark_sharpe']:
print(f"✅ 风险调整后收益也优于基准")
else:
print(f"⚠️ 风险调整后收益略低于基准")
else:
print(f"❌ 价值投资策略表现弱于基准")
# 建议
print(f"\n💡 建议:")
print(f"{'='*40}")
print(f"1. 考虑增加质量因子权重")
print(f"2. 优化估值因子组合")
print(f"3. 增加行业轮动机制")
print(f"4. 考虑市场周期调整")
# 保存结果
self.save_results(results, portfolio, portfolio_cumulative, benchmark_cumulative)
def save_results(self, results, portfolio, portfolio_cumulative, benchmark_cumulative):
"""保存结果"""
import os
# 创建输出目录
output_dir = "backtest_results"
os.makedirs(output_dir, exist_ok=True)
# 保存投资组合
portfolio.to_csv(f"{output_dir}/value_portfolio.csv", index=False)
# 保存回测结果
results_df = pd.DataFrame([results])
results_df.to_csv(f"{output_dir}/backtest_results.csv", index=False)
# 保存累计收益率
cumulative_df = pd.DataFrame({
'portfolio': portfolio_cumulative,
'benchmark': benchmark_cumulative
})
cumulative_df.to_csv(f"{output_dir}/cumulative_returns.csv")
# 保存报告
with open(f"{output_dir}/backtest_report.txt", 'w') as f:
f.write("="*60 + "\n")
f.write("价值投资策略回测报告\n")
f.write("="*60 + "\n\n")
f.write(f"回测时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"股票数量: {len(portfolio)}\n")
f.write(f"回测周期: 252个交易日\n\n")
f.write("业绩指标:\n")
f.write("-"*40 + "\n")
for key, value in results.items():
if 'return' in key or 'drawdown' in key or 'rate' in key:
f.write(f"{key}: {value*100:.2f}%\n")
else:
f.write(f"{key}: {value:.3f}\n")
print(f"\n💾 结果已保存到 {output_dir}/ 目录")
def main():
"""主函数"""
backtest = ValueInvestingBacktest()
results, portfolio = backtest.run()
return results, portfolio
if __name__ == "__main__":
main()