| | import traceback |
| | from copy import deepcopy |
| | from typing import Dict, Any |
| |
|
| | from .code_interpreters.create_code_interpreter import create_code_interpreter |
| | from aiflows.messages import FlowMessage |
| | from aiflows.base_flows import AtomicFlow |
| |
|
| | def truncate_output(data, max_output_chars=2000): |
| | needs_truncation = False |
| |
|
| | message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n' |
| |
|
| | |
| | if data.startswith(message): |
| | data = data[len(message):] |
| | needs_truncation = True |
| |
|
| | |
| | if len(data) > max_output_chars or needs_truncation: |
| | data = message + data[-max_output_chars:] |
| |
|
| | return data |
| |
|
| |
|
| | class InterpreterAtomicFlow(AtomicFlow): |
| | """This flow is used to run the code passed from the caller. |
| | *Input Interface*: |
| | - `code` |
| | - `language` |
| | |
| | *Output Interface*: |
| | - `interpreter_output`: output of the code interpreter |
| | - `code_runs`: whether the code runs successfully or not |
| | |
| | *Configuration Parameters*: |
| | - None |
| | |
| | **Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter) |
| | for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()** |
| | |
| | I'm extracting the code interpreter part from open-interpreter because the litellm version of open-interpreter |
| | is not compatible with that of the current version of aiflows(v.0.1.7). |
| | """ |
| | def __init__(self, |
| | **kwargs): |
| | super().__init__(**kwargs) |
| | self.max_output = self.flow_config["max_output"] |
| | self._code_interpreters = {} |
| |
|
| | def set_up_flow_state(self): |
| | """ class-specific flow state: language and code, which describes the programming language and the code to run. |
| | """ |
| | super().set_up_flow_state() |
| | self.flow_state["language"] = None |
| | self.flow_state["code"] = "" |
| |
|
| | def _state_update_add_language_and_code(self, |
| | language: str, |
| | code: str) -> None: |
| | """ |
| | updates the language and code passed from _process_input_data |
| | to the flow state |
| | :param language: the programming language |
| | :param code: the code to run |
| | """ |
| | self.flow_state["language"] = language |
| | self.flow_state["code"] = code |
| |
|
| | def _check_input(self, input_data: Dict[str, Any]): |
| | """ Sanity check of input data |
| | :param input_data: input data |
| | :type input_data: Dict[str, Any] |
| | """ |
| | |
| | assert "language" in input_data, "attribute 'language' not in input data." |
| | assert "code" in input_data, "attribute 'code' not in input data." |
| |
|
| |
|
| | def _process_input_data(self, input_data: Dict[str, Any]): |
| | """ Allocate interpreter if any, pass input data into flow state |
| | :param input_data: input data |
| | :type input_data: Dict[str, Any] |
| | """ |
| | |
| | if input_data["language"] == "python" and input_data["code"].startswith("!"): |
| | input_data["language"] = "shell" |
| | input_data["code"] = input_data["code"][1:] |
| |
|
| | |
| | |
| | |
| | language = input_data["language"] |
| | if language not in self._code_interpreters: |
| | self._code_interpreters[language] = create_code_interpreter(language) |
| |
|
| | |
| | self._state_update_add_language_and_code( |
| | language=language, |
| | code=input_data["code"] |
| | ) |
| |
|
| | def _call(self): |
| | """ This method runs the code interpreter and returns the output. (runs the code interpreter and returns the output.) |
| | """ |
| | code_runs = True |
| | output = "" |
| | try: |
| | code_interpreter = self._code_interpreters[self.flow_state["language"]] |
| | code = self.flow_state["code"] |
| | for line in code_interpreter.run(code): |
| | if "output" in line: |
| | output += "\n" + line["output"] |
| | |
| | output = truncate_output(output, self.max_output) |
| | output = output.strip() |
| | except: |
| | output = traceback.format_exc() |
| | output = output.strip() |
| | code_runs = False |
| | return output, code_runs |
| |
|
| | def run( |
| | self, |
| | input_message: FlowMessage): |
| | """ Run the code interpreter and return the output. |
| | :param input_message: The input message of the flow. |
| | :type input_message: FlowMessage |
| | """ |
| | input_data = input_message.data |
| | self._check_input(input_data) |
| | self._process_input_data(input_data) |
| | |
| | output = self._call() |
| | |
| | response = { |
| | "interpreter_output": output[0], |
| | "code_runs": output[1] |
| | } |
| | |
| | reply = self.package_output_message( |
| | input_message=input_message, |
| | response = response |
| | ) |
| | |
| | self.send_message(reply) |
| | |
| |
|