基于vue3.0和element-plus的组件库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

542 lines
18 KiB

#!/usr/bin/env python3
"""
Multi-Agent Pipeline: Status display and formatting.
Provides:
cmd_help - Show help text
cmd_list - List worktrees and agents
cmd_summary - Summary of all tasks with agent status
cmd_detail - Detailed single-agent status
cmd_registry - Dump agent registry
Also exports shared utilities used by status_monitor:
is_running, find_agent, get_registry_file, calc_elapsed, count_modified_files
"""
from __future__ import annotations
import json
import os
import subprocess
from datetime import datetime
from pathlib import Path
from common.cli_adapter import get_cli_adapter
from common.io import read_json
from common.log import Colors
from common.developer import ensure_developer
from common.paths import (
get_repo_root,
get_tasks_dir,
)
from common.phase import get_phase_info
from common.task_queue import format_task_stats, get_task_stats
from common.tasks import iter_active_tasks
from common.worktree import get_agents_dir
# =============================================================================
# Shared Utilities
# =============================================================================
def is_running(pid: int | str | None) -> bool:
"""Check if PID is running."""
if not pid:
return False
try:
pid_int = int(pid)
os.kill(pid_int, 0)
return True
except (ProcessLookupError, ValueError, PermissionError, TypeError):
return False
def status_color(status: str) -> str:
"""Get status color."""
colors = {
"completed": Colors.GREEN,
"in_progress": Colors.BLUE,
"planning": Colors.YELLOW,
}
return colors.get(status, Colors.DIM)
def get_registry_file(repo_root: Path) -> Path | None:
"""Get registry file path."""
agents_dir = get_agents_dir(repo_root)
if agents_dir:
return agents_dir / "registry.json"
return None
def find_agent(search: str, repo_root: Path) -> dict | None:
"""Find agent by task name or ID."""
registry_file = get_registry_file(repo_root)
if not registry_file or not registry_file.is_file():
return None
data = read_json(registry_file)
if not data:
return None
for agent in data.get("agents", []):
# Exact ID match
if agent.get("id") == search:
return agent
# Partial match on task_dir
task_dir = agent.get("task_dir", "")
if search in task_dir:
return agent
return None
def calc_elapsed(started: str | None) -> str:
"""Calculate elapsed time from ISO timestamp."""
if not started:
return "N/A"
try:
# Parse ISO format
if "+" in started:
started = started.split("+")[0]
if "T" in started:
start_dt = datetime.fromisoformat(started)
else:
return "N/A"
now = datetime.now()
elapsed = (now - start_dt).total_seconds()
if elapsed < 60:
return f"{int(elapsed)}s"
elif elapsed < 3600:
mins = int(elapsed // 60)
secs = int(elapsed % 60)
return f"{mins}m {secs}s"
else:
hours = int(elapsed // 3600)
mins = int((elapsed % 3600) // 60)
return f"{hours}h {mins}m"
except (ValueError, TypeError):
return "N/A"
def count_modified_files(worktree: str) -> int:
"""Count modified files in worktree."""
if not Path(worktree).is_dir():
return 0
try:
result = subprocess.run(
["git", "status", "--short"],
cwd=worktree,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
return len([line for line in result.stdout.splitlines() if line.strip()])
except Exception:
return 0
# =============================================================================
# Commands
# =============================================================================
def cmd_help() -> int:
"""Show help."""
print("""Multi-Agent Pipeline: Status Monitor
Usage:
python3 status.py Show summary of all tasks
python3 status.py -a <assignee> Filter tasks by assignee
python3 status.py --list List all worktrees and agents
python3 status.py --detail <task> Detailed task status
python3 status.py --progress <task> Quick progress view with recent activity
python3 status.py --watch <task> Watch agent log in real-time
python3 status.py --log <task> Show recent log entries
python3 status.py --registry Show agent registry
Examples:
python3 status.py -a taosu
python3 status.py --detail my-task
python3 status.py --progress my-task
python3 status.py --watch 01-16-worktree-support
python3 status.py --log worktree-support
""")
return 0
def cmd_list(repo_root: Path) -> int:
"""List worktrees and agents."""
print(f"{Colors.BLUE}=== Git Worktrees ==={Colors.NC}")
print()
subprocess.run(["git", "worktree", "list"], cwd=repo_root)
print()
print(f"{Colors.BLUE}=== Registered Agents ==={Colors.NC}")
print()
registry_file = get_registry_file(repo_root)
if not registry_file or not registry_file.is_file():
print(" (no registry found)")
return 0
data = read_json(registry_file)
if not data or not data.get("agents"):
print(" (no agents registered)")
return 0
for agent in data["agents"]:
agent_id = agent.get("id", "?")
pid = agent.get("pid")
wt = agent.get("worktree_path", "?")
started = agent.get("started_at", "?")
if is_running(pid):
status_icon = f"{Colors.GREEN}{Colors.NC}"
else:
status_icon = f"{Colors.RED}{Colors.NC}"
print(f" {status_icon} {agent_id} (PID: {pid})")
print(f" {Colors.DIM}Worktree: {wt}{Colors.NC}")
print(f" {Colors.DIM}Started: {started}{Colors.NC}")
print()
return 0
def cmd_summary(repo_root: Path, filter_assignee: str | None = None) -> int:
"""Show summary of all tasks."""
# Import lazily to avoid circular import at module level
from .status_monitor import get_last_tool, get_last_message
ensure_developer(repo_root)
tasks_dir = get_tasks_dir(repo_root)
if not tasks_dir.is_dir():
print("No tasks directory found")
return 0
registry_file = get_registry_file(repo_root)
# Count running agents
running_count = 0
total_agents = 0
if registry_file and registry_file.is_file():
data = read_json(registry_file)
if data:
agents = data.get("agents", [])
total_agents = len(agents)
for agent in agents:
if is_running(agent.get("pid")):
running_count += 1
# Task queue stats
task_stats = get_task_stats(repo_root)
print(f"{Colors.BLUE}=== Multi-Agent Status ==={Colors.NC}")
print(
f" Agents: {Colors.GREEN}{running_count}{Colors.NC} running / {total_agents} registered"
)
print(f" Tasks: {format_task_stats(task_stats)}")
print()
# Process tasks
running_tasks = []
stopped_tasks = []
regular_tasks = []
registry_data = (
read_json(registry_file)
if registry_file and registry_file.is_file()
else None
)
for t in iter_active_tasks(tasks_dir):
name = t.dir_name
status = t.status
assignee = t.assignee or "unassigned"
priority = t.priority
# Filter by assignee
if filter_assignee and assignee != filter_assignee:
continue
# Check agent status
agent_info = None
if registry_data:
for agent in registry_data.get("agents", []):
if name in agent.get("task_dir", ""):
agent_info = agent
break
if agent_info:
pid = agent_info.get("pid")
worktree = agent_info.get("worktree_path", "")
started = agent_info.get("started_at")
agent_platform = agent_info.get("platform", "claude")
if is_running(pid):
# Running agent
task_dir_rel = agent_info.get("task_dir", "")
worktree_task_json = Path(worktree) / task_dir_rel / "task.json"
phase_source = t.directory / "task.json"
if worktree_task_json.is_file():
phase_source = worktree_task_json
phase_info_str = get_phase_info(phase_source)
elapsed = calc_elapsed(started)
modified = count_modified_files(worktree)
worktree_data = read_json(phase_source)
branch = worktree_data.get("branch", "N/A") if worktree_data else "N/A"
log_file = Path(worktree) / ".agent-log"
last_tool = get_last_tool(log_file, platform=agent_platform)
running_tasks.append(
{
"name": name,
"priority": priority,
"assignee": assignee,
"phase_info": phase_info_str,
"elapsed": elapsed,
"branch": branch,
"modified": modified,
"last_tool": last_tool,
"pid": pid,
}
)
else:
# Stopped agent
task_dir_rel = agent_info.get("task_dir", "")
worktree_task_json = Path(worktree) / task_dir_rel / "task.json"
worktree_status = "unknown"
if worktree_task_json.is_file():
wt_data = read_json(worktree_task_json)
if wt_data:
worktree_status = wt_data.get("status", "unknown")
session_id_file = Path(worktree) / ".session-id"
log_file = Path(worktree) / ".agent-log"
stopped_tasks.append(
{
"name": name,
"worktree": worktree,
"status": worktree_status,
"session_id_file": session_id_file,
"log_file": log_file,
"platform": agent_info.get("platform", "claude"),
}
)
else:
# Regular task
regular_tasks.append(
{
"name": name,
"status": status,
"priority": priority,
"assignee": assignee,
}
)
# Output running agents
if running_tasks:
print(f"{Colors.CYAN}Running Agents:{Colors.NC}")
for t in running_tasks:
priority_color = (
Colors.RED
if t["priority"] == "P0"
else (Colors.YELLOW if t["priority"] == "P1" else Colors.BLUE)
)
print(
f"{Colors.GREEN}{Colors.NC} {Colors.CYAN}{t['name']}{Colors.NC} {Colors.GREEN}[running]{Colors.NC} {priority_color}[{t['priority']}]{Colors.NC} @{t['assignee']}"
)
print(f" Phase: {t['phase_info']}")
print(f" Elapsed: {t['elapsed']}")
print(f" Branch: {Colors.DIM}{t['branch']}{Colors.NC}")
print(f" Modified: {t['modified']} file(s)")
if t["last_tool"]:
print(f" Activity: {Colors.YELLOW}{t['last_tool']}{Colors.NC}")
print(f" PID: {Colors.DIM}{t['pid']}{Colors.NC}")
print()
# Output stopped agents
if stopped_tasks:
print(f"{Colors.RED}Stopped Agents:{Colors.NC}")
for t in stopped_tasks:
if t["status"] == "completed":
print(
f"{Colors.GREEN}{Colors.NC} {t['name']} {Colors.GREEN}[completed]{Colors.NC}"
)
else:
if t["session_id_file"].is_file():
session_id = (
t["session_id_file"].read_text(encoding="utf-8").strip()
)
last_msg = get_last_message(t["log_file"], 150, platform=t.get("platform", "claude"))
print(
f"{Colors.RED}{Colors.NC} {t['name']} {Colors.RED}[stopped]{Colors.NC}"
)
if last_msg:
print(f'{Colors.DIM}"{last_msg}"{Colors.NC}')
# Use CLI adapter for platform-specific resume command
adapter = get_cli_adapter(t.get("platform", "claude"))
resume_cmd = adapter.get_resume_command_str(session_id, cwd=t["worktree"])
print(f"{Colors.YELLOW}{resume_cmd}{Colors.NC}")
else:
print(
f"{Colors.RED}{Colors.NC} {t['name']} {Colors.RED}[stopped]{Colors.NC} {Colors.DIM}(no session-id){Colors.NC}"
)
print()
# Separator
if (running_tasks or stopped_tasks) and regular_tasks:
print(f"{Colors.DIM}───────────────────────────────────────{Colors.NC}")
print()
# Output regular tasks grouped by assignee
if regular_tasks:
# Sort by assignee, priority, status
regular_tasks.sort(
key=lambda x: (
x["assignee"],
{"P0": 0, "P1": 1, "P2": 2, "P3": 3}.get(x["priority"], 2),
{"in_progress": 0, "planning": 1, "completed": 2}.get(x["status"], 1),
)
)
current_assignee = None
for t in regular_tasks:
if t["assignee"] != current_assignee:
if current_assignee is not None:
print()
print(f"{Colors.CYAN}@{t['assignee']}:{Colors.NC}")
current_assignee = t["assignee"]
color = status_color(t["status"])
priority_color = (
Colors.RED
if t["priority"] == "P0"
else (Colors.YELLOW if t["priority"] == "P1" else Colors.BLUE)
)
print(
f" {color}{Colors.NC} {t['name']} ({t['status']}) {priority_color}[{t['priority']}]{Colors.NC}"
)
if running_tasks:
print()
print(f"{Colors.DIM}─────────────────────────────────────{Colors.NC}")
print(f"{Colors.DIM}Use --progress <name> for quick activity view{Colors.NC}")
print(f"{Colors.DIM}Use --detail <name> for more info{Colors.NC}")
print()
return 0
def cmd_detail(target: str, repo_root: Path) -> int:
"""Show detailed task status."""
agent = find_agent(target, repo_root)
if not agent:
print(f"Agent not found: {target}")
return 1
agent_id = agent.get("id", "?")
pid = agent.get("pid")
worktree = agent.get("worktree_path", "?")
task_dir = agent.get("task_dir", "?")
started = agent.get("started_at", "?")
platform = agent.get("platform", "claude")
# Check for session-id
session_id = ""
session_id_file = Path(worktree) / ".session-id"
if session_id_file.is_file():
session_id = session_id_file.read_text(encoding="utf-8").strip()
print(f"{Colors.BLUE}=== Agent Detail: {agent_id} ==={Colors.NC}")
print()
print(f" ID: {agent_id}")
print(f" PID: {pid}")
print(f" Session: {session_id or 'N/A'}")
print(f" Worktree: {worktree}")
print(f" Task Dir: {task_dir}")
print(f" Started: {started}")
print()
# Status
if is_running(pid):
print(f" Status: {Colors.GREEN}Running{Colors.NC}")
else:
print(f" Status: {Colors.RED}Stopped{Colors.NC}")
if session_id:
print()
# Use CLI adapter for platform-specific resume command
adapter = get_cli_adapter(platform)
resume_cmd = adapter.get_resume_command_str(session_id, cwd=worktree)
print(f" {Colors.YELLOW}Resume:{Colors.NC} {resume_cmd}")
# Task info
task_json = repo_root / task_dir / "task.json"
if task_json.is_file():
print()
print(f"{Colors.BLUE}=== Task Info ==={Colors.NC}")
print()
data = read_json(task_json)
if data:
print(f" Status: {data.get('status', 'unknown')}")
print(f" Branch: {data.get('branch', 'N/A')}")
print(f" Base Branch: {data.get('base_branch', 'N/A')}")
# Git changes
if Path(worktree).is_dir():
print()
print(f"{Colors.BLUE}=== Git Changes ==={Colors.NC}")
print()
result = subprocess.run(
["git", "status", "--short"],
cwd=worktree,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
changes = result.stdout.strip()
if changes:
for line in changes.splitlines()[:10]:
print(f" {line}")
total = len(changes.splitlines())
if total > 10:
print(f" ... and {total - 10} more")
else:
print(" (no changes)")
print()
return 0
def cmd_registry(repo_root: Path) -> int:
"""Show agent registry."""
registry_file = get_registry_file(repo_root)
print(f"{Colors.BLUE}=== Agent Registry ==={Colors.NC}")
print()
print(f"File: {registry_file}")
print()
if registry_file and registry_file.is_file():
data = read_json(registry_file)
if data:
print(json.dumps(data, indent=2))
else:
print("(registry not found)")
return 0