| import time |
| import json |
| from typing import Dict, Any, List, Union, Optional |
| from pathlib import Path |
| from bson import json_util |
| from pymongo import MongoClient |
|
|
| from .database_base import DatabaseBase, DatabaseType, QueryType, DatabaseConnection |
| from .tool import Tool, Toolkit |
| from ..core.logging import logger |
|
|
|
|
| class MongoDBConnection(DatabaseConnection): |
| """MongoDB-specific connection management""" |
| |
| def __init__(self, connection_string: str, **kwargs): |
| super().__init__(connection_string, **kwargs) |
| self.client = None |
| self.database = None |
| |
| def connect(self) -> bool: |
| """Establish connection to MongoDB""" |
| try: |
| |
| if "mongodb://" in self.connection_string or "mongodb+srv://" in self.connection_string: |
| |
| self.client = MongoClient(self.connection_string, **self.connection_params) |
| else: |
| |
| self.client = MongoClient(self.connection_string, **self.connection_params) |
| |
| |
| self.client.admin.command('ping') |
| self._is_connected = True |
| logger.info("Successfully connected to MongoDB") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to connect to MongoDB: {str(e)}") |
| self._is_connected = False |
| return False |
| |
| def disconnect(self) -> bool: |
| """Close MongoDB connection""" |
| try: |
| if self.client: |
| self.client.close() |
| self.client = None |
| self.database = None |
| self._is_connected = False |
| logger.info("Disconnected from MongoDB") |
| return True |
| except Exception as e: |
| logger.error(f"Error disconnecting from MongoDB: {str(e)}") |
| return False |
| |
| def test_connection(self) -> bool: |
| """Test MongoDB connection""" |
| try: |
| if self.client: |
| self.client.admin.command('ping') |
| return True |
| return False |
| except Exception: |
| return False |
| |
| def get_database(self, database_name: str): |
| """Get database instance""" |
| if self.client and database_name: |
| return self.client[database_name] |
| return None |
|
|
|
|
| class MongoDBDatabase(DatabaseBase): |
| """ |
| MongoDB database implementation with automatic initialization. |
| Handles remote connections, existing local databases, and new local database creation. |
| """ |
| |
| def __init__(self, |
| connection_string: str = None, |
| database_name: str = None, |
| local_path: str = None, |
| auto_save: bool = True, |
| read_only: bool = False, |
| **kwargs): |
| """ |
| Initialize MongoDB database with automatic detection and setup. |
| |
| Args: |
| connection_string: MongoDB connection string (for remote) |
| database_name: Name of the database |
| local_path: Path for local file-based database |
| auto_save: Automatically save changes to local files |
| read_only: If True, only read operations are allowed (no insert, update, delete) |
| **kwargs: Additional connection parameters |
| """ |
| |
| init_params = { |
| 'connection_string': connection_string, |
| 'database_name': database_name |
| } |
| |
| |
| super().__init__(**init_params, **kwargs) |
| |
| |
| self.local_path = Path(local_path) if local_path else None |
| self.auto_save = auto_save |
| self.read_only = read_only |
| self.connection_params = kwargs |
| |
| |
| self.is_local_database = False |
| self.client = None |
| self.database = None |
| |
| |
| if self._is_remote_connection(): |
| self._init_remote_database() |
| elif self._is_existing_local_database(): |
| self._init_existing_local_database() |
| else: |
| self._init_new_local_database() |
| |
| def _is_remote_connection(self) -> bool: |
| """Check if this is a remote MongoDB connection""" |
| return (self.connection_string and |
| (self.connection_string.startswith(('mongodb://', 'mongodb+srv://')) or |
| 'localhost' in self.connection_string or |
| '127.0.0.1' in self.connection_string)) |
| |
| def _is_existing_local_database(self) -> bool: |
| """Check if there's an existing local database""" |
| if not self.local_path: |
| return False |
| |
| if not self.local_path.exists(): |
| return False |
| |
| |
| json_files = list(self.local_path.glob("*.json")) |
| db_info_file = self.local_path / "db_info.json" |
| |
| return len(json_files) > 0 or db_info_file.exists() |
| |
| def _init_remote_database(self): |
| """Initialize remote MongoDB connection""" |
| try: |
| self.client = MongoClient(self.connection_string, **self.connection_params) |
| self.client.admin.command('ping') |
| |
| if self.database_name: |
| self.database = self.client[self.database_name] |
| |
| self._is_initialized = True |
| self.is_local_database = False |
| logger.info(f"Connected to remote MongoDB: {self.database_name}") |
| |
| except Exception as e: |
| logger.error(f"Failed to connect to remote MongoDB: {str(e)}") |
| self._is_initialized = False |
| raise |
| |
| def _init_existing_local_database(self): |
| """Initialize existing local database""" |
| try: |
| |
| self.connection_string = "mongodb://localhost:27017" |
| self.client = MongoClient(self.connection_string, **self.connection_params) |
| |
| |
| if not self.database_name: |
| self.database_name = self.local_path.name |
| |
| self.database = self.client[self.database_name] |
| |
| |
| self._load_local_collections() |
| |
| self._is_initialized = True |
| self.is_local_database = True |
| logger.info(f"Loaded existing local database from: {self.local_path}") |
| |
| except Exception as e: |
| logger.error(f"Failed to load existing local database: {str(e)}") |
| self._is_initialized = False |
| raise |
| |
| def _init_new_local_database(self): |
| """Initialize new local database""" |
| try: |
| |
| if not self.local_path: |
| self.local_path = Path("./mongodb_local") |
| |
| |
| self.local_path.mkdir(parents=True, exist_ok=True) |
| |
| |
| self.connection_string = "mongodb://localhost:27017" |
| self.client = MongoClient(self.connection_string, **self.connection_params) |
| |
| |
| if not self.database_name: |
| self.database_name = self.local_path.name |
| |
| self.database = self.client[self.database_name] |
| |
| |
| self._create_db_info_file() |
| |
| self._is_initialized = True |
| self.is_local_database = True |
| logger.info(f"Created new local database at: {self.local_path}") |
| |
| except Exception as e: |
| logger.error(f"Failed to create new local database: {str(e)}") |
| self._is_initialized = False |
| raise |
| |
| def _load_local_collections(self): |
| """Load collections from local JSON files""" |
| if not self.local_path or not self.local_path.exists(): |
| return |
| |
| json_files = [f for f in self.local_path.glob("*.json") if f.name != "db_info.json"] |
| |
| for json_file in json_files: |
| collection_name = json_file.stem |
| try: |
| with open(json_file, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| |
| if isinstance(data, dict): |
| documents = [data] |
| elif isinstance(data, list): |
| documents = data |
| else: |
| continue |
| |
| if documents: |
| |
| cleaned_documents = [] |
| for doc in documents: |
| cleaned_doc = self._clean_document_for_insert(doc) |
| cleaned_documents.append(cleaned_doc) |
| |
| collection = self.database[collection_name] |
| collection.drop() |
| if cleaned_documents: |
| collection.insert_many(cleaned_documents) |
| logger.info(f"Loaded {len(cleaned_documents)} documents into '{collection_name}'") |
| |
| except Exception as e: |
| logger.warning(f"Failed to load collection from {json_file}: {str(e)}") |
| |
| def _clean_document_for_insert(self, doc: Dict) -> Dict: |
| """Clean document by removing problematic MongoDB-specific fields""" |
| if isinstance(doc, dict): |
| cleaned = {} |
| for key, value in doc.items(): |
| if key == '_id' and isinstance(value, dict) and '$oid' in value: |
| |
| continue |
| elif isinstance(value, dict): |
| cleaned[key] = self._clean_document_for_insert(value) |
| elif isinstance(value, list): |
| cleaned[key] = [self._clean_document_for_insert(item) if isinstance(item, dict) else item for item in value] |
| else: |
| cleaned[key] = value |
| return cleaned |
| return doc |
| |
| def _create_db_info_file(self): |
| """Create database info file for new local database""" |
| try: |
| db_info = { |
| "database_name": self.database_name, |
| "created_at": time.time(), |
| "local_path": str(self.local_path.absolute()), |
| "auto_save": self.auto_save, |
| "version": "1.0" |
| } |
| |
| info_file = self.local_path / "db_info.json" |
| with open(info_file, 'w', encoding='utf-8') as f: |
| json.dump(db_info, f, indent=2, ensure_ascii=False) |
| |
| except Exception as e: |
| logger.warning(f"Failed to create db info file: {str(e)}") |
| |
| def _save_collection_to_file(self, collection_name: str): |
| """Save collection to local JSON file""" |
| if not self.is_local_database or not self.local_path: |
| return |
| |
| try: |
| collection = self.database[collection_name] |
| documents = list(collection.find()) |
| |
| |
| for doc in documents: |
| if '_id' in doc: |
| doc['_id'] = str(doc['_id']) |
| |
| file_path = self.local_path / f"{collection_name}.json" |
| with open(file_path, 'w', encoding='utf-8') as f: |
| json.dump(documents, f, indent=2, ensure_ascii=False, default=str) |
| |
| logger.debug(f"Saved collection '{collection_name}' to {file_path}") |
| |
| except Exception as e: |
| logger.warning(f"Failed to save collection '{collection_name}': {str(e)}") |
| |
| def _auto_save_if_needed(self, collection_name: str): |
| """Auto-save collection if local database and auto_save is enabled""" |
| if self.is_local_database and self.auto_save: |
| self._save_collection_to_file(collection_name) |
|
|
| def _get_database_type(self) -> DatabaseType: |
| return DatabaseType.MONGODB |
| |
| def connect(self) -> bool: |
| """Connection is already established in __init__""" |
| return self._is_initialized |
| |
| def disconnect(self) -> bool: |
| """Close MongoDB connection""" |
| try: |
| if self.client: |
| self.client.close() |
| self.client = None |
| self.database = None |
| self._is_initialized = False |
| logger.info("Disconnected from MongoDB") |
| return True |
| except Exception as e: |
| logger.error(f"Error disconnecting: {str(e)}") |
| return False |
| |
| def test_connection(self) -> bool: |
| """Test MongoDB connection""" |
| try: |
| if self.client: |
| self.client.admin.command('ping') |
| return True |
| return False |
| except Exception: |
| return False |
| |
| def execute_query(self, |
| query: Union[str, Dict, List], |
| query_type: QueryType = None, |
| collection_name: str = None, |
| **kwargs) -> Dict[str, Any]: |
| """Execute a query on MongoDB with automatic result handling""" |
| if not self._is_initialized or self.database is None: |
| return self.format_error_result("Database not connected") |
| |
| if not collection_name: |
| return self.format_error_result("Collection name is required") |
| |
| start_time = time.time() |
| |
| try: |
| collection = self.database[collection_name] |
| |
| |
| if not query_type: |
| query_type = self._infer_query_type(query) |
| |
| |
| if self.read_only and query_type in [QueryType.INSERT, QueryType.UPDATE, QueryType.DELETE, QueryType.CREATE, QueryType.DROP]: |
| return self.format_error_result( |
| f"Write operation '{query_type.value}' is not allowed in read-only mode. " |
| "Only SELECT and AGGREGATE operations are permitted.", |
| query_type, |
| execution_time=time.time() - start_time |
| ) |
| |
| |
| if query_type == QueryType.SELECT: |
| result = self._execute_find(collection, query, **kwargs) |
| elif query_type == QueryType.INSERT: |
| result = self._execute_insert(collection, query, **kwargs) |
| self._auto_save_if_needed(collection_name) |
| elif query_type == QueryType.UPDATE: |
| result = self._execute_update(collection, query, **kwargs) |
| self._auto_save_if_needed(collection_name) |
| elif query_type == QueryType.DELETE: |
| result = self._execute_delete(collection, query, **kwargs) |
| self._auto_save_if_needed(collection_name) |
| elif query_type == QueryType.AGGREGATE: |
| result = self._execute_aggregate(collection, query, **kwargs) |
| else: |
| return self.format_error_result(f"Unsupported query type: {query_type}") |
| |
| execution_time = time.time() - start_time |
| if isinstance(result, dict): |
| result["execution_time"] = execution_time |
| |
| return result |
| |
| except Exception as e: |
| execution_time = time.time() - start_time |
| logger.error(f"Error executing MongoDB query: {str(e)}") |
| return self.format_error_result(str(e), query_type, execution_time=execution_time) |
| |
| def _infer_query_type(self, query: Union[str, Dict, List]) -> QueryType: |
| """Infer query type from the query structure""" |
| if isinstance(query, list): |
| return QueryType.AGGREGATE |
| elif isinstance(query, dict): |
| |
| if self.read_only: |
| if "insert" in query or "insertOne" in query or "insertMany" in query: |
| return QueryType.SELECT |
| elif "update" in query or "updateOne" in query or "updateMany" in query: |
| return QueryType.SELECT |
| elif "delete" in query or "deleteOne" in query or "deleteMany" in query: |
| return QueryType.SELECT |
| elif "create" in query or "createCollection" in query: |
| return QueryType.SELECT |
| elif "drop" in query or "dropCollection" in query: |
| return QueryType.SELECT |
| else: |
| return QueryType.SELECT |
| else: |
| |
| if "insert" in query or "insertOne" in query or "insertMany" in query: |
| return QueryType.INSERT |
| elif "update" in query or "updateOne" in query or "updateMany" in query: |
| return QueryType.UPDATE |
| elif "delete" in query or "deleteOne" in query or "deleteMany" in query: |
| return QueryType.DELETE |
| elif "create" in query or "createCollection" in query: |
| return QueryType.CREATE |
| elif "drop" in query or "dropCollection" in query: |
| return QueryType.DROP |
| else: |
| return QueryType.SELECT |
| elif isinstance(query, str): |
| query_lower = query.lower().strip() |
| if self.read_only: |
| |
| return QueryType.SELECT |
| else: |
| |
| if query_lower.startswith(("insert", "create")): |
| return QueryType.INSERT |
| elif query_lower.startswith("update"): |
| return QueryType.UPDATE |
| elif query_lower.startswith("delete"): |
| return QueryType.DELETE |
| elif query_lower.startswith("drop"): |
| return QueryType.DROP |
| else: |
| return QueryType.SELECT |
| |
| return QueryType.SELECT |
| |
| def _execute_find(self, collection, query: Dict, **kwargs) -> Dict[str, Any]: |
| """Execute find query""" |
| try: |
| |
| if isinstance(query, str): |
| |
| if "=" in query: |
| field, value = query.split("=", 1) |
| query = {field.strip(): value.strip()} |
| else: |
| query = {} |
| |
| |
| filter_query = query.get("filter", query) |
| projection = query.get("projection", {}) |
| sort = query.get("sort", None) |
| limit = query.get("limit", kwargs.get("limit", 0)) |
| skip = query.get("skip", kwargs.get("skip", 0)) |
| |
| |
| cursor = collection.find(filter_query, projection) |
| |
| if sort: |
| cursor = cursor.sort(sort) |
| if skip: |
| cursor = cursor.skip(skip) |
| if limit: |
| cursor = cursor.limit(limit) |
| |
| |
| results = [] |
| for doc in cursor: |
| |
| doc = json.loads(json_util.dumps(doc)) |
| results.append(doc) |
| |
| return self.format_query_result( |
| results, |
| QueryType.SELECT, |
| collection_name=collection.name, |
| filter_applied=filter_query |
| ) |
| |
| except Exception as e: |
| return self.format_error_result(str(e), QueryType.SELECT) |
| |
| def _execute_insert(self, collection, query: Union[Dict, List], **kwargs) -> Dict[str, Any]: |
| """Execute insert operation""" |
| try: |
| if isinstance(query, dict): |
| |
| if "document" in query: |
| document = query["document"] |
| else: |
| document = query |
| |
| result = collection.insert_one(document) |
| return self.format_query_result( |
| {"inserted_id": str(result.inserted_id)}, |
| QueryType.INSERT, |
| collection_name=collection.name |
| ) |
| elif isinstance(query, list): |
| |
| if all(isinstance(item, dict) for item in query): |
| documents = query |
| else: |
| documents = [{"documents": query}] |
| |
| result = collection.insert_many(documents) |
| return self.format_query_result( |
| {"inserted_ids": [str(id) for id in result.inserted_ids]}, |
| QueryType.INSERT, |
| collection_name=collection.name |
| ) |
| else: |
| return self.format_error_result("Invalid insert query format", QueryType.INSERT) |
| |
| except Exception as e: |
| return self.format_error_result(str(e), QueryType.INSERT) |
| |
| def _execute_update(self, collection, query: Dict, **kwargs) -> Dict[str, Any]: |
| """Execute update operation""" |
| try: |
| filter_query = query.get("filter", {}) |
| update_query = query.get("update", {}) |
| upsert = query.get("upsert", False) |
| multi = query.get("multi", False) |
| |
| if multi: |
| result = collection.update_many(filter_query, update_query, upsert=upsert) |
| else: |
| result = collection.update_one(filter_query, update_query, upsert=upsert) |
| |
| return self.format_query_result( |
| { |
| "matched_count": result.matched_count, |
| "modified_count": result.modified_count, |
| "upserted_id": str(result.upserted_id) if result.upserted_id else None |
| }, |
| QueryType.UPDATE, |
| collection_name=collection.name |
| ) |
| |
| except Exception as e: |
| return self.format_error_result(str(e), QueryType.UPDATE) |
| |
| def _execute_delete(self, collection, query: Dict, **kwargs) -> Dict[str, Any]: |
| """Execute delete operation""" |
| try: |
| filter_query = query.get("filter", query) |
| multi = query.get("multi", False) |
| |
| if multi: |
| result = collection.delete_many(filter_query) |
| else: |
| result = collection.delete_one(filter_query) |
| |
| return self.format_query_result( |
| {"deleted_count": result.deleted_count}, |
| QueryType.DELETE, |
| collection_name=collection.name |
| ) |
| |
| except Exception as e: |
| return self.format_error_result(str(e), QueryType.DELETE) |
| |
| def _execute_aggregate(self, collection, pipeline: List, **kwargs) -> Dict[str, Any]: |
| """Execute aggregation pipeline""" |
| try: |
| cursor = collection.aggregate(pipeline) |
| results = [] |
| |
| for doc in cursor: |
| |
| doc = json.loads(json_util.dumps(doc)) |
| results.append(doc) |
| |
| return self.format_query_result( |
| results, |
| QueryType.AGGREGATE, |
| collection_name=collection.name, |
| pipeline_stages=len(pipeline) |
| ) |
| |
| except Exception as e: |
| return self.format_error_result(str(e), QueryType.AGGREGATE) |
| |
|
|
| |
| def get_database_info(self) -> Dict[str, Any]: |
| """Get MongoDB database information""" |
| try: |
| if not self._is_initialized or self.database is None: |
| return self.format_error_result("Database not connected") |
| |
| |
| stats = self.database.command("dbStats") |
| |
| |
| server_info = self.client.server_info() |
| |
| info = { |
| "database_name": self.database_name, |
| "collections": stats.get("collections", 0), |
| "data_size": stats.get("dataSize", 0), |
| "storage_size": stats.get("storageSize", 0), |
| "indexes": stats.get("indexes", 0), |
| "index_size": stats.get("indexSize", 0), |
| "server_version": server_info.get("version", "Unknown"), |
| "server_type": server_info.get("type", "Unknown"), |
| "connection_string": self.connection_string, |
| "is_connected": self._is_initialized |
| } |
| |
| return self.format_query_result(info, QueryType.SELECT) |
| |
| except Exception as e: |
| return self.format_error_result(str(e)) |
| |
| def list_collections(self) -> List[str]: |
| """List all collections in the database""" |
| try: |
| if not self._is_initialized or self.database is None: |
| return [] |
| |
| return self.database.list_collection_names() |
| |
| except Exception as e: |
| logger.error(f"Error listing collections: {str(e)}") |
| return [] |
| |
| def get_collection_info(self, collection_name: str) -> Dict[str, Any]: |
| """Get information about a specific collection""" |
| try: |
| if not self._is_initialized or not self.database: |
| return self.format_error_result("Database not connected") |
| |
| collection = self.database[collection_name] |
| |
| |
| stats = self.database.command("collStats", collection_name) |
| |
| |
| indexes = list(collection.list_indexes()) |
| |
| |
| sample_docs = list(collection.find().limit(5)) |
| |
| info = { |
| "collection_name": collection_name, |
| "document_count": stats.get("count", 0), |
| "data_size": stats.get("size", 0), |
| "storage_size": stats.get("storageSize", 0), |
| "index_count": stats.get("nindexes", 0), |
| "indexes": [{"name": idx["name"], "keys": idx["key"]} for idx in indexes], |
| "sample_documents": sample_docs[:2] |
| } |
| |
| return self.format_query_result(info, QueryType.SELECT) |
| |
| except Exception as e: |
| return self.format_error_result(str(e)) |
| |
| def get_schema(self, collection_name: str = None) -> Dict[str, Any]: |
| """Get schema information for database or specific collection""" |
| try: |
| if not self._is_initialized or not self.database: |
| return self.format_error_result("Database not connected") |
| |
| if collection_name: |
| |
| collection = self.database[collection_name] |
| sample_docs = list(collection.find().limit(100)) |
| |
| if not sample_docs: |
| return self.format_query_result( |
| {"collection_name": collection_name, "schema": {}, "message": "No documents found"}, |
| QueryType.SELECT |
| ) |
| |
| |
| schema = self._infer_schema_from_documents(sample_docs) |
| |
| return self.format_query_result( |
| { |
| "collection_name": collection_name, |
| "schema": schema, |
| "sample_count": len(sample_docs) |
| }, |
| QueryType.SELECT |
| ) |
| else: |
| |
| collections = self.list_collections() |
| schemas = {} |
| |
| for coll_name in collections[:10]: |
| coll_schema = self.get_schema(coll_name) |
| if coll_schema.get("success"): |
| schemas[coll_name] = coll_schema.get("data", {}).get("schema", {}) |
| |
| return self.format_query_result( |
| {"database_name": self.database_name, "schemas": schemas}, |
| QueryType.SELECT |
| ) |
| |
| except Exception as e: |
| return self.format_error_result(str(e)) |
| |
| def _infer_schema_from_documents(self, documents: List[Dict]) -> Dict[str, Any]: |
| """Infer schema from a list of documents""" |
| if not documents: |
| return {} |
| |
| schema = {} |
| |
| for doc in documents: |
| self._update_schema_from_document(schema, doc) |
| |
| return schema |
| |
| def _update_schema_from_document(self, schema: Dict, document: Dict, path: str = ""): |
| """Recursively update schema from a document""" |
| for key, value in document.items(): |
| current_path = f"{path}.{key}" if path else key |
| |
| if isinstance(value, dict): |
| if current_path not in schema: |
| schema[current_path] = {"type": "object", "fields": {}} |
| self._update_schema_from_document(schema[current_path]["fields"], value, current_path) |
| elif isinstance(value, list): |
| if current_path not in schema: |
| schema[current_path] = {"type": "array", "element_types": set()} |
| |
| for item in value[:3]: |
| if isinstance(item, dict): |
| schema[current_path]["element_types"].add("object") |
| else: |
| schema[current_path]["element_types"].add(type(item).__name__) |
| schema[current_path]["element_types"] = list(schema[current_path]["element_types"]) |
| else: |
| if current_path not in schema: |
| schema[current_path] = {"type": type(value).__name__} |
| elif schema[current_path]["type"] != type(value).__name__: |
| |
| schema[current_path]["type"] = "mixed" |
| |
| def get_supported_query_types(self) -> List[QueryType]: |
| """Get MongoDB-specific supported query types""" |
| if self.read_only: |
| return [ |
| QueryType.SELECT, |
| QueryType.AGGREGATE |
| ] |
| else: |
| return [ |
| QueryType.SELECT, |
| QueryType.INSERT, |
| QueryType.UPDATE, |
| QueryType.DELETE, |
| QueryType.CREATE, |
| QueryType.DROP, |
| QueryType.AGGREGATE, |
| QueryType.INDEX |
| ] |
| |
| def get_capabilities(self) -> Dict[str, Any]: |
| """Get MongoDB-specific capabilities""" |
| base_capabilities = super().get_capabilities() |
| base_capabilities.update({ |
| "supports_aggregation": True, |
| "supports_full_text_search": True, |
| "supports_geospatial_queries": True, |
| "supports_change_streams": True, |
| "supports_transactions": True, |
| "supports_indexing": True, |
| "document_oriented": True, |
| "schema_flexible": True, |
| "read_only": self.read_only, |
| "write_operations_allowed": not self.read_only |
| }) |
| return base_capabilities |
|
|
|
|
| class MongoDBExecuteQueryTool(Tool): |
| name: str = "mongodb_execute_query" |
| description: str = "Execute MongoDB queries including find and aggregation pipelines (read-only operations)" |
| inputs: Dict[str, Dict[str, str]] = { |
| "query": { |
| "type": "string", |
| "description": "MongoDB query (JSON string for find, array for aggregation pipeline)" |
| }, |
| "query_type": { |
| "type": "string", |
| "description": "Type of query (select, aggregate) - auto-detected if not provided" |
| }, |
| "collection_name": { |
| "type": "string", |
| "description": "Collection name (required for all operations)" |
| } |
| } |
| required: Optional[List[str]] = ["query", "collection_name"] |
|
|
| def __init__(self, database: MongoDBDatabase = None): |
| super().__init__() |
| self.database = database |
|
|
| def __call__(self, query: str, query_type: str = None, collection_name: str = None) -> Dict[str, Any]: |
| """Execute a MongoDB query""" |
| try: |
| if not self.database: |
| return {"success": False, "error": "MongoDB database not initialized", "data": None} |
| |
| |
| parsed_query = self._parse_query(query) |
| |
| |
| query_type_enum = None |
| if query_type: |
| try: |
| query_type_enum = QueryType(query_type.lower()) |
| except ValueError: |
| return {"success": False, "error": f"Invalid query type: {query_type}", "data": None} |
| |
|
|
| |
| |
| result = self.database.execute_query( |
| query=parsed_query, |
| query_type=query_type_enum, |
| collection_name=collection_name |
| ) |
| |
| if result["success"]: |
| logger.info(f"Successfully executed MongoDB query on collection {collection_name}") |
| else: |
| logger.error(f"Failed to execute MongoDB query: {result.get('error', 'Unknown error')}") |
| |
| return result |
| |
| except Exception as e: |
| logger.error(f"Error in mongodb_execute_query tool: {str(e)}") |
| return {"success": False, "error": str(e), "data": None} |
| |
| def _parse_query(self, query: str) -> Union[str, Dict, List]: |
| """Parse query string into appropriate format""" |
| try: |
| import json |
| return json.loads(query) |
| except (json.JSONDecodeError, ValueError): |
| return query |
|
|
|
|
| class MongoDBFindTool(Tool): |
| name: str = "mongodb_find" |
| description: str = "Find documents in a MongoDB collection with filtering, projection, sorting, and pagination" |
| inputs: Dict[str, Dict[str, str]] = { |
| "collection_name": { |
| "type": "string", |
| "description": "Collection name to query" |
| }, |
| "filter": { |
| "type": "string", |
| "description": "MongoDB filter query (JSON string, e.g., '{\"age\": {\"$gt\": 18}}')" |
| }, |
| "projection": { |
| "type": "string", |
| "description": "Fields to include/exclude (JSON string, e.g., '{\"name\": 1, \"_id\": 0}')" |
| }, |
| "sort": { |
| "type": "string", |
| "description": "Sort criteria (JSON string, e.g., '{\"age\": -1}')" |
| }, |
| "limit": { |
| "type": "integer", |
| "description": "Maximum number of documents to return" |
| }, |
| "skip": { |
| "type": "integer", |
| "description": "Number of documents to skip" |
| } |
| } |
| required: Optional[List[str]] = ["collection_name"] |
|
|
| def __init__(self, database: MongoDBDatabase = None): |
| super().__init__() |
| self.database = database |
|
|
| def __call__(self, collection_name: str, filter: str = "{}", projection: str = "{}", |
| sort: str = None, limit: int = 0, skip: int = 0) -> Dict[str, Any]: |
| """Find documents in MongoDB collection""" |
| try: |
| if not self.database: |
| return {"success": False, "error": "MongoDB database not initialized", "data": None} |
| |
| |
| import json |
| filter_dict = json.loads(filter) if filter else {} |
| projection_dict = json.loads(projection) if projection else {} |
| sort_dict = json.loads(sort) if sort else None |
| |
| |
| query = { |
| "filter": filter_dict, |
| "projection": projection_dict, |
| "limit": limit, |
| "skip": skip |
| } |
| |
| if sort_dict: |
| query["sort"] = sort_dict |
| |
| |
| result = self.database.execute_query( |
| query=query, |
| query_type=QueryType.SELECT, |
| collection_name=collection_name |
| ) |
| |
| if result["success"]: |
| logger.info(f"Successfully found documents in collection {collection_name}") |
| else: |
| logger.error(f"Failed to find documents: {result.get('error', 'Unknown error')}") |
| |
| return result |
| |
| except Exception as e: |
| logger.error(f"Error in mongodb_find tool: {str(e)}") |
| return {"success": False, "error": str(e), "data": None} |
|
|
|
|
| class MongoDBUpdateTool(Tool): |
| name: str = "mongodb_update" |
| description: str = "Update documents in a MongoDB collection" |
| inputs: Dict[str, Dict[str, str]] = { |
| "collection_name": { |
| "type": "string", |
| "description": "Collection name to update" |
| }, |
| "filter": { |
| "type": "string", |
| "description": "Filter to match documents to update (JSON string)" |
| }, |
| "update": { |
| "type": "string", |
| "description": "Update operations (JSON string, e.g., '{\"$set\": {\"status\": \"active\"}}')" |
| }, |
| "upsert": { |
| "type": "boolean", |
| "description": "Create document if it doesn't exist" |
| }, |
| "multi": { |
| "type": "boolean", |
| "description": "Update multiple documents (default: false)" |
| } |
| } |
| required: Optional[List[str]] = ["collection_name", "filter", "update"] |
|
|
| def __init__(self, database: MongoDBDatabase = None): |
| super().__init__() |
| self.database = database |
|
|
| def __call__(self, collection_name: str, filter: str, update: str, |
| upsert: bool = False, multi: bool = False) -> Dict[str, Any]: |
| """Update documents in MongoDB collection""" |
| try: |
| if not self.database: |
| return {"success": False, "error": "MongoDB database not initialized", "data": None} |
| |
| |
| import json |
| filter_dict = json.loads(filter) |
| update_dict = json.loads(update) |
| |
| |
| query = { |
| "filter": filter_dict, |
| "update": update_dict, |
| "upsert": upsert, |
| "multi": multi |
| } |
| |
| |
| result = self.database.execute_query( |
| query=query, |
| query_type=QueryType.UPDATE, |
| collection_name=collection_name |
| ) |
| |
| if result["success"]: |
| logger.info(f"Successfully updated documents in collection {collection_name}") |
| else: |
| logger.error(f"Failed to update documents: {result.get('error', 'Unknown error')}") |
| |
| return result |
| |
| except Exception as e: |
| logger.error(f"Error in mongodb_update tool: {str(e)}") |
| return {"success": False, "error": str(e), "data": None} |
|
|
|
|
| class MongoDBDeleteTool(Tool): |
| name: str = "mongodb_delete" |
| description: str = "Delete documents from a MongoDB collection" |
| inputs: Dict[str, Dict[str, str]] = { |
| "collection_name": { |
| "type": "string", |
| "description": "Collection name to delete from" |
| }, |
| "filter": { |
| "type": "string", |
| "description": "Filter to match documents to delete (JSON string)" |
| }, |
| "multi": { |
| "type": "boolean", |
| "description": "Delete multiple documents (default: false)" |
| } |
| } |
| required: Optional[List[str]] = ["collection_name", "filter"] |
|
|
| def __init__(self, database: MongoDBDatabase = None): |
| super().__init__() |
| self.database = database |
|
|
| def __call__(self, collection_name: str, filter: str, multi: bool = False) -> Dict[str, Any]: |
| """Delete documents from MongoDB collection""" |
| try: |
| if not self.database: |
| return {"success": False, "error": "MongoDB database not initialized", "data": None} |
| |
| |
| import json |
| filter_dict = json.loads(filter) |
| |
| |
| query = { |
| "filter": filter_dict, |
| "multi": multi |
| } |
| |
| |
| result = self.database.execute_query( |
| query=query, |
| query_type=QueryType.DELETE, |
| collection_name=collection_name |
| ) |
| |
| if result["success"]: |
| logger.info(f"Successfully deleted documents from collection {collection_name}") |
| else: |
| logger.error(f"Failed to delete documents: {result.get('error', 'Unknown error')}") |
| |
| return result |
| |
| except Exception as e: |
| logger.error(f"Error in mongodb_delete tool: {str(e)}") |
| return {"success": False, "error": str(e), "data": None} |
|
|
|
|
| class MongoDBInfoTool(Tool): |
| name: str = "mongodb_info" |
| description: str = "Get MongoDB database and collection information" |
| inputs: Dict[str, Dict[str, str]] = { |
| "info_type": { |
| "type": "string", |
| "description": "Type of information (database, collections, collection, schema, capabilities)" |
| }, |
| "collection_name": { |
| "type": "string", |
| "description": "Collection name for collection-specific info (optional)" |
| } |
| } |
| required: Optional[List[str]] = [] |
|
|
| def __init__(self, database: MongoDBDatabase = None): |
| super().__init__() |
| self.database = database |
|
|
| def __call__(self, info_type: str = "database", collection_name: str = None) -> Dict[str, Any]: |
| """Get MongoDB information""" |
| try: |
| if not self.database: |
| return {"success": False, "error": "MongoDB database not initialized", "data": None} |
| |
| info_type = info_type.lower() |
| |
| if info_type == "database": |
| result = self.database.get_database_info() |
| elif info_type == "collections": |
| collections = self.database.list_collections() |
| result = {"success": True, "data": collections, "collection_count": len(collections)} |
| elif info_type == "collection" and collection_name: |
| result = self.database.get_collection_info(collection_name) |
| elif info_type == "schema": |
| result = self.database.get_schema(collection_name) |
| elif info_type == "capabilities": |
| result = {"success": True, "data": self.database.get_capabilities()} |
| else: |
| return {"success": False, "error": f"Invalid info type: {info_type}", "data": None} |
| |
| if result["success"]: |
| logger.info(f"Successfully retrieved {info_type} information") |
| else: |
| logger.error(f"Failed to retrieve {info_type} information: {result.get('error', 'Unknown error')}") |
| |
| return result |
| |
| except Exception as e: |
| logger.error(f"Error in mongodb_info tool: {str(e)}") |
| return {"success": False, "error": str(e), "data": None} |
|
|
|
|
| class MongoDBToolkit(Toolkit): |
| """ |
| MongoDB-specific toolkit with simplified design. |
| Automatically handles remote, local file-based, or new database creation. |
| """ |
| |
| def __init__(self, |
| name: str = "MongoDBToolkit", |
| connection_string: str = None, |
| database_name: str = None, |
| local_path: str = None, |
| auto_save: bool = True, |
| read_only: bool = False, |
| **kwargs): |
| """ |
| Initialize the MongoDB toolkit. |
| |
| Args: |
| name: Name of the toolkit |
| connection_string: MongoDB connection string (for remote/existing) |
| database_name: Name of the database to use |
| local_path: Path for local file-based database |
| auto_save: Automatically save changes to local files |
| read_only: If True, only read operations are allowed (no insert, update, delete) |
| **kwargs: Additional connection parameters |
| """ |
| |
| database = MongoDBDatabase( |
| connection_string=connection_string, |
| database_name=database_name, |
| local_path=local_path, |
| auto_save=auto_save, |
| read_only=read_only, |
| **kwargs |
| ) |
| |
| |
| if read_only: |
| |
| tools = [ |
| MongoDBExecuteQueryTool(database=database), |
| MongoDBFindTool(database=database), |
| MongoDBInfoTool(database=database) |
| ] |
| else: |
| |
| tools = [ |
| MongoDBExecuteQueryTool(database=database), |
| MongoDBFindTool(database=database), |
| MongoDBUpdateTool(database=database), |
| MongoDBDeleteTool(database=database), |
| MongoDBInfoTool(database=database) |
| ] |
| |
| |
| super().__init__(name=name, tools=tools) |
| |
| |
| self.database = database |
| self.connection_string = connection_string |
| self.database_name = database_name |
| self.local_path = local_path |
| self.auto_save = auto_save |
| |
| |
| import atexit |
| atexit.register(self._cleanup) |
| |
| def _cleanup(self): |
| """Cleanup function called when program exits""" |
| try: |
| if self.database.is_local_database and self.database.auto_save: |
| logger.info("Auto-saving local database before exit...") |
| collections = self.database.list_collections() |
| for collection_name in collections: |
| self.database._save_collection_to_file(collection_name) |
| |
| if self.database: |
| self.database.disconnect() |
| logger.info("Disconnected from MongoDB database") |
| |
| except Exception as e: |
| logger.warning(f"Error during cleanup: {str(e)}") |
| |
| def get_capabilities(self) -> Dict[str, Any]: |
| """Get MongoDB-specific capabilities""" |
| if self.database: |
| capabilities = self.database.get_capabilities() |
| capabilities.update({ |
| "is_local_database": self.database.is_local_database, |
| "local_path": str(self.database.local_path) if self.database.local_path else None, |
| "auto_save": self.database.auto_save, |
| "read_only": self.database.read_only |
| }) |
| return capabilities |
| return {"error": "MongoDB database not initialized"} |
| |
| def connect(self) -> bool: |
| """Connect to MongoDB""" |
| return self.database.connect() if self.database else False |
| |
| def disconnect(self) -> bool: |
| """Disconnect from MongoDB""" |
| return self.database.disconnect() if self.database else False |
| |
| def test_connection(self) -> bool: |
| """Test MongoDB connection""" |
| return self.database.test_connection() if self.database else False |
| |
| def get_database(self) -> MongoDBDatabase: |
| """Get the underlying MongoDB database instance""" |
| return self.database |
| |
| def get_local_info(self) -> Dict[str, Any]: |
| """Get information about local database setup""" |
| return { |
| "is_local_database": self.database.is_local_database, |
| "local_path": str(self.database.local_path) if self.database.local_path else None, |
| "auto_save": self.database.auto_save, |
| "read_only": self.database.read_only, |
| "database_name": self.database_name, |
| "connection_string": self.connection_string |
| } if self.database else {"error": "Database not initialized"} |