File size: 3,941 Bytes
5dc5419 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | """TeamDeleteTool - Delete/disband a team for Stack 2.9"""
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
from .base import BaseTool, ToolResult
from .registry import tool_registry
TEAMS_FILE = Path.home() / ".stack-2.9" / "teams.json"
def _load_teams() -> Dict[str, Any]:
"""Load teams from disk."""
TEAMS_FILE.parent.mkdir(parents=True, exist_ok=True)
if TEAMS_FILE.exists():
return json.loads(TEAMS_FILE.read_text())
return {"teams": []}
def _save_teams(data: Dict[str, Any]) -> None:
"""Save teams to disk."""
TEAMS_FILE.write_text(json.dumps(data, indent=2))
class TeamDeleteTool(BaseTool):
"""Delete or disband a team."""
name = "team_delete"
description = "Delete and disband a team"
input_schema = {
"type": "object",
"properties": {
"team_id": {"type": "string", "description": "Team ID to delete"},
"force": {"type": "boolean", "default": False, "description": "Force delete even if tasks pending"}
},
"required": ["team_id"]
}
async def execute(self, team_id: str, force: bool = False) -> ToolResult:
"""Delete team."""
data = _load_teams()
# Find team
team = None
for t in data.get("teams", []):
if t.get("id") == team_id:
team = t
break
if not team:
return ToolResult(success=False, error=f"Team {team_id} not found")
# Check for pending tasks
if not force and team.get("status") == "active":
pending_tasks = [a for a in team.get("agents", []) if a.get("status") == "active"]
if pending_tasks:
return ToolResult(success=False, error=f"Team has {len(pending_tasks)} active agents. Use force=true to delete anyway.")
# Archive team before deletion
archive_dir = Path.home() / ".stack-2.9" / "archives"
archive_dir.mkdir(parents=True, exist_ok=True)
archive_file = archive_dir / f"team_{team_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
archive_file.write_text(json.dumps(team, indent=2))
# Remove from teams list
data["teams"] = [t for t in data["teams"] if t.get("id") != team_id]
_save_teams(data)
return ToolResult(success=True, data={
"team_id": team_id,
"team_name": team.get("name"),
"status": "deleted",
"archived_to": str(archive_file)
})
class TeamLeaveTool(BaseTool):
"""Leave a team (for agents)."""
name = "team_leave"
description = "Leave a team"
input_schema = {
"type": "object",
"properties": {
"team_id": {"type": "string", "description": "Team ID"},
"agent_name": {"type": "string", "description": "Agent name to remove"}
},
"required": ["team_id", "agent_name"]
}
async def execute(self, team_id: str, agent_name: str) -> ToolResult:
"""Leave team."""
data = _load_teams()
for team in data.get("teams", []):
if team.get("id") == team_id:
agents = team.get("agents", [])
original_count = len(agents)
agents = [a for a in agents if a.get("name") != agent_name]
if len(agents) == original_count:
return ToolResult(success=False, error=f"Agent {agent_name} not found in team")
team["agents"] = agents
_save_teams(data)
return ToolResult(success=True, data={
"team_id": team_id,
"agent_removed": agent_name,
"status": "removed"
})
return ToolResult(success=False, error=f"Team {team_id} not found")
# Register tools
tool_registry.register(TeamDeleteTool())
tool_registry.register(TeamLeaveTool())
|