| | |
| |
|
| | """ |
| | File System Tools Examples for EvoAgentX |
| | |
| | This module provides comprehensive examples for: |
| | - StorageToolkit: Comprehensive file storage operations with flexible storage backends |
| | - CMDToolkit: Command-line execution capabilities with timeout handling |
| | |
| | The examples demonstrate various file operations and system interactions including: |
| | - File save, read, append, list, delete, and existence checks |
| | - Command execution with working directory and timeout support |
| | - Cross-platform command handling |
| | - Storage handler management |
| | """ |
| |
|
| | import os |
| | import sys |
| | from pathlib import Path |
| |
|
| | |
| | sys.path.append(str(Path(__file__).parent.parent)) |
| |
|
| | from evoagentx.tools import ( |
| | StorageToolkit, |
| | CMDToolkit |
| | ) |
| |
|
| |
|
| | def run_file_tool_example(): |
| | """ |
| | Run an example using the StorageToolkit to read and write files with the new storage handler system. |
| | """ |
| | print("\n===== STORAGE TOOL EXAMPLE =====\n") |
| | |
| | try: |
| | |
| | storage_toolkit = StorageToolkit(name="DemoStorageToolkit") |
| | |
| | |
| | save_tool = storage_toolkit.get_tool("save") |
| | read_tool = storage_toolkit.get_tool("read") |
| | append_tool = storage_toolkit.get_tool("append") |
| | list_tool = storage_toolkit.get_tool("list_files") |
| | exists_tool = storage_toolkit.get_tool("exists") |
| | |
| | |
| | sample_text = """This is a sample text document created using the StorageToolkit. |
| | This tool provides comprehensive file operations with automatic format detection. |
| | It supports various file types including text, JSON, CSV, YAML, XML, Excel, and more.""" |
| | |
| | sample_json = { |
| | "name": "Sample Document", |
| | "type": "test", |
| | "content": "This is a JSON document for testing", |
| | "metadata": { |
| | "created": "2024-01-01", |
| | "version": "1.0" |
| | } |
| | } |
| | |
| | |
| | print("1. Testing file save operations...") |
| | |
| | |
| | text_result = save_tool( |
| | file_path="sample_document.txt", |
| | content=sample_text |
| | ) |
| | print("Text file save result:") |
| | print("-" * 30) |
| | print(text_result) |
| | print("-" * 30) |
| | |
| | |
| | |
| | |
| | json_result = save_tool( |
| | file_path="sample_data.json", |
| | content=sample_json |
| | ) |
| | print("JSON file save result:") |
| | print("-" * 30) |
| | print(json_result) |
| | print("-" * 30) |
| | |
| | |
| | print("\n2. Testing file read operations...") |
| | |
| | |
| | text_read_result = read_tool(file_path="sample_document.txt") |
| | print("Text file read result:") |
| | print("-" * 30) |
| | print(text_read_result) |
| | print("-" * 30) |
| | |
| | |
| | json_read_result = read_tool(file_path="sample_data.json") |
| | print("JSON file read result:") |
| | print("-" * 30) |
| | print(json_read_result) |
| | print("-" * 30) |
| | |
| | |
| | print("\n3. Testing file append operations...") |
| | |
| | |
| | append_text_result = append_tool( |
| | file_path="sample_document.txt", |
| | content="\n\nThis content was appended to the text file." |
| | ) |
| | print("Text file append result:") |
| | print("-" * 30) |
| | print(append_text_result) |
| | print("-" * 30) |
| | |
| | |
| | |
| | |
| | updated_json_data = {**sample_json, "additional": "data", "timestamp": "2024-01-01T12:00:00Z"} |
| | update_json_result = save_tool( |
| | file_path="sample_data.json", |
| | content=updated_json_data |
| | ) |
| | print("JSON file update result:") |
| | print("-" * 30) |
| | print(update_json_result) |
| | print("-" * 30) |
| | |
| | |
| | print("\n4. Testing file listing...") |
| | list_result = list_tool(path=".", max_depth=2, include_hidden=False) |
| | print("File listing result:") |
| | print("-" * 30) |
| | print(list_result) |
| | print("-" * 30) |
| | |
| | |
| | print("\n5. Testing file existence...") |
| | exists_result = exists_tool(path="sample_document.txt") |
| | print("File existence check result:") |
| | print("-" * 30) |
| | print(exists_result) |
| | print("-" * 30) |
| | |
| | |
| | print("\n6. Testing supported formats...") |
| | formats_tool = storage_toolkit.get_tool("list_supported_formats") |
| | formats_result = formats_tool() |
| | print("Supported formats result:") |
| | print("-" * 30) |
| | print(formats_result) |
| | print("-" * 30) |
| | |
| | print("\nβ StorageToolkit test completed successfully!") |
| | print("β All file operations working with default storage handler") |
| | print("β Automatic format detection working") |
| | print("β File operations working (including JSON updates)") |
| | print("β File listing and existence checks working") |
| | |
| | except Exception as e: |
| | print(f"Error running storage tool example: {str(e)}") |
| |
|
| |
|
| | def run_cmd_tool_example(): |
| | """Simple example using CMDToolkit for command line operations.""" |
| | print("\n===== CMD TOOL EXAMPLE =====\n") |
| | |
| | try: |
| | |
| | cmd_toolkit = CMDToolkit(name="DemoCMDToolkit") |
| | execute_tool = cmd_toolkit.get_tool("execute_command") |
| | |
| | print("β CMDToolkit initialized") |
| | |
| | |
| | print("1. Testing basic command execution...") |
| | result = execute_tool(command="echo 'Hello from CMD toolkit'") |
| | |
| | if result.get("success"): |
| | print("β Command executed successfully") |
| | print(f"Output: {result.get('stdout', 'No output')}") |
| | else: |
| | print(f"β Command failed: {result.get('error', 'Unknown error')}") |
| | |
| | |
| | print("\n2. Testing system information commands...") |
| | |
| | |
| | pwd_result = execute_tool(command="pwd") |
| | if pwd_result.get("success"): |
| | print(f"β Current directory: {pwd_result.get('stdout', '').strip()}") |
| | |
| | |
| | if os.name == 'posix': |
| | uname_result = execute_tool(command="uname -a") |
| | if uname_result.get("success"): |
| | print(f"β System info: {uname_result.get('stdout', '').strip()}") |
| | else: |
| | ver_result = execute_tool(command="ver") |
| | if ver_result.get("success"): |
| | print(f"β System info: {ver_result.get('stdout', '').strip()}") |
| | |
| | |
| | print("\n3. Testing file listing...") |
| | if os.name == 'posix': |
| | ls_result = execute_tool(command="ls -la", working_directory=".") |
| | else: |
| | ls_result = execute_tool(command="dir", working_directory=".") |
| | |
| | if ls_result.get("success"): |
| | print("β File listing successful") |
| | print(f"Output length: {len(ls_result.get('stdout', ''))} characters") |
| | else: |
| | print(f"β File listing failed: {ls_result.get('error', 'Unknown error')}") |
| | |
| | |
| | print("\n4. Testing command timeout...") |
| | timeout_result = execute_tool(command="sleep 5", timeout=12) |
| | if not timeout_result.get("success"): |
| | print("β Timeout working correctly (command was interrupted)") |
| | else: |
| | print("β Timeout may not be working as expected") |
| | |
| | print("\nβ CMDToolkit test completed") |
| | |
| | except Exception as e: |
| | print(f"Error: {str(e)}") |
| |
|
| |
|
| | def run_storage_handler_examples(): |
| | """ |
| | Run examples demonstrating different storage handlers and configurations. |
| | """ |
| | print("\n===== STORAGE HANDLER EXAMPLES =====\n") |
| | |
| | try: |
| | |
| | print("1. Testing custom base path storage...") |
| | custom_storage_toolkit = StorageToolkit( |
| | name="CustomPathStorageToolkit", |
| | storage_handler=None, |
| | base_path="./custom_storage" |
| | ) |
| | |
| | |
| | custom_save_tool = custom_storage_toolkit.get_tool("save") |
| | custom_result = custom_save_tool( |
| | file_path="custom_test.txt", |
| | content="This file is stored in a custom location" |
| | ) |
| | |
| | if custom_result.get("success"): |
| | print("β Custom path storage working") |
| | print(f"File saved to: {custom_result.get('file_path')}") |
| | else: |
| | print(f"β Custom path storage failed: {custom_result.get('error')}") |
| | |
| | |
| | custom_read_tool = custom_storage_toolkit.get_tool("read") |
| | custom_read_result = custom_read_tool(file_path="custom_test.txt") |
| | |
| | if custom_read_result.get("success"): |
| | print("β Custom path file reading working") |
| | print(f"Content: {custom_read_result.get('content', '')[:50]}...") |
| | |
| | |
| | custom_list_tool = custom_storage_toolkit.get_tool("list_files") |
| | custom_list_result = custom_list_tool(path=".", max_depth=1, include_hidden=False) |
| | |
| | if custom_list_result.get("success"): |
| | print("β Custom path file listing working") |
| | files = custom_list_result.get("files", []) |
| | print(f"Found {len(files)} files in custom location") |
| | |
| | print("\nβ Storage handler examples completed") |
| | |
| | except Exception as e: |
| | print(f"Error running storage handler examples: {str(e)}") |
| |
|
| |
|
| | def run_advanced_file_operations(): |
| | """ |
| | Run examples demonstrating advanced file operations and format handling. |
| | """ |
| | print("\n===== ADVANCED FILE OPERATIONS =====\n") |
| | |
| | try: |
| | |
| | storage_toolkit = StorageToolkit() |
| | save_tool = storage_toolkit.get_tool("save") |
| | read_tool = storage_toolkit.get_tool("read") |
| | |
| | |
| | print("1. Testing CSV file operations...") |
| | csv_content = """name,age,city |
| | John Doe,30,New York |
| | Jane Smith,25,Los Angeles |
| | Bob Johnson,35,Chicago""" |
| | |
| | csv_result = save_tool( |
| | file_path="sample_data.csv", |
| | content=csv_content |
| | ) |
| | |
| | if csv_result.get("success"): |
| | print("β CSV file saved successfully") |
| | |
| | |
| | csv_read_result = read_tool(file_path="sample_data.csv") |
| | |
| | if csv_read_result.get("success"): |
| | print("β CSV file read successfully") |
| | print(f"Content: {csv_read_result.get('content', '')[:100]}...") |
| | else: |
| | print(f"β Failed to read CSV file: {csv_read_result.get('error')}") |
| | else: |
| | print(f"β Failed to save CSV file: {csv_result.get('error')}") |
| | |
| | |
| | print("\n2. Testing YAML file operations...") |
| | yaml_content = """name: Sample YAML |
| | version: 1.0 |
| | features: |
| | - feature1 |
| | - feature2 |
| | metadata: |
| | author: Test User |
| | date: 2024-01-01""" |
| | |
| | yaml_result = save_tool( |
| | file_path="sample_config.yaml", |
| | content=yaml_content |
| | ) |
| | |
| | if yaml_result.get("success"): |
| | print("β YAML file saved successfully") |
| | |
| | |
| | yaml_read_result = read_tool(file_path="sample_config.yaml") |
| | |
| | if yaml_read_result.get("success"): |
| | print("β YAML file read successfully") |
| | print(f"Content: {yaml_read_result.get('content', '')[:100]}...") |
| | else: |
| | print(f"β Failed to read YAML file: {yaml_read_result.get('error')}") |
| | else: |
| | print(f"β Failed to save YAML file: {yaml_result.get('error')}") |
| |
|
| | |
| | print("\n3. Testing PDF file operations...") |
| | |
| | |
| | pdf_content = """Test PDF Document |
| | |
| | This is a test PDF created by EvoAgentX. |
| | |
| | Features: |
| | β’ PDF creation from text |
| | β’ Automatic formatting |
| | β’ Professional layout |
| | |
| | This demonstrates the storage system's PDF capabilities.""" |
| | |
| | pdf_result = save_tool( |
| | file_path="test_pdf.pdf", |
| | content=pdf_content |
| | ) |
| | |
| | if pdf_result.get("success"): |
| | print("β PDF file created successfully") |
| | else: |
| | print(f"β Failed to create PDF file: {pdf_result.get('error')}") |
| | |
| | |
| | pdf_read_result = read_tool(file_path="test_pdf.pdf") |
| | |
| | if pdf_read_result.get("success"): |
| | print("β PDF file read successfully") |
| | print(f"Content: {pdf_read_result.get('content', '')[:100]}...") |
| | else: |
| | print(f"β Failed to read PDF file: {pdf_read_result.get('error')}") |
| | |
| | |
| | print("\n4. Testing file deletion...") |
| | delete_tool = storage_toolkit.get_tool("delete") |
| | |
| | |
| | test_files = ["sample_document.txt", "sample_data.json", "custom_test.txt"] |
| | for test_file in test_files: |
| | if os.path.exists(test_file): |
| | delete_result = delete_tool(path=test_file) |
| | if delete_result.get("success"): |
| | print(f"β Deleted {test_file}") |
| | else: |
| | print(f"β Failed to delete {test_file}: {delete_result.get('error')}") |
| | |
| | print("\nβ Advanced file operations completed") |
| | |
| | except Exception as e: |
| | print(f"Error running advanced file operations: {str(e)}") |
| |
|
| |
|
| | def main(): |
| | """Main function to run all file system examples""" |
| | print("===== FILE SYSTEM TOOLS EXAMPLES =====") |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | run_advanced_file_operations() |
| | |
| | print("\n===== ALL FILE SYSTEM EXAMPLES COMPLETED =====") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|