Files
sanguo_moziplus_v2/docs/design/14-toolchain-event-hub.md
T
cfdaily 888e2d23c6
Deploy / ci (push) Waiting to run
Deploy / deploy (push) Blocked by required conditions
Deploy / notify-deploy-failure (push) Blocked by required conditions
auto-sync: 2026-06-07 11:13:34
2026-06-07 11:13:34 +08:00

19 KiB
Raw Blame History

工具链事件中枢设计

版本: v1.0-draft 日期: 2026-06-07 作者: 庞统(副军师)🐦 状态: 待评审 前置文档: 13-toolchain-and-dev-workflow.md §15(串联架构 v2.0 定稿)


§0. 讨论纪要

本节记录设计过程中的关键讨论,供未来回溯。

两条线的定位(主公决策)

线 方向 机制 定位
出线 Agent → Gitea Agent 直接操作 Gitea,完成后发 Mail 通知下一个 Agent Gitea = 黑板,Mail = 触发器
入线 外部 → Agent 外部事件(CI/部署/Webhook)→ 事件中枢 → Mail → Agent 事件中枢 = 入口

原则 1:Agent 之间的协作不走事件中枢。中枢只处理"外部 → Agent"。 原则 2:所有工具链留痕在 Gitea,Mail 是辅助推送。即使 Mail 失败,Agent 可主动查 Gitea 恢复上下文。 原则 3:工具链反馈 Agent 的通知统一走事件中枢,避免散乱。

关键决策记录

# 决策 理由
D1 不做第三种 task 类型 中枢产出就是 Mailfrom=system, type=inform),模板填充 description,投递复用 spawner。区别只在内容(模板化)和元数据(结构化),不需要新的数据结构
D2 提示词独立模板文件 模板和代码分离,改文案不改代码,可审查可扩展
D3 中枢是 daemon 内模块 要访问 Blackboard 创建 Task,共享进程。用独立路由模块(toolchain_routes.py)保持职责清晰
D4 同步处理 事件处理很轻(解析+模板填充+创建Task),远在 Gitea 5秒超时内。复杂事件未来用 asyncio.create_task
D5 Agent ID = Gitea 用户名 设计目标一致,代码里留 to_agent_id() 函数兜底,实际直用。待姜维确认各 Agent 在 Gitea 上的注册用户名
D6 CI/部署通知也走事件中枢 统一入口,不走 workflow 直接调 Mail API。CI workflow 写 PR comment → Gitea 触发 issue_comment Webhook → 中枢处理

§1. 定位与边界

1.1 是什么

事件中枢是 daemon 内的一个路由模块,负责将外部工具链事件翻译成 Mail 通知推送给 Agent。

1.2 不是什么

  • 不是消息队列
  • 不是 Agent 间通信通道(Agent 间协作走 Mail,不经过中枢)
  • 不是编排引擎(任务编排由 dispatcher + spawner 负责)

1.3 两条线

出线(Agent → 工具链):
  Agent 直接操作 Gitea(创建 Issue/PR/Review 等)
  → 完成后发 Mail 通知下一个 Agent
  → 下一个 Agent 去读 Gitea 继续干活

入线(工具链 → Agent):
  外部事件(CI 结果/部署结果/Webhook 事件)
  → 事件中枢
  → 模板化 Mail
  → Agent 收到通知,去 Gitea 查看详情并行动

1.4 数据流

外部事件源                    事件中枢                        Agent
─────────                    ──────                        ──────

Gitea Webhook ─────┐
                   │
CI workflow ───────┤    toolchain_routes.py          Mail Task
  (PR comment) ────┤    ┌─────────────────┐         (from: system)
                   ├──→│ 1. 接收事件      │              │
Deploy workflow ──┤    │ 2. 验签/过滤     │              ↓
  (PR comment) ────┤    │ 3. 幂等检查      │         dispatcher
                   │    │ 4. 选模板+填充   │              │
