#!/usr/bin/env python3 """ 超级财务智能体 - 并行财务因子计算引擎 启动时间:2026-03-21 17:45 截止时间:18:00 """ import sys import os import pandas as pd import numpy as np from datetime import datetime, timedelta import multiprocessing as mp from concurrent.futures import ProcessPoolExecutor, as_completed import warnings warnings.filterwarnings('ignore') class SuperFinancialAgent: """超级财务智能体""" def __init__(self): self.start_time = datetime.now() self.cpu_cores = min(10, mp.cpu_count()) print(f"⚡ 超级财务智能体启动!") print(f"🕐 启动时间: {self.start_time.strftime('%H:%M:%S')}") print(f"🎯 目标时间: 18:00") print(f"⏰ 剩余时间: {15}分钟") print(f"💻 CPU核心: {self.cpu_cores}核并行") def calculate_valuation_factors(self, stock_data): """计算估值因子(核心1-3)""" factors = {} # 1. 市盈率因子 if 'pe_ratio' in stock_data.columns: factors['pe_rank'] = stock_data['pe_ratio'].rank(pct=True) factors['pe_zscore'] = (stock_data['pe_ratio'] - stock_data['pe_ratio'].mean()) / stock_data['pe_ratio'].std() # 2. 市净率因子 if 'pb_ratio' in stock_data.columns: factors['pb_rank'] = stock_data['pb_ratio'].rank(pct=True) factors['pb_zscore'] = (stock_data['pb_ratio'] - stock_data['pb_ratio'].mean()) / stock_data['pb_ratio'].std() # 3. 市销率因子 if 'ps_ratio' in stock_data.columns: factors['ps_rank'] = stock_data['ps_ratio'].rank(pct=True) factors['ps_zscore'] = (stock_data['ps_ratio'] - stock_data['ps_ratio'].mean()) / stock_data['ps_ratio'].std() return factors def calculate_quality_factors(self, stock_data): """计算质量因子(核心4-6)""" factors = {} # 4. ROE因子 if 'roe' in stock_data.columns: factors['roe_rank'] = stock_data['roe'].rank(pct=True) factors['roe_stability'] = stock_data['roe'].rolling(5).std() # 5. 毛利率因子 if 'gross_margin' in stock_data.columns: factors['gross_margin_rank'] = stock_data['gross_margin'].rank(pct=True) factors['margin_stability'] = stock_data['gross_margin'].rolling(5).std() # 6. 现金流因子 if 'free_cash_flow' in stock_data.columns: factors['fcf_rank'] = stock_data['free_cash_flow'].rank(pct=True) factors['fcf_yield'] = stock_data['free_cash_flow'] / stock_data['market_cap'] return factors def calculate_growth_factors(self, stock_data): """计算成长因子(核心7-8)""" factors = {} # 7. 营收增长因子 if 'revenue' in stock_data.columns: revenue_growth = stock_data['revenue'].pct_change(periods=4) factors['revenue_growth_rank'] = revenue_growth.rank(pct=True) # 8. 盈利增长因子 if 'net_profit' in stock_data.columns: profit_growth = stock_data['net_profit'].pct_change(periods=4) factors['profit_growth_rank'] = profit_growth.rank(pct=True) return factors def calculate_risk_factors(self, stock_data): """计算风险因子(核心9-10)""" factors = {} # 9. 波动率因子 if 'close' in stock_data.columns: volatility = stock_data['close'].rolling(20).std() factors['volatility_rank'] = volatility.rank(pct=True) # 10. 流动性因子 if 'volume' in stock_data.columns: avg_volume = stock_data['volume'].rolling(20).mean() factors['liquidity_rank'] = avg_volume.rank(pct=True) return factors def parallel_factor_calculation(self, stock_data): """并行计算所有因子""" print(f"🔢 开始并行因子计算...") # 准备任务 tasks = [ (self.calculate_valuation_factors, stock_data), (self.calculate_quality_factors, stock_data), (self.calculate_growth_factors, stock_data), (self.calculate_risk_factors, stock_data) ] # 并行计算 all_factors = {} with ProcessPoolExecutor(max_workers=self.cpu_cores) as executor: future_to_task = {executor.submit(func, data): (func.__name__, data) for func, data in tasks} for future in as_completed(future_to_task): task_name, _ = future_to_task[future] try: factors = future.result() all_factors.update(factors) print(f"✅ {task_name} 计算完成") except Exception as e: print(f"❌ {task_name} 计算失败: {e}") return all_factors def main(): """主函数""" agent = SuperFinancialAgent() # 1. 生成模拟数据(实际项目中从数据源获取) print(f"\n📊 生成模拟财务数据...") np.random.seed(42) n_stocks = 3000 stock_data = pd.DataFrame({ 'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)], 'pe_ratio': np.random.uniform(5, 50, n_stocks), 'pb_ratio': np.random.uniform(0.5, 5, n_stocks), 'ps_ratio': np.random.uniform(0.5, 10, n_stocks), 'roe': np.random.uniform(0.05, 0.3, n_stocks), 'gross_margin': np.random.uniform(0.2, 0.6, n_stocks), 'free_cash_flow': np.random.uniform(1e6, 1e9, n_stocks), 'market_cap': np.random.uniform(1e8, 1e11, n_stocks), 'revenue': np.random.uniform(1e7, 1e10, n_stocks), 'net_profit': np.random.uniform(1e6, 1e9, n_stocks), 'close': np.random.uniform(10, 100, n_stocks), 'volume': np.random.uniform(1e5, 1e7, n_stocks) }) print(f"✅ 生成 {n_stocks} 只股票财务数据") # 2. 并行计算因子 factors = agent.parallel_factor_calculation(stock_data) # 3. 合并因子数据 for factor_name, factor_values in factors.items(): stock_data[factor_name] = factor_values # 4. 计算综合价值得分 print(f"\n🎯 计算综合价值得分...") # 价值因子(越低越好) value_factors = ['pe_rank', 'pb_rank', 'ps_rank'] value_score = stock_data[value_factors].mean(axis=1) # 质量因子(越高越好) quality_factors = ['roe_rank', 'gross_margin_rank', 'fcf_rank'] quality_score = stock_data[quality_factors].mean(axis=1) # 综合得分:价值得分(高) + 质量得分(高) stock_data['value_quality_score'] = (1 - value_score) * 0.6 + quality_score * 0.4 stock_data['value_quality_rank'] = stock_data['value_quality_score'].rank(ascending=False, pct=True) # 5. 输出结果 print(f"\n📈 计算完成!") print(f"⏰ 耗时: {(datetime.now() - agent.start_time).total_seconds():.2f}秒") print(f"📊 总因子数: {len(factors)}个") print(f"🏆 综合价值得分计算完成") # 显示前10名 top_stocks = stock_data.nlargest(10, 'value_quality_score')[['stock_code', 'value_quality_score', 'value_quality_rank']] print(f"\n🏅 价值投资前10名:") print(top_stocks.to_string(index=False)) return stock_data if __name__ == "__main__": stock_data = main()