42 KiB
moziplus v2.0 — AI 原生多Agent编排平台 架构设计
版本: v2.1(技术架构修订)
日期: 2026-05-14
作者: 庞统(副军师)
状态: 草案,待用户确认
调研基础: docs/research/shared-consciousness-research.md
变更记录: v2.1 修正了 Agent 调度方式(放弃 sessions_send/sessions_spawn,改用主 session + Daemon API)
0. 设计哲学
"可预测的骨架 + AI 驱动的填充" —— 不是纯 DAG,也不是纯 ReAct,而是混合模式。
六个核心信念:
- AI 参与每一个决策层 —— 编排/路由/渲染/异常处理/经验沉淀都有 AI 参与
- 黑板是唯一真相源 —— 所有 Agent 通过黑板共享信息,没有私下通信
- 产出物 > 消息 —— 共享产出物比共享消息更重要
- 验证才算完 —— 不验证产出不算完成
- 有界并行 —— 默认最多 4 个 Agent 并行(有学术依据)
- 闭环学习 —— 执行→经验沉淀→下次改进
1. 系统总览
1.1 宏观架构
用户(自然语言)
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 对话入口(Conversation Layer) │
│ 庞统的持久 session,用户唯一交互点 │
│ 支持:WebChat / CLI / Cron 触发 / API 调用 │
└───────────────────────┬─────────────────────────────────────┘
│
┌───────────────────────▼─────────────────────────────────────┐
│ 庞统 AI 指挥官(Control Unit) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Phase 1 │ │ Phase 2 │ │ Phase 3 │ │ Phase 4 │ │
│ │ 需求探索 │ │ 动态规划 │ │ 自主执行 │ │ 主动汇报 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 内置机制: │
│ - /goal Ralph Loop:持久目标跨 turn 保持 │
│ - Scope Reduction Detection:防偷懒 │
│ - 幻觉门控:验证产出再算完成 │
│ - Fidelity 路由:按需分发信息 │
│ - Boids 规则注入:Agent 协作行为塑造 │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Blackboard(共享意识空间) │ │
│ │ │ │
│ │ TaskCtx │ Moments │ Artifacts │ Decisions │ Plan │ │
│ │ AgentStates │ Experience │ EventLog │ │
│ └──────────────────────────────────────────────────────┘ │
└───────────────────────┬─────────────────────────────────────┘
│ Fidelity 三档读写
┌─────────────┼─────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ 张飞 │ │ 关羽 │ │ 赵云 │ ...
│ 编码 │ │ 风控 │ │ 数据 │
└─────────┘ └─────────┘ └─────────┘
每个 Agent: isolated session + SOUL.md + Skills
写入保护: propose → validate → commit
1.2 与 v1.0 的关系
| 维度 | v1.0 | v2.0 |
|---|---|---|
| 编排 | 固定 DAG 模板 + 确定性状态机 | 庞统 AI 动态规划 + 持续指挥 |
| 通信 | Sanguo Mail(异步邮件轮询) | Blackboard(实时共享读写) |
| 入口 | CLI + Dashboard | 自然语言对话 |
| 计划 | 一次性生成不可变 | 持续演进,可随时调整 |
| Agent 调度 | 按模板固定分配 | 按能力画像动态选择 |
| 信息可见性 | 每个 Agent 只看自己 | Fidelity 三档按需 |
| 异常处理 | Report Watcher(规则) | 庞统 AI 判断 |
| 验证 | output.md frontmatter | 幻觉门控 + AI 验证 |
| 经验 | 无闭环 | DISCOVER→DISTILL→APPLY→IMPROVE |
v2.0 独立仓库、独立代码、独立部署。v1.0 继续运行,互不干扰。
2. 核心模块详细设计
2.1 Blackboard(共享意识空间)
2.1.1 物理结构
~/.sanguo_projects/sanguo_moziplus_v2/
├── blackboard/ # 共享意识空间根目录
│ ├── tasks/ # 任务空间
│ │ └── {task-id}/ # 每个任务独立目录
│ │ ├── context.json # 任务上下文(目标/约束/状态)
│ │ ├── moments.jsonl # 原子事件流(追加写入)
│ │ ├── plan.json # 动态计划图谱
│ │ ├── decisions.jsonl # 决策记录(不可变)
│ │ ├── agents/ # 各 Agent 工作区
│ │ │ ├── {agent-id}/
│ │ │ │ ├── state.json # Agent 当前状态
│ │ │ │ ├── output/ # Agent 产出物
│ │ │ │ └── inbox/ # Agent 专属信箱(通知类)
│ │ │ └── ...
│ │ └── artifacts/ # 共享产出物索引
│ │ └── index.json # 产出物注册表
│ ├── global/ # 全局共享空间
│ │ ├── agent-registry.json # Agent 能力画像注册表
│ │ ├── experience/ # 跨任务经验库
│ │ │ ├── {domain}.jsonl # 按领域组织
│ │ │ └── index.json # 经验索引
│ │ └── templates/ # 任务模板库
│ │ └── {template-id}.json
│ ├── events/ # 不可变全局事件日志
│ │ └── {date}.jsonl # 按日期分文件
│ ├── inbox/ # 用户需求入口
│ │ └── {req-id}.json # 待处理需求
│ └── locks/ # 写入锁目录
│ └── {resource-path}.lock # 文件锁
├── daemon/ # 守护进程代码
├── skills/ # Skill 包
├── docs/ # 文档
└── config/ # 配置
2.1.2 数据结构定义
context.json — 任务上下文:
{
"task_id": "task-20260514-001",
"title": "均线策略回测",
"goal": "对双均线交叉策略在沪深300上进行5年回测",
"intent": "验证该策略在A股市场的有效性",
"end_state": "回测报告完整,包含收益曲线、最大回撤、夏普比率",
"constraints": [
"数据范围:2020-2025",
"标的:沪深300指数",
"初始资金:100万"
],
"state": "executing", // exploring → planning → executing → reviewing → completed
"phase": 3, // 当前四相阶段
"created_at": "2026-05-14T08:00:00+08:00",
"updated_at": "2026-05-14T08:15:00+08:00",
"parent_task": null, // 子任务指向父任务
"tags": ["backtest", "strategy", "moving-average"],
"confidence": 0.0 // 庞统对"需求理解程度"的自评
}
moments.jsonl — 原子事件流(每行一个 JSON):
{"type":"task_created","ts":"...","agent":"pangtong","data":{"goal":"..."}}
{"type":"requirement_clarified","ts":"...","agent":"pangtong","data":{"clarifications":[...]}}
{"type":"plan_generated","ts":"...","agent":"pangtong","data":{"plan_id":"p1","steps":5}}
{"type":"plan_approved","ts":"...","agent":"pangtong","data":{"approved_by":"user"}}
{"type":"agent_assigned","ts":"...","agent":"pangtong","data":{"agent":"zhaoyun","step":"s1"}}
{"type":"agent_started","ts":"...","agent":"zhaoyun","data":{"step":"s1"}}
{"type":"artifact_produced","ts":"...","agent":"zhaoyun","data":{"file":"data.csv","summary":"5年日线数据","confidence":0.95}}
{"type":"agent_completed","ts":"...","agent":"zhaoyun","data":{"step":"s1","status":"success"}}
{"type":"anomaly_detected","ts":"...","agent":"pangtong","data":{"type":"data_quality","severity":"warning"}}
{"type":"plan_adjusted","ts":"...","agent":"pangtong","data":{"reason":"数据质量问题","added_step":{...}}}
{"type":"consensus_reached","ts":"...","agent":"pangtong","data":{"result":"回测通过"}}
{"type":"task_completed","ts":"...","agent":"pangtong","data":{"final_state":"success"}}
plan.json — 动态计划图谱:
{
"plan_id": "p1",
"task_id": "task-20260514-001",
"version": 3,
"steps": [
{
"id": "s1",
"type": "data_fetch",
"intent": "获取沪深300的5年日线数据",
"end_state": "数据文件就绪,通过质量检查",
"constraints": ["使用AKShare", "保存为CSV"],
"agent": "zhaoyun",
"status": "completed",
"artifacts": ["data/hs300_daily.csv"],
"started_at": "...",
"completed_at": "...",
"confidence": 0.95
},
{
"id": "s2",
"type": "strategy_implementation",
"intent": "实现双均线交叉策略",
"end_state": "策略代码可运行,通过单元测试",
"constraints": ["使用vnpy框架", "参数可配置"],
"agent": "zhangfei",
"status": "executing",
"depends": ["s1"],
"started_at": "..."
},
{
"id": "s2.5",
"type": "data_cleaning",
"intent": "清洗异常数据点",
"end_state": "异常值处理完毕,数据连续无缺失",
"agent": "zhaoyun",
"status": "pending",
"depends": [],
"added_dynamically": true,
"add_reason": "s1 完成后发现数据有缺失值"
}
],
"changelog": [
{"version":1,"change":"初始计划","ts":"..."},
{"version":2,"change":"添加 s2.5 数据清洗步骤","reason":"发现数据质量问题","ts":"..."},
{"version":3,"change":"调整 s3 约束","reason":"用户要求改为vnpy框架","ts":"..."}
]
}
agent-registry.json — Agent 能力画像:
{
"agents": {
"zhaoyun-data": {
"name": "赵云",
"role": "数据总管",
"capabilities": ["data_fetch", "data_cleaning", "data_validation", "quality_check"],
"tools": ["exec", "read", "write", "web_fetch"],
"model_preference": "auto",
"max_parallel_tasks": 1,
"priority": 2,
"performance": {
"tasks_completed": 42,
"avg_confidence": 0.91,
"avg_duration_minutes": 8,
"strengths": ["data_quality", "python"],
"last_active": "..."
},
"session_key": "agent:zhaoyun-data:main"
},
"zhangfei-dev": {
"name": "张飞",
"role": "编码先锋",
"capabilities": ["coding", "backtest", "strategy_implementation", "scripting"],
"tools": ["exec", "read", "write", "edit"],
"model_preference": "auto",
"max_parallel_tasks": 1,
"priority": 1,
"performance": { ... },
"session_key": "agent:zhangfei-dev:main"
},
"guanyu-dev": {
"name": "关羽",
"role": "风控守将",
"capabilities": ["risk_check", "position_sizing", "stop_loss", "live_audit"],
"tools": ["exec", "read", "write", "edit"],
"model_preference": "auto",
"max_parallel_tasks": 1,
"priority": 3, // 风控最高优先级
"performance": { ... },
"session_key": "agent:guanyu-dev:main"
},
"simayi-challenger": {
"name": "司马懿",
"role": "质量总监",
"capabilities": ["code_review", "challenge", "final_acceptance"],
"tools": ["exec", "read", "write", "edit"],
"model_preference": "auto",
"max_parallel_tasks": 1,
"priority": 2,
"performance": { ... },
"session_key": "agent:simayi-challenger:main"
},
"jiangwei-infra": {
"name": "姜维",
"role": "平台总督",
"capabilities": ["deployment", "docker", "nas", "backtest_server", "vnpy"],
"tools": ["exec", "read", "write", "edit"],
"model_preference": "auto",
"max_parallel_tasks": 1,
"priority": 1,
"performance": { ... },
"session_key": "agent:jiangwei-infra:main"
}
}
}
decisions.jsonl — 决策记录(不可变):
{"ts":"...","agent":"pangtong","decision":"assign_s2_to_zhangfei","reason":"张飞擅长策略编码","alternatives_considered":["关羽(风控优先)"]}
{"ts":"...","agent":"pangtong","decision":"add_s2.5","reason":"数据清洗步骤缺失","trigger":"s1 confidence=0.7 低于阈值"}
2.1.3 写入保护:propose → validate → commit
借鉴 Network-AI 的三阶段原子写入:
Agent A:
1. propose: 写入 agents/{agent-id}/proposed/{change-id}.json
包含:target_path, proposed_content, priority, reason
Control Unit (庞统):
2. validate:
a. 格式校验(JSON schema)
b. 冲突检测(target 是否被其他 propose 锁定)
c. 优先级检查(是否有更高优先级的 propose)
d. 业务校验(状态流转是否合法)
3. commit:
a. 获取文件锁 (locks/{resource}.lock)
b. 原子写入(tmp → rename)
c. 追加 Moment 事件
d. 释放锁
或 abort:
a. 记录拒绝原因
b. 通知提议 Agent
简化规则:
- Agent 对自己工作区(
agents/{agent-id}/)的写入:自动 commit,不需要 propose - Agent 对共享区域(
artifacts/,plan.json,context.json)的写入:必须 propose → commit - Agent 对其他 Agent 工作区的写入:禁止(覆盖保护原则)
2.1.4 文件锁实现
借鉴 ClawTeam 的 fcntl + 原子 rename:
import fcntl, tempfile, os
from pathlib import Path
def atomic_write(path: Path, content: str):
"""原子写入:先写临时文件,再 rename"""
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=path.parent, prefix=f"{path.stem}-", suffix=".tmp")
with os.fdopen(fd, 'w') as f:
f.write(content)
Path(tmp).replace(path) # atomic on same filesystem
class BlackboardLock:
"""文件系统互斥锁"""
def __init__(self, lock_dir: Path):
self.lock_dir = lock_dir
def acquire(self, resource: str, holder: str, timeout_ms=10000) -> bool:
lock_path = self.lock_dir / f"{resource.replace('/', '_')}.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
start = time.monotonic()
while (time.monotonic() - start) * 1000 < timeout_ms:
try:
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
os.write(fd, json.dumps({"holder": holder, "acquired_at": time.time()}).encode())
os.close(fd)
return True
except FileExistsError:
# stale lock check (> 30s)
age = time.time() - lock_path.stat().st_mtime
if age > 30:
lock_path.unlink(missing_ok=True)
time.sleep(0.1)
return False
def release(self, resource: str, holder: str):
lock_path = self.lock_dir / f"{resource.replace('/', '_')}.lock"
if lock_path.exists():
data = json.loads(lock_path.read_text())
if data.get("holder") == holder:
lock_path.unlink()
2.2 Control Unit(庞统 AI 指挥官)
2.2.1 四相循环
Phase 1: 需求探索
├── 苏格拉底对话,帮用户发现真实需求
├── 歧义评分(0-1),高歧义时深入追问
├── 输出:context.json(goal/intent/end_state/constraints)
├── 自评 confidence,>0.8 才进入 Phase 2
└── 人的参与:🔴 高(全程对话)
Phase 2: 动态规划
├── 根据 context.json 生成 plan.json
├── AI 挑战:庞统自己审视计划的弱点
├── 三方共识(可选):庞统+司马懿+用户审核
├── Plan 审批:用户确认后才执行
├── 人的参与:🟡 可选(简单任务可跳过审批)
└── 输出:plan.json(version 1)
Phase 3: 自主执行
├── 按 plan.json 调度 Agent
├── 每步执行:
│ ├── 选择 Agent(能力画像匹配)
│ ├── 注入任务上下文(Fidelity 按角色)
│ ├── Agent 写入黑板
│ ├── 幻觉门控(验证产出存在)
│ ├── 异常检测(超时/质量低/Agent 崩溃)
│ └── 动态调整计划(如需)
├── /goal Ralph Loop:跨 turn 保持目标专注
├── 人的参与:🟢 几乎不参与(可随时介入 steer)
└── 输出:artifacts/ + moments 流
Phase 4: 主动汇报
├── AI 推送进展摘要(不等人查)
├── 验收:庞统自审 + 司马懿终审
├── 经验沉淀:提取关键经验写入 experience/
├── 人的参与:🔵 验收
└── 输出:最终报告 + experience 条目
2.2.2 庞统的运行模式
事件驱动 + 持久 session:
庞统 session 始终在线(OpenClaw persistent session)
触发方式:
1. 用户发消息 → 直接在 session 中处理
2. Agent 写入黑板 → cron 定期扫描黑板变化 → wake 庞统
3. Agent 完成/失败 → 写入 moments → wake 庞统
4. 异常检测 → cron 检查 → wake 庞统
5. 用户 steer(中途干预)→ 直接注入 session
空闲时:
- 不消耗资源
- 被 wake 事件唤醒后立即恢复上下文
- 通过黑板恢复状态(不需要重载全部历史)
2.2.3 信息路由(Fidelity 三档)
def route_information(target_agent: str, task_ctx: dict, moments: list) -> dict:
"""根据目标 Agent 的角色,选择合适的信息保真度"""
role = get_agent_role(target_agent)
if role == "control_unit": # 庞统自己
return {
"fidelity": "full",
"context": task_ctx,
"moments": moments, # 全量事件
"artifacts": all_artifacts, # 全部产出物
"agent_states": all_agent_states # 所有Agent状态
}
elif role in get_collaborators(task_ctx): # 同任务协作伙伴
return {
"fidelity": "summary",
"context": task_ctx, # 完整任务上下文
"relevant_steps": filter_relevant(moments, target_agent),
"artifacts": get_dependent_artifacts(target_agent),
"summary": ai_summarize(moments) # AI 压缩摘要
}
else: # 外围 Agent
return {
"fidelity": "signal",
"action_required": get_pending_actions(target_agent),
"final_result": task_ctx.get("result"),
"notification": True
}
2.2.4 Agent 选择算法
def select_agent(step: dict, registry: dict) -> str:
"""根据步骤需求和Agent能力画像选择最合适的Agent"""
required_caps = step.get("required_capabilities", infer_caps(step))
candidates = []
for agent_id, profile in registry["agents"].items():
# 能力匹配
cap_overlap = len(set(required_caps) & set(profile["capabilities"]))
if cap_overlap == 0:
continue
# 可用性检查
if profile["performance"]["tasks_in_progress"] >= profile["max_parallel_tasks"]:
continue
# 评分:能力匹配度 * 历史表现 * 当前空闲度
score = (
cap_overlap / len(required_caps) * 0.4 + # 能力匹配
profile["performance"]["avg_confidence"] * 0.3 + # 历史表现
(1 - profile["performance"]["tasks_in_progress"] /
profile["max_parallel_tasks"]) * 0.3 # 当前空闲度
)
candidates.append((agent_id, score))
if not candidates:
return None # 需要排队或调整计划
return max(candidates, key=lambda x: x[1])[0]
2.3 Agent 层
2.3.1 Agent 工作流程
1. 接收任务
├── 庞统通过 OpenClaw sessions_send 发送任务消息
├── 消息包含:step intent + end_state + constraints + 相关黑板内容
└── 不包含:完整计划、其他Agent的详情(Fidelity 控制)
2. 执行任务
├── 读取黑板中自己需要的上下文
├── 执行实际工作(编码/数据分析/风控检查等)
├── 写入产出到 agents/{agent-id}/output/
└── 追加 Moments 事件
3. 提交产出
├── propose 共享产出物到 artifacts/
├── 庞统 validate + commit
├── 自评 [confidence: 0.X]
└── 元认知:confidence < 0.6 时推荐人工审核
4. 等待下一步
├── 庞统根据执行结果决定下一步
└── Agent 进入空闲状态
2.3.2 Agent 行为注入
每个 Agent 的 SOUL.md / prompt 中注入Boids 协作规则:
## 协作规则(Boids 群体智能)
1. **Separation(不重复)**:开始工作前检查黑板,确认没有其他 Agent 在做相同的事
2. **Alignment(风格一致)**:遵循团队的编码规范、产出格式、命名约定
3. **Cohesion(主动共享)**:发现重要信息时主动写入黑板共享区域
4. **Boundary(不越界)**:只在自己的工作区和共享区域操作,不修改其他 Agent 的产出
以及元认知自评:
## 自评要求
完成任务后,标注置信度:
- `[confidence: 0.X]` 其中 X 为 0-10
- confidence < 0.6 时说明不确定之处并推荐人工审核
- 遇到专业外的问题主动上报,不硬撑
以及Auftragstaktik 任务式指挥:
## 任务执行方式
你会收到:Intent(意图)、End State(终态)、Constraints(约束)
- 自主决定如何达成目标
- 可以选择任何合理的方法
- 但必须遵守所有 Constraints
- 遇到 Constraints 阻碍目标时,上报而不是绕过
2.4 事件系统
2.4.1 Moments 事件类型
class MomentType(str, Enum):
# 任务生命周期
TASK_CREATED = "task_created"
REQUIREMENT_CLARIFIED = "requirement_clarified"
TASK_COMPLETED = "task_completed"
TASK_FAILED = "task_failed"
# 计划
PLAN_GENERATED = "plan_generated"
PLAN_APPROVED = "plan_approved"
PLAN_ADJUSTED = "plan_adjusted"
# Agent 调度
AGENT_ASSIGNED = "agent_assigned"
AGENT_STARTED = "agent_started"
AGENT_COMPLETED = "agent_completed"
AGENT_FAILED = "agent_failed"
AGENT_BLOCKED = "agent_blocked"
# 产出
ARTIFACT_PRODUCED = "artifact_produced"
ARTIFACT_VALIDATED = "artifact_validated"
# 决策
DECISION_MADE = "decision_made"
CHALLENGE_RAISED = "challenge_raised"
CHALLENGE_VERDICT = "challenge_verdict"
# 异常
ANOMALY_DETECTED = "anomaly_detected"
TIMEOUT_WARNING = "timeout_warning"
# 用户交互
USER_STEER = "user_steer"
USER_APPROVED = "user_approved"
USER_REJECTED = "user_rejected"
# 经验
EXPERIENCE_CAPTURED = "experience_captured"
2.4.2 事件驱动唤醒
# 庞统的唤醒条件
WAKE_CONDITIONS = {
# 黑板变化检测(cron 每 30s 扫描)
"blackboard_change": {
"trigger": "moments.jsonl 有新行",
"action": "wake pangtong session",
"context": "新增的 moments"
},
# Agent 完成
"agent_completed": {
"trigger": "agents/{id}/state.json status=completed",
"action": "wake pangtong",
"context": "agent_id + step_id"
},
# 超时检测
"step_timeout": {
"trigger": "step started_at + 30min < now",
"action": "wake pangtong with alert",
"context": "step_id + duration"
},
# 用户消息
"user_message": {
"trigger": "inbox/ 有新文件",
"action": "wake pangtong",
"context": "消息内容"
}
}
2.5 经验沉淀系统
2.5.1 闭环学习
DISCOVER(发现)
├── 任务执行过程中 Agent 发现好做法
├── 异常处理中发现新模式
└── 写入 blackboard/tasks/{id}/agents/{id}/discoveries.json
DISTILL(蒸馏)
├── 任务完成后庞统自动提取关键转折点
├── 从 decisions.jsonl + moments.jsonl 提炼经验
├── 压缩为经验条目:{pattern, context, outcome, applicability}
└── 写入 blackboard/global/experience/{domain}.jsonl
APPLY(应用)
├── 新任务开始时,庞统检索相关经验
├── 按任务类型+标签匹配
├── 注入 Agent prompt 作为参考
└── 标记"来自经验 X"
IMPROVE(改进)
├── 验证经验是否真的有效
├── 无效经验标记 deprecated
├── 有效经验提升 confidence
└── 定期合并相似经验
2.5.2 经验数据结构
{
"id": "exp-001",
"pattern": "数据清洗应在策略编码前完成",
"context": "量化策略开发任务",
"outcome": "减少返工率 40%",
"applicability": ["backtest", "strategy_development"],
"source_task": "task-20260514-001",
"confidence": 0.85,
"times_applied": 3,
"times_validated": 2,
"created_at": "...",
"last_validated_at": "..."
}
2.6 监控与运维
2.6.1 健康检查
class HealthChecker:
"""定期检查黑板和 Agent 健康状态"""
checks = [
# Agent 存活检测
"agent_heartbeat", # 检查 state.json 更新时间
"agent_zombie", # 运行超过 2 小时的 Agent
"agent_stale_lock", # 超过 30 秒的锁
# 任务健康
"step_timeout", # 步骤超时
"plan_stuck", # 计划卡住(所有 pending 步骤都有未完成的依赖)
"artifact_missing", # Agent 声称产出但文件不存在(幻觉门控)
# 系统健康
"blackboard_disk", # 磁盘空间
"moment_flood", # 事件洪泛检测
]
2.6.2 Token 成本治理
借鉴 Network-AI FederatedBudget + ClawTeam 成本追踪:
class TokenBudget:
"""Token 预算管理"""
def __init__(self):
self.global_ceiling = 500_000 # 每任务全局上限
self.per_agent_ceiling = 100_000 # 每 Agent 上限
self.spent = {} # agent_id → tokens used
def check(self, agent_id: str, estimated: int) -> bool:
total = sum(self.spent.values())
if total + estimated > self.global_ceiling:
return False # 全局预算不足
if self.spent.get(agent_id, 0) + estimated > self.per_agent_ceiling:
return False # Agent 预算不足
return True
3. 技术实现方案(v2.1 修订)
⚠️ v2.1 关键修正:
- ❌ 废弃 sessions_send(不稳定、timeout)
- ❌ 废弃 sessions_spawn(sub-agent 大爆炸、session 文件堆积)
- ❌ 废弃 cron wake(不稳定)
- ✅ 采用自建 Daemon HTTP API + SQLite(v1.0 已验证可靠)
- ✅ Agent 复用主 session,通过 Daemon API 回报
- ✅ 所有状态/流转/事件类型从配置文件加载,不硬编码
3.1 技术栈
| 层级 | 技术 | 说明 |
|---|---|---|
| 编排引擎 | 自建 Daemon (FastAPI + uvicorn) | HTTP API + 事件循环,PM2 管理 |
| 数据存储 | SQLite (WAL mode) | 任务/计划/事件/Agent状态/经验 |
| 文件存储 | 文件系统 (artifacts) | 产出物(代码/数据/文档),git 可追踪 |
| Agent 运行时 | OpenClaw Gateway | Agent 的主 session 管理 |
| Agent 通信 | Daemon HTTP API | Agent 回报结果、查询黑板 |
| Agent 调度 | Gateway WS API → 主 session | 发消息到 Agent 主 session(不创建 sub-agent) |
| 庞统通信 | Gateway WS API | Daemon → 庞统主 session 注入 systemEvent |
| 配置管理 | YAML/JSON 配置文件 | 状态/流转/事件/模板全部配置化 |
| 文件锁 | fcntl / O_EXCL | 零依赖,跨进程安全 |
| 前端 | OpenClaw Control Center | 对话式入口(庞统主 session) |
| 经验检索 | ripgrep + SQLite FTS | 文本搜索 |
| 同步 | sanguo_git_sync | 已有的三端 Git 同步 |
3.2 为什么不用 OpenClaw 原生调度?
| 方案 | 问题 | 结论 |
|---|---|---|
| sessions_send | 不稳定,经常 timeout | ❌ 废弃 |
| cron wake | 各种问题,不可靠 | ❌ 废弃 |
| sessions_spawn | 每次创建新 session,文件堆积(庞统 296个/354MB),sub-agent 缺少 SOUL.md | ❌ 废弃 |
| 自建 Daemon HTTP API | v1.0 已验证(FastAPI + SQLite + PM2),可靠 | ✅ 采用 |
3.3 Agent 调度方式:主 session + HTTP 回报
核心原则:不创建 sub-agent,复用 Agent 主 session。
Daemon 需要调度张飞执行编码任务:
1. Daemon 通过 Gateway WS API 发消息到 agent:zhangfei-dev:main
- 消息内容:step intent + end_state + constraints
- 张飞在自己的主 session 里收到消息
2. 张飞执行任务
- 读取 daemon API 获取需要的上下文
- 在自己的 workspace 里工作
3. 张飞通过 daemon HTTP API 回报结果
- curl POST http://localhost:8080/api/step/{id}/complete
- body: { artifacts: [...], confidence: 0.9, summary: "..." }
- daemon 做幻觉门控(验证文件存在)
- daemon 更新 SQLite 状态
- daemon 触发下一步
4. daemon 通过 Gateway WS API 通知庞统进展
- 庞统在主 session 收到 systemEvent
- 庞统决定下一步操作
为什么不用 sub-agent?
- 每个 sub-agent 产生 3-5 个磁盘文件(.jsonl + .trajectory + .path)
- 庞统已有 296 个 session 文件 354MB,姜维 227 个 1.4GB
- sub-agent 没有 SOUL.md/IDENTITY.md,行为不够可控
- cleanup: delete 只是从 UI 隐藏,文件仍然在磁盘上
主 session 的上下文膨胀怎么办?
- Agent 每完成一个任务步骤后,daemon 发 systemEvent 触发 reset
- OpenClaw 的 reset 会压缩历史,释放上下文空间
- 或者:每 N 个步骤后自动 reset 一次
3.4 用户查看进展的流程
用户: "任务进展如何?"
│
▼
庞统主 session 收到消息
│
▼
庞统调用 Daemon API:
GET http://localhost:8080/api/task/{task_id}/status
│
▼
Daemon 返回:
{
"task": { "title": "...", "state": "executing", "phase": 3 },
"plan": { "steps": [...], "completed": 3, "total": 5 },
"current_step": {
"agent": "zhangfei",
"status": "executing",
"started_at": "...",
"progress": "正在编码策略逻辑"
},
"recent_moments": [
{ "type": "agent_completed", "agent": "zhaoyun", "summary": "数据获取完成" },
{ "type": "plan_adjusted", "reason": "发现数据质量问题" }
],
"anomalies": [],
"token_budget": { "used": 120000, "total": 500000 }
}
│
▼
庞统用 AI 生成人类可读的进展汇报,回复用户
关键设计:庞统是无状态的
- 所有任务状态在 Daemon 的 SQLite 里
- 庞统 session 不保存任务状态
- 每次被问到进展,实时查询 Daemon API
- 这比 v1.0 好:v1.0 庞统需要在 session 里记住所有任务,上下文很快就爆了
3.5 配置化(零硬编码)
config/
├── states.yaml # 任务状态定义 + 合法流转
├── step-states.yaml # 步骤状态定义 + 合法流转
├── events.yaml # 事件类型定义
├── agent-registry.json # Agent 能力画像
├── templates/ # 任务模板
│ ├── backtest.yaml
│ ├── strategy-research.yaml
│ └── deployment.yaml
└── settings.yaml # 全局设置
states.yaml:
# 任务级状态定义
# Daemon 启动时加载,代码里不允许出现硬编码的状态名
states:
- name: exploring
description: "需求探索中"
phase: 1
transitions_to: [planning, cancelled]
- name: planning
description: "动态规划中"
phase: 2
transitions_to: [executing, planning, cancelled]
- name: executing
description: "自主执行中"
phase: 3
transitions_to: [reviewing, executing, cancelled]
- name: reviewing
description: "验收中"
phase: 4
transitions_to: [completed, executing, cancelled]
- name: completed
description: "任务完成"
transitions_to: [] # 终态
- name: cancelled
description: "已取消"
transitions_to: [] # 终态
step-states.yaml:
# 步骤级状态定义
step_states:
- name: pending
transitions_to: [assigned, cancelled]
- name: assigned
transitions_to: [executing, cancelled]
- name: executing
transitions_to: [completed, failed, blocked, cancelled]
- name: completed
transitions_to: []
- name: failed
transitions_to: [pending] # 可重试
max_retries: 3
- name: blocked
transitions_to: [pending, cancelled]
代码中禁止出现硬编码状态名:
# ❌ 禁止
if task.state == "executing":
# ✅ 正确
EXECUTING = config.get_state("executing")
if task.state == EXECUTING:
3.6 核心代码模块
sanguo_moziplus_v2/
├── daemon/ # 守护进程
│ ├── main.py # FastAPI 入口
│ ├── api/ # HTTP API 路由
│ │ ├── tasks.py # 任务 CRUD
│ │ ├── steps.py # 步骤 CRUD + 回报
│ │ ├── board.py # 黑板查询
│ │ ├── moments.py # 事件查询
│ │ └── agents.py # Agent 状态/心跳
│ ├── engine/ # 编排引擎
│ │ ├── orchestrator.py # 编排主循环(事件驱动)
│ │ ├── planner.py # 动态规划
│ │ ├── selector.py # Agent 选择
│ │ ├── validator.py # 产出验证(幻觉门控)
│ │ └── experience.py # 经验沉淀引擎
│ ├── gateway_client.py # Gateway WS API 客户端
│ ├── db.py # SQLite 数据层
│ ├── lock.py # 文件锁实现
│ ├── health.py # 健康检查(daemon 内部定时)
│ ├── budget.py # Token 预算管理
│ └── config_loader.py # YAML/JSON 配置加载
├── config/ # 配置文件
│ ├── states.yaml
│ ├── step-states.yaml
│ ├── events.yaml
│ ├── agent-registry.json
│ ├── templates/
│ └── settings.yaml
├── artifacts/ # 产出物目录(git 追踪)
├── skills/ # Skill 包(供 Agent 加载)
│ ├── task-bootstrap/ # Agent 任务引导 Skill
│ │ └── SKILL.md # Boids + 元认知 + Auftragstaktik
│ └── wiki-query/ # 复用已有
├── docs/
│ ├── design/
│ └── research/
├── scripts/
│ ├── create-task.sh # CLI: 创建任务
│ ├── status.sh # CLI: 查看状态
│ └── bootstrap.sh # 初始化脚本
└── README.md
3.3 关键交互流程
流程 1:完整任务生命周期
用户: "帮我做一个均线策略回测"
│
▼
庞统 Phase 1(需求探索)
├── 苏格拉底对话 2-3 轮
├── 澄清:标的?周期?资金?评价指标?
├── 写入 context.json,confidence=0.9
└── 转入 Phase 2
│
▼
庞统 Phase 2(动态规划)
├── 检索经验库 → 找到"数据清洗应先于策略编码"
├── 生成 plan.json(5步)
├── 用户确认(或跳过)
└── 转入 Phase 3
│
▼
庞统 Phase 3(自主执行)
│
├── Step s1: 数据获取 → 选择赵云(data_fetch 能力)
│ ├── sessions_send 给赵云
│ ├── 赵云执行,写入 output/hs300_daily.csv
│ ├── 赵云 propose → 庞统 validate → commit
│ ├── 幻觉门控:文件存在?大小合理? ✓
│ └── confidence=0.95 ✓
│
├── 发现:数据有缺失值(anomaly_detected)
│ └── 庞统动态添加 s1.5 数据清洗步骤
│
├── Step s1.5: 数据清洗 → 选择赵云
│ └── ... 执行并完成
│
├── Step s2: 策略编码 → 选择张飞
│ └── ... 执行并完成
│
├── Step s3: 风控审查 → 选择关羽
│ └── ... 执行并完成
│
├── Step s4: 质量评审 → 选择司马懿
│ ├── 司马懿发现问题 → challenge_raised
│ ├── 庞统裁决 → 要求张飞修正
│ ├── 张飞修正 → 重新评审
│ └── 司马懿通过 ✓
│
└── 转入 Phase 4
│
▼
庞统 Phase 4(主动汇报)
├── 生成最终报告
├── 经验沉淀:提取 3 条经验写入 experience/
├── 向用户推送完成通知
└── 用户验收
流程 2:异常处理
场景:赵云执行超时
1. health scanner 检测到 zhaoyun state.json 30分钟未更新
2. scanner 通过 cron wake 庞统
3. 庞统:
a. 检查赵云 session 是否存活(sessions_list)
b. 存活 → steer 赵云(sessions_send "进度如何?")
c. 不存活 → 标记 s1 为 failed,重新分配给其他 Agent 或调整计划
d. 记录 decision: "赵云超时,重新分配"
4. 追加 Moment: agent_failed + decision_made
流程 3:用户中途干预
场景:执行到 s2 时用户说"改成 MACD 策略"
1. 用户消息注入庞统 session
2. 庞统:
a. 暂停当前执行(通知张飞停止)
b. 修改 context.json(goal 改为 MACD 策略)
c. 重新规划 plan.json(s2 需要修改)
d. 向用户确认修改方案
e. 用户确认后继续执行
3. 追加 Moment: user_steer + plan_adjusted
3.4 与 OpenClaw 的集成点
| v2.0 功能 | OpenClaw 能力 | 集成方式 |
|---|---|---|
| Agent 调度 | sessions_send / sessions_spawn | 直接调用 |
| 指挥官持久化 | persistent session | 庞统 session 保持在线 |
| 事件唤醒 | cron wake + systemEvent | 定期扫描黑板 + wake |
| Agent 间通信 | Agent 读写黑板,不走邮件 | |
| 文件操作 | read / write / edit | 直接操作黑板文件 |
| 前端交互 | webchat | 用户通过对话入口交互 |
| 成本控制 | session_status | 获取 token 消耗 |
| 知识检索 | wiki-query skill | 复用已有 skill |
4. 开放问题(需讨论)
| # | 问题 | 选项 | 建议 |
|---|---|---|---|
| 1 | 黑板用文件系统还是 SQLite? | A) 纯文件系统 B) SQLite + 文件 | A — 简单、可 git 追踪 |
| 2 | 庞统用 persistent session 还是 cron 唤醒? | A) persistent session B) cron 事件驱动 | B — 更省资源,更稳定 |
| 3 | Agent 团队是继续用三国还是重新定义? | A) 复用三国 B) 新定义 | A — 角色映射清晰 |
| 4 | v1.0 的 Agent 是否同时服务 v2.0? | A) 共享 Agent session B) 独立 session | B — 隔离,v1.0 和 v2.0 并行运行 |
| 5 | 前端是什么? | A) 纯对话(Control Center)B) 对话 + 可视化 | 先 A,后续加 B |
| 6 | 第一版先做什么? | 见下方里程碑 | — |
5. 里程碑建议
M1: 黑板核心 + 指挥官原型
├── blackboard/ 目录结构 + 读写 API
├── lock.py 文件锁
├── moments.jsonl 追加写入
├── 庞统 cron 唤醒 + 黑板扫描
└── 端到端测试:一个简单任务的完整生命周期
M2: Agent 调度 + 动态规划
├── agent-registry.json 能力画像
├── Agent 选择算法
├── plan.json 动态计划
├── propose→validate→commit 写入保护
└── 端到端测试:多步骤任务(3个Agent协作)
M3: 四相完善 + 经验沉淀
├── Phase 1 苏格拉底对话
├── Phase 2 Plan 审批
├── Phase 4 主动汇报
├── experience/ 经验沉淀
├── 健康检查 + 幻觉门控
└── 端到端测试:异常场景(超时、Agent崩溃、用户干预)
M4: 生产化
├── 成本治理
├── 监控面板
├── 文档完善
├── 与 v1.0 并行运行验证
└── 切换:v1.0 → v2.0
附录:术语表
| 术语 | 含义 |
|---|---|
| Blackboard | 共享意识空间,所有 Agent 的唯一信息共享中枢 |
| Control Unit | AI 指挥官(庞统),负责动态规划、Agent 选择、异常处理 |
| Moment | 原子事件,任务执行过程中的最小信息单元 |
| Fidelity | 信息保真度,控制不同 Agent 看到多少信息 |
| propose→validate→commit | 三阶段原子写入,防止并发竞态 |
| Boids | 群体智能规则,让 Agent 自行涌现协作行为 |
| Auftragstaktik | 任务式指挥,只给目标不给步骤 |
| 幻觉门控 | 验证 Agent 产出是否真实存在 |
| Ralph Loop | 持久目标跨 turn 保持机制 |