auto-sync: 2026-05-16 13:43:57

This commit is contained in:
cfdaily
2026-05-16 13:43:57 +08:00
parent 74edce34ef
commit f109bcf252
+152 -56
View File
@@ -230,14 +230,42 @@ import time
from pathlib import Path
class ActiveAgentCounter:
"""线程安全的 Agent 活跃任务计数器。"""
def __init__(self, max_global: int = 5):
self._counts: dict[str, int] = {} # agent_id → 活跃任务数
self._total = 0 # 全局活跃总数
self._max_global = max_global
self._lock = threading.Lock()
def can_acquire(self, agent_id: str, max_per_agent: int = 1) -> bool:
"""检查是否可以分配(非阻塞)。"""
with self._lock:
if self._total >= self._max_global:
return False
return self._counts.get(agent_id, 0) < max_per_agent
def increment(self, agent_id: str):
with self._lock:
self._counts[agent_id] = self._counts.get(agent_id, 0) + 1
self._total += 1
def decrement(self, agent_id: str):
with self._lock:
if self._counts.get(agent_id, 0) > 0:
self._counts[agent_id] -= 1
self._total -= 1
class Daemon:
"""单进程 Daemonper-project 线程并发。"""
def __init__(self, config: DaemonConfig):
self.config = config
self.llm_semaphore = threading.Semaphore(config.max_concurrent_llm) # 默认 3
self.agent_locks: dict[str, threading.Lock] = {} # per-agent 互斥
self.agent_counter = ActiveAgentCounter(max_global=config.max_global_active) # 默认 5
self.slots: dict[str, ProjectSlot] = {}
self._slot_threads: dict[str, threading.Thread] = {} # 线程存活监控
self._shutdown = threading.Event()
def start(self):
@@ -246,20 +274,33 @@ class Daemon:
for project_id, meta in registry["projects"].items():
if meta["status"] != "active":
continue
slot = ProjectSlot(
project_id=project_id,
config=meta,
llm_semaphore=self.llm_semaphore,
agent_locks=self.agent_locks,
tick_interval=self.config.tick_interval, # 默认 30s
shutdown_event=self._shutdown,
)
self.slots[project_id] = slot
t = threading.Thread(target=slot.run_loop, name=f"project-{project_id}", daemon=True)
t.start()
self._start_slot(project_id, meta)
# 主线程等待 shutdown
self._shutdown.wait()
# 主线程:监控 + 等待 shutdown
while not self._shutdown.is_set():
self._check_slot_health()
self._shutdown.wait(60) # 每 60s 检查一次线程存活
def _start_slot(self, project_id: str, meta: dict):
slot = ProjectSlot(
project_id=project_id,
config=meta,
agent_counter=self.agent_counter,
tick_interval=self.config.tick_interval,
shutdown_event=self._shutdown,
)
self.slots[project_id] = slot
t = threading.Thread(target=slot.run_loop, name=f"project-{project_id}", daemon=True)
t.start()
self._slot_threads[project_id] = t
def _check_slot_health(self):
"""检查所有 ProjectSlot 线程是否存活,死亡则重启。"""
for project_id, thread in list(self._slot_threads.items()):
if not thread.is_alive():
logger.warning(f"ProjectSlot {project_id} thread died, restarting...")
meta = self.slots[project_id].config
self._start_slot(project_id, meta)
def shutdown(self):
self._shutdown.set()
@@ -268,13 +309,12 @@ class Daemon:
class ProjectSlot:
"""单项目的独立 tick 循环。"""
def __init__(self, project_id, config, llm_semaphore, agent_locks,
def __init__(self, project_id, config, agent_counter,
tick_interval=30, shutdown_event=None):
self.project_id = project_id
self.conn = sqlite_connect(Path(f"projects/{project_id}/blackboard.db"))
self.config = config
self.llm_sem = llm_semaphore # 共享:全局 LLM 信号量
self.agent_locks = agent_locks # 共享:per-agent 锁字典
self.agent_counter = agent_counter # 共享:全局 Agent 活跃计数器
self.tick_interval = tick_interval
self.shutdown = shutdown_event or threading.Event()
self.health = DaemonHealth(project_id)
@@ -286,10 +326,15 @@ class ProjectSlot:
self._tick()
except Exception as e:
logger.error(f"[{self.project_id}] tick failed: {e}")
self.shutdown.wait(self.tick_interval) # 可被 shutdown 中断的 sleep
self.shutdown.wait(self.tick_interval)
def _tick(self):
"""单次 tick:找 pending 任务,尝试分配。"""
# 先检查已完成的 Agent,释放计数器
completed = self._check_working_tasks() # 返回已完成的 agent_id 列表
for agent_id in completed:
self.agent_counter.decrement(agent_id)
pending = find_pending(self.conn)
if not pending:
self.health.record_idle()
@@ -297,75 +342,126 @@ class ProjectSlot:
for task in pending:
agent_id = task["assignee"]
lock = self.agent_locks.setdefault(agent_id, threading.Lock())
max_per_agent = self.config.get("max_active_per_agent", 1)
# 非阻塞尝试:Agent 正忙就跳过,不排队等
if not lock.acquire(blocking=False):
logger.info(f"[{self.project_id}] {agent_id} busy, skip task {task['id']}")
# 检查全局 + per-agent 并发上限
if not self.agent_counter.can_acquire(agent_id, max_per_agent):
logger.info(f"[{self.project_id}] {agent_id} at capacity, skip task {task['id']}")
continue
try:
# 等待全局 LLM 槽位(阻塞,但持有 agent_lock)
self.llm_sem.acquire()
try:
spawn_agent(self.project_id, task, self.conn)
self.health.record_change()
finally:
# LLM 调用完成后释放信号量
# 注意:Agent spawn 后 LLM 调用即完成,不需要等 Agent 执行完
self.llm_sem.release()
finally:
lock.release()
# sequential 模式:检查该 Agent 在本项目是否有 working 任务
if self.config.get("agent_parallelism") != "parallel":
if has_working_task(self.conn, agent_id):
continue
# 检查 working 任务超时等
self._check_working_tasks()
# 生成 session_id(§5.4.6 命名规则)
session_id = self._get_session_id(agent_id, task['id'])
# spawn Agent(异步,不阻塞)
spawn_agent(
project_id=self.project_id,
task=task,
session_id=session_id,
)
self.agent_counter.increment(agent_id) # +1
self.health.record_change()
def _get_session_id(self, agent_id: str, task_id: str) -> str:
"""§5.4.6 session 命名规则。"""
if self.config.get("agent_parallelism") != "parallel":
# sequential:同一项目同一 Agent 复用 session(保持上下文连续性)
return f"agent:{agent_id}:project:{self.project_id}"
else:
# parallel:每个任务独立 session
return f"agent:{agent_id}:project:{self.project_id}:task:{task_id}"
```
#### 5.4.5 资源释放时序
#### 5.4.5 资源控制模型(v2 修订:异步计数器替代同步信号量)
**原设计问题**spawn_agent() 不是瞬时操作——Agent 执行涉及 LLM API 调用(可能多轮工具调用),如果在 spawn 完成后才释放锁/信号量,并发退化回串行;如果在 spawn 启动后立即释放,信号量没有真正限流。
**修订方案**(采纳司马懿评审建议):
| 资源 | 原方案 | 修订方案 | 原因 |
|------|--------|---------|------|
| Agent 互斥 | `threading.Lock` | 移除。改为 per-project session 命名 + `_check_working_tasks()` | 同一 Agent 可用不同 session-id 安全服务不同项目 |
| LLM 并发 | `threading.Semaphore` | `ActiveAgentCounter`(异步计数器) | spawn 是异步的,同步信号量无法精确限流 |
| 项目隔离 | per-project SQLite | 不变 | |
**新时序**
```
ProjectSlot._tick()
├── lock.acquire() # 拿到 Agent 锁
├── llm_sem.acquire() # 拿到 LLM 槽位
├── spawn_agent() # spawn Agent 子进程(LLM 调用在 spawn 瞬间完成)
├── llm_sem.release() # ✅ spawn 完立即释放 LLM 槽位
├── lock.release() # ✅ spawn 完立即释放 Agent 锁
├── 检查 ActiveAgentCounter[agent_id] < max_active
│ └── 否 → 跳过,下个 tick 再检查
├── 检查 agent_parallelism == "sequential" 且该 Agent 有 working 任务?
│ └── 是 → 跳过
├── spawn_agent(project_id, task)
│ ├── session_id = "agent:{agent_id}:project:{project_id}:task:{task_id}"
│ └── ActiveAgentCounter.increment(agent_id) # +1
└── Agent 子进程独立运行 → 完成后写 output.json → 下次 tick 检测
└── Agent 完成回调(下次 tick 检测到 output 或 webhook
└── ActiveAgentCounter.decrement(agent_id) # -1
```
**关键设计决策**`lock``llm_sem``spawn_agent()` 返回后立即释放,**不等 Agent 执行完成**。
**关键变化**
1. 不再使用 `threading.Lock``threading.Semaphore`——它们是同步原语,不适合异步 spawn 场景
2. 改用 `ActiveAgentCounter`(线程安全计数器),spawn 时 +1,Agent 完成回调时 -1
3. `_tick()` 分配前检查计数器,超过阈值就跳过
4. Agent session 按 `agent:{agent_id}:project:{project_id}:task:{task_id}` 命名,项目+任务级天然隔离
原因:
1. `spawn_agent()``subprocess.Popen``openclaw agent` CLI 调用,启动后立即返回
2. Agent 执行是异步的(子进程独立运行)
3. 如果等 Agent 执行完才释放锁,并发就退化回串行
#### 5.4.6 Agent 并行策略 + Session 隔离
**这意味着**:同一个 Agent 理论上可以同时有多个任务在跑。如果需要严格串行(一个 Agent 同一时刻只有一个任务),则改为在下次 tick 的 `_check_working_tasks()` 中检测 Agent 是否有 working 任务,有则不再分配新任务。
#### 5.4.6 Agent 并行策略配置
**并行策略配置**
```yaml
# _registry.yaml 中可配置
projects:
quant-momentum:
agent_parallelism: sequential # 同一 Agent 同一时刻只跑一个任务(默认)
max_active_per_agent: 1 # sequential 的显式写法
quant-pairs:
agent_parallelism: parallel # 同一 Agent 可同时跑多个任务
max_active_per_agent: 2 # 最多 2 个并行
```
默认 `sequential``_tick()` 分配任务前先检查该 Agent 是否有 working 任务。
**Session 命名规则**
```
格式:agent:{agent_id}:project:{project_id}:task:{task_id}
示例:agent:zhangfei-dev:project:quant-momentum:task:task-001
```
- 每个任务独立 session,任务间上下文不串
- 同一 Agent 在不同项目用不同 session,项目间上下文不串
- `sequential` 模式:同一项目同一 Agent 只有一个活跃 session(新的任务复用或新开)
- `parallel` 模式:每个任务独立 session
**sequential 模式下的 session 复用**
```python
def _get_session_id(self, agent_id: str, task_id: str) -> str:
project_config = self.config
if project_config.get("agent_parallelism") != "parallel":
# sequential:同一项目同一 Agent 复用 session(保持上下文连续性)
return f"agent:{agent_id}:project:{self.project_id}"
else:
# parallel:每个任务独立 session
return f"agent:{agent_id}:project:{self.project_id}:task:{task_id}"
```
#### 5.4.7 并发安全保证
| 并发场景 | 风险 | 保护机制 |
|---------|------|---------|
| 两个项目同时写同一个 SQLite | 数据损坏 | 每个项目独立 `.db` 文件,不存在此场景 |
| 两个项目同时分配同一个 Agent | Agent session 上下文串 | per-agent Lock 互斥 |
| 三个项目同时调 LLM API | API 限流/超限 | 全局 Semaphore 限流 |
| ProjectSlot 线程异常退出 | 项目 tick 停止 | try/except 包裹 + Daemon 监控线程存活 |
| 两个项目同时分配同一个 Agent | Agent 资源争抢 | `ActiveAgentCounter` + `max_active_per_agent` 限制 |
| LLM API 并发超限 | API 限流/超限 | `ActiveAgentCounter` 全局计数,`_tick()` 分配前检查 |
| ProjectSlot 线程异常退出 | 项目 tick 停止 | try/except 包裹 + Daemon 监控线程存活(§5.4.8 |
| Daemon 主进程崩溃 | 所有项目停止 | PM2 自动重启 + SQLite WAL 保护数据完整性 |
| Agent 完成回调丢失 | 计数器不归零 | 超时兜底:working 任务超过 `task_timeout` 视为完成,计数器 -1 |
| _registry.yaml 并发写入 | 数据损坏 | _registry.yaml 只在 CLI 操作时读写(非 tick 热路径),tick 状态用内存 dict |
## 6. CLI 变更