| | """Chain that takes in an input and produces an action and action input.""" |
| |
|
| | from __future__ import annotations |
| |
|
| | import asyncio |
| | import json |
| | import logging |
| | import time |
| | from abc import abstractmethod |
| | from pathlib import Path |
| | from typing import ( |
| | Any, |
| | AsyncIterator, |
| | Callable, |
| | Dict, |
| | Iterator, |
| | List, |
| | Optional, |
| | Sequence, |
| | Tuple, |
| | Union, |
| | cast, |
| | ) |
| |
|
| | import yaml |
| | from langchain_core._api import deprecated |
| | from langchain_core.agents import AgentAction, AgentFinish, AgentStep |
| | from langchain_core.callbacks import ( |
| | AsyncCallbackManagerForChainRun, |
| | AsyncCallbackManagerForToolRun, |
| | BaseCallbackManager, |
| | CallbackManagerForChainRun, |
| | CallbackManagerForToolRun, |
| | Callbacks, |
| | ) |
| | from langchain_core.exceptions import OutputParserException |
| | from langchain_core.language_models import BaseLanguageModel |
| | from langchain_core.messages import BaseMessage |
| | from langchain_core.output_parsers import BaseOutputParser |
| | from langchain_core.prompts import BasePromptTemplate |
| | from langchain_core.prompts.few_shot import FewShotPromptTemplate |
| | from langchain_core.prompts.prompt import PromptTemplate |
| | from langchain_core.runnables import Runnable, RunnableConfig, ensure_config |
| | from langchain_core.runnables.utils import AddableDict |
| | from langchain_core.tools import BaseTool |
| | from langchain_core.utils.input import get_color_mapping |
| | from pydantic import BaseModel, ConfigDict, model_validator |
| | from typing_extensions import Self |
| |
|
| | from langchain._api.deprecation import AGENT_DEPRECATION_WARNING |
| | from langchain.agents.agent_iterator import AgentExecutorIterator |
| | from langchain.agents.agent_types import AgentType |
| | from langchain.agents.tools import InvalidTool |
| | from langchain.chains.base import Chain |
| | from langchain.chains.llm import LLMChain |
| | from langchain.utilities.asyncio import asyncio_timeout |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class BaseSingleActionAgent(BaseModel): |
| | """Base Single Action Agent class.""" |
| |
|
| | @property |
| | def return_values(self) -> List[str]: |
| | """Return values of the agent.""" |
| | return ["output"] |
| |
|
| | def get_allowed_tools(self) -> Optional[List[str]]: |
| | return None |
| |
|
| | @abstractmethod |
| | def plan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[AgentAction, AgentFinish]: |
| | """Given input, decided what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Action specifying what tool to use. |
| | """ |
| |
|
| | @abstractmethod |
| | async def aplan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[AgentAction, AgentFinish]: |
| | """Async given input, decided what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Action specifying what tool to use. |
| | """ |
| |
|
| | @property |
| | @abstractmethod |
| | def input_keys(self) -> List[str]: |
| | """Return the input keys. |
| | |
| | :meta private: |
| | """ |
| |
|
| | def return_stopped_response( |
| | self, |
| | early_stopping_method: str, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | **kwargs: Any, |
| | ) -> AgentFinish: |
| | """Return response when agent has been stopped due to max iterations. |
| | |
| | Args: |
| | early_stopping_method: Method to use for early stopping. |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | AgentFinish: Agent finish object. |
| | |
| | Raises: |
| | ValueError: If `early_stopping_method` is not supported. |
| | """ |
| | if early_stopping_method == "force": |
| | |
| | return AgentFinish( |
| | {"output": "Agent stopped due to iteration limit or time limit."}, "" |
| | ) |
| | else: |
| | raise ValueError( |
| | f"Got unsupported early_stopping_method `{early_stopping_method}`" |
| | ) |
| |
|
| | @classmethod |
| | def from_llm_and_tools( |
| | cls, |
| | llm: BaseLanguageModel, |
| | tools: Sequence[BaseTool], |
| | callback_manager: Optional[BaseCallbackManager] = None, |
| | **kwargs: Any, |
| | ) -> BaseSingleActionAgent: |
| | """Construct an agent from an LLM and tools. |
| | |
| | Args: |
| | llm: Language model to use. |
| | tools: Tools to use. |
| | callback_manager: Callback manager to use. |
| | kwargs: Additional arguments. |
| | |
| | Returns: |
| | BaseSingleActionAgent: Agent object. |
| | """ |
| | raise NotImplementedError |
| |
|
| | @property |
| | def _agent_type(self) -> str: |
| | """Return Identifier of an agent type.""" |
| | raise NotImplementedError |
| |
|
| | def dict(self, **kwargs: Any) -> Dict: |
| | """Return dictionary representation of agent. |
| | |
| | Returns: |
| | Dict: Dictionary representation of agent. |
| | """ |
| | _dict = super().model_dump() |
| | try: |
| | _type = self._agent_type |
| | except NotImplementedError: |
| | _type = None |
| | if isinstance(_type, AgentType): |
| | _dict["_type"] = str(_type.value) |
| | elif _type is not None: |
| | _dict["_type"] = _type |
| | return _dict |
| |
|
| | def save(self, file_path: Union[Path, str]) -> None: |
| | """Save the agent. |
| | |
| | Args: |
| | file_path: Path to file to save the agent to. |
| | |
| | Example: |
| | .. code-block:: python |
| | |
| | # If working with agent executor |
| | agent.agent.save(file_path="path/agent.yaml") |
| | """ |
| | |
| | if isinstance(file_path, str): |
| | save_path = Path(file_path) |
| | else: |
| | save_path = file_path |
| |
|
| | directory_path = save_path.parent |
| | directory_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | |
| | agent_dict = self.dict() |
| | if "_type" not in agent_dict: |
| | raise NotImplementedError(f"Agent {self} does not support saving") |
| |
|
| | if save_path.suffix == ".json": |
| | with open(file_path, "w") as f: |
| | json.dump(agent_dict, f, indent=4) |
| | elif save_path.suffix.endswith((".yaml", ".yml")): |
| | with open(file_path, "w") as f: |
| | yaml.dump(agent_dict, f, default_flow_style=False) |
| | else: |
| | raise ValueError(f"{save_path} must be json or yaml") |
| |
|
| | def tool_run_logging_kwargs(self) -> Dict: |
| | """Return logging kwargs for tool run.""" |
| | return {} |
| |
|
| |
|
| | class BaseMultiActionAgent(BaseModel): |
| | """Base Multi Action Agent class.""" |
| |
|
| | @property |
| | def return_values(self) -> List[str]: |
| | """Return values of the agent.""" |
| | return ["output"] |
| |
|
| | def get_allowed_tools(self) -> Optional[List[str]]: |
| | """Get allowed tools. |
| | |
| | Returns: |
| | Optional[List[str]]: Allowed tools. |
| | """ |
| | return None |
| |
|
| | @abstractmethod |
| | def plan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[List[AgentAction], AgentFinish]: |
| | """Given input, decided what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with the observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Actions specifying what tool to use. |
| | """ |
| |
|
| | @abstractmethod |
| | async def aplan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[List[AgentAction], AgentFinish]: |
| | """Async given input, decided what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with the observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Actions specifying what tool to use. |
| | """ |
| |
|
| | @property |
| | @abstractmethod |
| | def input_keys(self) -> List[str]: |
| | """Return the input keys. |
| | |
| | :meta private: |
| | """ |
| |
|
| | def return_stopped_response( |
| | self, |
| | early_stopping_method: str, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | **kwargs: Any, |
| | ) -> AgentFinish: |
| | """Return response when agent has been stopped due to max iterations. |
| | |
| | Args: |
| | early_stopping_method: Method to use for early stopping. |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | AgentFinish: Agent finish object. |
| | |
| | Raises: |
| | ValueError: If `early_stopping_method` is not supported. |
| | """ |
| | if early_stopping_method == "force": |
| | |
| | return AgentFinish({"output": "Agent stopped due to max iterations."}, "") |
| | else: |
| | raise ValueError( |
| | f"Got unsupported early_stopping_method `{early_stopping_method}`" |
| | ) |
| |
|
| | @property |
| | def _agent_type(self) -> str: |
| | """Return Identifier of an agent type.""" |
| | raise NotImplementedError |
| |
|
| | def dict(self, **kwargs: Any) -> Dict: |
| | """Return dictionary representation of agent.""" |
| | _dict = super().model_dump() |
| | try: |
| | _dict["_type"] = str(self._agent_type) |
| | except NotImplementedError: |
| | pass |
| | return _dict |
| |
|
| | def save(self, file_path: Union[Path, str]) -> None: |
| | """Save the agent. |
| | |
| | Args: |
| | file_path: Path to file to save the agent to. |
| | |
| | Raises: |
| | NotImplementedError: If agent does not support saving. |
| | ValueError: If file_path is not json or yaml. |
| | |
| | Example: |
| | .. code-block:: python |
| | |
| | # If working with agent executor |
| | agent.agent.save(file_path="path/agent.yaml") |
| | """ |
| | |
| | if isinstance(file_path, str): |
| | save_path = Path(file_path) |
| | else: |
| | save_path = file_path |
| |
|
| | |
| | agent_dict = self.dict() |
| | if "_type" not in agent_dict: |
| | raise NotImplementedError(f"Agent {self} does not support saving.") |
| |
|
| | directory_path = save_path.parent |
| | directory_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | if save_path.suffix == ".json": |
| | with open(file_path, "w") as f: |
| | json.dump(agent_dict, f, indent=4) |
| | elif save_path.suffix.endswith((".yaml", ".yml")): |
| | with open(file_path, "w") as f: |
| | yaml.dump(agent_dict, f, default_flow_style=False) |
| | else: |
| | raise ValueError(f"{save_path} must be json or yaml") |
| |
|
| | def tool_run_logging_kwargs(self) -> Dict: |
| | """Return logging kwargs for tool run.""" |
| |
|
| | return {} |
| |
|
| |
|
| | class AgentOutputParser(BaseOutputParser[Union[AgentAction, AgentFinish]]): |
| | """Base class for parsing agent output into agent action/finish.""" |
| |
|
| | @abstractmethod |
| | def parse(self, text: str) -> Union[AgentAction, AgentFinish]: |
| | """Parse text into agent action/finish.""" |
| |
|
| |
|
| | class MultiActionAgentOutputParser( |
| | BaseOutputParser[Union[List[AgentAction], AgentFinish]] |
| | ): |
| | """Base class for parsing agent output into agent actions/finish. |
| | |
| | This is used for agents that can return multiple actions. |
| | """ |
| |
|
| | @abstractmethod |
| | def parse(self, text: str) -> Union[List[AgentAction], AgentFinish]: |
| | """Parse text into agent actions/finish. |
| | |
| | Args: |
| | text: Text to parse. |
| | |
| | Returns: |
| | Union[List[AgentAction], AgentFinish]: |
| | List of agent actions or agent finish. |
| | """ |
| |
|
| |
|
| | class RunnableAgent(BaseSingleActionAgent): |
| | """Agent powered by Runnables.""" |
| |
|
| | runnable: Runnable[dict, Union[AgentAction, AgentFinish]] |
| | """Runnable to call to get agent action.""" |
| | input_keys_arg: List[str] = [] |
| | return_keys_arg: List[str] = [] |
| | stream_runnable: bool = True |
| | """Whether to stream from the runnable or not. |
| | |
| | If True then underlying LLM is invoked in a streaming fashion to make it possible |
| | to get access to the individual LLM tokens when using stream_log with the Agent |
| | Executor. If False then LLM is invoked in a non-streaming fashion and |
| | individual LLM tokens will not be available in stream_log. |
| | """ |
| |
|
| | model_config = ConfigDict( |
| | arbitrary_types_allowed=True, |
| | ) |
| |
|
| | @property |
| | def return_values(self) -> List[str]: |
| | """Return values of the agent.""" |
| | return self.return_keys_arg |
| |
|
| | @property |
| | def input_keys(self) -> List[str]: |
| | """Return the input keys.""" |
| | return self.input_keys_arg |
| |
|
| | def plan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[AgentAction, AgentFinish]: |
| | """Based on past history and current inputs, decide what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with the observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Action specifying what tool to use. |
| | """ |
| | inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} |
| | final_output: Any = None |
| | if self.stream_runnable: |
| | |
| | |
| | |
| | |
| | |
| | |
| | for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}): |
| | if final_output is None: |
| | final_output = chunk |
| | else: |
| | final_output += chunk |
| | else: |
| | final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks}) |
| |
|
| | return final_output |
| |
|
| | async def aplan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[ |
| | AgentAction, |
| | AgentFinish, |
| | ]: |
| | """Async based on past history and current inputs, decide what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Action specifying what tool to use. |
| | """ |
| | inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} |
| | final_output: Any = None |
| | if self.stream_runnable: |
| | |
| | |
| | |
| | |
| | |
| | |
| | async for chunk in self.runnable.astream( |
| | inputs, config={"callbacks": callbacks} |
| | ): |
| | if final_output is None: |
| | final_output = chunk |
| | else: |
| | final_output += chunk |
| | else: |
| | final_output = await self.runnable.ainvoke( |
| | inputs, config={"callbacks": callbacks} |
| | ) |
| | return final_output |
| |
|
| |
|
| | class RunnableMultiActionAgent(BaseMultiActionAgent): |
| | """Agent powered by Runnables.""" |
| |
|
| | runnable: Runnable[dict, Union[List[AgentAction], AgentFinish]] |
| | """Runnable to call to get agent actions.""" |
| | input_keys_arg: List[str] = [] |
| | return_keys_arg: List[str] = [] |
| | stream_runnable: bool = True |
| | """Whether to stream from the runnable or not. |
| | |
| | If True then underlying LLM is invoked in a streaming fashion to make it possible |
| | to get access to the individual LLM tokens when using stream_log with the Agent |
| | Executor. If False then LLM is invoked in a non-streaming fashion and |
| | individual LLM tokens will not be available in stream_log. |
| | """ |
| |
|
| | model_config = ConfigDict( |
| | arbitrary_types_allowed=True, |
| | ) |
| |
|
| | @property |
| | def return_values(self) -> List[str]: |
| | """Return values of the agent.""" |
| | return self.return_keys_arg |
| |
|
| | @property |
| | def input_keys(self) -> List[str]: |
| | """Return the input keys. |
| | |
| | Returns: |
| | List of input keys. |
| | """ |
| | return self.input_keys_arg |
| |
|
| | def plan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[ |
| | List[AgentAction], |
| | AgentFinish, |
| | ]: |
| | """Based on past history and current inputs, decide what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with the observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Action specifying what tool to use. |
| | """ |
| | inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} |
| | final_output: Any = None |
| | if self.stream_runnable: |
| | |
| | |
| | |
| | |
| | |
| | |
| | for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}): |
| | if final_output is None: |
| | final_output = chunk |
| | else: |
| | final_output += chunk |
| | else: |
| | final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks}) |
| |
|
| | return final_output |
| |
|
| | async def aplan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[ |
| | List[AgentAction], |
| | AgentFinish, |
| | ]: |
| | """Async based on past history and current inputs, decide what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Action specifying what tool to use. |
| | """ |
| | inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} |
| | final_output: Any = None |
| | if self.stream_runnable: |
| | |
| | |
| | |
| | |
| | |
| | |
| | async for chunk in self.runnable.astream( |
| | inputs, config={"callbacks": callbacks} |
| | ): |
| | if final_output is None: |
| | final_output = chunk |
| | else: |
| | final_output += chunk |
| | else: |
| | final_output = await self.runnable.ainvoke( |
| | inputs, config={"callbacks": callbacks} |
| | ) |
| |
|
| | return final_output |
| |
|
| |
|
| | @deprecated( |
| | "0.1.0", |
| | message=AGENT_DEPRECATION_WARNING, |
| | removal="1.0", |
| | ) |
| | class LLMSingleActionAgent(BaseSingleActionAgent): |
| | """Base class for single action agents.""" |
| |
|
| | llm_chain: LLMChain |
| | """LLMChain to use for agent.""" |
| | output_parser: AgentOutputParser |
| | """Output parser to use for agent.""" |
| | stop: List[str] |
| | """List of strings to stop on.""" |
| |
|
| | @property |
| | def input_keys(self) -> List[str]: |
| | """Return the input keys. |
| | |
| | Returns: |
| | List of input keys. |
| | """ |
| | return list(set(self.llm_chain.input_keys) - {"intermediate_steps"}) |
| |
|
| | def dict(self, **kwargs: Any) -> Dict: |
| | """Return dictionary representation of agent.""" |
| | _dict = super().dict() |
| | del _dict["output_parser"] |
| | return _dict |
| |
|
| | def plan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[AgentAction, AgentFinish]: |
| | """Given input, decided what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with the observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Action specifying what tool to use. |
| | """ |
| | output = self.llm_chain.run( |
| | intermediate_steps=intermediate_steps, |
| | stop=self.stop, |
| | callbacks=callbacks, |
| | **kwargs, |
| | ) |
| | return self.output_parser.parse(output) |
| |
|
| | async def aplan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[AgentAction, AgentFinish]: |
| | """Async given input, decided what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Action specifying what tool to use. |
| | """ |
| | output = await self.llm_chain.arun( |
| | intermediate_steps=intermediate_steps, |
| | stop=self.stop, |
| | callbacks=callbacks, |
| | **kwargs, |
| | ) |
| | return self.output_parser.parse(output) |
| |
|
| | def tool_run_logging_kwargs(self) -> Dict: |
| | """Return logging kwargs for tool run.""" |
| | return { |
| | "llm_prefix": "", |
| | "observation_prefix": "" if len(self.stop) == 0 else self.stop[0], |
| | } |
| |
|
| |
|
| | @deprecated( |
| | "0.1.0", |
| | message=AGENT_DEPRECATION_WARNING, |
| | removal="1.0", |
| | ) |
| | class Agent(BaseSingleActionAgent): |
| | """Agent that calls the language model and deciding the action. |
| | |
| | This is driven by a LLMChain. The prompt in the LLMChain MUST include |
| | a variable called "agent_scratchpad" where the agent can put its |
| | intermediary work. |
| | """ |
| |
|
| | llm_chain: LLMChain |
| | """LLMChain to use for agent.""" |
| | output_parser: AgentOutputParser |
| | """Output parser to use for agent.""" |
| | allowed_tools: Optional[List[str]] = None |
| | """Allowed tools for the agent. If None, all tools are allowed.""" |
| |
|
| | def dict(self, **kwargs: Any) -> Dict: |
| | """Return dictionary representation of agent.""" |
| | _dict = super().dict() |
| | del _dict["output_parser"] |
| | return _dict |
| |
|
| | def get_allowed_tools(self) -> Optional[List[str]]: |
| | """Get allowed tools.""" |
| | return self.allowed_tools |
| |
|
| | @property |
| | def return_values(self) -> List[str]: |
| | """Return values of the agent.""" |
| | return ["output"] |
| |
|
| | def _fix_text(self, text: str) -> str: |
| | """Fix the text. |
| | |
| | Args: |
| | text: Text to fix. |
| | |
| | Returns: |
| | str: Fixed text. |
| | """ |
| | raise ValueError("fix_text not implemented for this agent.") |
| |
|
| | @property |
| | def _stop(self) -> List[str]: |
| | return [ |
| | f"\n{self.observation_prefix.rstrip()}", |
| | f"\n\t{self.observation_prefix.rstrip()}", |
| | ] |
| |
|
| | def _construct_scratchpad( |
| | self, intermediate_steps: List[Tuple[AgentAction, str]] |
| | ) -> Union[str, List[BaseMessage]]: |
| | """Construct the scratchpad that lets the agent continue its thought process.""" |
| | thoughts = "" |
| | for action, observation in intermediate_steps: |
| | thoughts += action.log |
| | thoughts += f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}" |
| | return thoughts |
| |
|
| | def plan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[AgentAction, AgentFinish]: |
| | """Given input, decided what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Action specifying what tool to use. |
| | """ |
| | full_inputs = self.get_full_inputs(intermediate_steps, **kwargs) |
| | full_output = self.llm_chain.predict(callbacks=callbacks, **full_inputs) |
| | return self.output_parser.parse(full_output) |
| |
|
| | async def aplan( |
| | self, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> Union[AgentAction, AgentFinish]: |
| | """Async given input, decided what to do. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | callbacks: Callbacks to run. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Action specifying what tool to use. |
| | """ |
| | full_inputs = self.get_full_inputs(intermediate_steps, **kwargs) |
| | full_output = await self.llm_chain.apredict(callbacks=callbacks, **full_inputs) |
| | agent_output = await self.output_parser.aparse(full_output) |
| | return agent_output |
| |
|
| | def get_full_inputs( |
| | self, intermediate_steps: List[Tuple[AgentAction, str]], **kwargs: Any |
| | ) -> Dict[str, Any]: |
| | """Create the full inputs for the LLMChain from intermediate steps. |
| | |
| | Args: |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | Dict[str, Any]: Full inputs for the LLMChain. |
| | """ |
| | thoughts = self._construct_scratchpad(intermediate_steps) |
| | new_inputs = {"agent_scratchpad": thoughts, "stop": self._stop} |
| | full_inputs = {**kwargs, **new_inputs} |
| | return full_inputs |
| |
|
| | @property |
| | def input_keys(self) -> List[str]: |
| | """Return the input keys. |
| | |
| | :meta private: |
| | """ |
| | return list(set(self.llm_chain.input_keys) - {"agent_scratchpad"}) |
| |
|
| | @model_validator(mode="after") |
| | def validate_prompt(self) -> Self: |
| | """Validate that prompt matches format. |
| | |
| | Args: |
| | values: Values to validate. |
| | |
| | Returns: |
| | Dict: Validated values. |
| | |
| | Raises: |
| | ValueError: If `agent_scratchpad` is not in prompt.input_variables |
| | and prompt is not a FewShotPromptTemplate or a PromptTemplate. |
| | """ |
| | prompt = self.llm_chain.prompt |
| | if "agent_scratchpad" not in prompt.input_variables: |
| | logger.warning( |
| | "`agent_scratchpad` should be a variable in prompt.input_variables." |
| | " Did not find it, so adding it at the end." |
| | ) |
| | prompt.input_variables.append("agent_scratchpad") |
| | if isinstance(prompt, PromptTemplate): |
| | prompt.template += "\n{agent_scratchpad}" |
| | elif isinstance(prompt, FewShotPromptTemplate): |
| | prompt.suffix += "\n{agent_scratchpad}" |
| | else: |
| | raise ValueError(f"Got unexpected prompt type {type(prompt)}") |
| | return self |
| |
|
| | @property |
| | @abstractmethod |
| | def observation_prefix(self) -> str: |
| | """Prefix to append the observation with.""" |
| |
|
| | @property |
| | @abstractmethod |
| | def llm_prefix(self) -> str: |
| | """Prefix to append the LLM call with.""" |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def create_prompt(cls, tools: Sequence[BaseTool]) -> BasePromptTemplate: |
| | """Create a prompt for this class. |
| | |
| | Args: |
| | tools: Tools to use. |
| | |
| | Returns: |
| | BasePromptTemplate: Prompt template. |
| | """ |
| |
|
| | @classmethod |
| | def _validate_tools(cls, tools: Sequence[BaseTool]) -> None: |
| | """Validate that appropriate tools are passed in. |
| | |
| | Args: |
| | tools: Tools to use. |
| | """ |
| |
|
| | pass |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def _get_default_output_parser(cls, **kwargs: Any) -> AgentOutputParser: |
| | """Get default output parser for this class.""" |
| |
|
| | @classmethod |
| | def from_llm_and_tools( |
| | cls, |
| | llm: BaseLanguageModel, |
| | tools: Sequence[BaseTool], |
| | callback_manager: Optional[BaseCallbackManager] = None, |
| | output_parser: Optional[AgentOutputParser] = None, |
| | **kwargs: Any, |
| | ) -> Agent: |
| | """Construct an agent from an LLM and tools. |
| | |
| | Args: |
| | llm: Language model to use. |
| | tools: Tools to use. |
| | callback_manager: Callback manager to use. |
| | output_parser: Output parser to use. |
| | kwargs: Additional arguments. |
| | |
| | Returns: |
| | Agent: Agent object. |
| | """ |
| | cls._validate_tools(tools) |
| | llm_chain = LLMChain( |
| | llm=llm, |
| | prompt=cls.create_prompt(tools), |
| | callback_manager=callback_manager, |
| | ) |
| | tool_names = [tool.name for tool in tools] |
| | _output_parser = output_parser or cls._get_default_output_parser() |
| | return cls( |
| | llm_chain=llm_chain, |
| | allowed_tools=tool_names, |
| | output_parser=_output_parser, |
| | **kwargs, |
| | ) |
| |
|
| | def return_stopped_response( |
| | self, |
| | early_stopping_method: str, |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | **kwargs: Any, |
| | ) -> AgentFinish: |
| | """Return response when agent has been stopped due to max iterations. |
| | |
| | Args: |
| | early_stopping_method: Method to use for early stopping. |
| | intermediate_steps: Steps the LLM has taken to date, |
| | along with observations. |
| | **kwargs: User inputs. |
| | |
| | Returns: |
| | AgentFinish: Agent finish object. |
| | |
| | Raises: |
| | ValueError: If `early_stopping_method` is not in ['force', 'generate']. |
| | """ |
| | if early_stopping_method == "force": |
| | |
| | return AgentFinish( |
| | {"output": "Agent stopped due to iteration limit or time limit."}, "" |
| | ) |
| | elif early_stopping_method == "generate": |
| | |
| | thoughts = "" |
| | for action, observation in intermediate_steps: |
| | thoughts += action.log |
| | thoughts += ( |
| | f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}" |
| | ) |
| | |
| | thoughts += ( |
| | "\n\nI now need to return a final answer based on the previous steps:" |
| | ) |
| | new_inputs = {"agent_scratchpad": thoughts, "stop": self._stop} |
| | full_inputs = {**kwargs, **new_inputs} |
| | full_output = self.llm_chain.predict(**full_inputs) |
| | |
| | parsed_output = self.output_parser.parse(full_output) |
| | if isinstance(parsed_output, AgentFinish): |
| | |
| | return parsed_output |
| | else: |
| | |
| | |
| | return AgentFinish({"output": full_output}, full_output) |
| | else: |
| | raise ValueError( |
| | "early_stopping_method should be one of `force` or `generate`, " |
| | f"got {early_stopping_method}" |
| | ) |
| |
|
| | def tool_run_logging_kwargs(self) -> Dict: |
| | """Return logging kwargs for tool run.""" |
| | return { |
| | "llm_prefix": self.llm_prefix, |
| | "observation_prefix": self.observation_prefix, |
| | } |
| |
|
| |
|
| | class ExceptionTool(BaseTool): |
| | """Tool that just returns the query.""" |
| |
|
| | name: str = "_Exception" |
| | """Name of the tool.""" |
| | description: str = "Exception tool" |
| | """Description of the tool.""" |
| |
|
| | def _run( |
| | self, |
| | query: str, |
| | run_manager: Optional[CallbackManagerForToolRun] = None, |
| | ) -> str: |
| | return query |
| |
|
| | async def _arun( |
| | self, |
| | query: str, |
| | run_manager: Optional[AsyncCallbackManagerForToolRun] = None, |
| | ) -> str: |
| | return query |
| |
|
| |
|
| | NextStepOutput = List[Union[AgentFinish, AgentAction, AgentStep]] |
| | RunnableAgentType = Union[RunnableAgent, RunnableMultiActionAgent] |
| |
|
| |
|
| | class AgentExecutor(Chain): |
| | """Agent that is using tools.""" |
| |
|
| | agent: Union[BaseSingleActionAgent, BaseMultiActionAgent, Runnable] |
| | """The agent to run for creating a plan and determining actions |
| | to take at each step of the execution loop.""" |
| | tools: Sequence[BaseTool] |
| | """The valid tools the agent can call.""" |
| | return_intermediate_steps: bool = False |
| | """Whether to return the agent's trajectory of intermediate steps |
| | at the end in addition to the final output.""" |
| | max_iterations: Optional[int] = 15 |
| | """The maximum number of steps to take before ending the execution |
| | loop. |
| | |
| | Setting to 'None' could lead to an infinite loop.""" |
| | max_execution_time: Optional[float] = None |
| | """The maximum amount of wall clock time to spend in the execution |
| | loop. |
| | """ |
| | early_stopping_method: str = "force" |
| | """The method to use for early stopping if the agent never |
| | returns `AgentFinish`. Either 'force' or 'generate'. |
| | |
| | `"force"` returns a string saying that it stopped because it met a |
| | time or iteration limit. |
| | |
| | `"generate"` calls the agent's LLM Chain one final time to generate |
| | a final answer based on the previous steps. |
| | """ |
| | handle_parsing_errors: Union[bool, str, Callable[[OutputParserException], str]] = ( |
| | False |
| | ) |
| | """How to handle errors raised by the agent's output parser. |
| | Defaults to `False`, which raises the error. |
| | If `true`, the error will be sent back to the LLM as an observation. |
| | If a string, the string itself will be sent to the LLM as an observation. |
| | If a callable function, the function will be called with the exception |
| | as an argument, and the result of that function will be passed to the agent |
| | as an observation. |
| | """ |
| | trim_intermediate_steps: Union[ |
| | int, Callable[[List[Tuple[AgentAction, str]]], List[Tuple[AgentAction, str]]] |
| | ] = -1 |
| | """How to trim the intermediate steps before returning them. |
| | Defaults to -1, which means no trimming. |
| | """ |
| |
|
| | @classmethod |
| | def from_agent_and_tools( |
| | cls, |
| | agent: Union[BaseSingleActionAgent, BaseMultiActionAgent, Runnable], |
| | tools: Sequence[BaseTool], |
| | callbacks: Callbacks = None, |
| | **kwargs: Any, |
| | ) -> AgentExecutor: |
| | """Create from agent and tools. |
| | |
| | Args: |
| | agent: Agent to use. |
| | tools: Tools to use. |
| | callbacks: Callbacks to use. |
| | kwargs: Additional arguments. |
| | |
| | Returns: |
| | AgentExecutor: Agent executor object. |
| | """ |
| | return cls( |
| | agent=agent, |
| | tools=tools, |
| | callbacks=callbacks, |
| | **kwargs, |
| | ) |
| |
|
| | @model_validator(mode="after") |
| | def validate_tools(self) -> Self: |
| | """Validate that tools are compatible with agent. |
| | |
| | Args: |
| | values: Values to validate. |
| | |
| | Returns: |
| | Dict: Validated values. |
| | |
| | Raises: |
| | ValueError: If allowed tools are different than provided tools. |
| | """ |
| | agent = self.agent |
| | tools = self.tools |
| | allowed_tools = agent.get_allowed_tools() |
| | if allowed_tools is not None: |
| | if set(allowed_tools) != set([tool.name for tool in tools]): |
| | raise ValueError( |
| | f"Allowed tools ({allowed_tools}) different than " |
| | f"provided tools ({[tool.name for tool in tools]})" |
| | ) |
| | return self |
| |
|
| | @model_validator(mode="before") |
| | @classmethod |
| | def validate_runnable_agent(cls, values: Dict) -> Any: |
| | """Convert runnable to agent if passed in. |
| | |
| | Args: |
| | values: Values to validate. |
| | |
| | Returns: |
| | Dict: Validated values. |
| | """ |
| | agent = values.get("agent") |
| | if agent and isinstance(agent, Runnable): |
| | try: |
| | output_type = agent.OutputType |
| | except Exception as _: |
| | multi_action = False |
| | else: |
| | multi_action = output_type == Union[List[AgentAction], AgentFinish] |
| |
|
| | stream_runnable = values.pop("stream_runnable", True) |
| | if multi_action: |
| | values["agent"] = RunnableMultiActionAgent( |
| | runnable=agent, stream_runnable=stream_runnable |
| | ) |
| | else: |
| | values["agent"] = RunnableAgent( |
| | runnable=agent, stream_runnable=stream_runnable |
| | ) |
| | return values |
| |
|
| | @property |
| | def _action_agent(self) -> Union[BaseSingleActionAgent, BaseMultiActionAgent]: |
| | """Type cast self.agent. |
| | |
| | If the `agent` attribute is a Runnable, it will be converted one of |
| | RunnableAgentType in the validate_runnable_agent root_validator. |
| | |
| | To support instantiating with a Runnable, here we explicitly cast the type |
| | to reflect the changes made in the root_validator. |
| | """ |
| | if isinstance(self.agent, Runnable): |
| | return cast(RunnableAgentType, self.agent) |
| | else: |
| | return self.agent |
| |
|
| | def save(self, file_path: Union[Path, str]) -> None: |
| | """Raise error - saving not supported for Agent Executors. |
| | |
| | Args: |
| | file_path: Path to save to. |
| | |
| | Raises: |
| | ValueError: Saving not supported for agent executors. |
| | """ |
| | raise ValueError( |
| | "Saving not supported for agent executors. " |
| | "If you are trying to save the agent, please use the " |
| | "`.save_agent(...)`" |
| | ) |
| |
|
| | def save_agent(self, file_path: Union[Path, str]) -> None: |
| | """Save the underlying agent. |
| | |
| | Args: |
| | file_path: Path to save to. |
| | """ |
| | return self._action_agent.save(file_path) |
| |
|
| | def iter( |
| | self, |
| | inputs: Any, |
| | callbacks: Callbacks = None, |
| | *, |
| | include_run_info: bool = False, |
| | async_: bool = False, |
| | ) -> AgentExecutorIterator: |
| | """Enables iteration over steps taken to reach final output. |
| | |
| | Args: |
| | inputs: Inputs to the agent. |
| | callbacks: Callbacks to run. |
| | include_run_info: Whether to include run info. |
| | async_: Whether to run async. (Ignored) |
| | |
| | Returns: |
| | AgentExecutorIterator: Agent executor iterator object. |
| | """ |
| | return AgentExecutorIterator( |
| | self, |
| | inputs, |
| | callbacks, |
| | tags=self.tags, |
| | include_run_info=include_run_info, |
| | ) |
| |
|
| | @property |
| | def input_keys(self) -> List[str]: |
| | """Return the input keys. |
| | |
| | :meta private: |
| | """ |
| | return self._action_agent.input_keys |
| |
|
| | @property |
| | def output_keys(self) -> List[str]: |
| | """Return the singular output key. |
| | |
| | :meta private: |
| | """ |
| | if self.return_intermediate_steps: |
| | return self._action_agent.return_values + ["intermediate_steps"] |
| | else: |
| | return self._action_agent.return_values |
| |
|
| | def lookup_tool(self, name: str) -> BaseTool: |
| | """Lookup tool by name. |
| | |
| | Args: |
| | name: Name of tool. |
| | |
| | Returns: |
| | BaseTool: Tool object. |
| | """ |
| | return {tool.name: tool for tool in self.tools}[name] |
| |
|
| | def _should_continue(self, iterations: int, time_elapsed: float) -> bool: |
| | if self.max_iterations is not None and iterations >= self.max_iterations: |
| | return False |
| | if ( |
| | self.max_execution_time is not None |
| | and time_elapsed >= self.max_execution_time |
| | ): |
| | return False |
| |
|
| | return True |
| |
|
| | def _return( |
| | self, |
| | output: AgentFinish, |
| | intermediate_steps: list, |
| | run_manager: Optional[CallbackManagerForChainRun] = None, |
| | ) -> Dict[str, Any]: |
| | if run_manager: |
| | run_manager.on_agent_finish(output, color="green", verbose=self.verbose) |
| | final_output = output.return_values |
| | if self.return_intermediate_steps: |
| | final_output["intermediate_steps"] = intermediate_steps |
| | return final_output |
| |
|
| | async def _areturn( |
| | self, |
| | output: AgentFinish, |
| | intermediate_steps: list, |
| | run_manager: Optional[AsyncCallbackManagerForChainRun] = None, |
| | ) -> Dict[str, Any]: |
| | if run_manager: |
| | await run_manager.on_agent_finish( |
| | output, color="green", verbose=self.verbose |
| | ) |
| | final_output = output.return_values |
| | if self.return_intermediate_steps: |
| | final_output["intermediate_steps"] = intermediate_steps |
| | return final_output |
| |
|
| | def _consume_next_step( |
| | self, values: NextStepOutput |
| | ) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]: |
| | if isinstance(values[-1], AgentFinish): |
| | assert len(values) == 1 |
| | return values[-1] |
| | else: |
| | return [ |
| | (a.action, a.observation) for a in values if isinstance(a, AgentStep) |
| | ] |
| |
|
| | def _take_next_step( |
| | self, |
| | name_to_tool_map: Dict[str, BaseTool], |
| | color_mapping: Dict[str, str], |
| | inputs: Dict[str, str], |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | run_manager: Optional[CallbackManagerForChainRun] = None, |
| | ) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]: |
| | return self._consume_next_step( |
| | [ |
| | a |
| | for a in self._iter_next_step( |
| | name_to_tool_map, |
| | color_mapping, |
| | inputs, |
| | intermediate_steps, |
| | run_manager, |
| | ) |
| | ] |
| | ) |
| |
|
| | def _iter_next_step( |
| | self, |
| | name_to_tool_map: Dict[str, BaseTool], |
| | color_mapping: Dict[str, str], |
| | inputs: Dict[str, str], |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | run_manager: Optional[CallbackManagerForChainRun] = None, |
| | ) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]: |
| | """Take a single step in the thought-action-observation loop. |
| | |
| | Override this to take control of how the agent makes and acts on choices. |
| | """ |
| | try: |
| | intermediate_steps = self._prepare_intermediate_steps(intermediate_steps) |
| |
|
| | |
| | output = self._action_agent.plan( |
| | intermediate_steps, |
| | callbacks=run_manager.get_child() if run_manager else None, |
| | **inputs, |
| | ) |
| | except OutputParserException as e: |
| | if isinstance(self.handle_parsing_errors, bool): |
| | raise_error = not self.handle_parsing_errors |
| | else: |
| | raise_error = False |
| | if raise_error: |
| | raise ValueError( |
| | "An output parsing error occurred. " |
| | "In order to pass this error back to the agent and have it try " |
| | "again, pass `handle_parsing_errors=True` to the AgentExecutor. " |
| | f"This is the error: {str(e)}" |
| | ) |
| | text = str(e) |
| | if isinstance(self.handle_parsing_errors, bool): |
| | if e.send_to_llm: |
| | observation = str(e.observation) |
| | text = str(e.llm_output) |
| | else: |
| | observation = "Invalid or incomplete response" |
| | elif isinstance(self.handle_parsing_errors, str): |
| | observation = self.handle_parsing_errors |
| | elif callable(self.handle_parsing_errors): |
| | observation = self.handle_parsing_errors(e) |
| | else: |
| | raise ValueError("Got unexpected type of `handle_parsing_errors`") |
| | output = AgentAction("_Exception", observation, text) |
| | if run_manager: |
| | run_manager.on_agent_action(output, color="green") |
| | tool_run_kwargs = self._action_agent.tool_run_logging_kwargs() |
| | observation = ExceptionTool().run( |
| | output.tool_input, |
| | verbose=self.verbose, |
| | color=None, |
| | callbacks=run_manager.get_child() if run_manager else None, |
| | **tool_run_kwargs, |
| | ) |
| | yield AgentStep(action=output, observation=observation) |
| | return |
| |
|
| | |
| | if isinstance(output, AgentFinish): |
| | yield output |
| | return |
| |
|
| | actions: List[AgentAction] |
| | if isinstance(output, AgentAction): |
| | actions = [output] |
| | else: |
| | actions = output |
| | for agent_action in actions: |
| | yield agent_action |
| | for agent_action in actions: |
| | yield self._perform_agent_action( |
| | name_to_tool_map, color_mapping, agent_action, run_manager |
| | ) |
| |
|
| | def _perform_agent_action( |
| | self, |
| | name_to_tool_map: Dict[str, BaseTool], |
| | color_mapping: Dict[str, str], |
| | agent_action: AgentAction, |
| | run_manager: Optional[CallbackManagerForChainRun] = None, |
| | ) -> AgentStep: |
| | if run_manager: |
| | run_manager.on_agent_action(agent_action, color="green") |
| | |
| | if agent_action.tool in name_to_tool_map: |
| | tool = name_to_tool_map[agent_action.tool] |
| | return_direct = tool.return_direct |
| | color = color_mapping[agent_action.tool] |
| | tool_run_kwargs = self._action_agent.tool_run_logging_kwargs() |
| | if return_direct: |
| | tool_run_kwargs["llm_prefix"] = "" |
| | |
| | observation = tool.run( |
| | agent_action.tool_input, |
| | verbose=self.verbose, |
| | color=color, |
| | callbacks=run_manager.get_child() if run_manager else None, |
| | **tool_run_kwargs, |
| | ) |
| | else: |
| | tool_run_kwargs = self._action_agent.tool_run_logging_kwargs() |
| | observation = InvalidTool().run( |
| | { |
| | "requested_tool_name": agent_action.tool, |
| | "available_tool_names": list(name_to_tool_map.keys()), |
| | }, |
| | verbose=self.verbose, |
| | color=None, |
| | callbacks=run_manager.get_child() if run_manager else None, |
| | **tool_run_kwargs, |
| | ) |
| | return AgentStep(action=agent_action, observation=observation) |
| |
|
| | async def _atake_next_step( |
| | self, |
| | name_to_tool_map: Dict[str, BaseTool], |
| | color_mapping: Dict[str, str], |
| | inputs: Dict[str, str], |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | run_manager: Optional[AsyncCallbackManagerForChainRun] = None, |
| | ) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]: |
| | return self._consume_next_step( |
| | [ |
| | a |
| | async for a in self._aiter_next_step( |
| | name_to_tool_map, |
| | color_mapping, |
| | inputs, |
| | intermediate_steps, |
| | run_manager, |
| | ) |
| | ] |
| | ) |
| |
|
| | async def _aiter_next_step( |
| | self, |
| | name_to_tool_map: Dict[str, BaseTool], |
| | color_mapping: Dict[str, str], |
| | inputs: Dict[str, str], |
| | intermediate_steps: List[Tuple[AgentAction, str]], |
| | run_manager: Optional[AsyncCallbackManagerForChainRun] = None, |
| | ) -> AsyncIterator[Union[AgentFinish, AgentAction, AgentStep]]: |
| | """Take a single step in the thought-action-observation loop. |
| | |
| | Override this to take control of how the agent makes and acts on choices. |
| | """ |
| | try: |
| | intermediate_steps = self._prepare_intermediate_steps(intermediate_steps) |
| |
|
| | |
| | output = await self._action_agent.aplan( |
| | intermediate_steps, |
| | callbacks=run_manager.get_child() if run_manager else None, |
| | **inputs, |
| | ) |
| | except OutputParserException as e: |
| | if isinstance(self.handle_parsing_errors, bool): |
| | raise_error = not self.handle_parsing_errors |
| | else: |
| | raise_error = False |
| | if raise_error: |
| | raise ValueError( |
| | "An output parsing error occurred. " |
| | "In order to pass this error back to the agent and have it try " |
| | "again, pass `handle_parsing_errors=True` to the AgentExecutor. " |
| | f"This is the error: {str(e)}" |
| | ) |
| | text = str(e) |
| | if isinstance(self.handle_parsing_errors, bool): |
| | if e.send_to_llm: |
| | observation = str(e.observation) |
| | text = str(e.llm_output) |
| | else: |
| | observation = "Invalid or incomplete response" |
| | elif isinstance(self.handle_parsing_errors, str): |
| | observation = self.handle_parsing_errors |
| | elif callable(self.handle_parsing_errors): |
| | observation = self.handle_parsing_errors(e) |
| | else: |
| | raise ValueError("Got unexpected type of `handle_parsing_errors`") |
| | output = AgentAction("_Exception", observation, text) |
| | tool_run_kwargs = self._action_agent.tool_run_logging_kwargs() |
| | observation = await ExceptionTool().arun( |
| | output.tool_input, |
| | verbose=self.verbose, |
| | color=None, |
| | callbacks=run_manager.get_child() if run_manager else None, |
| | **tool_run_kwargs, |
| | ) |
| | yield AgentStep(action=output, observation=observation) |
| | return |
| |
|
| | |
| | if isinstance(output, AgentFinish): |
| | yield output |
| | return |
| |
|
| | actions: List[AgentAction] |
| | if isinstance(output, AgentAction): |
| | actions = [output] |
| | else: |
| | actions = output |
| | for agent_action in actions: |
| | yield agent_action |
| |
|
| | |
| | result = await asyncio.gather( |
| | *[ |
| | self._aperform_agent_action( |
| | name_to_tool_map, color_mapping, agent_action, run_manager |
| | ) |
| | for agent_action in actions |
| | ], |
| | ) |
| |
|
| | |
| | for chunk in result: |
| | yield chunk |
| |
|
| | async def _aperform_agent_action( |
| | self, |
| | name_to_tool_map: Dict[str, BaseTool], |
| | color_mapping: Dict[str, str], |
| | agent_action: AgentAction, |
| | run_manager: Optional[AsyncCallbackManagerForChainRun] = None, |
| | ) -> AgentStep: |
| | if run_manager: |
| | await run_manager.on_agent_action( |
| | agent_action, verbose=self.verbose, color="green" |
| | ) |
| | |
| | if agent_action.tool in name_to_tool_map: |
| | tool = name_to_tool_map[agent_action.tool] |
| | return_direct = tool.return_direct |
| | color = color_mapping[agent_action.tool] |
| | tool_run_kwargs = self._action_agent.tool_run_logging_kwargs() |
| | if return_direct: |
| | tool_run_kwargs["llm_prefix"] = "" |
| | |
| | observation = await tool.arun( |
| | agent_action.tool_input, |
| | verbose=self.verbose, |
| | color=color, |
| | callbacks=run_manager.get_child() if run_manager else None, |
| | **tool_run_kwargs, |
| | ) |
| | else: |
| | tool_run_kwargs = self._action_agent.tool_run_logging_kwargs() |
| | observation = await InvalidTool().arun( |
| | { |
| | "requested_tool_name": agent_action.tool, |
| | "available_tool_names": list(name_to_tool_map.keys()), |
| | }, |
| | verbose=self.verbose, |
| | color=None, |
| | callbacks=run_manager.get_child() if run_manager else None, |
| | **tool_run_kwargs, |
| | ) |
| | return AgentStep(action=agent_action, observation=observation) |
| |
|
| | def _call( |
| | self, |
| | inputs: Dict[str, str], |
| | run_manager: Optional[CallbackManagerForChainRun] = None, |
| | ) -> Dict[str, Any]: |
| | """Run text through and get agent response.""" |
| | |
| | name_to_tool_map = {tool.name: tool for tool in self.tools} |
| | |
| | color_mapping = get_color_mapping( |
| | [tool.name for tool in self.tools], excluded_colors=["green", "red"] |
| | ) |
| | intermediate_steps: List[Tuple[AgentAction, str]] = [] |
| | |
| | iterations = 0 |
| | time_elapsed = 0.0 |
| | start_time = time.time() |
| | |
| | while self._should_continue(iterations, time_elapsed): |
| | next_step_output = self._take_next_step( |
| | name_to_tool_map, |
| | color_mapping, |
| | inputs, |
| | intermediate_steps, |
| | run_manager=run_manager, |
| | ) |
| | if isinstance(next_step_output, AgentFinish): |
| | return self._return( |
| | next_step_output, intermediate_steps, run_manager=run_manager |
| | ) |
| |
|
| | intermediate_steps.extend(next_step_output) |
| | if len(next_step_output) == 1: |
| | next_step_action = next_step_output[0] |
| | |
| | tool_return = self._get_tool_return(next_step_action) |
| | if tool_return is not None: |
| | return self._return( |
| | tool_return, intermediate_steps, run_manager=run_manager |
| | ) |
| | iterations += 1 |
| | time_elapsed = time.time() - start_time |
| | output = self._action_agent.return_stopped_response( |
| | self.early_stopping_method, intermediate_steps, **inputs |
| | ) |
| | return self._return(output, intermediate_steps, run_manager=run_manager) |
| |
|
| | async def _acall( |
| | self, |
| | inputs: Dict[str, str], |
| | run_manager: Optional[AsyncCallbackManagerForChainRun] = None, |
| | ) -> Dict[str, str]: |
| | """Async run text through and get agent response.""" |
| | |
| | name_to_tool_map = {tool.name: tool for tool in self.tools} |
| | |
| | color_mapping = get_color_mapping( |
| | [tool.name for tool in self.tools], excluded_colors=["green"] |
| | ) |
| | intermediate_steps: List[Tuple[AgentAction, str]] = [] |
| | |
| | iterations = 0 |
| | time_elapsed = 0.0 |
| | start_time = time.time() |
| | |
| | try: |
| | async with asyncio_timeout(self.max_execution_time): |
| | while self._should_continue(iterations, time_elapsed): |
| | next_step_output = await self._atake_next_step( |
| | name_to_tool_map, |
| | color_mapping, |
| | inputs, |
| | intermediate_steps, |
| | run_manager=run_manager, |
| | ) |
| | if isinstance(next_step_output, AgentFinish): |
| | return await self._areturn( |
| | next_step_output, |
| | intermediate_steps, |
| | run_manager=run_manager, |
| | ) |
| |
|
| | intermediate_steps.extend(next_step_output) |
| | if len(next_step_output) == 1: |
| | next_step_action = next_step_output[0] |
| | |
| | tool_return = self._get_tool_return(next_step_action) |
| | if tool_return is not None: |
| | return await self._areturn( |
| | tool_return, intermediate_steps, run_manager=run_manager |
| | ) |
| |
|
| | iterations += 1 |
| | time_elapsed = time.time() - start_time |
| | output = self._action_agent.return_stopped_response( |
| | self.early_stopping_method, intermediate_steps, **inputs |
| | ) |
| | return await self._areturn( |
| | output, intermediate_steps, run_manager=run_manager |
| | ) |
| | except (TimeoutError, asyncio.TimeoutError): |
| | |
| | output = self._action_agent.return_stopped_response( |
| | self.early_stopping_method, intermediate_steps, **inputs |
| | ) |
| | return await self._areturn( |
| | output, intermediate_steps, run_manager=run_manager |
| | ) |
| |
|
| | def _get_tool_return( |
| | self, next_step_output: Tuple[AgentAction, str] |
| | ) -> Optional[AgentFinish]: |
| | """Check if the tool is a returning tool.""" |
| | agent_action, observation = next_step_output |
| | name_to_tool_map = {tool.name: tool for tool in self.tools} |
| | return_value_key = "output" |
| | if len(self._action_agent.return_values) > 0: |
| | return_value_key = self._action_agent.return_values[0] |
| | |
| | if agent_action.tool in name_to_tool_map: |
| | if name_to_tool_map[agent_action.tool].return_direct: |
| | return AgentFinish( |
| | {return_value_key: observation}, |
| | "", |
| | ) |
| | return None |
| |
|
| | def _prepare_intermediate_steps( |
| | self, intermediate_steps: List[Tuple[AgentAction, str]] |
| | ) -> List[Tuple[AgentAction, str]]: |
| | if ( |
| | isinstance(self.trim_intermediate_steps, int) |
| | and self.trim_intermediate_steps > 0 |
| | ): |
| | return intermediate_steps[-self.trim_intermediate_steps :] |
| | elif callable(self.trim_intermediate_steps): |
| | return self.trim_intermediate_steps(intermediate_steps) |
| | else: |
| | return intermediate_steps |
| |
|
| | def stream( |
| | self, |
| | input: Union[Dict[str, Any], Any], |
| | config: Optional[RunnableConfig] = None, |
| | **kwargs: Any, |
| | ) -> Iterator[AddableDict]: |
| | """Enables streaming over steps taken to reach final output. |
| | |
| | Args: |
| | input: Input to the agent. |
| | config: Config to use. |
| | kwargs: Additional arguments. |
| | |
| | Yields: |
| | AddableDict: Addable dictionary. |
| | """ |
| | config = ensure_config(config) |
| | iterator = AgentExecutorIterator( |
| | self, |
| | input, |
| | config.get("callbacks"), |
| | tags=config.get("tags"), |
| | metadata=config.get("metadata"), |
| | run_name=config.get("run_name"), |
| | run_id=config.get("run_id"), |
| | yield_actions=True, |
| | **kwargs, |
| | ) |
| | for step in iterator: |
| | yield step |
| |
|
| | async def astream( |
| | self, |
| | input: Union[Dict[str, Any], Any], |
| | config: Optional[RunnableConfig] = None, |
| | **kwargs: Any, |
| | ) -> AsyncIterator[AddableDict]: |
| | """Async enables streaming over steps taken to reach final output. |
| | |
| | Args: |
| | input: Input to the agent. |
| | config: Config to use. |
| | kwargs: Additional arguments. |
| | |
| | Yields: |
| | AddableDict: Addable dictionary. |
| | """ |
| |
|
| | config = ensure_config(config) |
| | iterator = AgentExecutorIterator( |
| | self, |
| | input, |
| | config.get("callbacks"), |
| | tags=config.get("tags"), |
| | metadata=config.get("metadata"), |
| | run_name=config.get("run_name"), |
| | run_id=config.get("run_id"), |
| | yield_actions=True, |
| | **kwargs, |
| | ) |
| | async for step in iterator: |
| | yield step |
| |
|