""" 个股+板块利好利空风险监控模块 功能:基于公开数据提前预判消息面风险,适配结构化行情 监控维度: 1. 量价异常:没有消息但突然大幅波动,可能是提前泄密 2. 融资余额变化:融资快速增减可能预示资金面变化 3. 龙虎榜/大宗交易:机构大额进出 4. 舆情频率:股吧雪球讨论热度异常变化 5. 国际联动风险:外盘、商品、国际消息对A股影响预判 6. 结构化行情风控:板块集中度风险、热点透支风险 Author: 关羽(云长) Date: 2026-03-27 """ from dataclasses import dataclass from typing import List, Dict, Optional, Tuple from enum import Enum class NewsSentiment(Enum): STRONG_BULL = 2 BULL = 1 NEUTRAL = 0 BEAR = -1 STRONG_BEAR = -2 class RiskLevel(Enum): SAFE = 0 WATCH = 1 REDUCE = 2 EXIT = 3 @dataclass class StockNewsData: """个股监控数据""" code: str name: str # 1. 量价数据 recent_vol_change: float # 近5日成交量相比20日均值变化比例,比如0.5就是放量50% recent_pct_change: float # 近5日涨跌幅(%) gap_up: bool = False # 是否高开缺口未回补 gap_down: bool = False # 是否低开缺口未回补 # 2. 融资数据 finance_balance_change: float = 0.0 # 融资余额变化比例(周) finance_pct_of_volume: float = 0.0 # 融资买入额占成交额比例 # 3. 龙虎榜/大宗 has_large_order: bool = False # 近日是否有龙虎榜大额卖出 has_bulk_discount: bool = False # 是否有大宗折价交易 bulk_discount_pct: float = 0.0 # 大宗折价幅度 # 4. 舆情数据 discussion_count_change: float = 0.0 # 讨论量相比上周变化倍数 sentiment_score: float = 0.0 # 舆情情感分 -1~1,负=利空 # 5. 国际联动数据 is_ah: bool = False # 是否A+H股 ah_hk_overnight_change: float = 0.0 # 港股H股隔夜涨跌幅% is_commodity_related: bool = False # 是否大宗商品相关(有色/化工/农业) commodity_future_overnight_change: float = 0.0 # 对应期货隔夜涨跌幅% us_index_overnight_change: float = 0.0 # 美股隔夜涨跌% has_major_international_news: bool = False # 是否有重大国际消息(加息/地缘政治等) international_news_sentiment: int = 0 # -1利空 0中性 1利好 # 6. 结构化行情数据 sector_name: str = "" # 板块名称 sector_style: str = "" # 风格名称(AI/新能源/消费等) sector_total_gain: float = 0.0 # 板块近期累计涨幅% sector_position_ratio: float = 0.0 # 当前板块仓位占总仓位比例 @dataclass class NewsRiskResult: """消息风险评估结果""" code: str name: str risk_level: RiskLevel sentiment: NewsSentiment risk_score: float # 总分 0~100,越高风险越大 bear_points: List[str] # 利空信号点 bull_points: List[str] # 利好信号点 suggestion: str # 操作建议 class PriceVolumeMonitor: """量价异常监控""" def __init__(self, vol_alert_threshold: float = 2.0, # 放量超过2倍预警 vol_crush_threshold: float = -0.5, # 缩量超过50%预警 drop_alert_threshold: float = -7.0): # 5日跌幅超过7%预警 self.vol_alert_threshold = vol_alert_threshold self.vol_crush_threshold = vol_crush_threshold self.drop_alert_threshold = drop_alert_threshold def analyze(self, data: StockNewsData) -> Tuple[int, List[str], List[str]]: """ 返回:(风险分增量, 利空点列表, 利好点列表) """ risk_score = 0 bear_points = [] bull_points = [] # 天量天价无消息,警惕利好兑现出货 if data.recent_vol_change >= self.vol_alert_threshold and data.recent_pct_change >= 5: risk_score += 15 bear_points.append(f"近5日放量{data.recent_vol_change:.1f}倍,涨幅{data.recent_pct_change:.1f}%,可能利好提前泄露,警惕出货") # 莫名其妙大跌,可能有利空提前泄露 if data.recent_pct_change <= self.drop_alert_threshold and data.recent_vol_change >= 0.5: risk_score += 20 bear_points.append(f"近5日放量下跌{data.recent_pct_change:.1f}%,无公开消息,警惕利空提前泄露") # 向下跳空缺口 if data.gap_down: risk_score += 10 bear_points.append("存在向下跳空缺口未回补,技术形态偏空") # 向上跳空缺口 if data.gap_up: bull_points.append("存在向上跳空缺口未回补,技术形态偏多") # 突然严重缩量,警惕流动性枯竭 if data.recent_vol_change <= self.vol_crush_threshold: risk_score += 10 bear_points.append(f"成交量缩量{(data.recent_vol_change*100):.0f}%,警惕流动性风险") return risk_score, bear_points, bull_points class FinanceMonitor: """融资余额监控""" def __init__(self, increase_threshold: float = 0.3, # 融资余额增加超30% decrease_threshold: float = -0.2): # 融资余额减少超20% self.increase_threshold = increase_threshold self.decrease_threshold = decrease_threshold def analyze(self, data: StockNewsData) -> Tuple[int, List[str], List[str]]: risk_score = 0 bear_points = [] bull_points = [] # 融资快速增加,看多情绪升温 if data.finance_balance_change >= self.increase_threshold: bull_points.append(f"融资余额一周增加{data.finance_balance_change:.1%},杠杆资金看多") # 融资快速减少,资金出逃,利空 if data.finance_balance_change <= self.decrease_threshold: risk_score += 15 bear_points.append(f"融资余额一周减少{data.finance_balance_change:.1%},杠杆资金快速出逃,警惕利空") # 融资买入占比过高,波动会放大 if data.finance_pct_of_volume >= 0.2: risk_score += 5 bear_points.append(f"融资买入占成交额{data.finance_pct_of_volume:.1%},杠杆比例高,波动风险大") return risk_score, bear_points, bull_points class InstitutionalMonitor: """龙虎榜/大宗交易监控""" def __init__(self, bulk_discount_threshold: float = -0.08): # 折价超过8%预警 self.bulk_discount_threshold = bulk_discount_threshold def analyze(self, data: StockNewsData) -> Tuple[int, List[str], List[str]]: risk_score = 0 bear_points = [] bull_points = [] # 龙虎榜大额机构卖出 if data.has_large_order: risk_score += 20 bear_points.append("龙虎榜出现机构大额卖出,机构出逃") # 大宗折价交易 if data.has_bulk_discount and data.bulk_discount_pct <= self.bulk_discount_threshold: risk_score += 15 bear_points.append(f"大宗交易折价{data.bulk_discount_pct:.1%},大股东折价出货") elif data.has_bulk_discount: bull_points.append("大宗交易平价/溢价成交,有机构接盘") return risk_score, bear_points, bull_points class SentimentMonitor: """舆情热度监控""" def __init__(self, hot_threshold: float = 5.0, # 讨论量增加5倍,太热预警 cold_threshold: float = -0.8): # 讨论量减少80%,太凉预警 self.hot_threshold = hot_threshold self.cold_threshold = cold_threshold def analyze(self, data: StockNewsData) -> Tuple[int, List[str], List[str]]: risk_score = 0 bear_points = [] bull_points = [] # 讨论量突然暴涨,关注度太高,往往是见顶信号 if data.discussion_count_change >= self.hot_threshold: risk_score += 10 bear_points.append(f"股吧/雪球讨论量增加{data.discussion_count_change:.1f}倍,热度异常,可能见顶") # 舆情已经明显偏空 if data.sentiment_score <= -0.5: risk_score += 10 bear_points.append(f"市场舆情偏空,情感分{data.sentiment_score:.2f}") # 舆情明显偏多 elif data.sentiment_score >= 0.5: bull_points.append(f"市场舆情偏多,情感分{data.sentiment_score:.2f}") return risk_score, bear_points, bull_points class InternationalLinkageMonitor: """国际联动风险监控""" def __init__(self, ah_change_threshold: float = -3.0, # H股隔夜跌幅超过3%预警 commodity_change_threshold: float = -4.0, # 商品期货跌幅超过4%预警 us_index_change_threshold: float = -2.0): # 美股跌幅超过2%预警 self.ah_change_threshold = ah_change_threshold self.commodity_change_threshold = commodity_change_threshold self.us_index_change_threshold = us_index_change_threshold def analyze(self, data: StockNewsData) -> Tuple[int, List[str], List[str]]: risk_score = 0 bear_points = [] bull_points = [] # 1. A+H股,H股隔夜大跌预警 if data.is_ah: if data.ah_hk_overnight_change <= self.ah_change_threshold: risk_score += 15 bear_points.append(f"A+H股,H股隔夜大跌{data.ah_hk_overnight_change:.1f}%,A股大概率跟随低开,风险预警") elif data.ah_hk_overnight_change >= 3.0: bull_points.append(f"A+H股,H股隔夜大涨{data.ah_hk_overnight_change:.1f}%,对A股有正面带动") # 2. 大宗商品相关个股,对应期货隔夜大跌预警 if data.is_commodity_related: if data.commodity_future_overnight_change <= self.commodity_change_threshold: risk_score += 15 bear_points.append(f"大宗商品股,对应期货隔夜大跌{data.commodity_future_overnight_change:.1f}%,个股承压") elif data.commodity_future_overnight_change >= 4.0: bull_points.append(f"大宗商品股,对应期货隔夜大涨{data.commodity_future_overnight_change:.1f}%,对个股有利") # 3. 美股隔夜大跌,系统性风险预警 if data.us_index_overnight_change <= self.us_index_change_threshold: risk_score += 10 bear_points.append(f"美股隔夜大跌{data.us_index_overnight_change:.1f}%,A股开盘可能承压,系统性风险") elif data.us_index_overnight_change >= 2.0: bull_points.append(f"美股隔夜大涨{data.us_index_overnight_change:.1f}%,对A股开盘有利") # 4. 重大国际消息 if data.has_major_international_news: if data.international_news_sentiment == -1: risk_score += 20 bear_points.append("重大国际利空消息(加息/地缘政治等),系统性风险上升,建议降仓") elif data.international_news_sentiment == 1: bull_points.append("重大国际利好消息,市场情绪偏向乐观") return risk_score, bear_points, bull_points class StructuralMarketRisk: """ 结构化行情风控 A股现在经常是结构化行情,少数板块上涨,其他板块不动 需要控制板块集中度,防范热点透支 """ def __init__(self, single_sector_threshold: float = 0.15, # 单板块仓位超15%预警 sector_rally_threshold: float = 50.0, # 板块累计涨幅超50%预警 hot_news_sensitivity: float = 2.0): # 热点板块消息风险放大倍数 self.single_sector_threshold = single_sector_threshold self.sector_rally_threshold = sector_rally_threshold self.hot_news_sensitivity = hot_news_sensitivity def analyze_sector_risk(self, data: StockNewsData, total_portfolio_sectors: Dict[str, float]) -> Tuple[int, List[str], List[str]]: """ 分析板块风险 total_portfolio_sectors: {板块名称: 板块仓位比例},用来计算整体板块集中度 """ risk_score = 0 bear_points = [] bull_points = [] # 1. 单板块仓位超过阈值,提醒分散 if data.sector_position_ratio > self.single_sector_threshold: extra_risk = int((data.sector_position_ratio - self.single_sector_threshold) * 100) risk_score += extra_risk bear_points.append(f"单板块仓位{data.sector_position_ratio:.1%},超过{self.single_sector_threshold:.1%}预警,建议分散减仓") # 2. 板块连续大涨,提醒风险 if data.sector_total_gain >= self.sector_rally_threshold: risk_score += 15 bear_points.append(f"板块{data.sector_name}累计涨幅{data.sector_total_gain:.1f}%,超过{self.sector_rally_threshold:.0f}%,警惕利好出尽") # 3. 冷门板块连续大跌,基本面没问题提示低吸机会 # 这里只做提示,实际需要结合基本面判断 if data.sector_total_gain <= -20 and data.sector_total_gain >= -40: bull_points.append(f"板块{data.sector_name}累计跌幅{data.sector_total_gain:.1f}%,如果基本面没问题,可考虑适度低吸") return risk_score, bear_points, bull_points def adjust_news_risk_for_hot_sector(self, base_risk: int, data: StockNewsData) -> int: """热点板块消息风险灵敏度提高,放大风险分""" if data.sector_total_gain >= 30: # 热点板块,消息更容易透支,放大风险评分 return int(base_risk * self.hot_news_sensitivity) return base_risk class StyleConcentrationRisk: """风格集中度风险控制,不要全仓押注一种风格""" def __init__(self, single_style_threshold: float = 0.4): # 单一风格超40%预警 self.single_style_threshold = single_style_threshold def analyze_style_risk(self, total_style_pos: Dict[str, float]) -> Tuple[int, List[str]]: """检查风格集中度""" risk_score = 0 bear_points = [] for style, ratio in total_style_pos.items(): if ratio >= self.single_style_threshold: extra_risk = int((ratio - self.single_style_threshold) * 50) risk_score += extra_risk bear_points.append(f"单一风格{style}仓位{ratio:.1%},超过{self.single_style_threshold:.0%},建议分散配置") return risk_score, bear_points class NewsRiskMonitor: """总消息风险监控器,整合所有监控维度,适配结构化行情""" def __init__(self): self.pv_monitor = PriceVolumeMonitor() self.fin_monitor = FinanceMonitor() self.inst_monitor = InstitutionalMonitor() self.sent_monitor = SentimentMonitor() self.intl_monitor = InternationalLinkageMonitor() self.structural_rc = StructuralMarketRisk() def analyze_stock(self, data: StockNewsData, total_portfolio_sectors: Dict[str, float] = None) -> NewsRiskResult: """综合分析个股消息风险,支持结构化板块风控""" total_portfolio_sectors = total_portfolio_sectors or {} total_risk = 0 all_bear = [] all_bull = [] # 原有各维度打分 r1, b1, bl1 = self.pv_monitor.analyze(data) r2, b2, bl2 = self.fin_monitor.analyze(data) r3, b3, bl3 = self.inst_monitor.analyze(data) r4, b4, bl4 = self.sent_monitor.analyze(data) r5, b5, bl5 = self.intl_monitor.analyze(data) total_risk = r1 + r2 + r3 + r4 + r5 all_bear = b1 + b2 + b3 + b4 + b5 all_bull = bl1 + bl2 + bl3 + bl4 + bl5 # 新增结构化行情板块风控 if data.sector_name: r6, b6, bl6 = self.structural_rc.analyze_sector_risk(data, total_portfolio_sectors) # 热点板块消息风险放大 base_news_risk = total_risk total_risk = self.structural_rc.adjust_news_risk_for_hot_sector(base_news_risk, data) + r6 all_bear.extend(b6) all_bull.extend(bl6) # 计算风险等级 # 计算风险等级 if total_risk >= 40: risk_level = RiskLevel.EXIT elif total_risk >= 25: risk_level = RiskLevel.REDUCE elif total_risk >= 10: risk_level = RiskLevel.WATCH else: risk_level = RiskLevel.SAFE # 计算整体情绪 bull_count = len(all_bull) bear_count = len(all_bear) if bull_count - bear_count >= 2: sentiment = NewsSentiment.STRONG_BULL elif bull_count - bear_count >= 1: sentiment = NewsSentiment.BULL elif bear_count - bull_count >= 2: sentiment = NewsSentiment.STRONG_BEAR elif bear_count - bull_count >= 1: sentiment = NewsSentiment.BEAR else: sentiment = NewsSentiment.NEUTRAL # 生成操作建议 suggestion = self._get_suggestion(risk_level, sentiment) return NewsRiskResult( code=data.code, name=data.name, risk_level=risk_level, sentiment=sentiment, risk_score=total_risk, bear_points=all_bear, bull_points=all_bull, suggestion=suggestion ) def analyze_stocks(self, datas: List[StockNewsData]) -> List[NewsRiskResult]: """批量分析""" return [self.analyze_stock(d) for d in datas] def _get_suggestion(self, risk_level: RiskLevel, sentiment: NewsSentiment) -> str: """根据风险和情绪给出操作建议""" if risk_level == RiskLevel.EXIT: return "⚠️ 风险较高,建议减仓或清仓离场,规避黑天鹅" elif risk_level == RiskLevel.REDUCE: return "⚠️ 存在明显利空信号,建议降低仓位,控制风险" elif risk_level == RiskLevel.WATCH: if sentiment in [NewsSentiment.BULL, NewsSentiment.STRONG_BULL]: return "👀 有少量异常信号,但整体偏多,可继续观察,谨慎加仓" else: return "👀 存在轻度风险信号,继续观察,不新开仓" else: if sentiment in [NewsSentiment.BULL, NewsSentiment.STRONG_BULL]: return "✅ 无明显风险,整体偏多,可按计划操作" else: return "✅ 无明显风险,按计划持有" def get_risk_report(self, result: NewsRiskResult) -> str: """生成风险报告""" levels = { RiskLevel.SAFE: "🟢 安全", RiskLevel.WATCH: "🟡 关注", RiskLevel.REDUCE: "🟠 减仓", RiskLevel.EXIT: "🔴 离场", } sentiments = { NewsSentiment.STRONG_BULL: "🔼 强烈看多", NewsSentiment.BULL: "▶️ 看多", NewsSentiment.NEUTRAL: "➖ 中性", NewsSentiment.BEAR: "◀️ 看空", NewsSentiment.STRONG_BEAR: "🔽 强烈看空", } lines = [] lines.append("=" * 60) lines.append(f"个股消息风险评估: {result.name}({result.code})") lines.append("=" * 60) lines.append(f"风险等级: {levels[result.risk_level]} 风险分: {result.risk_score}/100") lines.append(f"市场情绪: {sentiments[result.sentiment]}") lines.append("") if result.bull_points: lines.append("✅ 利好信号:") for point in result.bull_points: lines.append(f" • {point}") lines.append("") if result.bear_points: lines.append("⚠️ 利空信号:") for point in result.bear_points: lines.append(f" • {point}") lines.append("") lines.append(f"💡 操作建议: {result.suggestion}") lines.append("=" * 60) return "\n".join(lines) def get_risk_score_for_risk_system(self, result: NewsRiskResult) -> float: """ 获取归一化风险分,给五维风险评估体系使用 返回 0~1,越高风险越大 """ return min(result.risk_score / 50, 1.0) # 整合到原有风控体系的五维风险评估 class FiveDimensionRiskAssessment: """ 五维度风险评估 整合:流动性风险 + 估值风险 + 技术面风险 + 基本面风险 + 消息面风险 """ def __init__(self, news_monitor: NewsRiskMonitor = None): self.news_monitor = news_monitor or NewsRiskMonitor() def calculate_total_risk(self, liquidity_risk: float, # 0~1 valuation_risk: float, # 0~1 technical_risk: float, # 0~1 fundamental_risk: float, # 0~1 news_data: StockNewsData) -> dict: """ 计算五维综合风险 每个维度输入都是0~1,越高风险越大 """ # 先算消息面风险 news_result = self.news_monitor.analyze_stock(news_data) news_risk = self.news_monitor.get_risk_score_for_risk_system(news_result) # 加权平均,消息面风险权重高一些,因为黑天鹅突发危害大 total_risk = ( liquidity_risk * 0.15 + valuation_risk * 0.20 + technical_risk * 0.20 + fundamental_risk * 0.20 + news_risk * 0.25 ) return { "total_risk_score": total_risk, # 0~1 "dimension_scores": { "liquidity": liquidity_risk, "valuation": valuation_risk, "technical": technical_risk, "fundamental": fundamental_risk, "news": news_risk }, "news_result": news_result, "overall_risk_level": self._get_level(total_risk), "suggestion": self._get_suggestion(total_risk) } def _get_level(self, score: float) -> str: if score >= 0.7: return "高风险" elif score >= 0.4: return "中风险" else: return "低风险" def _get_suggestion(self, score: float) -> str: if score >= 0.7: return "不参与,规避" elif score >= 0.4: return "轻仓参与,严格止损" else: return "正常参与,按计划执行" if __name__ == "__main__": # 测试案例 print("=== 测试1: 疑似提前泄露利空的个股 + 国际利空 ===") data1 = StockNewsData( code="600XXX", name="XX股份", recent_vol_change=1.2, # 放量120% recent_pct_change=-8.5, # 5天下跌8.5% gap_down=True, finance_balance_change=-0.25, # 融资减少25% discussion_count_change=3.0, sentiment_score=-0.6, has_major_international_news=True, international_news_sentiment=-1 ) monitor = NewsRiskMonitor() result1 = monitor.analyze_stock(data1) print(monitor.get_risk_report(result1)) print("\n=== 测试2: 明显利好信号个股 ===") data2 = StockNewsData( code="002XXX", name="XX科技", recent_vol_change=0.8, recent_pct_change=4.2, gap_up=True, finance_balance_change=0.4, # 融资增加40% has_large_order=False, discussion_count_change=2.0, sentiment_score=0.5 ) result2 = monitor.analyze_stock(data2) print(monitor.get_risk_report(result2)) print("\n=== 测试2: 大宗商品股 + 商品隔夜大跌 ===") data2 = StockNewsData( code="601899", name="紫金矿业", recent_vol_change=0.5, recent_pct_change=-2.0, is_commodity_related=True, commodity_future_overnight_change=-5.2, us_index_overnight_change=-2.5 ) result2 = monitor.analyze_stock(data2) print(monitor.get_risk_report(result2)) print("\n=== 测试3: A+H股 + H股隔夜大跌 ===") data3 = StockNewsData( code="600016", name="民生银行", recent_vol_change=0.1, recent_pct_change=-1.2, is_ah=True, ah_hk_overnight_change=-4.5 ) result3 = monitor.analyze_stock(data3) print(monitor.get_risk_report(result3)) print("\n=== 测试4: 五维风险评估 ===") five_dim = FiveDimensionRiskAssessment() eval_result = five_dim.calculate_total_risk( liquidity_risk=0.2, valuation_risk=0.5, technical_risk=0.3, fundamental_risk=0.4, news_data=data1 ) print(f"五维综合风险评分: {eval_result['total_risk_score']:.2f}") print(f"风险等级: {eval_result['overall_risk_level']}") print(f"建议: {eval_result['suggestion']}") print(f"各维度分数: {eval_result['dimension_scores']}")