""" Executor Agent for SPARKNET Handles task execution and tool usage """ from typing import Optional, Dict, Any from loguru import logger import json import re from .base_agent import BaseAgent, Task, Message from ..llm.ollama_client import OllamaClient class ExecutorAgent(BaseAgent): """Agent specialized in executing tasks using available tools.""" def __init__( self, llm_client: OllamaClient, model: str = "llama3.1:8b", temperature: float = 0.5, ): system_prompt = """You are an execution agent specialized in completing tasks using available tools. Your role is to: 1. Analyze the task requirements 2. Select and use appropriate tools 3. Execute actions to complete the task 4. Report results clearly When you need to use a tool, format your response as: TOOL: tool_name PARAMETERS: { "param1": "value1", "param2": "value2" } After receiving tool results, provide a final answer starting with: RESULT: [your analysis and conclusion] Be precise, focused, and efficient in task completion.""" super().__init__( name="ExecutorAgent", description="Task execution and tool usage agent", llm_client=llm_client, model=model, system_prompt=system_prompt, temperature=temperature, max_tokens=1024, ) async def process_task(self, task: Task) -> Task: """ Process and execute a task. Args: task: Task to process Returns: Updated task with results """ logger.info(f"ExecutorAgent processing task: {task.id}") task.status = "in_progress" try: # Create task message task_message = Message( role="user", content=f"Task: {task.description}\n\nAvailable tools: {', '.join(self.get_available_tools())}", sender="system", ) # Clear history for fresh task processing self.clear_history() self.add_message(task_message) # Iteratively execute until task is complete max_iterations = 5 iteration = 0 final_result = None while iteration < max_iterations: iteration += 1 logger.debug(f"Iteration {iteration}/{max_iterations}") # Get agent response response = await self.call_llm(messages=self.messages) # Add response to history self.add_message( Message( role="assistant", content=response, sender=self.name, ) ) # Check if agent wants to use a tool if "TOOL:" in response: tool_result = await self._execute_tool_from_response(response) # Add tool result to conversation tool_message = Message( role="user", content=f"Tool execution result:\nSuccess: {tool_result.success}\nOutput: {tool_result.output}\nError: {tool_result.error}", sender="system", ) self.add_message(tool_message) # Check if agent provided final result if "RESULT:" in response: # Extract result result_match = re.search(r"RESULT:\s*(.+)", response, re.DOTALL) if result_match: final_result = result_match.group(1).strip() break if final_result: task.result = final_result task.status = "completed" logger.info(f"Task {task.id} completed successfully") else: task.result = "Task processing reached maximum iterations without completion" task.status = "completed" logger.warning(f"Task {task.id} reached max iterations") except Exception as e: logger.error(f"Error processing task {task.id}: {e}") task.status = "failed" task.error = str(e) return task async def _execute_tool_from_response(self, response: str) -> Any: """ Parse and execute tool call from agent response. Args: response: Agent response containing tool call Returns: Tool result """ try: # Extract tool name tool_match = re.search(r"TOOL:\s*(\w+)", response) if not tool_match: return {"success": False, "error": "Could not parse tool name"} tool_name = tool_match.group(1) # Extract parameters params_match = re.search(r"PARAMETERS:\s*(\{[^}]+\})", response, re.DOTALL) if params_match: params_str = params_match.group(1) # Clean up the JSON string params_str = params_str.replace("'", '"') params = json.loads(params_str) else: params = {} logger.info(f"Executing tool {tool_name} with params: {params}") # Execute tool result = await self.execute_tool(tool_name, **params) return result except Exception as e: logger.error(f"Error executing tool from response: {e}") return { "success": False, "error": f"Tool execution error: {str(e)}", }