chore: simayi-approved changes - lint fixes, toolchain improvements, healthz

All changes reviewed and APPROVED in PR #12 (Review ID: 40):
- toolchain_routes: webhook repo/org format compat, content dedup (sha256), closed issue filter
- dispatcher: inform mail crash 误标 done 修复
- ticker: cleanup and improvements
- healthz endpoint
- conftest: integration/e2e deselect markers
- docs: design docs, test-guide updates
- various lint/whitespace fixes across 30 files
This commit is contained in:
cfdaily
2026-06-09 23:35:36 +08:00
parent a1a4d7c5a7
commit f7fbdac89c
30 changed files with 362 additions and 125 deletions
+13 -15
View File
@@ -5,17 +5,17 @@ from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task, Review
from src.blackboard.queries import Queries
from src.blackboard.db import VALID_STATUSES, OUTPUT_TYPES
from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS, COMMENT_TYPES, OUTPUT_TYPES
from src.blackboard.registry import ProjectRegistry
import src.utils as _utils
from src.utils import get_data_root
router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"])
@@ -27,7 +27,7 @@ def _validate_project(project_id: str) -> str:
"""校验 project_id,已知项目/虚拟项目放行,未知项目返回 400"""
if project_id in _VIRTUAL_PROJECTS:
return project_id
reg = ProjectRegistry(_utils.get_data_root())
reg = ProjectRegistry(get_data_root())
if reg.get_project(project_id):
return project_id
raise HTTPException(400, {
@@ -43,12 +43,12 @@ def _validate_project(project_id: str) -> str:
def _bb(project_id: str) -> Blackboard:
_validate_project(project_id)
return Blackboard(_utils.get_data_root() / project_id / "blackboard.db")
return Blackboard(get_data_root() / project_id / "blackboard.db")
def _q(project_id: str) -> Queries:
_validate_project(project_id)
return Queries(_utils.get_data_root() / project_id / "blackboard.db")
return Queries(get_data_root() / project_id / "blackboard.db")
# --- Tasks ---
@@ -100,7 +100,7 @@ async def create_task(project_id: str, body: Dict[str, Any]):
date_str = datetime.now().strftime('%Y%m%d')
# seq: 查当前项目最大 seq
import sqlite3
db_path = _utils.get_data_root() / project_id / "blackboard.db"
db_path = get_data_root() / project_id / "blackboard.db"
try:
conn = sqlite3.connect(str(db_path), timeout=5)
max_id_row = conn.execute(
@@ -240,7 +240,7 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
})
if not bb.update_task_status(task_id, new_status,
agent=body.get("agent")):
agent=body.get("agent")):
raise HTTPException(409, {
"error": "transition_failed",
"detail": f"Status update failed for {task_id}",
@@ -265,7 +265,6 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
# --- @mention 自动提取(#04 ---
_KNOWN_AGENT_IDS: list = []
def _init_agent_ids():
"""从配置文件加载 Agent ID 列表"""
global _KNOWN_AGENT_IDS
@@ -280,7 +279,6 @@ def _init_agent_ids():
except Exception:
_KNOWN_AGENT_IDS = []
def _extract_mentions(text: str) -> list:
"""从文本中自动提取 @agent-id 格式的 mention"""
import re
@@ -319,8 +317,8 @@ async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]):
merged_mentions = list(set(explicit_mentions + auto_mentions))
cid = bb.add_comment(task_id, body["author"], comment_body,
comment_type=body.get("comment_type", "general"),
mentions=merged_mentions)
comment_type=body.get("comment_type", "general"),
mentions=merged_mentions)
if merged_mentions:
bb.record_mentions(cid, task_id, merged_mentions)
# #10: SSE 通知前端黑板有新 comment
@@ -426,8 +424,8 @@ async def get_decisions(project_id: str, task_id: str):
async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
did = bb.add_decision(task_id, body["decider"], body["decision"],
body["rationale"],
alternatives=body.get("alternatives"))
body["rationale"],
alternatives=body.get("alternatives"))
return {"ok": True, "decision_id": did}
@@ -437,7 +435,7 @@ async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]):
async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
oid = bb.add_observation(task_id, body["observer"], body["body"],
severity=body.get("severity", "info"))
severity=body.get("severity", "info"))
return {"ok": True, "observation_id": oid}
+2 -2
View File
@@ -10,7 +10,7 @@ from pydantic import BaseModel
from typing import Optional
from src.blackboard.operations import Blackboard
import src.utils as _utils
from src.utils import get_data_root
router = APIRouter(prefix="/api/projects/{project_id}/tasks/{task_id}/checkpoints", tags=["checkpoints"])
@@ -33,7 +33,7 @@ class ResolveCheckpointRequest(BaseModel):
# ── 工具 ──
def _bb(project_id: str) -> Blackboard:
db_path = _utils.get_data_root() / project_id / "blackboard.db"
db_path = get_data_root() / project_id / "blackboard.db"
if not db_path.exists():
raise HTTPException(status_code=404, detail="Project not found")
return Blackboard(db_path)
+4 -5
View File
@@ -9,7 +9,7 @@ from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Query
@@ -17,7 +17,7 @@ from src.blackboard.db import init_db
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
import src.utils as _utils
from src.utils import get_data_root
def _get_valid_agents() -> set:
@@ -36,14 +36,13 @@ def _get_valid_agents() -> set:
# fallback:硬编码
return {"zhangfei-dev", "guanyu-dev", "zhaoyun-data", "jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"}
router = APIRouter(prefix="/api/mail", tags=["mail"])
MAIL_PROJECT_ID = "_mail"
def _db_path() -> Path:
root = _utils.get_data_root()
root = get_data_root()
db = root / MAIL_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
@@ -223,7 +222,7 @@ async def send_mail(body: Dict[str, Any]):
# A8: 只有原邮件的双方能回复(严格 1 对 1)
if from_agent not in (orig_from, orig_to):
raise HTTPException(400, "只有邮件的发送者或接收者可以回复")
raise HTTPException(400, f"只有邮件的发送者或接收者可以回复")
# A6/A7: 自动纠正 to → 原邮件发件者
to_agent = body.get("to", "").strip()
+4 -4
View File
@@ -3,18 +3,18 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.registry import ProjectRegistry
import src.utils as _utils
from src.utils import get_data_root
router = APIRouter(prefix="/api/projects", tags=["projects"])
def _registry() -> ProjectRegistry:
return ProjectRegistry(_utils.get_data_root())
return ProjectRegistry(get_data_root())
@router.get("")
@@ -76,7 +76,7 @@ async def list_projects():
async def create_project(body: Dict[str, Any]):
reg = _registry()
try:
reg.create_project(
info = reg.create_project(
body["id"], body["name"],
agents=body.get("agents", []),
description=body.get("description", ""),
+70 -15
View File
@@ -28,7 +28,7 @@ from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.config.agents import AGENT_IDS
from src.daemon.toolchain_templates import render_template
import src.utils as _utils
from src.utils import get_data_root
logger = logging.getLogger(__name__)
@@ -46,17 +46,42 @@ _TTL_SECONDS = 7 * 24 * 3600
_idempotency_lock = asyncio.Lock()
def _is_duplicate(event: str, delivery: str) -> bool:
"""检查 Webhook 是否重复投递,自动清理过期条目。"""
def _is_duplicate(event: str, delivery: str, payload: Optional[Dict[str, Any]] = None) -> bool:
"""检查 Webhook 是否重复投递,自动清理过期条目。
双重去重策略
1. delivery UUID 去重标准幂等
2. payload 内容去重应对 Gitea v1.23.4 webhookNotifier + actionsNotifier
对同一 review 生成不同 UUID 的双投递问题
"""
now = time.time()
# 清理过期条目
while _delivery_timestamps and (now - _delivery_timestamps[0][0]) > _TTL_SECONDS:
_, key = _delivery_timestamps.pop(0)
_delivery_cache.discard(key)
# 检查 delivery UUID 去重
key = f"{event}-{delivery}"
if key in _delivery_cache:
return True
# 检查 payload 内容去重(review 事件:同一 PR + 同一用户 + 同一内容)
# 注意:Gitea webhookNotifier 用 review.bodyactionsNotifier 用 review.content
# 所以去重 key 需要同时取两个字段,确保两种格式生成相同 key
if payload and "review" in event:
pr_num = payload.get("pull_request", {}).get("number")
sender = payload.get("sender", {}).get("login")
review = payload.get("review", {})
# 取 body 或 content,优先 bodywebhookNotifier 格式)
content = review.get("body", "") or review.get("content", "")
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
content_key = f"content:{event}:{pr_num}:{sender}:{content_hash}"
if content_key in _delivery_cache:
logger.info("Content-based duplicate detected: %s PR#%s by %s", event, pr_num, sender)
return True
_delivery_cache.add(content_key)
_delivery_timestamps.append((now, content_key))
_delivery_cache.add(key)
_delivery_timestamps.append((now, key))
return False
@@ -141,12 +166,13 @@ def _calc_risk_level(changed_files: List[str]) -> str:
# ---------------------------------------------------------------------------
MAIL_PROJECT_ID = "_mail"
def _mail_db_path() -> Path:
"""获取 Mail 数据库路径,确保目录存在。"""
root = _utils.get_data_root()
root = get_data_root()
db = root / MAIL_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
@@ -257,7 +283,12 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。"""
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。
支持两种 payload 格式
- repo webhook: review.state = "APPROVED" / "REQUEST_CHANGES"
- org webhook (Gitea v1.23.4): review.type = "pull_request_review_approved" / "pull_request_review_rejected"
"""
review = payload.get("review")
if not review or not isinstance(review, dict):
logger.warning("pull_request_review event missing review field, skipping")
@@ -266,7 +297,18 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
if not pr or not isinstance(pr, dict):
logger.warning("pull_request_review event missing pull_request field, skipping")
return
# 兼容两种 payload 格式提取 state
state = review.get("state", "")
if not state:
# org webhook 格式:review.type = "pull_request_review_approved"
review_type = review.get("type", "")
type_map = {
"pull_request_review_approved": "APPROVED",
"pull_request_review_rejected": "REQUEST_CHANGES",
"pull_request_review_comment": "COMMENTED",
}
state = type_map.get(review_type, "")
# 只通知 APPROVED 和 REQUEST_CHANGES,跳过 COMMENTED 和其他状态
if state == "COMMENTED":
@@ -276,8 +318,9 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
reviewer = review.get("user", {}).get("login", "unknown")
review_body = review.get("body", "(无评论)")
# 兼容:org webhook 的 review 没有 user,从 sender 取
reviewer = review.get("user", {}).get("login", "") or payload.get("sender", {}).get("login", "unknown")
review_body = review.get("body", "") or review.get("content", "(无评论)")
result_map = {"APPROVED": "通过 ✓", "REQUEST_CHANGES": "驳回 ✗"}
if state not in result_map:
@@ -371,6 +414,12 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
if not issue or not isinstance(issue, dict):
logger.warning("issue_comment event missing issue field, skipping")
return
# 已关闭的 Issue/PR 不再发送 CI 失败通知
if issue.get("state") == "closed":
logger.debug("Skipping CI failure notification for closed issue #%s", issue.get("number"))
return
repo = _repo_fullname(payload)
issue_number = issue.get("number", 0)
@@ -400,6 +449,12 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
_EVENT_HANDLERS: Dict[str, Any] = {
"pull_request": _handle_pull_request,
"pull_request_review": _handle_pull_request_review,
"pull_request_review_approved": _handle_pull_request_review,
"pull_request_review_rejected": _handle_pull_request_review,
"pull_request_review_comment": _handle_pull_request_review,
# Gitea v1.23.4 实际发出的 review 子事件(无 _review_ 中间段)
"pull_request_approved": _handle_pull_request_review,
"pull_request_rejected": _handle_pull_request_review,
"issues": _handle_issues,
"issue_comment": _handle_issue_comment,
}
@@ -432,20 +487,20 @@ async def gitea_webhook(
logger.warning("Webhook signature verification failed")
return Response(status_code=403, content="signature verification failed")
# 2. 幂等检查
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
if _is_duplicate(x_gitea_event, x_gitea_delivery):
logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 3. 解析 payload
# 3. 解析 payload(提前解析,用于幂等检查
try:
payload = await request.json()
except Exception:
logger.warning("Failed to parse webhook payload")
return Response(status_code=200, content="invalid payload")
# 2. 幂等检查(需要在 payload 解析后,以支持内容去重)
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
if _is_duplicate(x_gitea_event, x_gitea_delivery, payload):
logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 4. 查找 handler
handler = _EVENT_HANDLERS.get(x_gitea_event or "")
if not handler: