Files
sanguo_quant_live/strategies/structured-dynamic-factors-20260327/utils/dynamic_weight.py
T
2026-03-28 00:14:34 +08:00

106 lines
3.2 KiB
Python

"""
动态加权 - 根据滚动IC自动调整因子权重
IC信息系数越高,因子权重越大
"""
import pandas as pd
import numpy as np
from typing import Dict, List
from scipy.stats import spearmanr
class DynamicICWeightAdjuster:
"""
动态IC权重调整器
每月更新因子IC,根据IC均值调整权重
"""
def __init__(
self,
factor_names: List[str],
window_size: int = 12, # 滚动窗口月数
min_ic: float = -0.02,
base_weight: float = 0.02 # 基础权重,保证每个因子都有暴露
):
self.factor_names = factor_names
self.window_size = window_size
self.min_ic = min_ic
self.base_weight = base_weight
# 历史IC记录
self.ic_history: Dict[str, List[float]] = {n: [] for n in factor_names}
def update_monthly_ic(
self,
factor_df: pd.DataFrame,
forward_returns: pd.Series
) -> Dict[str, float]:
"""
更新月度IC
参数:
factor_df: 当期因子得分,每列一个因子
forward_returns: 下期收益率(要预测的目标)
"""
current_ic = {}
# 对齐数据
combined = pd.concat([factor_df, forward_returns], axis=1).dropna()
for name in self.factor_names:
if name not in combined.columns:
continue
# 计算Spearman秩相关系数IC
ic, _ = spearmanr(combined[name], combined[forward_returns.name])
current_ic[name] = ic
self.ic_history[name].append(ic)
# 保持窗口大小
if len(self.ic_history[name]) > self.window_size:
self.ic_history[name].pop(0)
return current_ic
def calculate_weights(self) -> Dict[str, float]:
"""根据滚动IC计算新权重"""
# 计算每个因子平均IC
avg_ic = {}
for name in self.factor_names:
ics = self.ic_history[name]
if len(ics) > 0:
avg_ic[name] = np.mean(ics)
else:
avg_ic[name] = 0
# IC转权重: IC越大权重越大,IC小于最小值只保留基础权重
weights = {}
for name in self.factor_names:
ic = avg_ic[name]
if ic < self.min_ic:
weights[name] = self.base_weight
else:
weights[name] = self.base_weight + max(0, ic)
# 归一化总和为1
total = sum(weights.values())
if total > 0:
weights = {k: v / total for k, v in weights.items()}
else:
# 全不好就等权重
n = len(self.factor_names)
weights = {k: 1.0 / n for k in self.factor_names}
return weights
def get_avg_ic(self) -> Dict[str, float]:
"""获取当前滚动平均IC"""
avg = {}
for name, ics in self.ic_history.items():
if len(ics) > 0:
avg[name] = np.mean(ics)
else:
avg[name] = 0
return avg
def get_ic_history(self) -> Dict[str, List[float]]:
return self.ic_history.copy()