RAM / agent_tasks.py
anu151105's picture
Upload 6 files
a3738dd verified
import os
import re
import time
import json
from typing import Dict, List, Any, Optional, Union
class TaskExecutor:
"""Task execution engine for the autonomous AI agent
This module provides capabilities for executing various tasks including:
1. Task planning and execution
2. Progress tracking
3. Result formatting
4. Error handling
"""
def __init__(self, reasoning_engine):
"""Initialize the task executor
Args:
reasoning_engine: The reasoning engine to use for task planning
"""
self.reasoning_engine = reasoning_engine
self.current_task = None
self.task_status = "idle"
self.task_history = []
self.max_history_length = 10
def execute_task(self, task_description: str) -> Dict[str, Any]:
"""Execute a task based on the description
Args:
task_description: Description of the task to execute
Returns:
Dictionary containing task results and status
"""
self.current_task = task_description
self.task_status = "in_progress"
start_time = time.time()
try:
# First, analyze the task to understand its requirements and constraints
analysis_prompt = f"""I need to analyze this task to understand its requirements and constraints:
Task: {task_description}
Task Analysis:
1. What is the main objective of this task?
2. What are the key requirements?
3. What constraints or limitations should I be aware of?
4. What resources or information do I need to complete this task?
Analysis:"""
task_analysis = self.reasoning_engine.generate_text(analysis_prompt, max_length=768)
# Decompose the task into subtasks with the analysis in mind
decomposition_prompt = f"""Based on my analysis of the task:
{task_analysis}
I need to break down this task into smaller, manageable subtasks:
Task: {task_description}
Subtasks:
1. """
decomposition = self.reasoning_engine.generate_text(decomposition_prompt, max_length=1024)
# Parse the decomposition into a list of subtasks
subtasks = []
for line in decomposition.split('\n'):
line = line.strip()
if line and (line[0].isdigit() or line.startswith('- ')):
# Remove numbering or bullet points
cleaned_line = re.sub(r'^\d+\.\s*|^-\s*', '', line).strip()
if cleaned_line:
subtasks.append(cleaned_line)
# If parsing failed, use the reasoning engine's decompose_task method as fallback
if not subtasks:
subtasks = self.reasoning_engine.decompose_task(task_description)
# Generate a detailed plan for executing the task
planning_prompt = f"""I need to create a detailed plan to execute this task:
{task_description}
Task Analysis:
{task_analysis}
The task has been broken down into these subtasks:
{json.dumps(subtasks, indent=2)}
Detailed step-by-step plan (including how to handle potential issues):
1. """
plan = self.reasoning_engine.generate_text(planning_prompt, max_length=1024)
# Track progress of each subtask with more detailed execution
subtask_results = []
for i, subtask in enumerate(subtasks):
# Update status
self.task_status = f"in_progress ({i+1}/{len(subtasks)})"
# Execute the subtask with more context
subtask_prompt = f"""I am executing this subtask as part of the larger task:
Main Task: {task_description}
Current Subtask ({i+1}/{len(subtasks)}): {subtask}
Previous Results: {json.dumps([r['result'] for r in subtask_results], indent=2) if subtask_results else 'None yet'}
I will now execute this subtask carefully and report the detailed results:"""
result = self.reasoning_engine.generate_text(subtask_prompt, max_length=768)
# Evaluate the quality of the result
evaluation_prompt = f"""I need to evaluate the quality of my execution of this subtask:
Subtask: {subtask}
Execution Result: {result}
Evaluation (rate from 1-10 and explain):"""
evaluation = self.reasoning_engine.generate_text(evaluation_prompt, max_length=256)
subtask_results.append({
"subtask": subtask,
"result": result,
"evaluation": evaluation
})
# Compile the final results with synthesis
compilation_prompt = f"""I have executed all subtasks for the main task:
{task_description}
Here are the results of each subtask:
{json.dumps(subtask_results, indent=2)}
I need to synthesize these results into a coherent final result that addresses the original task completely.
Final synthesized result:"""
final_result = self.reasoning_engine.generate_text(compilation_prompt, max_length=1024)
# Self-reflection on the task execution
reflection_prompt = f"""I need to reflect on my execution of this task:
Task: {task_description}
My approach: {plan}
Final result: {final_result}
Reflection on what went well and what could be improved:"""
reflection = self.reasoning_engine.generate_text(reflection_prompt, max_length=512)
self.task_status = "completed"
execution_time = time.time() - start_time
# Add to task history
task_record = {
"task": task_description,
"plan": plan,
"subtasks": subtask_results,
"result": final_result,
"reflection": reflection,
"status": self.task_status,
"execution_time": execution_time,
"timestamp": time.time()
}
self.task_history.append(task_record)
# Trim history if it gets too long
if len(self.task_history) > self.max_history_length:
self.task_history = self.task_history[-self.max_history_length:]
return task_record
except Exception as e:
self.task_status = "failed"
error_message = str(e)
# Add failed task to history
task_record = {
"task": task_description,
"status": self.task_status,
"error": error_message,
"timestamp": time.time()
}
self.task_history.append(task_record)
return task_record
def get_task_status(self) -> Dict[str, Any]:
"""Get the current status of task execution
Returns:
Dictionary containing task status information
"""
return {
"current_task": self.current_task,
"status": self.task_status,
"history_length": len(self.task_history)
}
def get_task_history(self) -> List[Dict[str, Any]]:
"""Get the history of executed tasks
Returns:
List of task records
"""
return self.task_history
def cancel_task(self) -> Dict[str, Any]:
"""Cancel the currently executing task
Returns:
Dictionary containing cancellation status
"""
if self.task_status == "in_progress":
self.task_status = "cancelled"
# Update the last task record in history
if self.task_history:
self.task_history[-1]["status"] = "cancelled"
return {
"task": self.current_task,
"status": self.task_status,
"message": "Task cancelled successfully"
}
else:
return {
"task": self.current_task,
"status": self.task_status,
"message": "No task in progress to cancel"
}