diff --git a/src/blackboard/operations.py b/src/blackboard/operations.py index de4ce20..2d75f3e 100644 --- a/src/blackboard/operations.py +++ b/src/blackboard/operations.py @@ -935,3 +935,85 @@ class Blackboard: return row["round_count"] if row else 0 finally: conn.close() + + # =================================================================== + # Mention Queue (v2.9 #01) + # =================================================================== + + def record_mentions(self, comment_id: int, task_id: str, + mentioned_agents: List[str]) -> int: + """写入 mention_queue 记录(comment 创建时调用)""" + if not mentioned_agents: + return 0 + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + count = 0 + for agent_id in mentioned_agents: + exists = conn.execute( + "SELECT id FROM mention_queue WHERE comment_id=? AND mentioned_agent=?", + (comment_id, agent_id) + ).fetchone() + if not exists: + conn.execute( + "INSERT INTO mention_queue (comment_id, task_id, mentioned_agent, status) " + "VALUES (?,?,?,'pending')", + (comment_id, task_id, agent_id) + ) + count += 1 + conn.commit() + return count + finally: + conn.close() + + def get_pending_mentions(self, max_retries: int = 5) -> List[Dict[str, Any]]: + """获取所有 pending 且未超过重试上限的 mentions""" + conn = self._conn() + try: + rows = conn.execute( + """SELECT mq.*, c.body as comment_body, c.author as comment_author + FROM mention_queue mq + JOIN comments c ON mq.comment_id = c.id + WHERE mq.status = 'pending' AND mq.retry_count < ? + ORDER BY mq.created_at ASC""", + (max_retries,) + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + def mark_mention_notified(self, mention_id: int) -> bool: + """标记 mention 为已通知""" + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + "UPDATE mention_queue SET status='notified', notified_at=datetime('now') WHERE id=?", + (mention_id,) + ) + conn.commit() + return True + finally: + conn.close() + + def mark_mention_retry(self, mention_id: int) -> bool: + """递增 mention 重试计数""" + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute("UPDATE mention_queue SET retry_count=retry_count+1 WHERE id=?", (mention_id,)) + conn.commit() + return True + finally: + conn.close() + + def mark_mention_failed(self, mention_id: int) -> bool: + """标记 mention 为失败(超过重试上限)""" + conn = self._conn() + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute("UPDATE mention_queue SET status='failed' WHERE id=?", (mention_id,)) + conn.commit() + return True + finally: + conn.close()