""" Code Execution Tools for SPARKNET Tools for executing Python and bash code """ import subprocess import sys from io import StringIO from contextlib import redirect_stdout, redirect_stderr from typing import Optional from loguru import logger from .base_tool import BaseTool, ToolResult class PythonExecutorTool(BaseTool): """Tool for executing Python code.""" def __init__(self, sandbox: bool = True): super().__init__( name="python_executor", description="Execute Python code and return the output", ) self.sandbox = sandbox self.add_parameter("code", "str", "Python code to execute", required=True) self.add_parameter("timeout", "int", "Execution timeout in seconds", required=False, default=30) async def execute(self, code: str, timeout: int = 30, **kwargs) -> ToolResult: """ Execute Python code. Args: code: Python code to execute timeout: Execution timeout Returns: ToolResult with execution output """ try: # Capture stdout and stderr stdout_capture = StringIO() stderr_capture = StringIO() # Create a restricted namespace for sandboxing if self.sandbox: # Limited built-ins for safety safe_builtins = { "print": print, "len": len, "range": range, "str": str, "int": int, "float": float, "bool": bool, "list": list, "dict": dict, "tuple": tuple, "set": set, "sum": sum, "min": min, "max": max, "abs": abs, "round": round, "enumerate": enumerate, "zip": zip, } namespace = {"__builtins__": safe_builtins} else: namespace = {} # Execute code with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture): exec(code, namespace) stdout_text = stdout_capture.getvalue() stderr_text = stderr_capture.getvalue() output = stdout_text if stderr_text: output += f"\nSTDERR:\n{stderr_text}" return ToolResult( success=True, output=output or "Code executed successfully (no output)", metadata={ "sandbox": self.sandbox, "stdout": stdout_text, "stderr": stderr_text, }, ) except Exception as e: logger.error(f"Python execution error: {e}") return ToolResult( success=False, output=None, error=f"Execution error: {str(e)}", ) class BashExecutorTool(BaseTool): """Tool for executing bash commands.""" def __init__(self, allowed_commands: Optional[list[str]] = None): super().__init__( name="bash_executor", description="Execute bash commands and return the output", ) self.allowed_commands = allowed_commands self.add_parameter("command", "str", "Bash command to execute", required=True) self.add_parameter("timeout", "int", "Execution timeout in seconds", required=False, default=30) self.add_parameter("working_dir", "str", "Working directory", required=False, default=".") async def execute( self, command: str, timeout: int = 30, working_dir: str = ".", **kwargs, ) -> ToolResult: """ Execute bash command. Args: command: Bash command to execute timeout: Execution timeout working_dir: Working directory Returns: ToolResult with command output """ try: # Check if command is allowed (if whitelist is set) if self.allowed_commands: cmd_name = command.split()[0] if cmd_name not in self.allowed_commands: return ToolResult( success=False, output=None, error=f"Command '{cmd_name}' not allowed. Allowed: {self.allowed_commands}", ) # Execute command result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=timeout, cwd=working_dir, ) output = result.stdout if result.stderr: output += f"\nSTDERR:\n{result.stderr}" return ToolResult( success=result.returncode == 0, output=output or "(no output)", error=None if result.returncode == 0 else f"Command failed with code {result.returncode}", metadata={ "return_code": result.returncode, "stdout": result.stdout, "stderr": result.stderr, "command": command, }, ) except subprocess.TimeoutExpired: return ToolResult( success=False, output=None, error=f"Command timed out after {timeout} seconds", ) except Exception as e: logger.error(f"Bash execution error: {e}") return ToolResult( success=False, output=None, error=f"Execution error: {str(e)}", )