forked from mengyxu/noob-components
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
542 lines
18 KiB
542 lines
18 KiB
#!/usr/bin/env python3 |
|
""" |
|
Multi-Agent Pipeline: Status display and formatting. |
|
|
|
Provides: |
|
cmd_help - Show help text |
|
cmd_list - List worktrees and agents |
|
cmd_summary - Summary of all tasks with agent status |
|
cmd_detail - Detailed single-agent status |
|
cmd_registry - Dump agent registry |
|
|
|
Also exports shared utilities used by status_monitor: |
|
is_running, find_agent, get_registry_file, calc_elapsed, count_modified_files |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import json |
|
import os |
|
import subprocess |
|
from datetime import datetime |
|
from pathlib import Path |
|
|
|
from common.cli_adapter import get_cli_adapter |
|
from common.io import read_json |
|
from common.log import Colors |
|
from common.developer import ensure_developer |
|
from common.paths import ( |
|
get_repo_root, |
|
get_tasks_dir, |
|
) |
|
from common.phase import get_phase_info |
|
from common.task_queue import format_task_stats, get_task_stats |
|
from common.tasks import iter_active_tasks |
|
from common.worktree import get_agents_dir |
|
|
|
|
|
# ============================================================================= |
|
# Shared Utilities |
|
# ============================================================================= |
|
|
|
def is_running(pid: int | str | None) -> bool: |
|
"""Check if PID is running.""" |
|
if not pid: |
|
return False |
|
try: |
|
pid_int = int(pid) |
|
os.kill(pid_int, 0) |
|
return True |
|
except (ProcessLookupError, ValueError, PermissionError, TypeError): |
|
return False |
|
|
|
|
|
def status_color(status: str) -> str: |
|
"""Get status color.""" |
|
colors = { |
|
"completed": Colors.GREEN, |
|
"in_progress": Colors.BLUE, |
|
"planning": Colors.YELLOW, |
|
} |
|
return colors.get(status, Colors.DIM) |
|
|
|
|
|
def get_registry_file(repo_root: Path) -> Path | None: |
|
"""Get registry file path.""" |
|
agents_dir = get_agents_dir(repo_root) |
|
if agents_dir: |
|
return agents_dir / "registry.json" |
|
return None |
|
|
|
|
|
def find_agent(search: str, repo_root: Path) -> dict | None: |
|
"""Find agent by task name or ID.""" |
|
registry_file = get_registry_file(repo_root) |
|
if not registry_file or not registry_file.is_file(): |
|
return None |
|
|
|
data = read_json(registry_file) |
|
if not data: |
|
return None |
|
|
|
for agent in data.get("agents", []): |
|
# Exact ID match |
|
if agent.get("id") == search: |
|
return agent |
|
# Partial match on task_dir |
|
task_dir = agent.get("task_dir", "") |
|
if search in task_dir: |
|
return agent |
|
|
|
return None |
|
|
|
|
|
def calc_elapsed(started: str | None) -> str: |
|
"""Calculate elapsed time from ISO timestamp.""" |
|
if not started: |
|
return "N/A" |
|
|
|
try: |
|
# Parse ISO format |
|
if "+" in started: |
|
started = started.split("+")[0] |
|
if "T" in started: |
|
start_dt = datetime.fromisoformat(started) |
|
else: |
|
return "N/A" |
|
|
|
now = datetime.now() |
|
elapsed = (now - start_dt).total_seconds() |
|
|
|
if elapsed < 60: |
|
return f"{int(elapsed)}s" |
|
elif elapsed < 3600: |
|
mins = int(elapsed // 60) |
|
secs = int(elapsed % 60) |
|
return f"{mins}m {secs}s" |
|
else: |
|
hours = int(elapsed // 3600) |
|
mins = int((elapsed % 3600) // 60) |
|
return f"{hours}h {mins}m" |
|
except (ValueError, TypeError): |
|
return "N/A" |
|
|
|
|
|
def count_modified_files(worktree: str) -> int: |
|
"""Count modified files in worktree.""" |
|
if not Path(worktree).is_dir(): |
|
return 0 |
|
|
|
try: |
|
result = subprocess.run( |
|
["git", "status", "--short"], |
|
cwd=worktree, |
|
capture_output=True, |
|
text=True, |
|
encoding="utf-8", |
|
errors="replace", |
|
) |
|
return len([line for line in result.stdout.splitlines() if line.strip()]) |
|
except Exception: |
|
return 0 |
|
|
|
|
|
# ============================================================================= |
|
# Commands |
|
# ============================================================================= |
|
|
|
def cmd_help() -> int: |
|
"""Show help.""" |
|
print("""Multi-Agent Pipeline: Status Monitor |
|
|
|
Usage: |
|
python3 status.py Show summary of all tasks |
|
python3 status.py -a <assignee> Filter tasks by assignee |
|
python3 status.py --list List all worktrees and agents |
|
python3 status.py --detail <task> Detailed task status |
|
python3 status.py --progress <task> Quick progress view with recent activity |
|
python3 status.py --watch <task> Watch agent log in real-time |
|
python3 status.py --log <task> Show recent log entries |
|
python3 status.py --registry Show agent registry |
|
|
|
Examples: |
|
python3 status.py -a taosu |
|
python3 status.py --detail my-task |
|
python3 status.py --progress my-task |
|
python3 status.py --watch 01-16-worktree-support |
|
python3 status.py --log worktree-support |
|
""") |
|
return 0 |
|
|
|
|
|
def cmd_list(repo_root: Path) -> int: |
|
"""List worktrees and agents.""" |
|
print(f"{Colors.BLUE}=== Git Worktrees ==={Colors.NC}") |
|
print() |
|
|
|
subprocess.run(["git", "worktree", "list"], cwd=repo_root) |
|
print() |
|
|
|
print(f"{Colors.BLUE}=== Registered Agents ==={Colors.NC}") |
|
print() |
|
|
|
registry_file = get_registry_file(repo_root) |
|
if not registry_file or not registry_file.is_file(): |
|
print(" (no registry found)") |
|
return 0 |
|
|
|
data = read_json(registry_file) |
|
if not data or not data.get("agents"): |
|
print(" (no agents registered)") |
|
return 0 |
|
|
|
for agent in data["agents"]: |
|
agent_id = agent.get("id", "?") |
|
pid = agent.get("pid") |
|
wt = agent.get("worktree_path", "?") |
|
started = agent.get("started_at", "?") |
|
|
|
if is_running(pid): |
|
status_icon = f"{Colors.GREEN}●{Colors.NC}" |
|
else: |
|
status_icon = f"{Colors.RED}○{Colors.NC}" |
|
|
|
print(f" {status_icon} {agent_id} (PID: {pid})") |
|
print(f" {Colors.DIM}Worktree: {wt}{Colors.NC}") |
|
print(f" {Colors.DIM}Started: {started}{Colors.NC}") |
|
print() |
|
|
|
return 0 |
|
|
|
|
|
def cmd_summary(repo_root: Path, filter_assignee: str | None = None) -> int: |
|
"""Show summary of all tasks.""" |
|
# Import lazily to avoid circular import at module level |
|
from .status_monitor import get_last_tool, get_last_message |
|
|
|
ensure_developer(repo_root) |
|
|
|
tasks_dir = get_tasks_dir(repo_root) |
|
if not tasks_dir.is_dir(): |
|
print("No tasks directory found") |
|
return 0 |
|
|
|
registry_file = get_registry_file(repo_root) |
|
|
|
# Count running agents |
|
running_count = 0 |
|
total_agents = 0 |
|
|
|
if registry_file and registry_file.is_file(): |
|
data = read_json(registry_file) |
|
if data: |
|
agents = data.get("agents", []) |
|
total_agents = len(agents) |
|
for agent in agents: |
|
if is_running(agent.get("pid")): |
|
running_count += 1 |
|
|
|
# Task queue stats |
|
task_stats = get_task_stats(repo_root) |
|
|
|
print(f"{Colors.BLUE}=== Multi-Agent Status ==={Colors.NC}") |
|
print( |
|
f" Agents: {Colors.GREEN}{running_count}{Colors.NC} running / {total_agents} registered" |
|
) |
|
print(f" Tasks: {format_task_stats(task_stats)}") |
|
print() |
|
|
|
# Process tasks |
|
running_tasks = [] |
|
stopped_tasks = [] |
|
regular_tasks = [] |
|
|
|
registry_data = ( |
|
read_json(registry_file) |
|
if registry_file and registry_file.is_file() |
|
else None |
|
) |
|
|
|
for t in iter_active_tasks(tasks_dir): |
|
name = t.dir_name |
|
status = t.status |
|
assignee = t.assignee or "unassigned" |
|
priority = t.priority |
|
|
|
# Filter by assignee |
|
if filter_assignee and assignee != filter_assignee: |
|
continue |
|
|
|
# Check agent status |
|
agent_info = None |
|
if registry_data: |
|
for agent in registry_data.get("agents", []): |
|
if name in agent.get("task_dir", ""): |
|
agent_info = agent |
|
break |
|
|
|
if agent_info: |
|
pid = agent_info.get("pid") |
|
worktree = agent_info.get("worktree_path", "") |
|
started = agent_info.get("started_at") |
|
agent_platform = agent_info.get("platform", "claude") |
|
|
|
if is_running(pid): |
|
# Running agent |
|
task_dir_rel = agent_info.get("task_dir", "") |
|
worktree_task_json = Path(worktree) / task_dir_rel / "task.json" |
|
phase_source = t.directory / "task.json" |
|
if worktree_task_json.is_file(): |
|
phase_source = worktree_task_json |
|
|
|
phase_info_str = get_phase_info(phase_source) |
|
elapsed = calc_elapsed(started) |
|
modified = count_modified_files(worktree) |
|
|
|
worktree_data = read_json(phase_source) |
|
branch = worktree_data.get("branch", "N/A") if worktree_data else "N/A" |
|
|
|
log_file = Path(worktree) / ".agent-log" |
|
last_tool = get_last_tool(log_file, platform=agent_platform) |
|
|
|
running_tasks.append( |
|
{ |
|
"name": name, |
|
"priority": priority, |
|
"assignee": assignee, |
|
"phase_info": phase_info_str, |
|
"elapsed": elapsed, |
|
"branch": branch, |
|
"modified": modified, |
|
"last_tool": last_tool, |
|
"pid": pid, |
|
} |
|
) |
|
else: |
|
# Stopped agent |
|
task_dir_rel = agent_info.get("task_dir", "") |
|
worktree_task_json = Path(worktree) / task_dir_rel / "task.json" |
|
worktree_status = "unknown" |
|
|
|
if worktree_task_json.is_file(): |
|
wt_data = read_json(worktree_task_json) |
|
if wt_data: |
|
worktree_status = wt_data.get("status", "unknown") |
|
|
|
session_id_file = Path(worktree) / ".session-id" |
|
log_file = Path(worktree) / ".agent-log" |
|
|
|
stopped_tasks.append( |
|
{ |
|
"name": name, |
|
"worktree": worktree, |
|
"status": worktree_status, |
|
"session_id_file": session_id_file, |
|
"log_file": log_file, |
|
"platform": agent_info.get("platform", "claude"), |
|
} |
|
) |
|
else: |
|
# Regular task |
|
regular_tasks.append( |
|
{ |
|
"name": name, |
|
"status": status, |
|
"priority": priority, |
|
"assignee": assignee, |
|
} |
|
) |
|
|
|
# Output running agents |
|
if running_tasks: |
|
print(f"{Colors.CYAN}Running Agents:{Colors.NC}") |
|
for t in running_tasks: |
|
priority_color = ( |
|
Colors.RED |
|
if t["priority"] == "P0" |
|
else (Colors.YELLOW if t["priority"] == "P1" else Colors.BLUE) |
|
) |
|
print( |
|
f"{Colors.GREEN}▶{Colors.NC} {Colors.CYAN}{t['name']}{Colors.NC} {Colors.GREEN}[running]{Colors.NC} {priority_color}[{t['priority']}]{Colors.NC} @{t['assignee']}" |
|
) |
|
print(f" Phase: {t['phase_info']}") |
|
print(f" Elapsed: {t['elapsed']}") |
|
print(f" Branch: {Colors.DIM}{t['branch']}{Colors.NC}") |
|
print(f" Modified: {t['modified']} file(s)") |
|
if t["last_tool"]: |
|
print(f" Activity: {Colors.YELLOW}{t['last_tool']}{Colors.NC}") |
|
print(f" PID: {Colors.DIM}{t['pid']}{Colors.NC}") |
|
print() |
|
|
|
# Output stopped agents |
|
if stopped_tasks: |
|
print(f"{Colors.RED}Stopped Agents:{Colors.NC}") |
|
for t in stopped_tasks: |
|
if t["status"] == "completed": |
|
print( |
|
f"{Colors.GREEN}✓{Colors.NC} {t['name']} {Colors.GREEN}[completed]{Colors.NC}" |
|
) |
|
else: |
|
if t["session_id_file"].is_file(): |
|
session_id = ( |
|
t["session_id_file"].read_text(encoding="utf-8").strip() |
|
) |
|
last_msg = get_last_message(t["log_file"], 150, platform=t.get("platform", "claude")) |
|
print( |
|
f"{Colors.RED}○{Colors.NC} {t['name']} {Colors.RED}[stopped]{Colors.NC}" |
|
) |
|
if last_msg: |
|
print(f'{Colors.DIM}"{last_msg}"{Colors.NC}') |
|
# Use CLI adapter for platform-specific resume command |
|
adapter = get_cli_adapter(t.get("platform", "claude")) |
|
resume_cmd = adapter.get_resume_command_str(session_id, cwd=t["worktree"]) |
|
print(f"{Colors.YELLOW}{resume_cmd}{Colors.NC}") |
|
else: |
|
print( |
|
f"{Colors.RED}○{Colors.NC} {t['name']} {Colors.RED}[stopped]{Colors.NC} {Colors.DIM}(no session-id){Colors.NC}" |
|
) |
|
print() |
|
|
|
# Separator |
|
if (running_tasks or stopped_tasks) and regular_tasks: |
|
print(f"{Colors.DIM}───────────────────────────────────────{Colors.NC}") |
|
print() |
|
|
|
# Output regular tasks grouped by assignee |
|
if regular_tasks: |
|
# Sort by assignee, priority, status |
|
regular_tasks.sort( |
|
key=lambda x: ( |
|
x["assignee"], |
|
{"P0": 0, "P1": 1, "P2": 2, "P3": 3}.get(x["priority"], 2), |
|
{"in_progress": 0, "planning": 1, "completed": 2}.get(x["status"], 1), |
|
) |
|
) |
|
|
|
current_assignee = None |
|
for t in regular_tasks: |
|
if t["assignee"] != current_assignee: |
|
if current_assignee is not None: |
|
print() |
|
print(f"{Colors.CYAN}@{t['assignee']}:{Colors.NC}") |
|
current_assignee = t["assignee"] |
|
|
|
color = status_color(t["status"]) |
|
priority_color = ( |
|
Colors.RED |
|
if t["priority"] == "P0" |
|
else (Colors.YELLOW if t["priority"] == "P1" else Colors.BLUE) |
|
) |
|
print( |
|
f" {color}●{Colors.NC} {t['name']} ({t['status']}) {priority_color}[{t['priority']}]{Colors.NC}" |
|
) |
|
|
|
if running_tasks: |
|
print() |
|
print(f"{Colors.DIM}─────────────────────────────────────{Colors.NC}") |
|
print(f"{Colors.DIM}Use --progress <name> for quick activity view{Colors.NC}") |
|
print(f"{Colors.DIM}Use --detail <name> for more info{Colors.NC}") |
|
|
|
print() |
|
return 0 |
|
|
|
|
|
def cmd_detail(target: str, repo_root: Path) -> int: |
|
"""Show detailed task status.""" |
|
agent = find_agent(target, repo_root) |
|
if not agent: |
|
print(f"Agent not found: {target}") |
|
return 1 |
|
|
|
agent_id = agent.get("id", "?") |
|
pid = agent.get("pid") |
|
worktree = agent.get("worktree_path", "?") |
|
task_dir = agent.get("task_dir", "?") |
|
started = agent.get("started_at", "?") |
|
platform = agent.get("platform", "claude") |
|
|
|
# Check for session-id |
|
session_id = "" |
|
session_id_file = Path(worktree) / ".session-id" |
|
if session_id_file.is_file(): |
|
session_id = session_id_file.read_text(encoding="utf-8").strip() |
|
|
|
print(f"{Colors.BLUE}=== Agent Detail: {agent_id} ==={Colors.NC}") |
|
print() |
|
print(f" ID: {agent_id}") |
|
print(f" PID: {pid}") |
|
print(f" Session: {session_id or 'N/A'}") |
|
print(f" Worktree: {worktree}") |
|
print(f" Task Dir: {task_dir}") |
|
print(f" Started: {started}") |
|
print() |
|
|
|
# Status |
|
if is_running(pid): |
|
print(f" Status: {Colors.GREEN}Running{Colors.NC}") |
|
else: |
|
print(f" Status: {Colors.RED}Stopped{Colors.NC}") |
|
if session_id: |
|
print() |
|
# Use CLI adapter for platform-specific resume command |
|
adapter = get_cli_adapter(platform) |
|
resume_cmd = adapter.get_resume_command_str(session_id, cwd=worktree) |
|
print(f" {Colors.YELLOW}Resume:{Colors.NC} {resume_cmd}") |
|
|
|
# Task info |
|
task_json = repo_root / task_dir / "task.json" |
|
if task_json.is_file(): |
|
print() |
|
print(f"{Colors.BLUE}=== Task Info ==={Colors.NC}") |
|
print() |
|
data = read_json(task_json) |
|
if data: |
|
print(f" Status: {data.get('status', 'unknown')}") |
|
print(f" Branch: {data.get('branch', 'N/A')}") |
|
print(f" Base Branch: {data.get('base_branch', 'N/A')}") |
|
|
|
# Git changes |
|
if Path(worktree).is_dir(): |
|
print() |
|
print(f"{Colors.BLUE}=== Git Changes ==={Colors.NC}") |
|
print() |
|
|
|
result = subprocess.run( |
|
["git", "status", "--short"], |
|
cwd=worktree, |
|
capture_output=True, |
|
text=True, |
|
encoding="utf-8", |
|
errors="replace", |
|
) |
|
changes = result.stdout.strip() |
|
if changes: |
|
for line in changes.splitlines()[:10]: |
|
print(f" {line}") |
|
total = len(changes.splitlines()) |
|
if total > 10: |
|
print(f" ... and {total - 10} more") |
|
else: |
|
print(" (no changes)") |
|
|
|
print() |
|
return 0 |
|
|
|
|
|
def cmd_registry(repo_root: Path) -> int: |
|
"""Show agent registry.""" |
|
registry_file = get_registry_file(repo_root) |
|
|
|
print(f"{Colors.BLUE}=== Agent Registry ==={Colors.NC}") |
|
print() |
|
print(f"File: {registry_file}") |
|
print() |
|
|
|
if registry_file and registry_file.is_file(): |
|
data = read_json(registry_file) |
|
if data: |
|
print(json.dumps(data, indent=2)) |
|
else: |
|
print("(registry not found)") |
|
|
|
return 0
|
|
|