| from typing import Dict, Any |
|
|
| from flows.base_flows import CircularFlow |
| from flows.utils import logging |
|
|
| from .ControllerAtomicFlow import ControllerAtomicFlow |
|
|
| logging.set_verbosity_debug() |
| log = logging.get_logger(__name__) |
|
|
|
|
| class ControllerExecutorFlow(CircularFlow): |
| def _on_reach_max_round(self): |
| self._state_update_dict({ |
| "answer": "The maximum amount of rounds was reached before the model found an answer.", |
| "status": "unfinished" |
| }) |
|
|
| @CircularFlow.output_msg_payload_processor |
| def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[str, Any]: |
| command = output_payload["command"] |
| if command == "finish": |
| return { |
| "EARLY_EXIT": True, |
| "answer": output_payload["command_args"]["answer"], |
| "status": "finished" |
| } |
| else: |
| return output_payload |
|
|