File size: 5,656 Bytes
a9dc537 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
"""
Executor Agent for SPARKNET
Handles task execution and tool usage
"""
from typing import Optional, Dict, Any
from loguru import logger
import json
import re
from .base_agent import BaseAgent, Task, Message
from ..llm.ollama_client import OllamaClient
class ExecutorAgent(BaseAgent):
"""Agent specialized in executing tasks using available tools."""
def __init__(
self,
llm_client: OllamaClient,
model: str = "llama3.1:8b",
temperature: float = 0.5,
):
system_prompt = """You are an execution agent specialized in completing tasks using available tools.
Your role is to:
1. Analyze the task requirements
2. Select and use appropriate tools
3. Execute actions to complete the task
4. Report results clearly
When you need to use a tool, format your response as:
TOOL: tool_name
PARAMETERS: {
"param1": "value1",
"param2": "value2"
}
After receiving tool results, provide a final answer starting with:
RESULT: [your analysis and conclusion]
Be precise, focused, and efficient in task completion."""
super().__init__(
name="ExecutorAgent",
description="Task execution and tool usage agent",
llm_client=llm_client,
model=model,
system_prompt=system_prompt,
temperature=temperature,
max_tokens=1024,
)
async def process_task(self, task: Task) -> Task:
"""
Process and execute a task.
Args:
task: Task to process
Returns:
Updated task with results
"""
logger.info(f"ExecutorAgent processing task: {task.id}")
task.status = "in_progress"
try:
# Create task message
task_message = Message(
role="user",
content=f"Task: {task.description}\n\nAvailable tools: {', '.join(self.get_available_tools())}",
sender="system",
)
# Clear history for fresh task processing
self.clear_history()
self.add_message(task_message)
# Iteratively execute until task is complete
max_iterations = 5
iteration = 0
final_result = None
while iteration < max_iterations:
iteration += 1
logger.debug(f"Iteration {iteration}/{max_iterations}")
# Get agent response
response = await self.call_llm(messages=self.messages)
# Add response to history
self.add_message(
Message(
role="assistant",
content=response,
sender=self.name,
)
)
# Check if agent wants to use a tool
if "TOOL:" in response:
tool_result = await self._execute_tool_from_response(response)
# Add tool result to conversation
tool_message = Message(
role="user",
content=f"Tool execution result:\nSuccess: {tool_result.success}\nOutput: {tool_result.output}\nError: {tool_result.error}",
sender="system",
)
self.add_message(tool_message)
# Check if agent provided final result
if "RESULT:" in response:
# Extract result
result_match = re.search(r"RESULT:\s*(.+)", response, re.DOTALL)
if result_match:
final_result = result_match.group(1).strip()
break
if final_result:
task.result = final_result
task.status = "completed"
logger.info(f"Task {task.id} completed successfully")
else:
task.result = "Task processing reached maximum iterations without completion"
task.status = "completed"
logger.warning(f"Task {task.id} reached max iterations")
except Exception as e:
logger.error(f"Error processing task {task.id}: {e}")
task.status = "failed"
task.error = str(e)
return task
async def _execute_tool_from_response(self, response: str) -> Any:
"""
Parse and execute tool call from agent response.
Args:
response: Agent response containing tool call
Returns:
Tool result
"""
try:
# Extract tool name
tool_match = re.search(r"TOOL:\s*(\w+)", response)
if not tool_match:
return {"success": False, "error": "Could not parse tool name"}
tool_name = tool_match.group(1)
# Extract parameters
params_match = re.search(r"PARAMETERS:\s*(\{[^}]+\})", response, re.DOTALL)
if params_match:
params_str = params_match.group(1)
# Clean up the JSON string
params_str = params_str.replace("'", '"')
params = json.loads(params_str)
else:
params = {}
logger.info(f"Executing tool {tool_name} with params: {params}")
# Execute tool
result = await self.execute_tool(tool_name, **params)
return result
except Exception as e:
logger.error(f"Error executing tool from response: {e}")
return {
"success": False,
"error": f"Tool execution error: {str(e)}",
}
|