194 lines
7.4 KiB
Python
194 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
超级财务智能体 - 并行财务因子计算引擎
|
||
启动时间:2026-03-21 17:45
|
||
截止时间:18:00
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import pandas as pd
|
||
import numpy as np
|
||
from datetime import datetime, timedelta
|
||
import multiprocessing as mp
|
||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||
import warnings
|
||
warnings.filterwarnings('ignore')
|
||
|
||
class SuperFinancialAgent:
|
||
"""超级财务智能体"""
|
||
|
||
def __init__(self):
|
||
self.start_time = datetime.now()
|
||
self.cpu_cores = min(10, mp.cpu_count())
|
||
print(f"⚡ 超级财务智能体启动!")
|
||
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
|
||
print(f"🎯 目标时间: 18:00")
|
||
print(f"⏰ 剩余时间: {15}分钟")
|
||
print(f"💻 CPU核心: {self.cpu_cores}核并行")
|
||
|
||
def calculate_valuation_factors(self, stock_data):
|
||
"""计算估值因子(核心1-3)"""
|
||
factors = {}
|
||
|
||
# 1. 市盈率因子
|
||
if 'pe_ratio' in stock_data.columns:
|
||
factors['pe_rank'] = stock_data['pe_ratio'].rank(pct=True)
|
||
factors['pe_zscore'] = (stock_data['pe_ratio'] - stock_data['pe_ratio'].mean()) / stock_data['pe_ratio'].std()
|
||
|
||
# 2. 市净率因子
|
||
if 'pb_ratio' in stock_data.columns:
|
||
factors['pb_rank'] = stock_data['pb_ratio'].rank(pct=True)
|
||
factors['pb_zscore'] = (stock_data['pb_ratio'] - stock_data['pb_ratio'].mean()) / stock_data['pb_ratio'].std()
|
||
|
||
# 3. 市销率因子
|
||
if 'ps_ratio' in stock_data.columns:
|
||
factors['ps_rank'] = stock_data['ps_ratio'].rank(pct=True)
|
||
factors['ps_zscore'] = (stock_data['ps_ratio'] - stock_data['ps_ratio'].mean()) / stock_data['ps_ratio'].std()
|
||
|
||
return factors
|
||
|
||
def calculate_quality_factors(self, stock_data):
|
||
"""计算质量因子(核心4-6)"""
|
||
factors = {}
|
||
|
||
# 4. ROE因子
|
||
if 'roe' in stock_data.columns:
|
||
factors['roe_rank'] = stock_data['roe'].rank(pct=True)
|
||
factors['roe_stability'] = stock_data['roe'].rolling(5).std()
|
||
|
||
# 5. 毛利率因子
|
||
if 'gross_margin' in stock_data.columns:
|
||
factors['gross_margin_rank'] = stock_data['gross_margin'].rank(pct=True)
|
||
factors['margin_stability'] = stock_data['gross_margin'].rolling(5).std()
|
||
|
||
# 6. 现金流因子
|
||
if 'free_cash_flow' in stock_data.columns:
|
||
factors['fcf_rank'] = stock_data['free_cash_flow'].rank(pct=True)
|
||
factors['fcf_yield'] = stock_data['free_cash_flow'] / stock_data['market_cap']
|
||
|
||
return factors
|
||
|
||
def calculate_growth_factors(self, stock_data):
|
||
"""计算成长因子(核心7-8)"""
|
||
factors = {}
|
||
|
||
# 7. 营收增长因子
|
||
if 'revenue' in stock_data.columns:
|
||
revenue_growth = stock_data['revenue'].pct_change(periods=4)
|
||
factors['revenue_growth_rank'] = revenue_growth.rank(pct=True)
|
||
|
||
# 8. 盈利增长因子
|
||
if 'net_profit' in stock_data.columns:
|
||
profit_growth = stock_data['net_profit'].pct_change(periods=4)
|
||
factors['profit_growth_rank'] = profit_growth.rank(pct=True)
|
||
|
||
return factors
|
||
|
||
def calculate_risk_factors(self, stock_data):
|
||
"""计算风险因子(核心9-10)"""
|
||
factors = {}
|
||
|
||
# 9. 波动率因子
|
||
if 'close' in stock_data.columns:
|
||
volatility = stock_data['close'].rolling(20).std()
|
||
factors['volatility_rank'] = volatility.rank(pct=True)
|
||
|
||
# 10. 流动性因子
|
||
if 'volume' in stock_data.columns:
|
||
avg_volume = stock_data['volume'].rolling(20).mean()
|
||
factors['liquidity_rank'] = avg_volume.rank(pct=True)
|
||
|
||
return factors
|
||
|
||
def parallel_factor_calculation(self, stock_data):
|
||
"""并行计算所有因子"""
|
||
print(f"🔢 开始并行因子计算...")
|
||
|
||
# 准备任务
|
||
tasks = [
|
||
(self.calculate_valuation_factors, stock_data),
|
||
(self.calculate_quality_factors, stock_data),
|
||
(self.calculate_growth_factors, stock_data),
|
||
(self.calculate_risk_factors, stock_data)
|
||
]
|
||
|
||
# 并行计算
|
||
all_factors = {}
|
||
with ProcessPoolExecutor(max_workers=self.cpu_cores) as executor:
|
||
future_to_task = {executor.submit(func, data): (func.__name__, data) for func, data in tasks}
|
||
|
||
for future in as_completed(future_to_task):
|
||
task_name, _ = future_to_task[future]
|
||
try:
|
||
factors = future.result()
|
||
all_factors.update(factors)
|
||
print(f"✅ {task_name} 计算完成")
|
||
except Exception as e:
|
||
print(f"❌ {task_name} 计算失败: {e}")
|
||
|
||
return all_factors
|
||
|
||
def main():
|
||
"""主函数"""
|
||
agent = SuperFinancialAgent()
|
||
|
||
# 1. 生成模拟数据(实际项目中从数据源获取)
|
||
print(f"\n📊 生成模拟财务数据...")
|
||
np.random.seed(42)
|
||
n_stocks = 3000
|
||
|
||
stock_data = pd.DataFrame({
|
||
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
|
||
'pe_ratio': np.random.uniform(5, 50, n_stocks),
|
||
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
|
||
'ps_ratio': np.random.uniform(0.5, 10, n_stocks),
|
||
'roe': np.random.uniform(0.05, 0.3, n_stocks),
|
||
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
|
||
'free_cash_flow': np.random.uniform(1e6, 1e9, n_stocks),
|
||
'market_cap': np.random.uniform(1e8, 1e11, n_stocks),
|
||
'revenue': np.random.uniform(1e7, 1e10, n_stocks),
|
||
'net_profit': np.random.uniform(1e6, 1e9, n_stocks),
|
||
'close': np.random.uniform(10, 100, n_stocks),
|
||
'volume': np.random.uniform(1e5, 1e7, n_stocks)
|
||
})
|
||
|
||
print(f"✅ 生成 {n_stocks} 只股票财务数据")
|
||
|
||
# 2. 并行计算因子
|
||
factors = agent.parallel_factor_calculation(stock_data)
|
||
|
||
# 3. 合并因子数据
|
||
for factor_name, factor_values in factors.items():
|
||
stock_data[factor_name] = factor_values
|
||
|
||
# 4. 计算综合价值得分
|
||
print(f"\n🎯 计算综合价值得分...")
|
||
|
||
# 价值因子(越低越好)
|
||
value_factors = ['pe_rank', 'pb_rank', 'ps_rank']
|
||
value_score = stock_data[value_factors].mean(axis=1)
|
||
|
||
# 质量因子(越高越好)
|
||
quality_factors = ['roe_rank', 'gross_margin_rank', 'fcf_rank']
|
||
quality_score = stock_data[quality_factors].mean(axis=1)
|
||
|
||
# 综合得分:价值得分(高) + 质量得分(高)
|
||
stock_data['value_quality_score'] = (1 - value_score) * 0.6 + quality_score * 0.4
|
||
stock_data['value_quality_rank'] = stock_data['value_quality_score'].rank(ascending=False, pct=True)
|
||
|
||
# 5. 输出结果
|
||
print(f"\n📈 计算完成!")
|
||
print(f"⏰ 耗时: {(datetime.now() - agent.start_time).total_seconds():.2f}秒")
|
||
print(f"📊 总因子数: {len(factors)}个")
|
||
print(f"🏆 综合价值得分计算完成")
|
||
|
||
# 显示前10名
|
||
top_stocks = stock_data.nlargest(10, 'value_quality_score')[['stock_code', 'value_quality_score', 'value_quality_rank']]
|
||
print(f"\n🏅 价值投资前10名:")
|
||
print(top_stocks.to_string(index=False))
|
||
|
||
return stock_data
|
||
|
||
if __name__ == "__main__":
|
||
stock_data = main() |