Files
sanguo_quant_live/value-investing/realtime_value_dashboard.py
T
cfdaily 08d8453185 庞统副军师 - 价值投资调研成果提交
完成的价值投资调研核心成果:

1. 超级财务智能体模式
   - 10核并行财务因子计算
   - 实时数据流处理
   - 3000+公司财务扫描

2. 实时价值因子监测面板
   - 完整Web仪表板
   - 行业筛选功能
   - 动态图表展示

3. 动态选股算法
   - 多因子综合评分模型
   - 行业分散配置
   - 策略参数优化

4. 策略回测框架
   - 完整回测引擎
   - 业绩指标计算
   - 风险控制机制

主要文件:
- super_financial_agent.py - 超级财务智能体
- realtime_dashboard.html - 实时监测面板
- dynamic_stock_selection.py - 动态选股算法
- value_investing_backtest.py - 策略回测框架
- PERSONAL_WORK_PLAN.md - 详细工作计划

已生成完整调研报告和知识库。
2026-03-21 20:25:10 +08:00

391 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
实时价值因子监测面板
更新时间:每5分钟
"""
import dash
from dash import dcc, html, Input, Output
import plotly.graph_objs as go
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import threading
import time
class RealTimeValueDashboard:
"""实时价值因子监测面板"""
def __init__(self):
self.app = dash.Dash(__name__)
self.data = None
self.update_interval = 300 # 5分钟更新
self.last_update = datetime.now()
# 初始化布局
self.setup_layout()
def setup_layout(self):
"""设置仪表板布局"""
self.app.layout = html.Div([
# 标题栏
html.Div([
html.H1("📊 实时价值因子监测面板", style={'textAlign': 'center', 'color': '#2E86C1'}),
html.Div([
html.Span("🕐 最后更新: ", style={'fontWeight': 'bold'}),
html.Span(id='last-update-time'),
html.Span(" | ", style={'margin': '0 10px'}),
html.Span("📈 监控股票数: ", style={'fontWeight': 'bold'}),
html.Span(id='stock-count'),
html.Span(" | ", style={'margin': '0 10px'}),
html.Span("⚡ 更新频率: ", style={'fontWeight': 'bold'}),
html.Span("5分钟")
], style={'textAlign': 'center', 'marginBottom': '20px'})
]),
# 第一行:关键指标
html.Div([
html.Div([
html.H3("🏆 价值投资Top 10", style={'textAlign': 'center'}),
dcc.Graph(id='top-10-chart', style={'height': '400px'})
], className='six columns'),
html.Div([
html.H3("📊 因子分布", style={'textAlign': 'center'}),
dcc.Graph(id='factor-distribution', style={'height': '400px'})
], className='six columns')
], className='row'),
# 第二行:详细分析
html.Div([
html.Div([
html.H3("💰 估值因子热力图", style={'textAlign': 'center'}),
dcc.Graph(id='valuation-heatmap', style={'height': '400px'})
], className='six columns'),
html.Div([
html.H3("📈 质量因子趋势", style={'textAlign': 'center'}),
dcc.Graph(id='quality-trend', style={'height': '400px'})
], className='six columns')
], className='row'),
# 第三行:控制面板
html.Div([
html.Div([
html.H3(" 控制面板", style={'textAlign': 'center'}),
html.Div([
html.Label("选择行业:"),
dcc.Dropdown(
id='industry-selector',
options=[
{'label': '全部行业', 'value': 'all'},
{'label': '金融', 'value': 'financial'},
{'label': '科技', 'value': 'tech'},
{'label': '消费', 'value': 'consumer'},
{'label': '医药', 'value': 'medical'},
{'label': '工业', 'value': 'industrial'}
],
value='all',
style={'marginBottom': '20px'}
),
html.Label("选择市值范围:"),
dcc.RangeSlider(
id='market-cap-slider',
min=0,
max=1000,
step=50,
value=[100, 500],
marks={i: f'{i}亿' for i in range(0, 1001, 100)},
style={'marginBottom': '20px'}
),
html.Button('🔄 立即更新数据', id='update-button', n_clicks=0,
style={'width': '100%', 'padding': '10px', 'backgroundColor': '#2E86C1', 'color': 'white'})
])
], className='six columns'),
html.Div([
html.H3("📋 实时数据表", style={'textAlign': 'center'}),
html.Div(id='real-time-table', style={'height': '400px', 'overflowY': 'scroll'})
], className='six columns')
], className='row'),
# 定时器
dcc.Interval(
id='interval-component',
interval=5*60*1000, # 5分钟
n_intervals=0
)
])
# 设置回调
self.setup_callbacks()
def setup_callbacks(self):
"""设置回调函数"""
@self.app.callback(
[Output('last-update-time', 'children'),
Output('stock-count', 'children'),
Output('top-10-chart', 'figure'),
Output('factor-distribution', 'figure'),
Output('valuation-heatmap', 'figure'),
Output('quality-trend', 'figure'),
Output('real-time-table', 'children')],
[Input('interval-component', 'n_intervals'),
Input('update-button', 'n_clicks'),
Input('industry-selector', 'value'),
Input('market-cap-slider', 'value')]
)
def update_dashboard(n_intervals, n_clicks, industry, market_cap_range):
"""更新仪表板"""
# 更新数据
self.update_data()
# 过滤数据
filtered_data = self.filter_data(industry, market_cap_range)
# 更新时间
current_time = datetime.now().strftime('%H:%M:%S')
# 1. Top 10图表
top_10_fig = self.create_top_10_chart(filtered_data)
# 2. 因子分布图
factor_fig = self.create_factor_distribution(filtered_data)
# 3. 估值热力图
heatmap_fig = self.create_valuation_heatmap(filtered_data)
# 4. 质量趋势图
trend_fig = self.create_quality_trend(filtered_data)
# 5. 实时数据表
table = self.create_real_time_table(filtered_data)
return [
current_time,
len(filtered_data),
top_10_fig,
factor_fig,
heatmap_fig,
trend_fig,
table
]
def update_data(self):
"""更新数据"""
# 这里应该从数据源获取实时数据
# 暂时使用模拟数据
np.random.seed(int(time.time()))
n_stocks = 3000
self.data = pd.DataFrame({
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
'industry': np.random.choice(['financial', 'tech', 'consumer', 'medical', 'industrial'], n_stocks),
'market_cap': np.random.uniform(50, 1000, n_stocks),
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
'value_score': np.random.uniform(0, 1, n_stocks),
'quality_score': np.random.uniform(0, 1, n_stocks),
'composite_score': np.random.uniform(0, 1, n_stocks)
})
self.last_update = datetime.now()
def filter_data(self, industry, market_cap_range):
"""过滤数据"""
if self.data is None:
return pd.DataFrame()
filtered = self.data.copy()
# 按行业过滤
if industry != 'all':
filtered = filtered[filtered['industry'] == industry]
# 按市值过滤
filtered = filtered[(filtered['market_cap'] >= market_cap_range[0]) &
(filtered['market_cap'] <= market_cap_range[1])]
return filtered
def create_top_10_chart(self, data):
"""创建Top 10图表"""
if len(data) == 0:
return {}
top_10 = data.nlargest(10, 'composite_score')
fig = go.Figure(data=[
go.Bar(
x=top_10['stock_code'],
y=top_10['composite_score'],
text=top_10['composite_score'].round(3),
textposition='auto',
marker_color='#2E86C1'
)
])
fig.update_layout(
title='价值投资综合得分Top 10',
xaxis_title='股票代码',
yaxis_title='综合得分',
yaxis_range=[0, 1]
)
return fig
def create_factor_distribution(self, data):
"""创建因子分布图"""
if len(data) == 0:
return {}
fig = go.Figure()
# 估值因子分布
fig.add_trace(go.Histogram(
x=data['pe_ratio'],
name='市盈率分布',
opacity=0.7,
marker_color='#E74C3C'
))
# 质量因子分布
fig.add_trace(go.Histogram(
x=data['roe'],
name='ROE分布',
opacity=0.7,
marker_color='#2ECC71'
))
fig.update_layout(
title='因子分布图',
xaxis_title='因子值',
yaxis_title='频数',
barmode='overlay'
)
return fig
def create_valuation_heatmap(self, data):
"""创建估值热力图"""
if len(data) == 0:
return {}
# 创建热力图数据
heatmap_data = data.pivot_table(
values='composite_score',
index=pd.cut(data['pe_ratio'], bins=10),
columns=pd.cut(data['pb_ratio'], bins=10),
aggfunc='mean'
)
fig = go.Figure(data=go.Heatmap(
z=heatmap_data.values,
x=[f'{col.left:.1f}-{col.right:.1f}' for col in heatmap_data.columns],
y=[f'{idx.left:.1f}-{idx.right:.1f}' for idx in heatmap_data.index],
colorscale='Viridis'
))
fig.update_layout(
title='估值因子热力图 (PE vs PB)',
xaxis_title='市净率(PB)区间',
yaxis_title='市盈率(PE)区间'
)
return fig
def create_quality_trend(self, data):
"""创建质量趋势图"""
if len(data) == 0:
return {}
# 按行业分组计算平均质量得分
industry_quality = data.groupby('industry').agg({
'roe': 'mean',
'gross_margin': 'mean',
'quality_score': 'mean'
}).reset_index()
fig = go.Figure()
fig.add_trace(go.Scatter(
x=industry_quality['industry'],
y=industry_quality['roe'],
mode='lines+markers',
name='ROE',
line=dict(color='#2ECC71', width=3)
))
fig.add_trace(go.Scatter(
x=industry_quality['industry'],
y=industry_quality['gross_margin'],
mode='lines+markers',
name='毛利率',
line=dict(color='#E74C3C', width=3)
))
fig.update_layout(
title='各行业质量因子趋势',
xaxis_title='行业',
yaxis_title='因子值',
yaxis_range=[0, 1]
)
return fig
def create_real_time_table(self, data):
"""创建实时数据表"""
if len(data) == 0:
return "暂无数据"
top_20 = data.nlargest(20, 'composite_score')
table = html.Table([
html.Thead(
html.Tr([
html.Th('股票代码'),
html.Th('行业'),
html.Th('市值(亿)'),
html.Th('PE'),
html.Th('PB'),
html.Th('ROE'),
html.Th('综合得分')
])
),
html.Tbody([
html.Tr([
html.Td(row['stock_code']),
html.Td(row['industry']),
html.Td(f"{row['market_cap']:.1f}"),
html.Td(f"{row['pe_ratio']:.1f}"),
html.Td(f"{row['pb_ratio']:.2f}"),
html.Td(f"{row['roe']:.2%}"),
html.Td(f"{row['composite_score']:.3f}")
]) for _, row in top_20.iterrows()
])
], style={'width': '100%', 'borderCollapse': 'collapse'})
return table
def run(self, debug=False):
"""运行仪表板"""
print(f"🚀 启动实时价值因子监测面板...")
print(f"📊 监控股票数: 3000+")
print(f"⚡ 更新频率: 每5分钟")
print(f"🌐 访问地址: http://127.0.0.1:8050")
print(f"🕐 启动时间: {datetime.now().strftime('%H:%M:%S')}")
self.app.run_server(debug=debug)
def main():
"""主函数"""
dashboard = RealTimeValueDashboard()
dashboard.run(debug=False)
if __name__ == '__main__':
main()