345 lines
14 KiB
Python
345 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
价值投资策略回测框架
|
||
紧急提交时间:18:13
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from datetime import datetime, timedelta
|
||
import warnings
|
||
warnings.filterwarnings('ignore')
|
||
|
||
class ValueInvestingBacktest:
|
||
"""价值投资策略回测框架"""
|
||
|
||
def __init__(self):
|
||
self.start_time = datetime.now()
|
||
print(f"🚀 价值投资策略回测框架启动!")
|
||
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
|
||
print(f"🎯 紧急提交时间: 18:00(立即补交)")
|
||
|
||
def generate_price_data(self, n_stocks=3000, n_days=252):
|
||
"""生成价格数据(模拟)"""
|
||
print(f"📈 生成价格数据...")
|
||
|
||
np.random.seed(42)
|
||
|
||
# 生成基础价格数据
|
||
dates = pd.date_range(end=datetime.now(), periods=n_days, freq='B')
|
||
stock_codes = [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)]
|
||
|
||
# 创建价格DataFrame
|
||
price_data = pd.DataFrame(index=dates, columns=stock_codes)
|
||
|
||
# 为每只股票生成价格序列
|
||
for stock in stock_codes:
|
||
# 基础收益率(年化10-20%)
|
||
base_return = np.random.uniform(0.0004, 0.0008, n_days)
|
||
|
||
# 随机波动
|
||
volatility = np.random.uniform(0.01, 0.03, n_days)
|
||
random_shocks = np.random.normal(0, volatility)
|
||
|
||
# 计算日收益率
|
||
daily_returns = base_return + random_shocks
|
||
|
||
# 计算价格(从100开始)
|
||
prices = 100 * np.exp(np.cumsum(daily_returns))
|
||
price_data[stock] = prices
|
||
|
||
print(f"✅ 生成 {n_stocks} 只股票 {n_days} 天价格数据")
|
||
return price_data
|
||
|
||
def generate_fundamental_data(self, n_stocks=3000):
|
||
"""生成基本面数据(模拟)"""
|
||
print(f"📊 生成基本面数据...")
|
||
|
||
np.random.seed(42)
|
||
|
||
fundamental_data = pd.DataFrame({
|
||
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
|
||
'industry': np.random.choice(['金融', '科技', '消费', '医药', '工业', '能源', '材料', '公用事业'], n_stocks),
|
||
'market_cap': np.random.uniform(50, 1000, n_stocks),
|
||
'pe_ratio': np.random.uniform(5, 50, n_stocks),
|
||
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
|
||
'ps_ratio': np.random.uniform(0.5, 10, n_stocks),
|
||
'dividend_yield': np.random.uniform(0, 0.05, n_stocks),
|
||
'roe': np.random.uniform(0.05, 0.3, n_stocks),
|
||
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
|
||
'net_margin': np.random.uniform(0.05, 0.25, n_stocks),
|
||
'debt_to_equity': np.random.uniform(0.1, 1.5, n_stocks),
|
||
'current_ratio': np.random.uniform(1, 3, n_stocks),
|
||
'revenue_growth': np.random.uniform(-0.2, 0.5, n_stocks),
|
||
'profit_growth': np.random.uniform(-0.3, 0.6, n_stocks),
|
||
'fcf_yield': np.random.uniform(0, 0.1, n_stocks)
|
||
})
|
||
|
||
print(f"✅ 生成 {n_stocks} 只股票基本面数据")
|
||
return fundamental_data
|
||
|
||
def calculate_value_score(self, fundamental_data):
|
||
"""计算价值得分"""
|
||
print(f"🔢 计算价值得分...")
|
||
|
||
data = fundamental_data.copy()
|
||
|
||
# 1. 估值因子得分(越低估值,得分越高)
|
||
data['value_score'] = (
|
||
(1 - data['pe_ratio'].rank(pct=True)) * 0.4 +
|
||
(1 - data['pb_ratio'].rank(pct=True)) * 0.3 +
|
||
(1 - data['ps_ratio'].rank(pct=True)) * 0.2 +
|
||
data['dividend_yield'].rank(pct=True) * 0.1
|
||
)
|
||
|
||
# 2. 质量因子得分(越高质量,得分越高)
|
||
data['quality_score'] = (
|
||
data['roe'].rank(pct=True) * 0.3 +
|
||
data['gross_margin'].rank(pct=True) * 0.2 +
|
||
data['net_margin'].rank(pct=True) * 0.2 +
|
||
(1 - data['debt_to_equity'].rank(pct=True)) * 0.15 +
|
||
data['current_ratio'].rank(pct=True) * 0.15
|
||
)
|
||
|
||
# 3. 综合价值得分
|
||
data['composite_value_score'] = data['value_score'] * 0.6 + data['quality_score'] * 0.4
|
||
|
||
print(f"✅ 价值得分计算完成")
|
||
return data
|
||
|
||
def select_portfolio(self, fundamental_data, portfolio_size=20):
|
||
"""选择投资组合"""
|
||
print(f"🏗️ 选择投资组合...")
|
||
|
||
# 按综合价值得分排序
|
||
top_stocks = fundamental_data.nlargest(portfolio_size, 'composite_value_score')
|
||
|
||
# 计算等权重
|
||
top_stocks['weight'] = 1.0 / portfolio_size
|
||
|
||
print(f"✅ 选择 {portfolio_size} 只股票投资组合")
|
||
return top_stocks
|
||
|
||
def run_backtest(self, price_data, portfolio, rebalance_freq='M'):
|
||
"""运行回测"""
|
||
print(f"📊 运行回测...")
|
||
|
||
# 获取投资组合股票代码
|
||
portfolio_stocks = portfolio['stock_code'].tolist()
|
||
|
||
# 只保留投资组合中的股票价格数据
|
||
portfolio_prices = price_data[portfolio_stocks]
|
||
|
||
# 计算投资组合每日收益率(等权重)
|
||
portfolio_returns = portfolio_prices.pct_change().mean(axis=1)
|
||
|
||
# 计算基准收益率(所有股票等权重)
|
||
benchmark_returns = price_data.pct_change().mean(axis=1)
|
||
|
||
# 计算累计收益率
|
||
portfolio_cumulative = (1 + portfolio_returns).cumprod()
|
||
benchmark_cumulative = (1 + benchmark_returns).cumprod()
|
||
|
||
# 计算回测指标
|
||
results = self.calculate_metrics(portfolio_returns, benchmark_returns)
|
||
|
||
print(f"✅ 回测完成")
|
||
return results, portfolio_returns, benchmark_returns, portfolio_cumulative, benchmark_cumulative
|
||
|
||
def calculate_metrics(self, portfolio_returns, benchmark_returns):
|
||
"""计算回测指标"""
|
||
# 年化收益率
|
||
annual_portfolio_return = (1 + portfolio_returns.mean()) ** 252 - 1
|
||
annual_benchmark_return = (1 + benchmark_returns.mean()) ** 252 - 1
|
||
|
||
# 年化波动率
|
||
annual_portfolio_vol = portfolio_returns.std() * np.sqrt(252)
|
||
annual_benchmark_vol = benchmark_returns.std() * np.sqrt(252)
|
||
|
||
# 夏普比率(假设无风险利率3%)
|
||
risk_free_rate = 0.03
|
||
portfolio_sharpe = (annual_portfolio_return - risk_free_rate) / annual_portfolio_vol if annual_portfolio_vol > 0 else 0
|
||
benchmark_sharpe = (annual_benchmark_return - risk_free_rate) / annual_benchmark_vol if annual_benchmark_vol > 0 else 0
|
||
|
||
# 最大回撤
|
||
cumulative_returns = (1 + portfolio_returns).cumprod()
|
||
running_max = cumulative_returns.expanding().max()
|
||
drawdown = (cumulative_returns - running_max) / running_max
|
||
max_drawdown = drawdown.min()
|
||
|
||
# 胜率
|
||
winning_months = (portfolio_returns > benchmark_returns).sum()
|
||
total_months = len(portfolio_returns)
|
||
win_rate = winning_months / total_months if total_months > 0 else 0
|
||
|
||
# 信息比率
|
||
active_returns = portfolio_returns - benchmark_returns
|
||
information_ratio = (active_returns.mean() * 252) / (active_returns.std() * np.sqrt(252)) if active_returns.std() > 0 else 0
|
||
|
||
results = {
|
||
'annual_return': annual_portfolio_return,
|
||
'annual_benchmark_return': annual_benchmark_return,
|
||
'annual_volatility': annual_portfolio_vol,
|
||
'benchmark_volatility': annual_benchmark_vol,
|
||
'sharpe_ratio': portfolio_sharpe,
|
||
'benchmark_sharpe': benchmark_sharpe,
|
||
'max_drawdown': max_drawdown,
|
||
'win_rate': win_rate,
|
||
'information_ratio': information_ratio,
|
||
'excess_return': annual_portfolio_return - annual_benchmark_return
|
||
}
|
||
|
||
return results
|
||
|
||
def run(self):
|
||
"""运行完整回测"""
|
||
print(f"\n{'='*60}")
|
||
print("🚀 价值投资策略回测开始")
|
||
print(f"{'='*60}")
|
||
|
||
# 1. 生成数据
|
||
price_data = self.generate_price_data(n_stocks=3000, n_days=252)
|
||
fundamental_data = self.generate_fundamental_data(n_stocks=3000)
|
||
|
||
# 2. 计算价值得分
|
||
scored_data = self.calculate_value_score(fundamental_data)
|
||
|
||
# 3. 选择投资组合
|
||
portfolio = self.select_portfolio(scored_data, portfolio_size=20)
|
||
|
||
# 4. 运行回测
|
||
results, portfolio_returns, benchmark_returns, portfolio_cumulative, benchmark_cumulative = self.run_backtest(
|
||
price_data, portfolio
|
||
)
|
||
|
||
# 5. 输出结果
|
||
self.output_results(results, portfolio, portfolio_cumulative, benchmark_cumulative)
|
||
|
||
return results, portfolio
|
||
|
||
def output_results(self, results, portfolio, portfolio_cumulative, benchmark_cumulative):
|
||
"""输出结果"""
|
||
print(f"\n{'='*60}")
|
||
print("📊 回测结果汇总")
|
||
print(f"{'='*60}")
|
||
|
||
# 业绩指标
|
||
print(f"\n📈 业绩指标:")
|
||
print(f"{'='*40}")
|
||
print(f"年化收益率: {results['annual_return']*100:.2f}%")
|
||
print(f"基准收益率: {results['annual_benchmark_return']*100:.2f}%")
|
||
print(f"超额收益: {results['excess_return']*100:.2f}%")
|
||
print(f"年化波动率: {results['annual_volatility']*100:.2f}%")
|
||
print(f"夏普比率: {results['sharpe_ratio']:.3f}")
|
||
print(f"基准夏普: {results['benchmark_sharpe']:.3f}")
|
||
print(f"最大回撤: {results['max_drawdown']*100:.2f}%")
|
||
print(f"胜率: {results['win_rate']*100:.1f}%")
|
||
print(f"信息比率: {results['information_ratio']:.3f}")
|
||
|
||
# 投资组合
|
||
print(f"\n🏆 投资组合(前10只):")
|
||
print(f"{'='*40}")
|
||
top_10 = portfolio.head(10)
|
||
display_cols = ['stock_code', 'industry', 'pe_ratio', 'pb_ratio', 'roe', 'composite_value_score', 'weight']
|
||
display_data = top_10[display_cols].copy()
|
||
display_data['roe'] = display_data['roe'].apply(lambda x: f"{x*100:.1f}%")
|
||
display_data['weight'] = display_data['weight'].apply(lambda x: f"{x*100:.1f}%")
|
||
display_data['composite_value_score'] = display_data['composite_value_score'].round(3)
|
||
print(display_data.to_string(index=False))
|
||
|
||
# 组合特征
|
||
print(f"\n📊 组合特征:")
|
||
print(f"{'='*40}")
|
||
print(f"平均PE: {portfolio['pe_ratio'].mean():.1f}")
|
||
print(f"平均PB: {portfolio['pb_ratio'].mean():.2f}")
|
||
print(f"平均ROE: {portfolio['roe'].mean()*100:.1f}%")
|
||
print(f"平均股息率: {portfolio['dividend_yield'].mean()*100:.2f}%")
|
||
print(f"平均市值: {portfolio['market_cap'].mean():.1f}亿")
|
||
|
||
# 累计收益率
|
||
final_portfolio_return = portfolio_cumulative.iloc[-1] - 1
|
||
final_benchmark_return = benchmark_cumulative.iloc[-1] - 1
|
||
print(f"\n💰 累计收益率:")
|
||
print(f"{'='*40}")
|
||
print(f"投资组合: {final_portfolio_return*100:.2f}%")
|
||
print(f"基准: {final_benchmark_return*100:.2f}%")
|
||
print(f"超额: {(final_portfolio_return - final_benchmark_return)*100:.2f}%")
|
||
|
||
# 时间统计
|
||
elapsed = (datetime.now() - self.start_time).total_seconds()
|
||
print(f"\n⏰ 回测运行时间: {elapsed:.2f}秒")
|
||
print(f"🕐 完成时间: {datetime.now().strftime('%H:%M:%S')}")
|
||
|
||
# 结论
|
||
print(f"\n🎯 结论:")
|
||
print(f"{'='*40}")
|
||
if results['excess_return'] > 0:
|
||
print(f"✅ 价值投资策略表现优于基准")
|
||
if results['sharpe_ratio'] > results['benchmark_sharpe']:
|
||
print(f"✅ 风险调整后收益也优于基准")
|
||
else:
|
||
print(f"⚠️ 风险调整后收益略低于基准")
|
||
else:
|
||
print(f"❌ 价值投资策略表现弱于基准")
|
||
|
||
# 建议
|
||
print(f"\n💡 建议:")
|
||
print(f"{'='*40}")
|
||
print(f"1. 考虑增加质量因子权重")
|
||
print(f"2. 优化估值因子组合")
|
||
print(f"3. 增加行业轮动机制")
|
||
print(f"4. 考虑市场周期调整")
|
||
|
||
# 保存结果
|
||
self.save_results(results, portfolio, portfolio_cumulative, benchmark_cumulative)
|
||
|
||
def save_results(self, results, portfolio, portfolio_cumulative, benchmark_cumulative):
|
||
"""保存结果"""
|
||
import os
|
||
|
||
# 创建输出目录
|
||
output_dir = "backtest_results"
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# 保存投资组合
|
||
portfolio.to_csv(f"{output_dir}/value_portfolio.csv", index=False)
|
||
|
||
# 保存回测结果
|
||
results_df = pd.DataFrame([results])
|
||
results_df.to_csv(f"{output_dir}/backtest_results.csv", index=False)
|
||
|
||
# 保存累计收益率
|
||
cumulative_df = pd.DataFrame({
|
||
'portfolio': portfolio_cumulative,
|
||
'benchmark': benchmark_cumulative
|
||
})
|
||
cumulative_df.to_csv(f"{output_dir}/cumulative_returns.csv")
|
||
|
||
# 保存报告
|
||
with open(f"{output_dir}/backtest_report.txt", 'w') as f:
|
||
f.write("="*60 + "\n")
|
||
f.write("价值投资策略回测报告\n")
|
||
f.write("="*60 + "\n\n")
|
||
f.write(f"回测时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
f.write(f"股票数量: {len(portfolio)}\n")
|
||
f.write(f"回测周期: 252个交易日\n\n")
|
||
|
||
f.write("业绩指标:\n")
|
||
f.write("-"*40 + "\n")
|
||
for key, value in results.items():
|
||
if 'return' in key or 'drawdown' in key or 'rate' in key:
|
||
f.write(f"{key}: {value*100:.2f}%\n")
|
||
else:
|
||
f.write(f"{key}: {value:.3f}\n")
|
||
|
||
print(f"\n💾 结果已保存到 {output_dir}/ 目录")
|
||
|
||
def main():
|
||
"""主函数"""
|
||
backtest = ValueInvestingBacktest()
|
||
results, portfolio = backtest.run()
|
||
|
||
return results, portfolio
|
||
|
||
if __name__ == "__main__":
|
||
main() |