auto-sync: 2026-05-16 13:39:32
This commit is contained in:
@@ -2,8 +2,9 @@
|
||||
|
||||
> **日期**: 2026-05-16
|
||||
> **作者**: 庞统(副军师)🐦
|
||||
> **状态**: 初稿待确认
|
||||
> **状态**: v2(并发调度模型重设计,待评审)
|
||||
> **前置**: 课题1-4、课题6 已完成设计
|
||||
> **变更**: v2 新增 §5.4 并发调度模型(per-project 线程 + 全局资源信号量),替代原串行 tick
|
||||
|
||||
---
|
||||
|
||||
@@ -141,21 +142,11 @@ class ProjectManager:
|
||||
return global_guardrails
|
||||
```
|
||||
|
||||
### 5.2 Tick 逻辑变更
|
||||
### 5.2 ~~Tick 逻辑变更~~(已废弃,见 §5.4)
|
||||
|
||||
```python
|
||||
# Daemon 主循环
|
||||
def tick(self):
|
||||
registry = load_registry()
|
||||
for project_id, project_meta in registry["projects"].items():
|
||||
if project_meta["status"] != "active":
|
||||
continue
|
||||
conn = self.project_manager.get_connection(project_id)
|
||||
config = self.project_manager.get_config(project_id)
|
||||
self._tick_project(project_id, conn, config) # 原有的 tick 逻辑
|
||||
```
|
||||
|
||||
每个项目的 tick 逻辑完全相同,只是操作的 SQLite 连接不同。
|
||||
> **原设计**:Daemon 主循环串行遍历所有项目 tick。每个项目 tick 完再 tick 下一个。
|
||||
> **问题**:所有项目/任务一起排队,项目 A 的长任务阻塞项目 B。
|
||||
> **新设计**:见 §5.4 per-project 并发调度。
|
||||
|
||||
### 5.3 Daemon 逻辑健康自检
|
||||
|
||||
@@ -176,6 +167,206 @@ class DaemonHealth:
|
||||
return self._tick_state_changes[project_id] >= STALE_TICK_THRESHOLD
|
||||
```
|
||||
|
||||
### 5.4 并发调度模型(v2 新增)
|
||||
|
||||
#### 5.4.1 问题
|
||||
|
||||
原设计中 Daemon 主循环串行 tick 所有项目:
|
||||
|
||||
```
|
||||
Tick → Project A(30s)→ Project B(等A完成)→ Project C(等B完成)
|
||||
```
|
||||
|
||||
问题:
|
||||
1. **项目间互相阻塞**——Project A 有一个长任务在执行,Project B 的独立任务必须等
|
||||
2. **响应延迟**——3 个项目 tick 一次可能要 90s+,Project C 要等 60s 才被检查
|
||||
3. **不符合业界实践**——调研 7 个项目(Hermes/open-multi-agent/Wanman/Google ADK/Microsoft AutoGen/AgentScope/GSD),没有一个用全局串行排队
|
||||
|
||||
#### 5.4.2 业界并发模型调研
|
||||
|
||||
| 项目 | 并发模型 | 核心机制 |
|
||||
|------|---------|---------|
|
||||
| **open-multi-agent** | AgentPool + Semaphore | 全局 `maxConcurrency=5`,per-agent 互斥锁,`Promise.allSettled` 并行执行独立任务 |
|
||||
| **Wanman** | per-agent 进程 | 每个 Agent 独立进程+独立 runLoop,Supervisor 通过消息总线协调 |
|
||||
| **Google ADK** | asyncio.TaskGroup | `ParallelAgent` 用 `TaskGroup` 并行执行子 Agent |
|
||||
| **Microsoft AutoGen** | Pregel Superstep | 每个 superstep 内所有激活 Executor 并行执行 |
|
||||
| **Hermes** | 单线程 tick + flock | **单项目设计**,tick 内只有几个 cron job,不需要并发 |
|
||||
|
||||
**关键发现**:open-multi-agent 的 `AgentPool + Semaphore + per-agent Lock` 是最成熟、最可借鉴的模型。
|
||||
|
||||
#### 5.4.3 设计:per-project 线程 + 全局资源信号量
|
||||
|
||||
```
|
||||
Daemon 主进程(轻量路由器 + 资源管控)
|
||||
│
|
||||
├── 全局 LLM Semaphore(max_concurrent=3)
|
||||
├── per-agent Lock(张飞不能同时在两个项目里跑)
|
||||
│
|
||||
├── ProjectSlot A(独立线程)
|
||||
│ └── 自己的 SQLite 连接
|
||||
│ └── 自己的 tick 循环(30s)
|
||||
│ └── spawn Agent 时:acquire agent_lock → acquire llm_semaphore
|
||||
│
|
||||
├── ProjectSlot B(独立线程)
|
||||
│ └── (同上)
|
||||
│
|
||||
└── ProjectSlot C(独立线程)
|
||||
└── (同上)
|
||||
```
|
||||
|
||||
**三层资源控制**:
|
||||
|
||||
| 层级 | 控制对象 | 机制 | 原因 |
|
||||
|------|---------|------|------|
|
||||
| L1: 项目隔离 | SQLite 连接 | per-project 独立连接 | 数据物理隔离,无竞争 |
|
||||
| L2: Agent 互斥 | 同一 Agent 不能并行 | `threading.Lock` per-agent | Agent session 不是线程安全的,张飞同一时刻只能服务一个任务 |
|
||||
| L3: 全局资源 | LLM API 调用并发 | `threading.Semaphore(max_concurrent)` | 防止 API 限流、控制成本 |
|
||||
|
||||
#### 5.4.4 核心代码
|
||||
|
||||
```python
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class Daemon:
|
||||
"""单进程 Daemon,per-project 线程并发。"""
|
||||
|
||||
def __init__(self, config: DaemonConfig):
|
||||
self.config = config
|
||||
self.llm_semaphore = threading.Semaphore(config.max_concurrent_llm) # 默认 3
|
||||
self.agent_locks: dict[str, threading.Lock] = {} # per-agent 互斥
|
||||
self.slots: dict[str, ProjectSlot] = {}
|
||||
self._shutdown = threading.Event()
|
||||
|
||||
def start(self):
|
||||
"""启动所有 active 项目的独立线程。"""
|
||||
registry = load_registry()
|
||||
for project_id, meta in registry["projects"].items():
|
||||
if meta["status"] != "active":
|
||||
continue
|
||||
slot = ProjectSlot(
|
||||
project_id=project_id,
|
||||
config=meta,
|
||||
llm_semaphore=self.llm_semaphore,
|
||||
agent_locks=self.agent_locks,
|
||||
tick_interval=self.config.tick_interval, # 默认 30s
|
||||
shutdown_event=self._shutdown,
|
||||
)
|
||||
self.slots[project_id] = slot
|
||||
t = threading.Thread(target=slot.run_loop, name=f"project-{project_id}", daemon=True)
|
||||
t.start()
|
||||
|
||||
# 主线程等待 shutdown
|
||||
self._shutdown.wait()
|
||||
|
||||
def shutdown(self):
|
||||
self._shutdown.set()
|
||||
|
||||
|
||||
class ProjectSlot:
|
||||
"""单项目的独立 tick 循环。"""
|
||||
|
||||
def __init__(self, project_id, config, llm_semaphore, agent_locks,
|
||||
tick_interval=30, shutdown_event=None):
|
||||
self.project_id = project_id
|
||||
self.conn = sqlite_connect(Path(f"projects/{project_id}/blackboard.db"))
|
||||
self.config = config
|
||||
self.llm_sem = llm_semaphore # 共享:全局 LLM 信号量
|
||||
self.agent_locks = agent_locks # 共享:per-agent 锁字典
|
||||
self.tick_interval = tick_interval
|
||||
self.shutdown = shutdown_event or threading.Event()
|
||||
self.health = DaemonHealth(project_id)
|
||||
|
||||
def run_loop(self):
|
||||
"""独立线程的主循环。"""
|
||||
while not self.shutdown.is_set():
|
||||
try:
|
||||
self._tick()
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.project_id}] tick failed: {e}")
|
||||
self.shutdown.wait(self.tick_interval) # 可被 shutdown 中断的 sleep
|
||||
|
||||
def _tick(self):
|
||||
"""单次 tick:找 pending 任务,尝试分配。"""
|
||||
pending = find_pending(self.conn)
|
||||
if not pending:
|
||||
self.health.record_idle()
|
||||
return
|
||||
|
||||
for task in pending:
|
||||
agent_id = task["assignee"]
|
||||
lock = self.agent_locks.setdefault(agent_id, threading.Lock())
|
||||
|
||||
# 非阻塞尝试:Agent 正忙就跳过,不排队等
|
||||
if not lock.acquire(blocking=False):
|
||||
logger.info(f"[{self.project_id}] {agent_id} busy, skip task {task['id']}")
|
||||
continue
|
||||
|
||||
try:
|
||||
# 等待全局 LLM 槽位(阻塞,但持有 agent_lock)
|
||||
self.llm_sem.acquire()
|
||||
try:
|
||||
spawn_agent(self.project_id, task, self.conn)
|
||||
self.health.record_change()
|
||||
finally:
|
||||
# LLM 调用完成后释放信号量
|
||||
# 注意:Agent spawn 后 LLM 调用即完成,不需要等 Agent 执行完
|
||||
self.llm_sem.release()
|
||||
finally:
|
||||
lock.release()
|
||||
|
||||
# 检查 working 任务超时等
|
||||
self._check_working_tasks()
|
||||
```
|
||||
|
||||
#### 5.4.5 资源释放时序
|
||||
|
||||
```
|
||||
ProjectSlot._tick()
|
||||
│
|
||||
├── lock.acquire() # 拿到 Agent 锁
|
||||
├── llm_sem.acquire() # 拿到 LLM 槽位
|
||||
├── spawn_agent() # spawn Agent 子进程(LLM 调用在 spawn 瞬间完成)
|
||||
├── llm_sem.release() # ✅ spawn 完立即释放 LLM 槽位
|
||||
├── lock.release() # ✅ spawn 完立即释放 Agent 锁
|
||||
│
|
||||
└── Agent 子进程独立运行 → 完成后写 output.json → 下次 tick 检测
|
||||
```
|
||||
|
||||
**关键设计决策**:`lock` 和 `llm_sem` 在 `spawn_agent()` 返回后立即释放,**不等 Agent 执行完成**。
|
||||
|
||||
原因:
|
||||
1. `spawn_agent()` 是 `subprocess.Popen` 或 `openclaw agent` CLI 调用,启动后立即返回
|
||||
2. Agent 执行是异步的(子进程独立运行)
|
||||
3. 如果等 Agent 执行完才释放锁,并发就退化回串行
|
||||
|
||||
**这意味着**:同一个 Agent 理论上可以同时有多个任务在跑。如果需要严格串行(一个 Agent 同一时刻只有一个任务),则改为在下次 tick 的 `_check_working_tasks()` 中检测 Agent 是否有 working 任务,有则不再分配新任务。
|
||||
|
||||
#### 5.4.6 Agent 并行策略配置
|
||||
|
||||
```yaml
|
||||
# _registry.yaml 中可配置
|
||||
projects:
|
||||
quant-momentum:
|
||||
agent_parallelism: sequential # 同一 Agent 同一时刻只跑一个任务(默认)
|
||||
quant-pairs:
|
||||
agent_parallelism: parallel # 同一 Agent 可同时跑多个任务
|
||||
```
|
||||
|
||||
默认 `sequential`:`_tick()` 分配任务前先检查该 Agent 是否有 working 任务。
|
||||
|
||||
#### 5.4.7 并发安全保证
|
||||
|
||||
| 并发场景 | 风险 | 保护机制 |
|
||||
|---------|------|---------|
|
||||
| 两个项目同时写同一个 SQLite | 数据损坏 | 每个项目独立 `.db` 文件,不存在此场景 |
|
||||
| 两个项目同时分配同一个 Agent | Agent session 上下文串 | per-agent Lock 互斥 |
|
||||
| 三个项目同时调 LLM API | API 限流/超限 | 全局 Semaphore 限流 |
|
||||
| ProjectSlot 线程异常退出 | 项目 tick 停止 | try/except 包裹 + Daemon 监控线程存活 |
|
||||
| Daemon 主进程崩溃 | 所有项目停止 | PM2 自动重启 + SQLite WAL 保护数据完整性 |
|
||||
|
||||
## 6. CLI 变更
|
||||
|
||||
### 6.1 项目管理命令
|
||||
|
||||
Reference in New Issue
Block a user