| | import traceback |
| | from copy import deepcopy |
| | from typing import Dict, Any |
| |
|
| | from .code_interpreters.create_code_interpreter import create_code_interpreter |
| |
|
| | from aiflows.base_flows import AtomicFlow |
| |
|
| | def truncate_output(data, max_output_chars=2000): |
| | needs_truncation = False |
| |
|
| | message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n' |
| |
|
| | |
| | if data.startswith(message): |
| | data = data[len(message):] |
| | needs_truncation = True |
| |
|
| | |
| | if len(data) > max_output_chars or needs_truncation: |
| | data = message + data[-max_output_chars:] |
| |
|
| | return data |
| |
|
| |
|
| | class InterpreterAtomicFlow(AtomicFlow): |
| | """This flow is used to run the code passed from the caller. |
| | *Expected Input*: |
| | - `code` |
| | - `language` |
| | |
| | *Expected Output*: |
| | - `interpreter_output`: output of the code interpreter |
| | |
| | **Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter) |
| | for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()** |
| | |
| | I'm extracting the code interpreter part from open-interpreter because the litellm version of open-interpreter |
| | is not compatible with that of the current version of aiflows(v.0.1.7). |
| | """ |
| | def __init__(self, |
| | max_output=2000, |
| | **kwargs): |
| | super().__init__(**kwargs) |
| | self.max_output = max_output |
| | self._code_interpreters = {} |
| |
|
| | @classmethod |
| | def instantiate_from_config(cls, config): |
| | flow_config = deepcopy(config) |
| |
|
| | kwargs = {"flow_config": flow_config} |
| |
|
| | |
| | return cls(**kwargs) |
| |
|
| |
|
| | def set_up_flow_state(self): |
| | """ |
| | class-specific flow state: language and code, |
| | which describes the programming language and the code to run. |
| | """ |
| | super().set_up_flow_state() |
| | self.flow_state["language"] = None |
| | self.flow_state["code"] = "" |
| |
|
| | def _state_update_add_language_and_code(self, |
| | language: str, |
| | code: str) -> None: |
| | """ |
| | updates the language and code passed from _process_input_data |
| | to the flow state |
| | """ |
| | self.flow_state["language"] = language |
| | self.flow_state["code"] = code |
| |
|
| | def _check_input(self, input_data: Dict[str, Any]): |
| | """ |
| | Sanity check of input data |
| | """ |
| | |
| | assert "language" in input_data, "attribute 'language' not in input data." |
| | assert "code" in input_data, "attribute 'code' not in input data." |
| |
|
| |
|
| | def _process_input_data(self, input_data: Dict[str, Any]): |
| | """ |
| | Allocate interpreter if any, pass input data into flow state |
| | """ |
| | |
| | if input_data["language"] == "python" and input_data["code"].startswith("!"): |
| | input_data["language"] = "shell" |
| | input_data["code"] = input_data["code"][1:] |
| |
|
| | |
| | |
| | |
| | language = input_data["language"] |
| | if language not in self._code_interpreters: |
| | self._code_interpreters[language] = create_code_interpreter(language) |
| |
|
| | |
| | self._state_update_add_language_and_code( |
| | language=language, |
| | code=input_data["code"] |
| | ) |
| |
|
| | def _call(self): |
| | output = "" |
| | try: |
| | code_interpreter = self._code_interpreters[self.flow_state["language"]] |
| | code = self.flow_state["code"] |
| | for line in code_interpreter.run(code): |
| | if "output" in line: |
| | output += "\n" + line["output"] |
| | |
| | output = truncate_output(output, self.max_output) |
| | output = output.strip() |
| | except: |
| | output = traceback.format_exc() |
| | output = output.strip() |
| | return output |
| |
|
| | def run( |
| | self, |
| | input_data: Dict[str, Any]): |
| | self._check_input(input_data) |
| | self._process_input_data(input_data) |
| | response = self._call() |
| | return {"interpreter_output": response} |
| |
|