import sys from typing import Dict, Any from aiflows.utils import logging logging.set_verbosity_debug() log = logging.get_logger(__name__) from aiflows.interfaces import KeyInterface from flow_modules.aiflows.ControllerExecutorFlowModule import ControllerExecutorFlow class AutoGPTFlow(ControllerExecutorFlow): """ This class implements a (very basic) AutoGPT flow. It is a flow that consists of multiple sub-flows that are executed circularly. It Contains the following subflows: - A Controller Flow: A Flow that controls which subflow of the Executor Flow to execute next. - A Memory Flow: A Flow used to save and retrieve messages or memories which might be useful for the Controller Flow. - A HumanFeedback Flow: A flow use to get feedback from the user/human. - A Executor Flow: A Flow that executes commands generated by the Controller Flow. Typically it's a branching flow (see BranchingFlow) and the commands are which branch to execute next. An illustration of the flow is as follows: | -------> Memory Flow -------> Controller Flow ------->| ^ | | | | v | <----- HumanFeedback Flow <------- Executor Flow <----| *Configuration Parameters*: - `name` (str): The name of the flow. Default is "AutoGPTFlow". - `description` (str): A description of the flow. Default is "An example implementation of AutoGPT with Flows." - `max_rounds` (int): The maximum number of rounds the circular flow can run for. Default is 30. - `early_exit_key` (str): The key that is used to terminate the flow early. Default is "EARLY_EXIT". - `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default: - `Controller` (Dict[str,Any]): The configuration of the Controller Flow. By default the controller flow is a ControllerAtomicFlow (see ControllerExecutorFlowModule). It's default values are defined in ControllerAtomicFlow.yaml of the ControllerExecutorFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml: - `finish` (Dict[str,Any]): The configuration of the finish command (used to terminate the flow early when the controller has accomplished its goal). - `description` (str): The description of the command. Default is "The finish command is used to terminate the flow early when the controller has accomplished its goal." - `input_args` (List[str]): The list of expected keys to run the finish command. Default is ["answer"]. - `human_message_prompt_template`(Dict[str,Any]): The prompt template used to generate the message that is shown to the user/human when the finish command is executed. Default is: - `template` (str): The template of the humand message prompt (see AutoGPTFlow.yaml for default template) - `input_variables` (List[str]): The list of variables to be included in the template. Default is ["observation", "human_feedback", "memory"]. - `ìnput_interface_initialized` (List[str]): The input interface that Controller Flow expects except for the first time in the flow. Default is ["observation", "human_feedback", "memory"]. - `Executor` (Dict[str,Any]): The configuration of the Executor Flow. By default the executor flow is a Branching Flow (see BranchingFlow). It's default values are the default values of the BranchingFlow. Fields to define: - `subflows_config` (Dict[str,Any]): A Dictionary of subflows configurations.The keys are the names of the subflows and the values are the configurations of the subflows. Each subflow is a branch of the branching flow. - `HumanFeedback` (Dict[str,Any]): The configuration of the HumanFeedback Flow. By default the human feedback flow is a HumanStandardInputFlow (see HumanStandardInputFlowModule ). It's default values are specified in the REAMDE.md of HumanStandardInputFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml: - `request_multi_line_input_flag` (bool): Flag to request multi-line input. Default is False. - `query_message_prompt_template` (Dict[str,Any]): The prompt template presented to the user/human to request input. Default is: - `template` (str): The template of the query message prompt (see AutoGPTFlow.yaml for default template) - `input_variables` (List[str]): The list of variables to be included in the template. Default is ["goal","command","command_args",observation"] - input_interface_initialized (List[str]): The input interface that HumanFeeback Flow expects except for the first time in the flow. Default is ["goal","command","command_args",observation"] - `Memory` (Dict[str,Any]): The configuration of the Memory Flow. By default the memory flow is a ChromaDBFlow (see VectorStoreFlowModule). It's default values are defined in ChromaDBFlow.yaml of the VectorStoreFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml: - `n_results`: The number of results to retrieve from the memory. Default is 2. - `topology` (List[Dict[str,Any]]): The topology of the flow which is "circular". By default, the topology is the one shown in the illustration above (the topology is also described in AutoGPTFlow.yaml). *Input Interface*: - `goal` (str): The goal of the flow. *Output Interface*: - `answer` (str): The answer of the flow. - `status` (str): The status of the flow. It can be "finished" or "unfinished". :param flow_config: The configuration of the flow. Contains the parameters described above and the parameters required by the parent class (CircularFlow). :type flow_config: Dict[str,Any] :param subflows: A list of subflows constituating the circular flow. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config). :type subflows: List[Flow] """ def __init__(self, **kwargs): super().__init__( **kwargs) self.rename_human_output_interface = KeyInterface( keys_to_rename={"human_input": "human_feedback"} ) def set_up_flow_state(self): super().set_up_flow_state() self.flow_state["early_exit_flag"] = True self.flow_state["is_first_round"] = True def memory_read_step(self): memory_read_input = self.prepare_memory_read_input() output = self.ask_subflow("Memory", memory_read_input).get_data() memory_read_output = self.prepare_memory_read_output(output) return memory_read_output def memory_write_step(self): memory_write_input = self.prepare_memory_write_input() self.tell_subflow("Memory", memory_write_input) def controller_executor_step(self, output_memory_retrieval): if self.flow_state["is_first_round"]: additional_input_ctrl_ex = { "goal": self.flow_state["goal"], } else: additional_input_ctrl_ex = { "observation": self.flow_state["observation"], "human_feedback": self.flow_state["human_feedback"], } input_ctrl_ex = {"executor_reply": {**output_memory_retrieval,**additional_input_ctrl_ex}} output_ctrl_ex = self._single_round_controller_executor(input_ctrl_ex) self.flow_state["early_exit_flag"] = output_ctrl_ex.get("EARLY_EXIT",False) if self.flow_state["early_exit_flag"]: return output_ctrl_ex controller_reply = output_ctrl_ex["controller_reply"] executor_reply = output_ctrl_ex["executor_reply"] self._state_update_dict( { "command": controller_reply["command"], "command_args": controller_reply["command_args"], "observation": executor_reply["observation"], } ) return output_ctrl_ex def human_feedback_step(self): human_feedback_input_variables = { "goal": self.flow_state["goal"], "command": self.flow_state["command"], "command_args": self.flow_state["command_args"], "observation": self.flow_state["observation"], } human_feedback = self.rename_human_output_interface( self.ask_subflow("HumanFeedback", human_feedback_input_variables).get_data() ) self.flow_state["human_feedback"] = human_feedback["human_feedback"] if human_feedback["human_feedback"].strip().lower() == "q": self.flow_state["early_exit_flag"] = True return { "EARLY_EXIT": True, "answer": "The user has chosen to exit before a final answer was generated.", "status": "unfinished", } return human_feedback def _single_round_autogpt(self): #1. Memory Retrieval output_memory_retrieval = self.memory_read_step() #2. ControllerExecutor output_ctrl_ex = self.controller_executor_step(output_memory_retrieval) if self.flow_state["early_exit_flag"]: return output_ctrl_ex #3. HumanFeedback output_human_feedback = self.human_feedback_step() if self.flow_state["early_exit_flag"]: return output_human_feedback #4. Memory Write self.memory_write_step() return {** output_ctrl_ex, **output_human_feedback} def run(self,input_data): self._state_update_dict({"goal": input_data["goal"]}) for round in range(self.flow_config["max_rounds"]): output = self._single_round_autogpt() self.flow_state["is_first_round"] = False if self.flow_state["early_exit_flag"]: return output return { "EARLY_EXIT": False, "answer": output, "status": "unfinished" } def _get_memory_key(self): """ This method returns the memory key that is used to retrieve memories from the ChromaDB model. :param flow_state: The state of the flow :type flow_state: Dict[str, Any] :return: The current context :rtype: str """ goal = self.flow_state.get("goal") last_command = self.flow_state.get("command") last_command_args = self.flow_state.get("command_args") last_observation = self.flow_state.get("observation") last_human_feedback = self.flow_state.get("human_feedback") if last_command is None: return "" assert goal is not None, goal assert last_command_args is not None, last_command_args assert last_observation is not None, last_observation current_context = \ f""" == Goal == {goal} == Command == {last_command} == Args {last_command_args} == Result {last_observation} == Human Feedback == {last_human_feedback} """ return current_context def prepare_memory_read_input(self) -> Dict[str, Any]: query = self._get_memory_key() return { "operation": "read", "content": query } def prepare_memory_read_output(self, data: Dict[str, Any]): retrieved_memories = data["retrieved"][0][1:] return {"memory": "\n".join(retrieved_memories)} def prepare_memory_write_input(self) -> Dict[str, Any]: query = self._get_memory_key() return { "operation": "write", "content": str(query) }