SPARKNET / src /agents /executor_agent.py
MHamdan's picture
Initial commit: SPARKNET framework
a9dc537
"""
Executor Agent for SPARKNET
Handles task execution and tool usage
"""
from typing import Optional, Dict, Any
from loguru import logger
import json
import re
from .base_agent import BaseAgent, Task, Message
from ..llm.ollama_client import OllamaClient
class ExecutorAgent(BaseAgent):
"""Agent specialized in executing tasks using available tools."""
def __init__(
self,
llm_client: OllamaClient,
model: str = "llama3.1:8b",
temperature: float = 0.5,
):
system_prompt = """You are an execution agent specialized in completing tasks using available tools.
Your role is to:
1. Analyze the task requirements
2. Select and use appropriate tools
3. Execute actions to complete the task
4. Report results clearly
When you need to use a tool, format your response as:
TOOL: tool_name
PARAMETERS: {
"param1": "value1",
"param2": "value2"
}
After receiving tool results, provide a final answer starting with:
RESULT: [your analysis and conclusion]
Be precise, focused, and efficient in task completion."""
super().__init__(
name="ExecutorAgent",
description="Task execution and tool usage agent",
llm_client=llm_client,
model=model,
system_prompt=system_prompt,
temperature=temperature,
max_tokens=1024,
)
async def process_task(self, task: Task) -> Task:
"""
Process and execute a task.
Args:
task: Task to process
Returns:
Updated task with results
"""
logger.info(f"ExecutorAgent processing task: {task.id}")
task.status = "in_progress"
try:
# Create task message
task_message = Message(
role="user",
content=f"Task: {task.description}\n\nAvailable tools: {', '.join(self.get_available_tools())}",
sender="system",
)
# Clear history for fresh task processing
self.clear_history()
self.add_message(task_message)
# Iteratively execute until task is complete
max_iterations = 5
iteration = 0
final_result = None
while iteration < max_iterations:
iteration += 1
logger.debug(f"Iteration {iteration}/{max_iterations}")
# Get agent response
response = await self.call_llm(messages=self.messages)
# Add response to history
self.add_message(
Message(
role="assistant",
content=response,
sender=self.name,
)
)
# Check if agent wants to use a tool
if "TOOL:" in response:
tool_result = await self._execute_tool_from_response(response)
# Add tool result to conversation
tool_message = Message(
role="user",
content=f"Tool execution result:\nSuccess: {tool_result.success}\nOutput: {tool_result.output}\nError: {tool_result.error}",
sender="system",
)
self.add_message(tool_message)
# Check if agent provided final result
if "RESULT:" in response:
# Extract result
result_match = re.search(r"RESULT:\s*(.+)", response, re.DOTALL)
if result_match:
final_result = result_match.group(1).strip()
break
if final_result:
task.result = final_result
task.status = "completed"
logger.info(f"Task {task.id} completed successfully")
else:
task.result = "Task processing reached maximum iterations without completion"
task.status = "completed"
logger.warning(f"Task {task.id} reached max iterations")
except Exception as e:
logger.error(f"Error processing task {task.id}: {e}")
task.status = "failed"
task.error = str(e)
return task
async def _execute_tool_from_response(self, response: str) -> Any:
"""
Parse and execute tool call from agent response.
Args:
response: Agent response containing tool call
Returns:
Tool result
"""
try:
# Extract tool name
tool_match = re.search(r"TOOL:\s*(\w+)", response)
if not tool_match:
return {"success": False, "error": "Could not parse tool name"}
tool_name = tool_match.group(1)
# Extract parameters
params_match = re.search(r"PARAMETERS:\s*(\{[^}]+\})", response, re.DOTALL)
if params_match:
params_str = params_match.group(1)
# Clean up the JSON string
params_str = params_str.replace("'", '"')
params = json.loads(params_str)
else:
params = {}
logger.info(f"Executing tool {tool_name} with params: {params}")
# Execute tool
result = await self.execute_tool(tool_name, **params)
return result
except Exception as e:
logger.error(f"Error executing tool from response: {e}")
return {
"success": False,
"error": f"Tool execution error: {str(e)}",
}