forked from mengyxu/noob-components
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
335 lines
8.6 KiB
335 lines
8.6 KiB
#!/usr/bin/env python3 |
|
""" |
|
Registry utility functions for multi-agent pipeline. |
|
|
|
Provides: |
|
registry_get_file - Get registry file path |
|
registry_get_agent_by_id - Find agent by ID |
|
registry_get_agent_by_worktree - Find agent by worktree path |
|
registry_get_task_dir - Get task dir for a worktree |
|
registry_remove_by_id - Remove agent by ID |
|
registry_remove_by_worktree - Remove agent by worktree path |
|
registry_add_agent - Add agent to registry |
|
registry_search_agent - Search agent by ID or task_dir |
|
registry_list_agents - List all agents |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
from datetime import datetime |
|
from pathlib import Path |
|
|
|
from .io import read_json, write_json |
|
from .paths import get_repo_root |
|
from .worktree import get_agents_dir |
|
|
|
|
|
# ============================================================================= |
|
# Internal Helpers |
|
# ============================================================================= |
|
|
|
def _load_registry( |
|
repo_root: Path | None = None, |
|
) -> tuple[Path | None, dict | None]: |
|
"""Load registry file and data in one step. |
|
|
|
Returns: |
|
(registry_file_path, data_dict) — either may be None. |
|
""" |
|
if repo_root is None: |
|
repo_root = get_repo_root() |
|
|
|
registry_file = registry_get_file(repo_root) |
|
if not registry_file or not registry_file.is_file(): |
|
return registry_file, None |
|
|
|
data = read_json(registry_file) |
|
return registry_file, data |
|
|
|
|
|
# ============================================================================= |
|
# Registry File Access |
|
# ============================================================================= |
|
|
|
def registry_get_file(repo_root: Path | None = None) -> Path | None: |
|
"""Get registry file path. |
|
|
|
Args: |
|
repo_root: Repository root path. Defaults to auto-detected. |
|
|
|
Returns: |
|
Path to registry.json, or None if agents dir not found. |
|
""" |
|
if repo_root is None: |
|
repo_root = get_repo_root() |
|
|
|
agents_dir = get_agents_dir(repo_root) |
|
if agents_dir: |
|
return agents_dir / "registry.json" |
|
return None |
|
|
|
|
|
def _ensure_registry(repo_root: Path | None = None) -> Path | None: |
|
"""Ensure registry file exists with valid structure. |
|
|
|
Args: |
|
repo_root: Repository root path. Defaults to auto-detected. |
|
|
|
Returns: |
|
Path to registry file, or None if cannot create. |
|
""" |
|
if repo_root is None: |
|
repo_root = get_repo_root() |
|
|
|
registry_file = registry_get_file(repo_root) |
|
if not registry_file: |
|
return None |
|
|
|
agents_dir = registry_file.parent |
|
|
|
try: |
|
agents_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
if not registry_file.exists(): |
|
write_json(registry_file, {"agents": []}) |
|
|
|
return registry_file |
|
except (OSError, IOError): |
|
return None |
|
|
|
|
|
# ============================================================================= |
|
# Agent Lookup |
|
# ============================================================================= |
|
|
|
def registry_get_agent_by_id( |
|
agent_id: str, |
|
repo_root: Path | None = None |
|
) -> dict | None: |
|
"""Get agent by ID. |
|
|
|
Args: |
|
agent_id: Agent ID. |
|
repo_root: Repository root path. Defaults to auto-detected. |
|
|
|
Returns: |
|
Agent dict, or None if not found. |
|
""" |
|
_, data = _load_registry(repo_root) |
|
if not data: |
|
return None |
|
|
|
for agent in data.get("agents", []): |
|
if agent.get("id") == agent_id: |
|
return agent |
|
|
|
return None |
|
|
|
|
|
def registry_get_agent_by_worktree( |
|
worktree_path: str, |
|
repo_root: Path | None = None |
|
) -> dict | None: |
|
"""Get agent by worktree path. |
|
|
|
Args: |
|
worktree_path: Worktree path. |
|
repo_root: Repository root path. Defaults to auto-detected. |
|
|
|
Returns: |
|
Agent dict, or None if not found. |
|
""" |
|
_, data = _load_registry(repo_root) |
|
if not data: |
|
return None |
|
|
|
for agent in data.get("agents", []): |
|
if agent.get("worktree_path") == worktree_path: |
|
return agent |
|
|
|
return None |
|
|
|
|
|
def registry_search_agent( |
|
search: str, |
|
repo_root: Path | None = None |
|
) -> dict | None: |
|
"""Search agent by ID or task_dir containing search term. |
|
|
|
Args: |
|
search: Search term. |
|
repo_root: Repository root path. Defaults to auto-detected. |
|
|
|
Returns: |
|
First matching agent dict, or None if not found. |
|
""" |
|
_, data = _load_registry(repo_root) |
|
if not data: |
|
return None |
|
|
|
for agent in data.get("agents", []): |
|
# Exact ID match |
|
if agent.get("id") == search: |
|
return agent |
|
# Partial match on task_dir |
|
task_dir = agent.get("task_dir", "") |
|
if search in task_dir: |
|
return agent |
|
|
|
return None |
|
|
|
|
|
def registry_get_task_dir( |
|
worktree_path: str, |
|
repo_root: Path | None = None |
|
) -> str | None: |
|
"""Get task directory for a worktree. |
|
|
|
Args: |
|
worktree_path: Worktree path. |
|
repo_root: Repository root path. Defaults to auto-detected. |
|
|
|
Returns: |
|
Task directory path, or None if not found. |
|
""" |
|
_, data = _load_registry(repo_root) |
|
if not data: |
|
return None |
|
|
|
for agent in data.get("agents", []): |
|
if agent.get("worktree_path") == worktree_path: |
|
return agent.get("task_dir") |
|
|
|
return None |
|
|
|
|
|
# ============================================================================= |
|
# Agent Modification |
|
# ============================================================================= |
|
|
|
def registry_remove_by_id(agent_id: str, repo_root: Path | None = None) -> bool: |
|
"""Remove agent by ID. |
|
|
|
Args: |
|
agent_id: Agent ID. |
|
repo_root: Repository root path. Defaults to auto-detected. |
|
|
|
Returns: |
|
True on success. |
|
""" |
|
registry_file, data = _load_registry(repo_root) |
|
if not registry_file or not data: |
|
return True # Nothing to remove |
|
|
|
agents = data.get("agents", []) |
|
data["agents"] = [a for a in agents if a.get("id") != agent_id] |
|
|
|
return write_json(registry_file, data) |
|
|
|
|
|
def registry_remove_by_worktree( |
|
worktree_path: str, |
|
repo_root: Path | None = None |
|
) -> bool: |
|
"""Remove agent by worktree path. |
|
|
|
Args: |
|
worktree_path: Worktree path. |
|
repo_root: Repository root path. Defaults to auto-detected. |
|
|
|
Returns: |
|
True on success. |
|
""" |
|
registry_file, data = _load_registry(repo_root) |
|
if not registry_file or not data: |
|
return True # Nothing to remove |
|
|
|
agents = data.get("agents", []) |
|
data["agents"] = [a for a in agents if a.get("worktree_path") != worktree_path] |
|
|
|
return write_json(registry_file, data) |
|
|
|
|
|
def registry_add_agent( |
|
agent_id: str, |
|
worktree_path: str, |
|
pid: int, |
|
task_dir: str, |
|
repo_root: Path | None = None, |
|
platform: str = "claude", |
|
) -> bool: |
|
"""Add agent to registry (replaces if same ID exists). |
|
|
|
Args: |
|
agent_id: Agent ID. |
|
worktree_path: Worktree path. |
|
pid: Process ID. |
|
task_dir: Task directory path. |
|
repo_root: Repository root path. Defaults to auto-detected. |
|
platform: Platform used (e.g., 'claude', 'opencode', 'codex', 'kiro', 'antigravity'). Defaults to 'claude'. |
|
|
|
Returns: |
|
True on success. |
|
""" |
|
if repo_root is None: |
|
repo_root = get_repo_root() |
|
|
|
registry_file = _ensure_registry(repo_root) |
|
if not registry_file: |
|
return False |
|
|
|
data = read_json(registry_file) |
|
if not data: |
|
data = {"agents": []} |
|
|
|
# Remove existing agent with same ID |
|
agents = data.get("agents", []) |
|
agents = [a for a in agents if a.get("id") != agent_id] |
|
|
|
# Create new agent record |
|
started_at = datetime.now().isoformat() |
|
new_agent = { |
|
"id": agent_id, |
|
"worktree_path": worktree_path, |
|
"pid": pid, |
|
"started_at": started_at, |
|
"task_dir": task_dir, |
|
"platform": platform, |
|
} |
|
|
|
agents.append(new_agent) |
|
data["agents"] = agents |
|
|
|
return write_json(registry_file, data) |
|
|
|
|
|
def registry_list_agents(repo_root: Path | None = None) -> list[dict]: |
|
"""List all agents. |
|
|
|
Args: |
|
repo_root: Repository root path. Defaults to auto-detected. |
|
|
|
Returns: |
|
List of agent dicts. |
|
""" |
|
_, data = _load_registry(repo_root) |
|
if not data: |
|
return [] |
|
|
|
return data.get("agents", []) |
|
|
|
|
|
# ============================================================================= |
|
# Main Entry (for testing) |
|
# ============================================================================= |
|
|
|
if __name__ == "__main__": |
|
import json as json_mod |
|
|
|
repo = get_repo_root() |
|
print(f"Repository root: {repo}") |
|
print(f"Registry file: {registry_get_file(repo)}") |
|
print() |
|
print("Agents:") |
|
agents = registry_list_agents(repo) |
|
print(json_mod.dumps(agents, indent=2))
|
|
|