Files

194 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
超级财务智能体 - 并行财务因子计算引擎
启动时间:2026-03-21 17:45
截止时间:18:00
"""
import sys
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
import warnings
warnings.filterwarnings('ignore')
class SuperFinancialAgent:
"""超级财务智能体"""
def __init__(self):
self.start_time = datetime.now()
self.cpu_cores = min(10, mp.cpu_count())
print(f"⚡ 超级财务智能体启动!")
print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}")
print(f"🎯 目标时间: 18:00")
print(f"⏰ 剩余时间: {15}分钟")
print(f"💻 CPU核心: {self.cpu_cores}核并行")
def calculate_valuation_factors(self, stock_data):
"""计算估值因子(核心1-3"""
factors = {}
# 1. 市盈率因子
if 'pe_ratio' in stock_data.columns:
factors['pe_rank'] = stock_data['pe_ratio'].rank(pct=True)
factors['pe_zscore'] = (stock_data['pe_ratio'] - stock_data['pe_ratio'].mean()) / stock_data['pe_ratio'].std()
# 2. 市净率因子
if 'pb_ratio' in stock_data.columns:
factors['pb_rank'] = stock_data['pb_ratio'].rank(pct=True)
factors['pb_zscore'] = (stock_data['pb_ratio'] - stock_data['pb_ratio'].mean()) / stock_data['pb_ratio'].std()
# 3. 市销率因子
if 'ps_ratio' in stock_data.columns:
factors['ps_rank'] = stock_data['ps_ratio'].rank(pct=True)
factors['ps_zscore'] = (stock_data['ps_ratio'] - stock_data['ps_ratio'].mean()) / stock_data['ps_ratio'].std()
return factors
def calculate_quality_factors(self, stock_data):
"""计算质量因子(核心4-6"""
factors = {}
# 4. ROE因子
if 'roe' in stock_data.columns:
factors['roe_rank'] = stock_data['roe'].rank(pct=True)
factors['roe_stability'] = stock_data['roe'].rolling(5).std()
# 5. 毛利率因子
if 'gross_margin' in stock_data.columns:
factors['gross_margin_rank'] = stock_data['gross_margin'].rank(pct=True)
factors['margin_stability'] = stock_data['gross_margin'].rolling(5).std()
# 6. 现金流因子
if 'free_cash_flow' in stock_data.columns:
factors['fcf_rank'] = stock_data['free_cash_flow'].rank(pct=True)
factors['fcf_yield'] = stock_data['free_cash_flow'] / stock_data['market_cap']
return factors
def calculate_growth_factors(self, stock_data):
"""计算成长因子(核心7-8"""
factors = {}
# 7. 营收增长因子
if 'revenue' in stock_data.columns:
revenue_growth = stock_data['revenue'].pct_change(periods=4)
factors['revenue_growth_rank'] = revenue_growth.rank(pct=True)
# 8. 盈利增长因子
if 'net_profit' in stock_data.columns:
profit_growth = stock_data['net_profit'].pct_change(periods=4)
factors['profit_growth_rank'] = profit_growth.rank(pct=True)
return factors
def calculate_risk_factors(self, stock_data):
"""计算风险因子(核心9-10"""
factors = {}
# 9. 波动率因子
if 'close' in stock_data.columns:
volatility = stock_data['close'].rolling(20).std()
factors['volatility_rank'] = volatility.rank(pct=True)
# 10. 流动性因子
if 'volume' in stock_data.columns:
avg_volume = stock_data['volume'].rolling(20).mean()
factors['liquidity_rank'] = avg_volume.rank(pct=True)
return factors
def parallel_factor_calculation(self, stock_data):
"""并行计算所有因子"""
print(f"🔢 开始并行因子计算...")
# 准备任务
tasks = [
(self.calculate_valuation_factors, stock_data),
(self.calculate_quality_factors, stock_data),
(self.calculate_growth_factors, stock_data),
(self.calculate_risk_factors, stock_data)
]
# 并行计算
all_factors = {}
with ProcessPoolExecutor(max_workers=self.cpu_cores) as executor:
future_to_task = {executor.submit(func, data): (func.__name__, data) for func, data in tasks}
for future in as_completed(future_to_task):
task_name, _ = future_to_task[future]
try:
factors = future.result()
all_factors.update(factors)
print(f"{task_name} 计算完成")
except Exception as e:
print(f"{task_name} 计算失败: {e}")
return all_factors
def main():
"""主函数"""
agent = SuperFinancialAgent()
# 1. 生成模拟数据(实际项目中从数据源获取)
print(f"\n📊 生成模拟财务数据...")
np.random.seed(42)
n_stocks = 3000
stock_data = pd.DataFrame({
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'ps_ratio': np.random.uniform(0.5, 10, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
'free_cash_flow': np.random.uniform(1e6, 1e9, n_stocks),
'market_cap': np.random.uniform(1e8, 1e11, n_stocks),
'revenue': np.random.uniform(1e7, 1e10, n_stocks),
'net_profit': np.random.uniform(1e6, 1e9, n_stocks),
'close': np.random.uniform(10, 100, n_stocks),
'volume': np.random.uniform(1e5, 1e7, n_stocks)
})
print(f"✅ 生成 {n_stocks} 只股票财务数据")
# 2. 并行计算因子
factors = agent.parallel_factor_calculation(stock_data)
# 3. 合并因子数据
for factor_name, factor_values in factors.items():
stock_data[factor_name] = factor_values
# 4. 计算综合价值得分
print(f"\n🎯 计算综合价值得分...")
# 价值因子(越低越好)
value_factors = ['pe_rank', 'pb_rank', 'ps_rank']
value_score = stock_data[value_factors].mean(axis=1)
# 质量因子(越高越好)
quality_factors = ['roe_rank', 'gross_margin_rank', 'fcf_rank']
quality_score = stock_data[quality_factors].mean(axis=1)
# 综合得分:价值得分(高) + 质量得分(高)
stock_data['value_quality_score'] = (1 - value_score) * 0.6 + quality_score * 0.4
stock_data['value_quality_rank'] = stock_data['value_quality_score'].rank(ascending=False, pct=True)
# 5. 输出结果
print(f"\n📈 计算完成!")
print(f"⏰ 耗时: {(datetime.now() - agent.start_time).total_seconds():.2f}")
print(f"📊 总因子数: {len(factors)}")
print(f"🏆 综合价值得分计算完成")
# 显示前10名
top_stocks = stock_data.nlargest(10, 'value_quality_score')[['stock_code', 'value_quality_score', 'value_quality_rank']]
print(f"\n🏅 价值投资前10名:")
print(top_stocks.to_string(index=False))
return stock_data
if __name__ == "__main__":
stock_data = main()