Files

391 lines
14 KiB
Python

#!/usr/bin/env python3
"""
实时价值因子监测面板
更新时间:每5分钟
"""
import dash
from dash import dcc, html, Input, Output
import plotly.graph_objs as go
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import threading
import time
class RealTimeValueDashboard:
"""实时价值因子监测面板"""
def __init__(self):
self.app = dash.Dash(__name__)
self.data = None
self.update_interval = 300 # 5分钟更新
self.last_update = datetime.now()
# 初始化布局
self.setup_layout()
def setup_layout(self):
"""设置仪表板布局"""
self.app.layout = html.Div([
# 标题栏
html.Div([
html.H1("📊 实时价值因子监测面板", style={'textAlign': 'center', 'color': '#2E86C1'}),
html.Div([
html.Span("🕐 最后更新: ", style={'fontWeight': 'bold'}),
html.Span(id='last-update-time'),
html.Span(" | ", style={'margin': '0 10px'}),
html.Span("📈 监控股票数: ", style={'fontWeight': 'bold'}),
html.Span(id='stock-count'),
html.Span(" | ", style={'margin': '0 10px'}),
html.Span("⚡ 更新频率: ", style={'fontWeight': 'bold'}),
html.Span("5分钟")
], style={'textAlign': 'center', 'marginBottom': '20px'})
]),
# 第一行:关键指标
html.Div([
html.Div([
html.H3("🏆 价值投资Top 10", style={'textAlign': 'center'}),
dcc.Graph(id='top-10-chart', style={'height': '400px'})
], className='six columns'),
html.Div([
html.H3("📊 因子分布", style={'textAlign': 'center'}),
dcc.Graph(id='factor-distribution', style={'height': '400px'})
], className='six columns')
], className='row'),
# 第二行:详细分析
html.Div([
html.Div([
html.H3("💰 估值因子热力图", style={'textAlign': 'center'}),
dcc.Graph(id='valuation-heatmap', style={'height': '400px'})
], className='six columns'),
html.Div([
html.H3("📈 质量因子趋势", style={'textAlign': 'center'}),
dcc.Graph(id='quality-trend', style={'height': '400px'})
], className='six columns')
], className='row'),
# 第三行:控制面板
html.Div([
html.Div([
html.H3("⚙️ 控制面板", style={'textAlign': 'center'}),
html.Div([
html.Label("选择行业:"),
dcc.Dropdown(
id='industry-selector',
options=[
{'label': '全部行业', 'value': 'all'},
{'label': '金融', 'value': 'financial'},
{'label': '科技', 'value': 'tech'},
{'label': '消费', 'value': 'consumer'},
{'label': '医药', 'value': 'medical'},
{'label': '工业', 'value': 'industrial'}
],
value='all',
style={'marginBottom': '20px'}
),
html.Label("选择市值范围:"),
dcc.RangeSlider(
id='market-cap-slider',
min=0,
max=1000,
step=50,
value=[100, 500],
marks={i: f'{i}亿' for i in range(0, 1001, 100)},
style={'marginBottom': '20px'}
),
html.Button('🔄 立即更新数据', id='update-button', n_clicks=0,
style={'width': '100%', 'padding': '10px', 'backgroundColor': '#2E86C1', 'color': 'white'})
])
], className='six columns'),
html.Div([
html.H3("📋 实时数据表", style={'textAlign': 'center'}),
html.Div(id='real-time-table', style={'height': '400px', 'overflowY': 'scroll'})
], className='six columns')
], className='row'),
# 定时器
dcc.Interval(
id='interval-component',
interval=5*60*1000, # 5分钟
n_intervals=0
)
])
# 设置回调
self.setup_callbacks()
def setup_callbacks(self):
"""设置回调函数"""
@self.app.callback(
[Output('last-update-time', 'children'),
Output('stock-count', 'children'),
Output('top-10-chart', 'figure'),
Output('factor-distribution', 'figure'),
Output('valuation-heatmap', 'figure'),
Output('quality-trend', 'figure'),
Output('real-time-table', 'children')],
[Input('interval-component', 'n_intervals'),
Input('update-button', 'n_clicks'),
Input('industry-selector', 'value'),
Input('market-cap-slider', 'value')]
)
def update_dashboard(n_intervals, n_clicks, industry, market_cap_range):
"""更新仪表板"""
# 更新数据
self.update_data()
# 过滤数据
filtered_data = self.filter_data(industry, market_cap_range)
# 更新时间
current_time = datetime.now().strftime('%H:%M:%S')
# 1. Top 10图表
top_10_fig = self.create_top_10_chart(filtered_data)
# 2. 因子分布图
factor_fig = self.create_factor_distribution(filtered_data)
# 3. 估值热力图
heatmap_fig = self.create_valuation_heatmap(filtered_data)
# 4. 质量趋势图
trend_fig = self.create_quality_trend(filtered_data)
# 5. 实时数据表
table = self.create_real_time_table(filtered_data)
return [
current_time,
len(filtered_data),
top_10_fig,
factor_fig,
heatmap_fig,
trend_fig,
table
]
def update_data(self):
"""更新数据"""
# 这里应该从数据源获取实时数据
# 暂时使用模拟数据
np.random.seed(int(time.time()))
n_stocks = 3000
self.data = pd.DataFrame({
'stock_code': [f'{i:06d}.XSHE' for i in range(1, n_stocks + 1)],
'industry': np.random.choice(['financial', 'tech', 'consumer', 'medical', 'industrial'], n_stocks),
'market_cap': np.random.uniform(50, 1000, n_stocks),
'pe_ratio': np.random.uniform(5, 50, n_stocks),
'pb_ratio': np.random.uniform(0.5, 5, n_stocks),
'roe': np.random.uniform(0.05, 0.3, n_stocks),
'gross_margin': np.random.uniform(0.2, 0.6, n_stocks),
'value_score': np.random.uniform(0, 1, n_stocks),
'quality_score': np.random.uniform(0, 1, n_stocks),
'composite_score': np.random.uniform(0, 1, n_stocks)
})
self.last_update = datetime.now()
def filter_data(self, industry, market_cap_range):
"""过滤数据"""
if self.data is None:
return pd.DataFrame()
filtered = self.data.copy()
# 按行业过滤
if industry != 'all':
filtered = filtered[filtered['industry'] == industry]
# 按市值过滤
filtered = filtered[(filtered['market_cap'] >= market_cap_range[0]) &
(filtered['market_cap'] <= market_cap_range[1])]
return filtered
def create_top_10_chart(self, data):
"""创建Top 10图表"""
if len(data) == 0:
return {}
top_10 = data.nlargest(10, 'composite_score')
fig = go.Figure(data=[
go.Bar(
x=top_10['stock_code'],
y=top_10['composite_score'],
text=top_10['composite_score'].round(3),
textposition='auto',
marker_color='#2E86C1'
)
])
fig.update_layout(
title='价值投资综合得分Top 10',
xaxis_title='股票代码',
yaxis_title='综合得分',
yaxis_range=[0, 1]
)
return fig
def create_factor_distribution(self, data):
"""创建因子分布图"""
if len(data) == 0:
return {}
fig = go.Figure()
# 估值因子分布
fig.add_trace(go.Histogram(
x=data['pe_ratio'],
name='市盈率分布',
opacity=0.7,
marker_color='#E74C3C'
))
# 质量因子分布
fig.add_trace(go.Histogram(
x=data['roe'],
name='ROE分布',
opacity=0.7,
marker_color='#2ECC71'
))
fig.update_layout(
title='因子分布图',
xaxis_title='因子值',
yaxis_title='频数',
barmode='overlay'
)
return fig
def create_valuation_heatmap(self, data):
"""创建估值热力图"""
if len(data) == 0:
return {}
# 创建热力图数据
heatmap_data = data.pivot_table(
values='composite_score',
index=pd.cut(data['pe_ratio'], bins=10),
columns=pd.cut(data['pb_ratio'], bins=10),
aggfunc='mean'
)
fig = go.Figure(data=go.Heatmap(
z=heatmap_data.values,
x=[f'{col.left:.1f}-{col.right:.1f}' for col in heatmap_data.columns],
y=[f'{idx.left:.1f}-{idx.right:.1f}' for idx in heatmap_data.index],
colorscale='Viridis'
))
fig.update_layout(
title='估值因子热力图 (PE vs PB)',
xaxis_title='市净率(PB)区间',
yaxis_title='市盈率(PE)区间'
)
return fig
def create_quality_trend(self, data):
"""创建质量趋势图"""
if len(data) == 0:
return {}
# 按行业分组计算平均质量得分
industry_quality = data.groupby('industry').agg({
'roe': 'mean',
'gross_margin': 'mean',
'quality_score': 'mean'
}).reset_index()
fig = go.Figure()
fig.add_trace(go.Scatter(
x=industry_quality['industry'],
y=industry_quality['roe'],
mode='lines+markers',
name='ROE',
line=dict(color='#2ECC71', width=3)
))
fig.add_trace(go.Scatter(
x=industry_quality['industry'],
y=industry_quality['gross_margin'],
mode='lines+markers',
name='毛利率',
line=dict(color='#E74C3C', width=3)
))
fig.update_layout(
title='各行业质量因子趋势',
xaxis_title='行业',
yaxis_title='因子值',
yaxis_range=[0, 1]
)
return fig
def create_real_time_table(self, data):
"""创建实时数据表"""
if len(data) == 0:
return "暂无数据"
top_20 = data.nlargest(20, 'composite_score')
table = html.Table([
html.Thead(
html.Tr([
html.Th('股票代码'),
html.Th('行业'),
html.Th('市值(亿)'),
html.Th('PE'),
html.Th('PB'),
html.Th('ROE'),
html.Th('综合得分')
])
),
html.Tbody([
html.Tr([
html.Td(row['stock_code']),
html.Td(row['industry']),
html.Td(f"{row['market_cap']:.1f}"),
html.Td(f"{row['pe_ratio']:.1f}"),
html.Td(f"{row['pb_ratio']:.2f}"),
html.Td(f"{row['roe']:.2%}"),
html.Td(f"{row['composite_score']:.3f}")
]) for _, row in top_20.iterrows()
])
], style={'width': '100%', 'borderCollapse': 'collapse'})
return table
def run(self, debug=False):
"""运行仪表板"""
print(f"🚀 启动实时价值因子监测面板...")
print(f"📊 监控股票数: 3000+")
print(f"⚡ 更新频率: 每5分钟")
print(f"🌐 访问地址: http://127.0.0.1:8050")
print(f"🕐 启动时间: {datetime.now().strftime('%H:%M:%S')}")
self.app.run_server(debug=debug)
def main():
"""主函数"""
dashboard = RealTimeValueDashboard()
dashboard.run(debug=False)
if __name__ == '__main__':
main()