SPARKNET / src /tools /code_tools.py
MHamdan's picture
Initial commit: SPARKNET framework
a9dc537
"""
Code Execution Tools for SPARKNET
Tools for executing Python and bash code
"""
import subprocess
import sys
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
from typing import Optional
from loguru import logger
from .base_tool import BaseTool, ToolResult
class PythonExecutorTool(BaseTool):
"""Tool for executing Python code."""
def __init__(self, sandbox: bool = True):
super().__init__(
name="python_executor",
description="Execute Python code and return the output",
)
self.sandbox = sandbox
self.add_parameter("code", "str", "Python code to execute", required=True)
self.add_parameter("timeout", "int", "Execution timeout in seconds", required=False, default=30)
async def execute(self, code: str, timeout: int = 30, **kwargs) -> ToolResult:
"""
Execute Python code.
Args:
code: Python code to execute
timeout: Execution timeout
Returns:
ToolResult with execution output
"""
try:
# Capture stdout and stderr
stdout_capture = StringIO()
stderr_capture = StringIO()
# Create a restricted namespace for sandboxing
if self.sandbox:
# Limited built-ins for safety
safe_builtins = {
"print": print,
"len": len,
"range": range,
"str": str,
"int": int,
"float": float,
"bool": bool,
"list": list,
"dict": dict,
"tuple": tuple,
"set": set,
"sum": sum,
"min": min,
"max": max,
"abs": abs,
"round": round,
"enumerate": enumerate,
"zip": zip,
}
namespace = {"__builtins__": safe_builtins}
else:
namespace = {}
# Execute code
with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture):
exec(code, namespace)
stdout_text = stdout_capture.getvalue()
stderr_text = stderr_capture.getvalue()
output = stdout_text
if stderr_text:
output += f"\nSTDERR:\n{stderr_text}"
return ToolResult(
success=True,
output=output or "Code executed successfully (no output)",
metadata={
"sandbox": self.sandbox,
"stdout": stdout_text,
"stderr": stderr_text,
},
)
except Exception as e:
logger.error(f"Python execution error: {e}")
return ToolResult(
success=False,
output=None,
error=f"Execution error: {str(e)}",
)
class BashExecutorTool(BaseTool):
"""Tool for executing bash commands."""
def __init__(self, allowed_commands: Optional[list[str]] = None):
super().__init__(
name="bash_executor",
description="Execute bash commands and return the output",
)
self.allowed_commands = allowed_commands
self.add_parameter("command", "str", "Bash command to execute", required=True)
self.add_parameter("timeout", "int", "Execution timeout in seconds", required=False, default=30)
self.add_parameter("working_dir", "str", "Working directory", required=False, default=".")
async def execute(
self,
command: str,
timeout: int = 30,
working_dir: str = ".",
**kwargs,
) -> ToolResult:
"""
Execute bash command.
Args:
command: Bash command to execute
timeout: Execution timeout
working_dir: Working directory
Returns:
ToolResult with command output
"""
try:
# Check if command is allowed (if whitelist is set)
if self.allowed_commands:
cmd_name = command.split()[0]
if cmd_name not in self.allowed_commands:
return ToolResult(
success=False,
output=None,
error=f"Command '{cmd_name}' not allowed. Allowed: {self.allowed_commands}",
)
# Execute command
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=timeout,
cwd=working_dir,
)
output = result.stdout
if result.stderr:
output += f"\nSTDERR:\n{result.stderr}"
return ToolResult(
success=result.returncode == 0,
output=output or "(no output)",
error=None if result.returncode == 0 else f"Command failed with code {result.returncode}",
metadata={
"return_code": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"command": command,
},
)
except subprocess.TimeoutExpired:
return ToolResult(
success=False,
output=None,
error=f"Command timed out after {timeout} seconds",
)
except Exception as e:
logger.error(f"Bash execution error: {e}")
return ToolResult(
success=False,
output=None,
error=f"Execution error: {str(e)}",
)