| import sys |
| from typing import Dict, Any |
|
|
| from flows.base_flows import CircularFlow |
| from flows.utils import logging |
|
|
| logging.set_verbosity_debug() |
|
|
| log = logging.get_logger(__name__) |
|
|
| from flow_modules.aiflows.ControllerExecutorFlowModule import ControllerAtomicFlow |
| from flow_modules.aiflows.VectorStoreFlowModule import ChromaDBFlow |
|
|
| class AutoGPTFlow(CircularFlow): |
| def _on_reach_max_round(self): |
| self._state_update_dict({ |
| "answer": "The maximum amount of rounds was reached before the model found an answer.", |
| "status": "unfinished" |
| }) |
|
|
| @staticmethod |
| def _get_memory_key(flow_state): |
| goal = flow_state.get("goal") |
| last_command = flow_state.get("command") |
| last_command_args = flow_state.get("command_args") |
| last_observation = flow_state.get("observation") |
| last_human_feedback = flow_state.get("human_feedback") |
|
|
| if last_command is None: |
| return "" |
|
|
| assert goal is not None, goal |
| assert last_command_args is not None, last_command_args |
| assert last_observation is not None, last_observation |
|
|
| current_context = \ |
| f""" |
| == Goal == |
| {goal} |
| |
| == Command == |
| {last_command} |
| == Args |
| {last_command_args} |
| == Result |
| {last_observation} |
| |
| == Human Feedback == |
| {last_human_feedback} |
| """ |
|
|
| return current_context |
|
|
| @CircularFlow.input_msg_payload_builder |
| def prepare_memory_read_input(self, flow_state: Dict[str, Any], dst_flow: ChromaDBFlow) -> Dict[str, Any]: |
| """A (very) basic example implementation of how the memory retrieval could be constructed.""" |
| query = self._get_memory_key(flow_state) |
|
|
| return { |
| "operation": "read", |
| "content": query |
| } |
|
|
| @CircularFlow.output_msg_payload_processor |
| def prepare_memory_read_output(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow): |
| retrieved_memories = output_payload["retrieved"][0][1:] |
| return {"memory": "\n".join(retrieved_memories)} |
|
|
| @CircularFlow.input_msg_payload_builder |
| def prepare_memory_write_input(self, flow_state: Dict[str, Any], dst_flow: ChromaDBFlow) -> Dict[str, Any]: |
| """An (very) example (not optimized) implementation of how the memory population could be implemented.""" |
| query = self._get_memory_key(flow_state) |
|
|
| return { |
| "operation": "write", |
| "content": str(query) |
| } |
|
|
| @CircularFlow.output_msg_payload_processor |
| def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[ |
| str, Any]: |
| command = output_payload["command"] |
| if command == "finish": |
| return { |
| "EARLY_EXIT": True, |
| "answer": output_payload["command_args"]["answer"], |
| "status": "finished" |
| } |
| else: |
| return output_payload |
|
|
| @CircularFlow.output_msg_payload_processor |
| def detect_finish_in_human_input(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[ |
| str, Any]: |
| human_feedback = output_payload["human_input"] |
| if human_feedback.strip().lower() == "q": |
| return { |
| "EARLY_EXIT": True, |
| "answer": "The user has chosen to exit before a final answer was generated.", |
| "status": "unfinished" |
| } |
|
|
| return {"human_feedback": human_feedback} |
|
|