| | from copy import deepcopy |
| | from typing import Any, Dict |
| |
|
| | from flows.base_flows import SequentialFlow |
| | from flows.utils import logging |
| |
|
| | logging.set_verbosity_debug() |
| | log = logging.get_logger(__name__) |
| |
|
| | class InteractivePlanGenFlow(SequentialFlow): |
| | REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology", "memory_files"] |
| |
|
| | def __init__( |
| | self, |
| | memory_files: Dict[str, Any], |
| | **kwargs |
| | ): |
| | super().__init__(**kwargs) |
| | self.memory_files = memory_files |
| |
|
| | @classmethod |
| | def instantiate_from_config(cls, config): |
| | flow_config = deepcopy(config) |
| |
|
| | kwargs = {"flow_config": flow_config} |
| |
|
| | |
| | memory_files = flow_config["memory_files"] |
| | kwargs.update({"memory_files": memory_files}) |
| |
|
| | |
| | kwargs.update({"subflows": cls._set_up_subflows(flow_config)}) |
| |
|
| | |
| | return cls(**kwargs) |
| |
|
| | def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
| | |
| | self._state_update_dict(update_data=input_data) |
| |
|
| | |
| | self._state_update_dict(update_data={"memory_files": self.memory_files}) |
| |
|
| | max_rounds = self.flow_config.get("max_rounds", 1) |
| | if max_rounds is None: |
| | log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") |
| |
|
| | self._sequential_run(max_rounds=max_rounds) |
| |
|
| | output = self._get_output_from_state() |
| |
|
| | return output |
| |
|