| | import os
|
| | import re
|
| | import time
|
| | import json
|
| | from typing import Dict, List, Any, Optional, Union
|
| |
|
| | class TaskExecutor:
|
| | """Task execution engine for the autonomous AI agent
|
| |
|
| | This module provides capabilities for executing various tasks including:
|
| | 1. Task planning and execution
|
| | 2. Progress tracking
|
| | 3. Result formatting
|
| | 4. Error handling
|
| | """
|
| |
|
| | def __init__(self, reasoning_engine):
|
| | """Initialize the task executor
|
| |
|
| | Args:
|
| | reasoning_engine: The reasoning engine to use for task planning
|
| | """
|
| | self.reasoning_engine = reasoning_engine
|
| | self.current_task = None
|
| | self.task_status = "idle"
|
| | self.task_history = []
|
| | self.max_history_length = 10
|
| |
|
| | def execute_task(self, task_description: str) -> Dict[str, Any]:
|
| | """Execute a task based on the description
|
| |
|
| | Args:
|
| | task_description: Description of the task to execute
|
| |
|
| | Returns:
|
| | Dictionary containing task results and status
|
| | """
|
| | self.current_task = task_description
|
| | self.task_status = "in_progress"
|
| | start_time = time.time()
|
| |
|
| | try:
|
| |
|
| | analysis_prompt = f"""I need to analyze this task to understand its requirements and constraints:
|
| |
|
| | Task: {task_description}
|
| |
|
| | Task Analysis:
|
| | 1. What is the main objective of this task?
|
| | 2. What are the key requirements?
|
| | 3. What constraints or limitations should I be aware of?
|
| | 4. What resources or information do I need to complete this task?
|
| |
|
| | Analysis:"""
|
| |
|
| | task_analysis = self.reasoning_engine.generate_text(analysis_prompt, max_length=768)
|
| |
|
| |
|
| | decomposition_prompt = f"""Based on my analysis of the task:
|
| | {task_analysis}
|
| |
|
| | I need to break down this task into smaller, manageable subtasks:
|
| |
|
| | Task: {task_description}
|
| |
|
| | Subtasks:
|
| | 1. """
|
| |
|
| | decomposition = self.reasoning_engine.generate_text(decomposition_prompt, max_length=1024)
|
| |
|
| |
|
| | subtasks = []
|
| | for line in decomposition.split('\n'):
|
| | line = line.strip()
|
| | if line and (line[0].isdigit() or line.startswith('- ')):
|
| |
|
| | cleaned_line = re.sub(r'^\d+\.\s*|^-\s*', '', line).strip()
|
| | if cleaned_line:
|
| | subtasks.append(cleaned_line)
|
| |
|
| |
|
| | if not subtasks:
|
| | subtasks = self.reasoning_engine.decompose_task(task_description)
|
| |
|
| |
|
| | planning_prompt = f"""I need to create a detailed plan to execute this task:
|
| | {task_description}
|
| |
|
| | Task Analysis:
|
| | {task_analysis}
|
| |
|
| | The task has been broken down into these subtasks:
|
| | {json.dumps(subtasks, indent=2)}
|
| |
|
| | Detailed step-by-step plan (including how to handle potential issues):
|
| | 1. """
|
| |
|
| | plan = self.reasoning_engine.generate_text(planning_prompt, max_length=1024)
|
| |
|
| |
|
| | subtask_results = []
|
| | for i, subtask in enumerate(subtasks):
|
| |
|
| | self.task_status = f"in_progress ({i+1}/{len(subtasks)})"
|
| |
|
| |
|
| | subtask_prompt = f"""I am executing this subtask as part of the larger task:
|
| |
|
| | Main Task: {task_description}
|
| |
|
| | Current Subtask ({i+1}/{len(subtasks)}): {subtask}
|
| |
|
| | Previous Results: {json.dumps([r['result'] for r in subtask_results], indent=2) if subtask_results else 'None yet'}
|
| |
|
| | I will now execute this subtask carefully and report the detailed results:"""
|
| |
|
| | result = self.reasoning_engine.generate_text(subtask_prompt, max_length=768)
|
| |
|
| |
|
| | evaluation_prompt = f"""I need to evaluate the quality of my execution of this subtask:
|
| |
|
| | Subtask: {subtask}
|
| |
|
| | Execution Result: {result}
|
| |
|
| | Evaluation (rate from 1-10 and explain):"""
|
| |
|
| | evaluation = self.reasoning_engine.generate_text(evaluation_prompt, max_length=256)
|
| |
|
| | subtask_results.append({
|
| | "subtask": subtask,
|
| | "result": result,
|
| | "evaluation": evaluation
|
| | })
|
| |
|
| |
|
| | compilation_prompt = f"""I have executed all subtasks for the main task:
|
| | {task_description}
|
| |
|
| | Here are the results of each subtask:
|
| | {json.dumps(subtask_results, indent=2)}
|
| |
|
| | I need to synthesize these results into a coherent final result that addresses the original task completely.
|
| |
|
| | Final synthesized result:"""
|
| |
|
| | final_result = self.reasoning_engine.generate_text(compilation_prompt, max_length=1024)
|
| |
|
| |
|
| | reflection_prompt = f"""I need to reflect on my execution of this task:
|
| |
|
| | Task: {task_description}
|
| |
|
| | My approach: {plan}
|
| |
|
| | Final result: {final_result}
|
| |
|
| | Reflection on what went well and what could be improved:"""
|
| |
|
| | reflection = self.reasoning_engine.generate_text(reflection_prompt, max_length=512)
|
| |
|
| | self.task_status = "completed"
|
| | execution_time = time.time() - start_time
|
| |
|
| |
|
| | task_record = {
|
| | "task": task_description,
|
| | "plan": plan,
|
| | "subtasks": subtask_results,
|
| | "result": final_result,
|
| | "reflection": reflection,
|
| | "status": self.task_status,
|
| | "execution_time": execution_time,
|
| | "timestamp": time.time()
|
| | }
|
| |
|
| | self.task_history.append(task_record)
|
| |
|
| |
|
| | if len(self.task_history) > self.max_history_length:
|
| | self.task_history = self.task_history[-self.max_history_length:]
|
| |
|
| | return task_record
|
| |
|
| | except Exception as e:
|
| | self.task_status = "failed"
|
| | error_message = str(e)
|
| |
|
| |
|
| | task_record = {
|
| | "task": task_description,
|
| | "status": self.task_status,
|
| | "error": error_message,
|
| | "timestamp": time.time()
|
| | }
|
| |
|
| | self.task_history.append(task_record)
|
| |
|
| | return task_record
|
| |
|
| | def get_task_status(self) -> Dict[str, Any]:
|
| | """Get the current status of task execution
|
| |
|
| | Returns:
|
| | Dictionary containing task status information
|
| | """
|
| | return {
|
| | "current_task": self.current_task,
|
| | "status": self.task_status,
|
| | "history_length": len(self.task_history)
|
| | }
|
| |
|
| | def get_task_history(self) -> List[Dict[str, Any]]:
|
| | """Get the history of executed tasks
|
| |
|
| | Returns:
|
| | List of task records
|
| | """
|
| | return self.task_history
|
| |
|
| | def cancel_task(self) -> Dict[str, Any]:
|
| | """Cancel the currently executing task
|
| |
|
| | Returns:
|
| | Dictionary containing cancellation status
|
| | """
|
| | if self.task_status == "in_progress":
|
| | self.task_status = "cancelled"
|
| |
|
| |
|
| | if self.task_history:
|
| | self.task_history[-1]["status"] = "cancelled"
|
| |
|
| | return {
|
| | "task": self.current_task,
|
| | "status": self.task_status,
|
| | "message": "Task cancelled successfully"
|
| | }
|
| | else:
|
| | return {
|
| | "task": self.current_task,
|
| | "status": self.task_status,
|
| | "message": "No task in progress to cancel"
|
| | } |