| from abc import ABC, abstractmethod |
| from collections.abc import Mapping |
| from copy import deepcopy |
| from enum import Enum |
| from typing import TYPE_CHECKING, Any, Optional, Union |
|
|
| from pydantic import BaseModel, ConfigDict, field_validator |
| from pydantic_core.core_schema import ValidationInfo |
|
|
| from core.app.entities.app_invoke_entities import InvokeFrom |
| from core.tools.entities.tool_entities import ( |
| ToolDescription, |
| ToolIdentity, |
| ToolInvokeFrom, |
| ToolInvokeMessage, |
| ToolParameter, |
| ToolProviderType, |
| ToolRuntimeImageVariable, |
| ToolRuntimeVariable, |
| ToolRuntimeVariablePool, |
| ) |
| from core.tools.tool_file_manager import ToolFileManager |
|
|
| if TYPE_CHECKING: |
| from core.file.models import File |
|
|
|
|
| class Tool(BaseModel, ABC): |
| identity: Optional[ToolIdentity] = None |
| parameters: Optional[list[ToolParameter]] = None |
| description: Optional[ToolDescription] = None |
| is_team_authorization: bool = False |
|
|
| |
| model_config = ConfigDict(protected_namespaces=()) |
|
|
| @field_validator("parameters", mode="before") |
| @classmethod |
| def set_parameters(cls, v, validation_info: ValidationInfo) -> list[ToolParameter]: |
| return v or [] |
|
|
| class Runtime(BaseModel): |
| """ |
| Meta data of a tool call processing |
| """ |
|
|
| def __init__(self, **data: Any): |
| super().__init__(**data) |
| if not self.runtime_parameters: |
| self.runtime_parameters = {} |
|
|
| tenant_id: Optional[str] = None |
| tool_id: Optional[str] = None |
| invoke_from: Optional[InvokeFrom] = None |
| tool_invoke_from: Optional[ToolInvokeFrom] = None |
| credentials: Optional[dict[str, Any]] = None |
| runtime_parameters: Optional[dict[str, Any]] = None |
|
|
| runtime: Optional[Runtime] = None |
| variables: Optional[ToolRuntimeVariablePool] = None |
|
|
| def __init__(self, **data: Any): |
| super().__init__(**data) |
|
|
| class VariableKey(str, Enum): |
| IMAGE = "image" |
| DOCUMENT = "document" |
| VIDEO = "video" |
| AUDIO = "audio" |
| CUSTOM = "custom" |
|
|
| def fork_tool_runtime(self, runtime: dict[str, Any]) -> "Tool": |
| """ |
| fork a new tool with meta data |
| |
| :param meta: the meta data of a tool call processing, tenant_id is required |
| :return: the new tool |
| """ |
| return self.__class__( |
| identity=self.identity.model_copy() if self.identity else None, |
| parameters=self.parameters.copy() if self.parameters else None, |
| description=self.description.model_copy() if self.description else None, |
| runtime=Tool.Runtime(**runtime), |
| ) |
|
|
| @abstractmethod |
| def tool_provider_type(self) -> ToolProviderType: |
| """ |
| get the tool provider type |
| |
| :return: the tool provider type |
| """ |
|
|
| def load_variables(self, variables: ToolRuntimeVariablePool): |
| """ |
| load variables from database |
| |
| :param conversation_id: the conversation id |
| """ |
| self.variables = variables |
|
|
| def set_image_variable(self, variable_name: str, image_key: str) -> None: |
| """ |
| set an image variable |
| """ |
| if not self.variables: |
| return |
|
|
| self.variables.set_file(self.identity.name, variable_name, image_key) |
|
|
| def set_text_variable(self, variable_name: str, text: str) -> None: |
| """ |
| set a text variable |
| """ |
| if not self.variables: |
| return |
|
|
| self.variables.set_text(self.identity.name, variable_name, text) |
|
|
| def get_variable(self, name: Union[str, Enum]) -> Optional[ToolRuntimeVariable]: |
| """ |
| get a variable |
| |
| :param name: the name of the variable |
| :return: the variable |
| """ |
| if not self.variables: |
| return None |
|
|
| if isinstance(name, Enum): |
| name = name.value |
|
|
| for variable in self.variables.pool: |
| if variable.name == name: |
| return variable |
|
|
| return None |
|
|
| def get_default_image_variable(self) -> Optional[ToolRuntimeVariable]: |
| """ |
| get the default image variable |
| |
| :return: the image variable |
| """ |
| if not self.variables: |
| return None |
|
|
| return self.get_variable(self.VariableKey.IMAGE) |
|
|
| def get_variable_file(self, name: Union[str, Enum]) -> Optional[bytes]: |
| """ |
| get a variable file |
| |
| :param name: the name of the variable |
| :return: the variable file |
| """ |
| variable = self.get_variable(name) |
| if not variable: |
| return None |
|
|
| if not isinstance(variable, ToolRuntimeImageVariable): |
| return None |
|
|
| message_file_id = variable.value |
| |
| file_binary = ToolFileManager.get_file_binary_by_message_file_id(message_file_id) |
| if not file_binary: |
| return None |
|
|
| return file_binary[0] |
|
|
| def list_variables(self) -> list[ToolRuntimeVariable]: |
| """ |
| list all variables |
| |
| :return: the variables |
| """ |
| if not self.variables: |
| return [] |
|
|
| return self.variables.pool |
|
|
| def list_default_image_variables(self) -> list[ToolRuntimeVariable]: |
| """ |
| list all image variables |
| |
| :return: the image variables |
| """ |
| if not self.variables: |
| return [] |
|
|
| result = [] |
|
|
| for variable in self.variables.pool: |
| if variable.name.startswith(self.VariableKey.IMAGE.value): |
| result.append(variable) |
|
|
| return result |
|
|
| def invoke(self, user_id: str, tool_parameters: Mapping[str, Any]) -> list[ToolInvokeMessage]: |
| |
| |
| if self.runtime.runtime_parameters: |
| tool_parameters.update(self.runtime.runtime_parameters) |
|
|
| |
| tool_parameters = self._transform_tool_parameters_type(tool_parameters) |
|
|
| result = self._invoke( |
| user_id=user_id, |
| tool_parameters=tool_parameters, |
| ) |
|
|
| if not isinstance(result, list): |
| result = [result] |
|
|
| return result |
|
|
| def _transform_tool_parameters_type(self, tool_parameters: Mapping[str, Any]) -> dict[str, Any]: |
| """ |
| Transform tool parameters type |
| """ |
| |
| result = deepcopy(tool_parameters) |
| for parameter in self.parameters or []: |
| if parameter.name in tool_parameters: |
| result[parameter.name] = parameter.type.cast_value(tool_parameters[parameter.name]) |
|
|
| return result |
|
|
| @abstractmethod |
| def _invoke( |
| self, user_id: str, tool_parameters: dict[str, Any] |
| ) -> Union[ToolInvokeMessage, list[ToolInvokeMessage]]: |
| pass |
|
|
| def validate_credentials(self, credentials: dict[str, Any], parameters: dict[str, Any]) -> None: |
| """ |
| validate the credentials |
| |
| :param credentials: the credentials |
| :param parameters: the parameters |
| """ |
| pass |
|
|
| def get_runtime_parameters(self) -> list[ToolParameter]: |
| """ |
| get the runtime parameters |
| |
| interface for developer to dynamic change the parameters of a tool depends on the variables pool |
| |
| :return: the runtime parameters |
| """ |
| return self.parameters or [] |
|
|
| def get_all_runtime_parameters(self) -> list[ToolParameter]: |
| """ |
| get all runtime parameters |
| |
| :return: all runtime parameters |
| """ |
| parameters = self.parameters or [] |
| parameters = parameters.copy() |
| user_parameters = self.get_runtime_parameters() or [] |
| user_parameters = user_parameters.copy() |
|
|
| |
| for parameter in user_parameters: |
| |
| found = False |
| for tool_parameter in parameters: |
| if tool_parameter.name == parameter.name: |
| found = True |
| break |
|
|
| if found: |
| |
| tool_parameter.type = parameter.type |
| tool_parameter.form = parameter.form |
| tool_parameter.required = parameter.required |
| tool_parameter.default = parameter.default |
| tool_parameter.options = parameter.options |
| tool_parameter.llm_description = parameter.llm_description |
| else: |
| |
| parameters.append(parameter) |
|
|
| return parameters |
|
|
| def create_image_message(self, image: str, save_as: str = "") -> ToolInvokeMessage: |
| """ |
| create an image message |
| |
| :param image: the url of the image |
| :return: the image message |
| """ |
| return ToolInvokeMessage(type=ToolInvokeMessage.MessageType.IMAGE, message=image, save_as=save_as) |
|
|
| def create_file_message(self, file: "File") -> ToolInvokeMessage: |
| return ToolInvokeMessage(type=ToolInvokeMessage.MessageType.FILE, message="", meta={"file": file}, save_as="") |
|
|
| def create_link_message(self, link: str, save_as: str = "") -> ToolInvokeMessage: |
| """ |
| create a link message |
| |
| :param link: the url of the link |
| :return: the link message |
| """ |
| return ToolInvokeMessage(type=ToolInvokeMessage.MessageType.LINK, message=link, save_as=save_as) |
|
|
| def create_text_message(self, text: str, save_as: str = "") -> ToolInvokeMessage: |
| """ |
| create a text message |
| |
| :param text: the text |
| :return: the text message |
| """ |
| return ToolInvokeMessage(type=ToolInvokeMessage.MessageType.TEXT, message=text, save_as=save_as) |
|
|
| def create_blob_message(self, blob: bytes, meta: Optional[dict] = None, save_as: str = "") -> ToolInvokeMessage: |
| """ |
| create a blob message |
| |
| :param blob: the blob |
| :return: the blob message |
| """ |
| return ToolInvokeMessage(type=ToolInvokeMessage.MessageType.BLOB, message=blob, meta=meta, save_as=save_as) |
|
|
| def create_json_message(self, object: dict) -> ToolInvokeMessage: |
| """ |
| create a json message |
| """ |
| return ToolInvokeMessage(type=ToolInvokeMessage.MessageType.JSON, message=object) |
|
|