from flow_modules.aiflows.HumanStandardInputFlowModule import HumanStandardInputFlow from typing import Dict, Any from aiflows.messages import UpdateMessage_Generic from aiflows.utils import logging log = logging.get_logger(f"aiflows.{__name__}") class CodeWriterAskUserFlow(HumanStandardInputFlow): """This class is used to ask for user feedback whenever the controller is unsure of something, or need confirmation, etc. *Input Interface*: - `question`: The question asked by the controller *Expected Behaviour*: - The question is displayed, and the user gives feedback by inputing string. *Output Interface*: - `feedback`: The input of the user. - `code`: No code was written. """ def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """Run the flow. :param input_data: The input data. :return: The output data. """ query_message = self._get_message(self.query_message_prompt_template, input_data) state_update_message = UpdateMessage_Generic( created_by=self.flow_config['name'], updated_flow=self.flow_config["name"], data={"query_message": query_message}, ) self._log_message(state_update_message) log.info(query_message) human_input = self._read_input() response = {} response["feedback"] = human_input response["code"] = "no code was written" return response