CodeWriterFlowModule / CodeWriterFlow.py
Tachi67's picture
Update CodeWriterFlow.py
85ccf67 verified
from typing import Dict, Any
import os
from flow_modules.aiflows.ContentWriterFlowModule import ContentWriterFlow
from aiflows.base_flows import CircularFlow
from aiflows.utils import logging
log = logging.get_logger(__name__)
class CodeWriterFlow(ContentWriterFlow):
"""This flow inherits from ContentWriterFlow, it is used to write code in an interactive way.
In the subflow of the executor, we specify an InteractiveCodeGenFlow (https://huggingface.co/aiflows/InteractiveCodeGenFlowModule)
*Input Interface*:
- `goal`
*Output Interface*:
- `code`
- `result`
- `summary`
- `status`
*Configuration Parameters*:
- `name`: Name of the flow
- `description`: Description of the flow
- `_target_`: The instantiation target of the flow
- `input_interface`: The input to the flow. Inherited from ContentWriterFlow, in this case, it is `goal`.
- `output_interface`: The output of the flow.
- `subflows_config`: Configurations of subflows
- `early_exit_keys`: The keys that will trigger an early exit of the flow
- `topology`: Configures the topology of the subflows, please have a special look at the I/O interfaces of the subflows.
"""
def _on_reach_max_round(self):
"""This function is called when the maximum amount of rounds was reached before the model generated the code.
"""
self._state_update_dict({
"code": "The maximum amount of rounds was reached before the model generated the code.",
"status": "unfinished"
})
@CircularFlow.output_msg_payload_processor
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]:
"""This function is used to detect whether the code generation process is finished or not.
It is configured in the topology of the subflows, see CodeWriterFlow.yaml for more details.
:param output_payload: The output payload of the subflow
:param src_flow: The subflow that generated the output payload
:return: The output payload of the subflow
"""
command = output_payload["command"]
if command == "finish":
# ~~~ delete the temp code file ~~~
keys_to_fetch_from_state = ["temp_code_file_location", "code"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
temp_code_file_location = fetched_state["temp_code_file_location"]
code_content = fetched_state["code"]
if os.path.exists(temp_code_file_location):
os.remove(temp_code_file_location)
# ~~~ return the code content ~~~
return {
"EARLY_EXIT": True,
"code": code_content,
"result": output_payload["command_args"]["summary"],
"summary": "ExtendLibrary/CodeWriter: " + output_payload["command_args"]["summary"],
"status": "finished"
}
elif command == "manual_finish":
# ~~~ delete the temp code file ~~~
keys_to_fetch_from_state = ["temp_code_file_location"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
temp_code_file_location = fetched_state["temp_code_file_location"]
if os.path.exists(temp_code_file_location):
os.remove(temp_code_file_location)
# ~~~ return the manual quit status ~~~
return {
"EARLY_EXIT": True,
"code": "no code was generated",
"result": "CodeWriter was terminated explicitly by the user, process is unfinished",
"summary": "ExtendLibrary/CodeWriter: CodeWriter was terminated explicitly by the user, process is unfinished",
"status": "unfinished"
}
elif command == "test":
# ~~~ fetch code string from flow state ~~~
keys_to_fetch_from_state = ["code"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
# ~~~ add code content to the command args (branch input data) ~~~
code_content = fetched_state["code"]
output_payload["command_args"]["code"] = code_content
return output_payload
else:
return output_payload
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""The run function of the flow.
:param input_data: The input data of the flow
:return: The output data of the flow
"""
# ~~~ sets the input_data in the flow_state dict ~~~
self._state_update_dict(update_data=input_data)
max_rounds = self.flow_config.get("max_rounds", 1)
if max_rounds is None:
log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.")
self._sequential_run(max_rounds=max_rounds)
output = self._get_output_from_state()
self.reset(full_reset=True, recursive=True, src_flow=self)
return output