"""CLI 工具 — blackboard.py(Agent 黑板操作)+ admin.py(管理员)""" from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import List, Optional from src.blackboard.operations import Blackboard from src.blackboard.models import Task, Comment, Output, Decision, Observation, Review, Experience from src.blackboard.queries import Queries from src.blackboard.registry import ProjectRegistry def _find_project_root() -> Path: """从环境变量或默认路径找项目根目录""" import os root = os.environ.get("BLACKBOARD_ROOT") if root: return Path(root) return Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" / "projects" def _get_bb(project_id: str) -> Blackboard: root = _find_project_root() return Blackboard(root / project_id / "blackboard.db") def _get_queries(project_id: str) -> Queries: root = _find_project_root() return Queries(root / project_id / "blackboard.db") # =================================================================== # blackboard.py CLI # =================================================================== def build_blackboard_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="blackboard", description="Agent blackboard operations") sub = parser.add_subparsers(dest="command") # read p_read = sub.add_parser("read", help="Read task(s)") p_read.add_argument("--project", required=True) p_read.add_argument("--task-id") p_read.add_argument("--status") p_read.add_argument("--assignee") p_read.add_argument("--json", action="store_true", dest="as_json") # create task p_create = sub.add_parser("create", help="Create task") p_create.add_argument("--project", required=True) p_create.add_argument("--id", required=True, dest="task_id") p_create.add_argument("--title", required=True) p_create.add_argument("--description", default="") p_create.add_argument("--task-type", default="coding") p_create.add_argument("--priority", type=int, default=5) p_create.add_argument("--assignee") p_create.add_argument("--assigned-by", default="user") p_create.add_argument("--depends-on", help="JSON array of task IDs") p_create.add_argument("--risk-level", default="standard") p_create.add_argument("--must-haves", help="JSON string") # claim p_claim = sub.add_parser("claim", help="Claim a task") p_claim.add_argument("--project", required=True) p_claim.add_argument("--task-id", required=True) p_claim.add_argument("--agent", required=True) # output p_output = sub.add_parser("output", help="Write output") p_output.add_argument("--project", required=True) p_output.add_argument("--task-id", required=True) p_output.add_argument("--agent", required=True) p_output.add_argument("--type", required=True, dest="output_type") p_output.add_argument("--title", required=True) p_output.add_argument("--path", default="") p_output.add_argument("--summary", default="") # comment p_comment = sub.add_parser("comment", help="Add comment") p_comment.add_argument("--project", required=True) p_comment.add_argument("--task-id", required=True) p_comment.add_argument("--author", required=True) p_comment.add_argument("--body", required=True) p_comment.add_argument("--type", default="general", dest="comment_type") p_comment.add_argument("--mentions", help="Comma-separated agent IDs") # decide p_decide = sub.add_parser("decide", help="Record decision") p_decide.add_argument("--project", required=True) p_decide.add_argument("--task-id", required=True) p_decide.add_argument("--decider", required=True) p_decide.add_argument("--decision", required=True) p_decide.add_argument("--rationale", required=True) # observe p_observe = sub.add_parser("observe", help="Add observation") p_observe.add_argument("--project", required=True) p_observe.add_argument("--task-id", required=True) p_observe.add_argument("--observer", required=True) p_observe.add_argument("--body", required=True) p_observe.add_argument("--severity", default="info") # review p_review = sub.add_parser("review", help="Add review") p_review.add_argument("--project", required=True) p_review.add_argument("--review-id", required=True) p_review.add_argument("--task-id", required=True) p_review.add_argument("--reviewer", required=True) p_review.add_argument("--review-type", required=True) p_review.add_argument("--verdict", required=True) p_review.add_argument("--summary", required=True) p_review.add_argument("--confidence", type=float) return parser def run_blackboard_cli(args: Optional[List[str]] = None) -> int: parser = build_blackboard_parser() opts = parser.parse_args(args) if not opts.command: parser.print_help() return 1 try: if opts.command == "read": return _cmd_read(opts) elif opts.command == "create": return _cmd_create(opts) elif opts.command == "claim": return _cmd_claim(opts) elif opts.command == "output": return _cmd_output(opts) elif opts.command == "comment": return _cmd_comment(opts) elif opts.command == "decide": return _cmd_decide(opts) elif opts.command == "observe": return _cmd_observe(opts) elif opts.command == "review": return _cmd_review(opts) else: parser.print_help() return 1 except Exception as e: print(f"ERROR: {e}", file=sys.stderr) return 1 def _cmd_read(opts) -> int: bb = _get_bb(opts.project) if opts.task_id: task = bb.get_task(opts.task_id) if not task: print(f"Task not found: {opts.task_id}", file=sys.stderr) return 1 _print_tasks([task], opts.as_json) else: tasks = bb.list_tasks(status=getattr(opts, "status", None), assignee=getattr(opts, "assignee", None)) _print_tasks(tasks, opts.as_json) return 0 def _cmd_create(opts) -> int: bb = _get_bb(opts.project) task = Task( id=opts.task_id, title=opts.title, description=opts.description, task_type=opts.task_type, priority=opts.priority, assignee=opts.assignee, assigned_by=opts.assigned_by, depends_on=opts.depends_on, risk_level=opts.risk_level, must_haves=getattr(opts, "must_haves", None), ) bb.create_task(task) print(f"Created task: {task.id}") return 0 def _cmd_claim(opts) -> int: bb = _get_bb(opts.project) if bb.claim_task(opts.task_id, opts.agent): print(f"Claimed: {opts.task_id} by {opts.agent}") return 0 print(f"Claim failed: {opts.task_id}", file=sys.stderr) return 1 def _cmd_output(opts) -> int: bb = _get_bb(opts.project) oid = bb.write_output(opts.task_id, opts.agent, opts.output_type, opts.title, content_path=opts.path, summary=opts.summary) print(f"Output written: {oid}") return 0 def _cmd_comment(opts) -> int: bb = _get_bb(opts.project) mentions = opts.mentions.split(",") if opts.mentions else None cid = bb.add_comment(opts.task_id, opts.author, opts.body, comment_type=opts.comment_type, mentions=mentions) print(f"Comment added: {cid}") return 0 def _cmd_decide(opts) -> int: bb = _get_bb(opts.project) did = bb.add_decision(opts.task_id, opts.decider, opts.decision, opts.rationale) print(f"Decision recorded: {did}") return 0 def _cmd_observe(opts) -> int: bb = _get_bb(opts.project) oid = bb.add_observation(opts.task_id, opts.observer, opts.body, severity=opts.severity) print(f"Observation added: {oid}") return 0 def _cmd_review(opts) -> int: bb = _get_bb(opts.project) review = Review( id=opts.review_id, task_id=opts.task_id, reviewer=opts.reviewer, review_type=opts.review_type, verdict=opts.verdict, summary=opts.summary, confidence=opts.confidence, ) bb.add_review(review) print(f"Review added: {review.id}") return 0 def _print_tasks(tasks, as_json: bool): if as_json: data = [] for t in tasks: d = {k: v for k, v in t.__dict__.items() if v is not None} data.append(d) print(json.dumps(data, indent=2, ensure_ascii=False)) else: for t in tasks: print(f" {t.id} [{t.status:>8}] P{t.priority} {t.title}") if t.assignee: print(f" assigned: {t.assignee}") # =================================================================== # admin.py CLI # =================================================================== def build_admin_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="admin", description="Admin operations") sub = parser.add_subparsers(dest="command") # project create p_pc = sub.add_parser("project-create", help="Create project") p_pc.add_argument("--id", required=True, dest="project_id") p_pc.add_argument("--name", required=True) p_pc.add_argument("--agents", help="Comma-separated agent IDs") p_pc.add_argument("--description", default="") # project list p_pl = sub.add_parser("project-list", help="List projects") # project archive p_pa = sub.add_parser("project-archive", help="Archive project") p_pa.add_argument("--id", required=True, dest="project_id") # manual tick p_tick = sub.add_parser("tick", help="Trigger manual tick") p_tick.add_argument("--project", required=True) return parser def run_admin_cli(args: Optional[List[str]] = None) -> int: parser = build_admin_parser() opts = parser.parse_args(args) if not opts.command: parser.print_help() return 1 root = _find_project_root() registry = ProjectRegistry(root) try: if opts.command == "project-create": agents = opts.agents.split(",") if opts.agents else [] info = registry.create_project(opts.project_id, opts.name, agents=agents, description=opts.description) print(f"Project created: {opts.project_id} ({opts.name})") return 0 elif opts.command == "project-list": projects = registry.list_projects() for pid, info in projects.items(): status = info.get("status", "?") agents = ",".join(info.get("agents", [])) print(f" {pid} [{status}] {info.get('name', '')} agents: {agents}") return 0 elif opts.command == "project-archive": if registry.archive_project(opts.project_id): print(f"Archived: {opts.project_id}") return 0 print(f"Project not found: {opts.project_id}", file=sys.stderr) return 1 elif opts.command == "tick": print(f"Manual tick placeholder for project: {opts.project}") return 0 else: parser.print_help() return 1 except Exception as e: print(f"ERROR: {e}", file=sys.stderr) return 1 if __name__ == "__main__": # Determine which CLI to run based on script name import os script = os.path.basename(sys.argv[0]) if "admin" in script: sys.exit(run_admin_cli()) else: sys.exit(run_blackboard_cli())