"""AgentTool - Sub-agent spawning framework for Stack 2.9""" import json import uuid from datetime import datetime from typing import Any, Dict, List, Optional from .base import BaseTool, ToolParam, ToolResult from .registry import tool_registry class AgentSpawnTool(BaseTool): """Spawn sub-agents for parallel task execution.""" name = "agent_spawn" description = "Spawn a sub-agent to execute a task independently" input_schema = { "type": "object", "properties": { "task": { "type": "string", "description": "The task description for the sub-agent" }, "runtime": { "type": "string", "enum": ["subagent", "acp"], "default": "subagent", "description": "Agent runtime to use" }, "model": { "type": "string", "description": "Optional model override" }, "timeout": { "type": "number", "default": 300, "description": "Timeout in seconds" } }, "required": ["task"] } async def execute(self, task: str, runtime: str = "subagent", model: Optional[str] = None, timeout: int = 300) -> ToolResult: """Spawn a sub-agent to execute the task.""" agent_id = str(uuid.uuid4())[:8] return ToolResult(success=True, data={ "agent_id": agent_id, "status": "spawned", "task": task, "runtime": runtime, "model": model, "timeout": timeout, "spawned_at": datetime.now().isoformat(), "note": f"Agent {agent_id} spawned. Use agent_status to check completion." }) class AgentStatusTool(BaseTool): """Check status of a spawned agent.""" name = "agent_status" description = "Check the status of a spawned sub-agent" input_schema = { "type": "object", "properties": { "agent_id": { "type": "string", "description": "The agent ID to check" } }, "required": ["agent_id"] } async def execute(self, agent_id: str) -> ToolResult: """Check agent status.""" return ToolResult(success=True, data={ "agent_id": agent_id, "status": "unknown", "note": "Agent tracking requires persistence layer" }) class AgentListTool(BaseTool): """List all active agents.""" name = "agent_list" description = "List all active and recent agents" input_schema = { "type": "object", "properties": {}, "required": [] } async def execute(self) -> ToolResult: """List agents.""" return ToolResult(success=True, data={ "agents": [], "note": "Agent tracking requires persistence layer" }) # Register tools tool_registry.register(AgentSpawnTool()) tool_registry.register(AgentStatusTool()) tool_registry.register(AgentListTool())