from typing import Dict, Any import os from flow_modules.aiflows.ContentWriterFlowModule import ContentWriterFlow from aiflows.base_flows import CircularFlow from aiflows.utils import logging log = logging.get_logger(f"aiflows.{__name__}") class CodeWriterFlow(ContentWriterFlow): """This flow inherits from ContentWriterFlow, it is used to write code in an interactive way. In the subflow of the executor, we specify an InteractiveCodeGenFlow (https://huggingface.co/aiflows/InteractiveCodeGenFlowModule) *Input Interface*: - `goal` *Output Interface*: - `code` - `result` - `summary` - `status` *Configuration Parameters*: - `name`: Name of the flow - `description`: Description of the flow - `_target_`: The instantiation target of the flow - `input_interface`: The input to the flow. Inherited from ContentWriterFlow, in this case, it is `goal`. - `output_interface`: The output of the flow. - `subflows_config`: Configurations of subflows - `early_exit_keys`: The keys that will trigger an early exit of the flow - `topology`: Configures the topology of the subflows, please have a special look at the I/O interfaces of the subflows. """ def _on_reach_max_round(self): """This function is called when the maximum amount of rounds was reached before the model generated the code. """ self._state_update_dict({ "code": "The maximum amount of rounds was reached before the model generated the code.", "status": "unfinished" }) @CircularFlow.output_msg_payload_processor def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: """This function is used to detect whether the code generation process is finished or not. It is configured in the topology of the subflows, see CodeWriterFlow.yaml for more details. :param output_payload: The output payload of the subflow :param src_flow: The subflow that generated the output payload :return: The output payload of the subflow """ command = output_payload["command"] if command == "finish": # ~~~ delete the temp code file ~~~ keys_to_fetch_from_state = ["temp_code_file_location", "code"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) temp_code_file_location = fetched_state["temp_code_file_location"] code_content = fetched_state["code"] if os.path.exists(temp_code_file_location): os.remove(temp_code_file_location) # ~~~ return the code content ~~~ return { "EARLY_EXIT": True, "code": code_content, "result": output_payload["command_args"]["summary"], "summary": "ExtendLibrary/CodeWriter: " + output_payload["command_args"]["summary"], "status": "finished" } elif command == "manual_finish": # ~~~ delete the temp code file ~~~ keys_to_fetch_from_state = ["temp_code_file_location"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) temp_code_file_location = fetched_state["temp_code_file_location"] if os.path.exists(temp_code_file_location): os.remove(temp_code_file_location) # ~~~ return the manual quit status ~~~ return { "EARLY_EXIT": True, "code": "no code was generated", "result": "CodeWriter was terminated explicitly by the user, process is unfinished", "summary": "ExtendLibrary/CodeWriter: CodeWriter was terminated explicitly by the user, process is unfinished", "status": "unfinished" } elif command == "test": # ~~~ fetch code string from flow state ~~~ keys_to_fetch_from_state = ["code"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) # ~~~ add code content to the command args (branch input data) ~~~ code_content = fetched_state["code"] output_payload["command_args"]["code"] = code_content return output_payload else: return output_payload def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """The run function of the flow. :param input_data: The input data of the flow :return: The output data of the flow """ # ~~~ sets the input_data in the flow_state dict ~~~ self._state_update_dict(update_data=input_data) max_rounds = self.flow_config.get("max_rounds", 1) if max_rounds is None: log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") self._sequential_run(max_rounds=max_rounds) output = self._get_output_from_state() self.reset(full_reset=True, recursive=True, src_flow=self) return output