File size: 6,014 Bytes
5dc5419 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | """WorktreeTool - Git worktree management for Stack 2.9"""
import json
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from .base import BaseTool, ToolResult
from .registry import tool_registry
WORKTREES_FILE = Path.home() / ".stack-2.9" / "worktrees.json"
def _load_worktrees() -> Dict[str, Any]:
"""Load worktree state."""
WORKTREES_FILE.parent.mkdir(parents=True, exist_ok=True)
if WORKTREES_FILE.exists():
return json.loads(WORKTREES_FILE.read_text())
return {"worktrees": []}
def _save_worktrees(data: Dict[str, Any]) -> None:
"""Save worktree state."""
WORKTREES_FILE.write_text(json.dumps(data, indent=2))
def _run_git(args: list) -> tuple:
"""Run git command."""
try:
result = subprocess.run(
["git"] + args,
capture_output=True,
text=True,
timeout=30
)
return result.stdout, result.stderr, result.returncode
except Exception as e:
return "", str(e), 1
class EnterWorktreeTool(BaseTool):
"""Enter a git worktree."""
name = "enter_worktree"
description = "Enter or create a git worktree"
input_schema = {
"type": "object",
"properties": {
"worktree_path": {"type": "string", "description": "Path for the worktree"},
"branch": {"type": "string", "description": "Branch name (optional, will use current if not specified)"},
"create": {"type": "boolean", "default": False, "description": "Create worktree if it doesn't exist"}
},
"required": ["worktree_path"]
}
async def execute(self, worktree_path: str, branch: Optional[str] = None, create: bool = False) -> ToolResult:
"""Enter worktree."""
wt = Path(worktree_path)
if wt.exists() and create:
return ToolResult(success=False, error=f"Worktree path exists and create=true: {worktree_path}")
data = _load_worktrees()
# Check if worktree already registered
for existing in data.get("worktrees", []):
if existing.get("path") == worktree_path:
return ToolResult(success=True, data={
"worktree_id": existing.get("id"),
"path": worktree_path,
"branch": existing.get("branch"),
"status": "already_registered"
})
# Create worktree if requested
if create:
if not branch:
# Get current branch
stdout, _, code = _run_git(["branch", "--show-current"])
branch = stdout.strip() or "main"
stdout, stderr, code = _run_git(["worktree", "add", worktree_path, branch])
if code != 0:
return ToolResult(success=False, error=f"Failed to create worktree: {stderr}")
# Register worktree
wt_id = f"wt_{len(data.get('worktrees', [])) + 1}"
worktree_entry = {
"id": wt_id,
"path": worktree_path,
"branch": branch,
"created_at": datetime.now().isoformat(),
"status": "active"
}
data.setdefault("worktrees", []).append(worktree_entry)
_save_worktrees(data)
return ToolResult(success=True, data={
"worktree_id": wt_id,
"path": worktree_path,
"branch": branch,
"status": "entered"
})
class ExitWorktreeTool(BaseTool):
"""Exit a git worktree."""
name = "exit_worktree"
description = "Exit and optionally remove a worktree"
input_schema = {
"type": "object",
"properties": {
"worktree_id": {"type": "string", "description": "Worktree ID to exit"},
"cleanup": {"type": "boolean", "default": False, "description": "Remove the worktree directory"}
},
"required": ["worktree_id"]
}
async def execute(self, worktree_id: str, cleanup: bool = False) -> ToolResult:
"""Exit worktree."""
data = _load_worktrees()
worktree = None
for wt in data.get("worktrees", []):
if wt.get("id") == worktree_id:
worktree = wt
break
if not worktree:
return ToolResult(success=False, error=f"Worktree {worktree_id} not found")
if cleanup:
stdout, stderr, code = _run_git(["worktree", "remove", worktree["path"]])
if code != 0:
return ToolResult(success=False, error=f"Failed to remove worktree: {stderr}")
# Archive and remove from registry
archive_dir = Path.home() / ".stack-2.9" / "archives"
archive_dir.mkdir(parents=True, exist_ok=True)
archive_file = archive_dir / f"worktree_{worktree_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
archive_file.write_text(json.dumps(worktree, indent=2))
data["worktrees"] = [w for w in data["worktrees"] if w.get("id") != worktree_id]
_save_worktrees(data)
return ToolResult(success=True, data={
"worktree_id": worktree_id,
"path": worktree.get("path"),
"status": "exited",
"cleaned_up": cleanup,
"archived_to": str(archive_file) if cleanup else None
})
class ListWorktreesTool(BaseTool):
"""List all registered worktrees."""
name = "list_worktrees"
description = "List all git worktrees"
input_schema = {
"type": "object",
"properties": {},
"required": []
}
async def execute(self) -> ToolResult:
"""List worktrees."""
data = _load_worktrees()
return ToolResult(success=True, data={
"worktrees": data.get("worktrees", []),
"count": len(data.get("worktrees", []))
})
# Register tools
tool_registry.register(EnterWorktreeTool())
tool_registry.register(ExitWorktreeTool())
tool_registry.register(ListWorktreesTool())
|