Files
sanguo_moziplus_v2/src/cli/blackboard.py
T
cfdaily f7fbdac89c chore: simayi-approved changes - lint fixes, toolchain improvements, healthz
All changes reviewed and APPROVED in PR #12 (Review ID: 40):
- toolchain_routes: webhook repo/org format compat, content dedup (sha256), closed issue filter
- dispatcher: inform mail crash 误标 done 修复
- ticker: cleanup and improvements
- healthz endpoint
- conftest: integration/e2e deselect markers
- docs: design docs, test-guide updates
- various lint/whitespace fixes across 30 files
2026-06-09 23:41:53 +08:00

333 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""CLI 工具 — blackboard.pyAgent 黑板操作)+ admin.py(管理员)"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import List, Optional
from src.blackboard.operations import Blackboard
from src.utils import get_data_root
from src.blackboard.models import Task, Comment, Output, Decision, Observation, Review, Experience
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
def _find_project_root() -> Path:
return get_data_root()
def _get_bb(project_id: str) -> Blackboard:
root = _find_project_root()
return Blackboard(root / project_id / "blackboard.db")
def _get_queries(project_id: str) -> Queries:
root = _find_project_root()
return Queries(root / project_id / "blackboard.db")
# ===================================================================
# blackboard.py CLI
# ===================================================================
def build_blackboard_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="blackboard", description="Agent blackboard operations")
sub = parser.add_subparsers(dest="command")
# read
p_read = sub.add_parser("read", help="Read task(s)")
p_read.add_argument("--project", required=True)
p_read.add_argument("--task-id")
p_read.add_argument("--status")
p_read.add_argument("--assignee")
p_read.add_argument("--json", action="store_true", dest="as_json")
# create task
p_create = sub.add_parser("create", help="Create task")
p_create.add_argument("--project", required=True)
p_create.add_argument("--id", required=True, dest="task_id")
p_create.add_argument("--title", required=True)
p_create.add_argument("--description", default="")
p_create.add_argument("--task-type", default="coding")
p_create.add_argument("--priority", type=int, default=5)
p_create.add_argument("--assignee")
p_create.add_argument("--assigned-by", default="user")
p_create.add_argument("--depends-on", help="JSON array of task IDs")
p_create.add_argument("--risk-level", default="standard")
p_create.add_argument("--must-haves", help="JSON string")
# claim
p_claim = sub.add_parser("claim", help="Claim a task")
p_claim.add_argument("--project", required=True)
p_claim.add_argument("--task-id", required=True)
p_claim.add_argument("--agent", required=True)
# output
p_output = sub.add_parser("output", help="Write output")
p_output.add_argument("--project", required=True)
p_output.add_argument("--task-id", required=True)
p_output.add_argument("--agent", required=True)
p_output.add_argument("--type", required=True, dest="output_type")
p_output.add_argument("--title", required=True)
p_output.add_argument("--path", default="")
p_output.add_argument("--summary", default="")
# comment
p_comment = sub.add_parser("comment", help="Add comment")
p_comment.add_argument("--project", required=True)
p_comment.add_argument("--task-id", required=True)
p_comment.add_argument("--author", required=True)
p_comment.add_argument("--body", required=True)
p_comment.add_argument("--type", default="general", dest="comment_type")
p_comment.add_argument("--mentions", help="Comma-separated agent IDs")
# decide
p_decide = sub.add_parser("decide", help="Record decision")
p_decide.add_argument("--project", required=True)
p_decide.add_argument("--task-id", required=True)
p_decide.add_argument("--decider", required=True)
p_decide.add_argument("--decision", required=True)
p_decide.add_argument("--rationale", required=True)
# observe
p_observe = sub.add_parser("observe", help="Add observation")
p_observe.add_argument("--project", required=True)
p_observe.add_argument("--task-id", required=True)
p_observe.add_argument("--observer", required=True)
p_observe.add_argument("--body", required=True)
p_observe.add_argument("--severity", default="info")
# review
p_review = sub.add_parser("review", help="Add review")
p_review.add_argument("--project", required=True)
p_review.add_argument("--review-id", required=True)
p_review.add_argument("--task-id", required=True)
p_review.add_argument("--reviewer", required=True)
p_review.add_argument("--review-type", required=True)
p_review.add_argument("--verdict", required=True)
p_review.add_argument("--summary", required=True)
p_review.add_argument("--confidence", type=float)
return parser
def run_blackboard_cli(args: Optional[List[str]] = None) -> int:
parser = build_blackboard_parser()
opts = parser.parse_args(args)
if not opts.command:
parser.print_help()
return 1
try:
if opts.command == "read":
return _cmd_read(opts)
elif opts.command == "create":
return _cmd_create(opts)
elif opts.command == "claim":
return _cmd_claim(opts)
elif opts.command == "output":
return _cmd_output(opts)
elif opts.command == "comment":
return _cmd_comment(opts)
elif opts.command == "decide":
return _cmd_decide(opts)
elif opts.command == "observe":
return _cmd_observe(opts)
elif opts.command == "review":
return _cmd_review(opts)
else:
parser.print_help()
return 1
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
def _cmd_read(opts) -> int:
bb = _get_bb(opts.project)
if opts.task_id:
task = bb.get_task(opts.task_id)
if not task:
print(f"Task not found: {opts.task_id}", file=sys.stderr)
return 1
_print_tasks([task], opts.as_json)
else:
tasks = bb.list_tasks(status=getattr(opts, "status", None),
assignee=getattr(opts, "assignee", None))
_print_tasks(tasks, opts.as_json)
return 0
def _cmd_create(opts) -> int:
bb = _get_bb(opts.project)
task = Task(
id=opts.task_id, title=opts.title, description=opts.description,
task_type=opts.task_type, priority=opts.priority,
assignee=opts.assignee, assigned_by=opts.assigned_by,
depends_on=opts.depends_on, risk_level=opts.risk_level,
must_haves=getattr(opts, "must_haves", None),
)
bb.create_task(task)
print(f"Created task: {task.id}")
return 0
def _cmd_claim(opts) -> int:
bb = _get_bb(opts.project)
if bb.claim_task(opts.task_id, opts.agent):
print(f"Claimed: {opts.task_id} by {opts.agent}")
return 0
print(f"Claim failed: {opts.task_id}", file=sys.stderr)
return 1
def _cmd_output(opts) -> int:
bb = _get_bb(opts.project)
oid = bb.write_output(opts.task_id, opts.agent, opts.output_type,
opts.title, content_path=opts.path,
summary=opts.summary)
print(f"Output written: {oid}")
return 0
def _cmd_comment(opts) -> int:
bb = _get_bb(opts.project)
mentions = opts.mentions.split(",") if opts.mentions else None
cid = bb.add_comment(opts.task_id, opts.author, opts.body,
comment_type=opts.comment_type, mentions=mentions)
print(f"Comment added: {cid}")
return 0
def _cmd_decide(opts) -> int:
bb = _get_bb(opts.project)
did = bb.add_decision(opts.task_id, opts.decider, opts.decision, opts.rationale)
print(f"Decision recorded: {did}")
return 0
def _cmd_observe(opts) -> int:
bb = _get_bb(opts.project)
oid = bb.add_observation(opts.task_id, opts.observer, opts.body,
severity=opts.severity)
print(f"Observation added: {oid}")
return 0
def _cmd_review(opts) -> int:
bb = _get_bb(opts.project)
review = Review(
id=opts.review_id, task_id=opts.task_id, reviewer=opts.reviewer,
review_type=opts.review_type, verdict=opts.verdict,
summary=opts.summary, confidence=opts.confidence,
)
bb.add_review(review)
print(f"Review added: {review.id}")
return 0
def _print_tasks(tasks, as_json: bool):
if as_json:
data = []
for t in tasks:
d = {k: v for k, v in t.__dict__.items() if v is not None}
data.append(d)
print(json.dumps(data, indent=2, ensure_ascii=False))
else:
for t in tasks:
print(f" {t.id} [{t.status:>8}] P{t.priority} {t.title}")
if t.assignee:
print(f" assigned: {t.assignee}")
# ===================================================================
# admin.py CLI
# ===================================================================
def build_admin_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="admin", description="Admin operations")
sub = parser.add_subparsers(dest="command")
# project create
p_pc = sub.add_parser("project-create", help="Create project")
p_pc.add_argument("--id", required=True, dest="project_id")
p_pc.add_argument("--name", required=True)
p_pc.add_argument("--agents", help="Comma-separated agent IDs")
p_pc.add_argument("--description", default="")
# project list
p_pl = sub.add_parser("project-list", help="List projects")
# project archive
p_pa = sub.add_parser("project-archive", help="Archive project")
p_pa.add_argument("--id", required=True, dest="project_id")
# manual tick
p_tick = sub.add_parser("tick", help="Trigger manual tick")
p_tick.add_argument("--project", required=True)
return parser
def run_admin_cli(args: Optional[List[str]] = None) -> int:
parser = build_admin_parser()
opts = parser.parse_args(args)
if not opts.command:
parser.print_help()
return 1
root = _find_project_root()
registry = ProjectRegistry(root)
try:
if opts.command == "project-create":
agents = opts.agents.split(",") if opts.agents else []
info = registry.create_project(opts.project_id, opts.name,
agents=agents,
description=opts.description)
print(f"Project created: {opts.project_id} ({opts.name})")
return 0
elif opts.command == "project-list":
projects = registry.list_projects()
for pid, info in projects.items():
status = info.get("status", "?")
agents = ",".join(info.get("agents", []))
print(f" {pid} [{status}] {info.get('name', '')} agents: {agents}")
return 0
elif opts.command == "project-archive":
if registry.archive_project(opts.project_id):
print(f"Archived: {opts.project_id}")
return 0
print(f"Project not found: {opts.project_id}", file=sys.stderr)
return 1
elif opts.command == "tick":
print(f"Manual tick placeholder for project: {opts.project}")
return 0
else:
parser.print_help()
return 1
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
# Determine which CLI to run based on script name
import os
script = os.path.basename(sys.argv[0])
if "admin" in script:
sys.exit(run_admin_cli())
else:
sys.exit(run_blackboard_cli())