daemon 内部 ───────┘    │ 5. 创建 Mail    │              ↓
  (预留接口)           └─────────────────┘         spawner 投递

§2. 事件分类与处理

2.1 事件来源

来源 触发方式 事件类型
Gitea Webhook Gitea 主动 POST PR opened, Review submitted, Issue assigned, issue_comment
CI workflow CI 写 PR comment → 触发 issue_comment Webhook CI 失败
Deploy workflow Deploy 写 PR comment → 触发 issue_comment Webhook 部署成功/失败
daemon 内部 内部函数调用 预留,当前不走中枢

2.2 事件处理矩阵

事件 来源 通知谁 模板 关键信息
pull_request opened Gitea Webhook 司马懿(Review review_request.md PR号、标题、作者、分支、文件列表、风险级别
pull_request_review submitted Gitea Webhook PR 作者 review_result.md PR号、审查结论、评论、审查者
issues assigned Gitea Webhook 被指派人 issue_assigned.md Issue号、标题、标签、描述、分支名建议
issue_comment (CI失败) CI workflow → Webhook PR 作者 ci_failure.md PR号、分支、失败步骤、错误摘要
issue_comment (部署成功) Deploy workflow → Webhook 庞统 deploy_success.md 仓库、版本、commit
issue_comment (部署失败) Deploy workflow → Webhook 庞统 + 姜维 deploy_failure.md 仓库、失败原因、已回滚版本

2.3 事件过滤规则

不是所有事件都需要处理:

规则 说明
只处理白名单内的事件类型 未知的忽略 + 日志
issue_comment 需判断来源 只处理 CI/deploy workflow 写的评论(按特定前缀匹配,如 [CI][Deploy]
PR 作者/审查者必须是已知 Agent 未知的忽略 + 日志
幂等:同一事件不重复创建 Mail {event_type}-{payload_id} 去重

§3. 模板系统

3.1 模板文件组织

templates/toolchain/
├── review_request.md      # PR opened → 司马懿 Review
├── review_result.md       # Review submitted → PR 作者
├── issue_assigned.md      # Issue assigned → 开发者
├── ci_failure.md          # CI 失败 → PR 作者
├── deploy_success.md      # 部署成功 → 庞统
└── deploy_failure.md      # 部署失败 → 庞统+姜维

3.2 模板格式

模板使用 {variable} 占位符,中枢填充后生成 Mail description。

示例:review_request.md

PR Review 请求

PR: http://192.168.2.154:3000/{repo}/pulls/{pr_number}
标题: {pr_title}
作者: {pr_author}
分支: {branch}
风险级别: {risk_level}
改动文件:
{file_list}

流程:
1. 读取 PR diffGitea API: GET /repos/{repo_owner}/{repo}/pulls/{pr_number}.diff
2. 按审查清单审查(参考 code-review Skill
3. 提交 ReviewGitea API: POST /repos/{repo_owner}/{repo}/pulls/{pr_number}/reviews
4. 提交后改动者会自动收到通知

完成后回复此 Mail 确认。

3.3 模板变量提取

变量 来源 提取方式
pr_number Webhook payload event["pull_request"]["number"]
pr_title Webhook payload event["pull_request"]["title"]
pr_author Webhook payload to_agent_id(event["pull_request"]["user"]["login"])
branch Webhook payload event["pull_request"]["head"]["ref"]
file_list Webhook payload event["pull_request"]["changed_files"] 或需要额外 API 调用
risk_level daemon 计算 按文件路径规则匹配(见 §3.4
repo Webhook payload event["repository"]["full_name"] 提取

3.4 风险级别自动判定(简化版)

按改动文件路径匹配规则:

HIGH_PATTERNS = ["**/spawner*", "**/ticker*", "**/dispatcher*", 
                 "**/router*", "**/guardrails*", "**/strategy*", "**/risk*"]

def calc_risk_level(changed_files: list[str]) -> str:
    for f in changed_files:
        for pattern in HIGH_PATTERNS:
            if fnmatch(f, pattern):
                return "high"
    return "standard"

§4. 技术设计

4.1 模块结构

src/api/
├── toolchain_routes.py    # 事件中枢路由(~150行)
├── mail_routes.py         # 现有 Mail API
└── ...

src/daemon/
├── toolchain_templates.py # 模板加载+填充(~80行)
├── mail_notify.py         # 现有 Mail 失败通知
└── ...

templates/toolchain/
├── review_request.md
├── review_result.md
├── issue_assigned.md
├── ci_failure.md
├── deploy_success.md
└── deploy_failure.md

4.2 toolchain_routes.py 接口设计

# src/api/toolchain_routes.py

from fastapi import APIRouter, Request, Header, HTTPException
from src.daemon.toolchain_templates import TemplateEngine

router = APIRouter()
engine = TemplateEngine()

GITEA_WEBHOOK_SECRET = os.environ.get("GITEA_WEBHOOK_SECRET", "")

@router.post("/webhook/gitea")
async def handle_gitea_webhook(
    request: Request,
    x_gitea_event: str = Header(...),
    x_gitea_signature: str = Header(None),
    x_gitea_delivery: str = Header(None),
):
    """接收 Gitea Webhook,翻译成 Mail"""
    
    body = await request.body()
    
    # 1. 签名验证(可选)
    if GITEA_WEBHOOK_SECRET:
        expected = hmac.new(GITEA_WEBHOOK_SECRET.encode(), body, sha256).hexdigest()
        if not hmac.compare_digest(expected, (x_gitea_signature or "")):
            raise HTTPException(403, "Invalid signature")
    
    event = json.loads(body)
    
    # 2. 幂等检查
    event_key = f"{x_gitea_event}-{x_gitea_delivery}"
    if is_duplicate(event_key):
        return {"status": "duplicate"}
    
    # 3. 路由到对应处理器
    handler = HANDLERS.get(x_gitea_event)
    if not handler:
        logger.info("Ignoring unhandled event: %s", x_gitea_event)
        return {"status": "ignored"}
    
    # 4. 处理事件 → 创建 Mail
    try:
        result = await handler(engine, event)
        return {"status": "ok", "mail_id": result}
    except Exception as e:
        logger.exception("Failed to handle %s event", x_gitea_event)
        raise HTTPException(500, str(e))

4.3 事件处理器

每个事件类型一个处理函数,职责:解析 payload → 选模板 → 填充 → 创建 Mail Task。

async def handle_pr_opened(engine: TemplateEngine, event: dict) -> str:
    """PR opened → Review 请求给司马懿"""
    pr = event["pull_request"]
    
    # 提取变量
    variables = {
        "pr_number": pr["number"],
        "pr_title": pr["title"],
        "pr_author": to_agent_id(pr["user"]["login"]),
        "branch": pr["head"]["ref"],
        "repo": event["repository"]["full_name"],
        "repo_owner": event["repository"]["owner"]["login"],
        "risk_level": calc_risk_level(get_changed_files(pr)),
        "file_list": format_file_list(get_changed_files(pr)),
    }
    
    # 填充模板
    text = engine.render("review_request.md", variables)
    
    # 创建 Mail Task
    meta = {
        "source": "toolchain",
        "webhook_event": "pull_request_opened",
        "pr_number": pr["number"],
        "repo": variables["repo"],
    }
    
    return create_mail_task(
        to="simayi-challenger",
        title=f"Review 请求: PR #{pr['number']} {pr['title']}",
        text=text,
        meta=meta,
    )


async def handle_review_submitted(engine: TemplateEngine, event: dict) -> str:
    """Review submitted → 结果通知 PR 作者"""
    review = event["review"]
    pr = event["pull_request"]
    pr_author = to_agent_id(pr["user"]["login"])
    state = review["state"]  # APPROVED / REQUEST_CHANGES / COMMENTED
    
    if state == "COMMENTED":
        return None  # 普通评论不通知
    
    variables = {
        "pr_number": pr["number"],
        "pr_title": pr["title"],
        "result": "通过" if state == "APPROVED" else "不通过",
        "reviewer": to_agent_id(review["user"]["login"]),
        "review_body": review["body"] or "无",
        "repo": event["repository"]["full_name"],
        "repo_owner": event["repository"]["owner"]["login"],
    }
    
    template = "review_result.md"
    text = engine.render(template, variables)
    
    return create_mail_task(
        to=pr_author,
        title=f"Review {variables['result']}: PR #{pr['number']}",
        text=text,
        meta={"source": "toolchain", "webhook_event": "review_submitted", "pr_number": pr["number"]},
    )


async def handle_issue_comment(engine: TemplateEngine, event: dict) -> str:
    """issue_comment → 判断来源,路由到 CI/部署通知"""
    comment_body = event["comment"]["body"]
    
    if comment_body.startswith("[CI]"):
        return await handle_ci_comment(engine, event)
    elif comment_body.startswith("[Deploy]"):
        return await handle_deploy_comment(engine, event)
    
    return None  # 非 CI/部署评论不处理


# HANDLERS 注册表
HANDLERS = {
    "pull_request": handle_pr_event,          # 根据 action 分发
    "pull_request_review": handle_review_submitted,
    "issues": handle_issue_event,             # 根据 action 分发
    "issue_comment": handle_issue_comment,
}

4.4 共用函数

def to_agent_id(gitea_username: str) -> str:
    """Gitea 用户名 → Agent ID。当前设计目标一致,直用。"""
    return gitea_username

def create_mail_task(to: str, title: str, text: str, meta: dict) -> str:
    """创建 Mail Taskfrom=system, type=inform
    复用 mail_notify.py 的模式:直接通过 Blackboard 创建 Task。
    """
    # 从 mail_routes.py 提取共用逻辑,或直接复用 mail_notify 的方式
    ...

def is_duplicate(event_key: str) -> bool:
    """幂等检查:同一事件不重复创建 Mail"""
    ...

def calc_risk_level(files: list[str]) -> str:
    """按文件路径规则判定风险级别"""
    ...

def get_changed_files(pr: dict) -> list[str]:
    """从 PR payload 提取改动文件列表"""
    ...

4.5 模板引擎

# src/daemon/toolchain_templates.py

from pathlib import Path
import string

TEMPLATE_DIR = Path(__file__).parent.parent.parent / "templates" / "toolchain"

class TemplateEngine:
    def __init__(self, template_dir: Path = TEMPLATE_DIR):
        self.template_dir = template_dir
        self._cache = {}
    
    def render(self, template_name: str, variables: dict) -> str:
        """加载模板文件并填充变量"""
        if template_name not in self._cache:
            path = self.template_dir / template_name
            self._cache[template_name] = path.read_text()
        
        template = self._cache[template_name]
        return template.format_map(defaultdict(str, variables))

§5. 错误处理

场景 处理 HTTP 返回
签名验证失败 日志 warning 403
payload 解析失败 日志 error 200(不触发 Gitea 重试)
未知事件类型 忽略 + 日志 info 200
幂等检测到重复 忽略 + 日志 info 200
未知 Agent(不在映射表) 忽略 + 日志 warning 200
模板填充失败 日志 error 500(触发 Gitea 重试)
Mail 创建失败 日志 error 500(触发 Gitea 重试)

原则

  • daemon 自身能处理的错误(格式、过滤、幂等)→ 返回 200,不让 Gitea 无意义重试
  • daemon 处理不了的错误(数据库、内部异常)→ 返回 500,让 Gitea 重试

§6. CI/Deploy Workflow 配合

6.1 CI 失败通知

CI workflow 失败时写 PR comment,触发 issue_comment Webhook

# .gitea/workflows/ci.yml
- name: Report failure
  if: failure()
  run: |
    curl -s -X POST \
      "http://192.168.2.154:3000/api/v1/repos/{owner}/{repo}/issues/{pr_number}/comments" \
      -H "Authorization: token ${{ secrets.CI_TOKEN }}" \
      -H "Content-Type: application/json" \
      -d "{
        \"body\": \"[CI] CI 失败\\n\\n分支: ${{ gitea.ref_name }}\\n失败步骤: ${{ job.status }}\\n错误摘要: $(tail -20 $GITHUB_STEP_SUMMARY)\"
      }"

6.2 Deploy 结果通知

同理,deploy workflow 成功/失败时写 comment

# .gitea/workflows/deploy.yml
- name: Report success
  if: success()
  run: |
    curl -s -X POST \
      "http://192.168.2.154:3000/api/v1/repos/{owner}/{repo}/issues/{pr_number}/comments" \
      -H "Authorization: token ${{ secrets.CI_TOKEN }}" \
      -d "{\"body\": \"[Deploy:成功] 版本: $(bash scripts/deploy.sh --version)\\n请确认后关闭 Issue.\"}"

- name: Report failure
  if: failure()
  run: |
    curl -s -X POST \
      "http://192.168.2.154:3000/api/v1/repos/{owner}/{repo}/issues/{pr_number}/comments" \
      -H "Authorization: token ${{ secrets.CI_TOKEN }}" \
      -d "{\"body\": \"[Deploy:失败] 已回滚到上一版本。原因: ...\"}"

§7. 配置

环境变量 用途 默认值
GITEA_WEBHOOK_SECRET Webhook HMAC 签名密钥(可选) 空(跳过验签)
BLACKBOARD_ROOT Blackboard 数据根目录(已有) ~/.sanguo_projects/sanguo_moziplus_v2/data
TOOLCHAIN_TEMPLATES_DIR 模板文件目录(可选) templates/toolchain/

§8. 待确认项

# 负责人 说明
1 各 Agent 在 Gitea 上的注册用户名是否和 Agent ID 一致 姜维 决定是否需要映射表
2 Gitea Webhook 是否已配置 secret 姜维 当前已配未启用
3 CI workflow 是否已有写 PR comment 的 step 当前 CI workflow 可能没有
4 from=system 走 HTTP API 还是只走内部函数 mail_routes.py 当前只在内部函数支持 system
5 PR changed_files 是否包含在 Webhook payload 中 Gitea v1.23.4 可能需要额外 API 调用

§9. 实施计划

步骤 内容 依赖
1 toolchain_routes.py 骨架 + Gitea Webhook 接收 + 签名验证
2 toolchain_templates.py 模板引擎
3 6 个模板文件 步骤 2
4 4 个事件处理器(PR/Review/Issue/Comment 步骤 1+3
5 幂等检查(内存缓存或 SQLite 记录) 步骤 1
6 create_mail_task 共用函数(从 mail_routes.py 提取)
7 CI/Deploy workflow 加 comment step 步骤 1+4
8 Gitea Webhook 启用 + 测试 姜维
9 端到端测试 步骤 1-8

9.1 改动量估算

文件 行数 类型
src/api/toolchain_routes.py ~200 新增
src/daemon/toolchain_templates.py ~60 新增
templates/toolchain/*.md ~2006个文件) 新增
src/api/mail_routes.py ~20 修改(提取共用函数)
.gitea/workflows/ci.yml ~15 修改(加 comment step
.gitea/workflows/deploy.yml ~15 修改(加 comment step
总计 ~510

§10. 评审检查清单

  • §0 讨论纪要是否准确反映了决策过程
  • §1 两条线的边界是否清晰(出线不走中枢,入线统一走中枢)
  • §2 事件矩阵是否有遗漏
  • §3 模板内容是否完整,变量提取是否正确
  • §4 接口设计是否合理,共用函数提取是否恰当
  • §5 错误处理策略是否完备
  • §6 CI/Deploy workflow 配合方案是否可行
  • §8 待确认项是否完整
  • §9 实施步骤是否合理