from flow_modules.aiflows.HumanStandardInputFlowModule import HumanStandardInputFlow from typing import Dict, Any from aiflows.messages import UpdateMessage_Generic from aiflows.utils import logging log = logging.get_logger(f"aiflows.{__name__}") class RunCodeAskUserFlow(HumanStandardInputFlow): """Refer to: https://huggingface.co/aiflows/ExtendLibraryFlowModule/blob/main/ExtLibAskUserFlow.py *Input Interface*: - `question`: The question asked *Expected Behaviour*: - The question is displayed, and the user gives feedback by input string. *Output Interface*: - `result`: The input of the user. - `summary`: The summary that will be written by the caller. *Configuration Parameters*: - `query_message_prompt_template`: The template of the message that is displayed to the user. - `request_multi_line_input`: Whether the user should be able to input multiple lines. Default: False. - `end_of_input_string`: The string that the user can input to indicate that he/she is done with the input. Default: EOI """ def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """Run the flow module. :param input_data: The input data. :type input_data: Dict[str, Any] :return: The output data. :rtype: Dict[str, Any] """ query_message = self._get_message(self.query_message_prompt_template, input_data) state_update_message = UpdateMessage_Generic( created_by=self.flow_config['name'], updated_flow=self.flow_config["name"], data={"query_message": query_message}, ) self._log_message(state_update_message) log.info(query_message) human_input = self._read_input() response = {} result = f""" The following code was ran: {input_data['code_ran']} The execution result is: {input_data['interpreter_output']} The user's feedback is: {human_input} """ response["result"] = result response["summary"] = f"Coder/run_code: \n" + result return response