"""CLI 工具 — blackboard.py(Agent 黑板操作)+ admin.py(管理员)""" from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import List, Optional from src.blackboard.operations import Blackboard from src.utils import get_data_root from src.blackboard.models import Task, Review from src.blackboard.queries import Queries from src.blackboard.registry import ProjectRegistry def _find_project_root() -> Path: return get_data_root() def _get_bb(project_id: str) -> Blackboard: root = _find_project_root() return Blackboard(root / project_id / "blackboard.db") def _get_queries(project_id: str) -> Queries: root = _find_project_root() return Queries(root / project_id / "blackboard.db") # =================================================================== # blackboard.py CLI # =================================================================== def build_blackboard_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="blackboard", description="Agent blackboard operations") sub = parser.add_subparsers(dest="command") # read p_read = sub.add_parser("read", help="Read task(s)") p_read.add_argument("--project", required=True) p_read.add_argument("--task-id") p_read.add_argument("--status") p_read.add_argument("--assignee") p_read.add_argument("--json", action="store_true", dest="as_json") # create task p_create = sub.add_parser("create", help="Create task") p_create.add_argument("--project", required=True) p_create.add_argument("--id", required=True, dest="task_id") p_create.add_argument("--title", required=True) p_create.add_argument("--description", default="") p_create.add_argument("--task-type", default="coding") p_create.add_argument("--priority", type=int, default=5) p_create.add_argument("--assignee") p_create.add_argument("--assigned-by", default="user") p_create.add_argument("--depends-on", help="JSON array of task IDs") p_create.add_argument("--risk-level", default="standard") p_create.add_argument("--must-haves", help="JSON string") # claim p_claim = sub.add_parser("claim", help="Claim a task") p_claim.add_argument("--project", required=True) p_claim.add_argument("--task-id", required=True) p_claim.add_argument("--agent", required=True) # output p_output = sub.add_parser("output", help="Write output") p_output.add_argument("--project", required=True) p_output.add_argument("--task-id", required=True) p_output.add_argument("--agent", required=True) p_output.add_argument("--type", required=True, dest="output_type") p_output.add_argument("--title", required=True) p_output.add_argument("--path", default="") p_output.add_argument("--summary", default="") # comment p_comment = sub.add_parser("comment", help="Add comment") p_comment.add_argument("--project", required=True) p_comment.add_argument("--task-id", required=True) p_comment.add_argument("--author", required=True) p_comment.add_argument("--body", required=True) p_comment.add_argument("--type", default="general", dest="comment_type") p_comment.add_argument("--mentions", help="Comma-separated agent IDs") # decide p_decide = sub.add_parser("decide", help="Record decision") p_decide.add_argument("--project", required=True) p_decide.add_argument("--task-id", required=True) p_decide.add_argument("--decider", required=True) p_decide.add_argument("--decision", required=True) p_decide.add_argument("--rationale", required=True) # observe p_observe = sub.add_parser("observe", help="Add observation") p_observe.add_argument("--project", required=True) p_observe.add_argument("--task-id", required=True) p_observe.add_argument("--observer", required=True) p_observe.add_argument("--body", required=True) p_observe.add_argument("--severity", default="info") # review p_review = sub.add_parser("review", help="Add review") p_review.add_argument("--project", required=True) p_review.add_argument("--review-id", required=True) p_review.add_argument("--task-id", required=True) p_review.add_argument("--reviewer", required=True) p_review.add_argument("--review-type", required=True) p_review.add_argument("--verdict", required=True) p_review.add_argument("--summary", required=True) p_review.add_argument("--confidence", type=float) return parser def run_blackboard_cli(args: Optional[List[str]] = None) -> int: parser = build_blackboard_parser() opts = parser.parse_args(args) if not opts.command: parser.print_help() return 1 try: if opts.command == "read": return _cmd_read(opts) elif opts.command == "create": return _cmd_create(opts) elif opts.command == "claim": return _cmd_claim(opts) elif opts.command == "output": return _cmd_output(opts) elif opts.command == "comment": return _cmd_comment(opts) elif opts.command == "decide": return _cmd_decide(opts) elif opts.command == "observe": return _cmd_observe(opts) elif opts.command == "review": return _cmd_review(opts) else: parser.print_help() return 1 except Exception as e: print(f"ERROR: {e}", file=sys.stderr) return 1 def _cmd_read(opts) -> int: bb = _get_bb(opts.project) if opts.task_id: task = bb.get_task(opts.task_id) if not task: print(f"Task not found: {opts.task_id}", file=sys.stderr) return 1 _print_tasks([task], opts.as_json) else: tasks = bb.list_tasks(status=getattr(opts, "status", None), assignee=getattr(opts, "assignee", None)) _print_tasks(tasks, opts.as_json) return 0 def _cmd_create(opts) -> int: bb = _get_bb(opts.project) task = Task( id=opts.task_id, title=opts.title, description=opts.description, task_type=opts.task_type, priority=opts.priority, assignee=opts.assignee, assigned_by=opts.assigned_by, depends_on=opts.depends_on, risk_level=opts.risk_level, must_haves=getattr(opts, "must_haves", None), ) bb.create_task(task) print(f"Created task: {task.id}") return 0 def _cmd_claim(opts) -> int: bb = _get_bb(opts.project) if bb.claim_task(opts.task_id, opts.agent): print(f"Claimed: {opts.task_id} by {opts.agent}") return 0 print(f"Claim failed: {opts.task_id}", file=sys.stderr) return 1 def _cmd_output(opts) -> int: bb = _get_bb(opts.project) oid = bb.write_output(opts.task_id, opts.agent, opts.output_type, opts.title, content_path=opts.path, summary=opts.summary) print(f"Output written: {oid}") return 0 def _cmd_comment(opts) -> int: bb = _get_bb(opts.project) mentions = opts.mentions.split(",") if opts.mentions else None cid = bb.add_comment(opts.task_id, opts.author, opts.body, comment_type=opts.comment_type, mentions=mentions) print(f"Comment added: {cid}") return 0 def _cmd_decide(opts) -> int: bb = _get_bb(opts.project) did = bb.add_decision( opts.task_id, opts.decider, opts.decision, opts.rationale) print(f"Decision recorded: {did}") return 0 def _cmd_observe(opts) -> int: bb = _get_bb(opts.project) oid = bb.add_observation(opts.task_id, opts.observer, opts.body, severity=opts.severity) print(f"Observation added: {oid}") return 0 def _cmd_review(opts) -> int: bb = _get_bb(opts.project) review = Review( id=opts.review_id, task_id=opts.task_id, reviewer=opts.reviewer, review_type=opts.review_type, verdict=opts.verdict, summary=opts.summary, confidence=opts.confidence, ) bb.add_review(review) print(f"Review added: {review.id}") return 0 def _print_tasks(tasks, as_json: bool): if as_json: data = [] for t in tasks: d = {k: v for k, v in t.__dict__.items() if v is not None} data.append(d) print(json.dumps(data, indent=2, ensure_ascii=False)) else: for t in tasks: print(f" {t.id} [{t.status:>8}] P{t.priority} {t.title}") if t.assignee: print(f" assigned: {t.assignee}") # =================================================================== # admin.py CLI # =================================================================== def build_admin_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="admin", description="Admin operations") sub = parser.add_subparsers(dest="command") # project create p_pc = sub.add_parser("project-create", help="Create project") p_pc.add_argument("--id", required=True, dest="project_id") p_pc.add_argument("--name", required=True) p_pc.add_argument("--agents", help="Comma-separated agent IDs") p_pc.add_argument("--description", default="") # project list sub.add_parser("project-list", help="List projects") # project archive p_pa = sub.add_parser("project-archive", help="Archive project") p_pa.add_argument("--id", required=True, dest="project_id") # manual tick p_tick = sub.add_parser("tick", help="Trigger manual tick") p_tick.add_argument("--project", required=True) return parser def run_admin_cli(args: Optional[List[str]] = None) -> int: parser = build_admin_parser() opts = parser.parse_args(args) if not opts.command: parser.print_help() return 1 root = _find_project_root() registry = ProjectRegistry(root) try: if opts.command == "project-create": agents = opts.agents.split(",") if opts.agents else [] info = registry.create_project(opts.project_id, opts.name, agents=agents, description=opts.description) print(f"Project created: {opts.project_id} ({opts.name})") return 0 elif opts.command == "project-list": projects = registry.list_projects() for pid, info in projects.items(): status = info.get("status", "?") agents = ",".join(info.get("agents", [])) print( f" {pid} [{status}] {info.get('name', '')} agents: {agents}") return 0 elif opts.command == "project-archive": if registry.archive_project(opts.project_id): print(f"Archived: {opts.project_id}") return 0 print(f"Project not found: {opts.project_id}", file=sys.stderr) return 1 elif opts.command == "tick": print(f"Manual tick placeholder for project: {opts.project}") return 0 else: parser.print_help() return 1 except Exception as e: print(f"ERROR: {e}", file=sys.stderr) return 1 if __name__ == "__main__": # Determine which CLI to run based on script name import os script = os.path.basename(sys.argv[0]) if "admin" in script: sys.exit(run_admin_cli()) else: sys.exit(run_blackboard_cli())