File size: 5,125 Bytes
3e0718c
 
 
 
 
 
37dc901
85ccf67
37dc901
3e0718c
 
 
1206897
3e0718c
 
 
 
 
 
 
 
 
37dc901
 
 
 
 
 
 
 
 
 
 
3e0718c
37dc901
3e0718c
37dc901
 
3e0718c
 
 
 
 
 
 
37dc901
 
 
 
 
 
3e0718c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37dc901
 
 
 
3e0718c
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from typing import Dict, Any
import os

from flow_modules.aiflows.ContentWriterFlowModule import ContentWriterFlow
from aiflows.base_flows import CircularFlow

from aiflows.utils import logging
log = logging.get_logger(__name__)


class CodeWriterFlow(ContentWriterFlow):
    """This flow inherits from ContentWriterFlow, it is used to write code in an interactive way.
    In the subflow of the executor, we specify an InteractiveCodeGenFlow (https://huggingface.co/aiflows/InteractiveCodeGenFlowModule)

    *Input Interface*:
    - `goal`

    *Output Interface*:
    - `code`
    - `result`
    - `summary`
    - `status`

    *Configuration Parameters*:
    - `name`: Name of the flow
    - `description`: Description of the flow
    - `_target_`: The instantiation target of the flow
    - `input_interface`: The input to the flow. Inherited from ContentWriterFlow, in this case, it is `goal`.
    - `output_interface`: The output of the flow.
    - `subflows_config`: Configurations of subflows
    - `early_exit_keys`: The keys that will trigger an early exit of the flow
    - `topology`: Configures the topology of the subflows, please have a special look at the I/O interfaces of the subflows.

    """

    def _on_reach_max_round(self):
        """This function is called when the maximum amount of rounds was reached before the model generated the code.
        """
        self._state_update_dict({
            "code": "The maximum amount of rounds was reached before the model generated the code.",
            "status": "unfinished"
        })

    @CircularFlow.output_msg_payload_processor
    def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]:
        """This function is used to detect whether the code generation process is finished or not.
        It is configured in the topology of the subflows, see CodeWriterFlow.yaml for more details.
        :param output_payload: The output payload of the subflow
        :param src_flow: The subflow that generated the output payload
        :return: The output payload of the subflow
        """
        command = output_payload["command"]
        if command == "finish":
            # ~~~ delete the temp code file ~~~
            keys_to_fetch_from_state = ["temp_code_file_location", "code"]
            fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
            temp_code_file_location = fetched_state["temp_code_file_location"]
            code_content = fetched_state["code"]
            if os.path.exists(temp_code_file_location):
                os.remove(temp_code_file_location)
            # ~~~ return the code content ~~~
            return {
                "EARLY_EXIT": True,
                "code": code_content,
                "result": output_payload["command_args"]["summary"],
                "summary": "ExtendLibrary/CodeWriter: " + output_payload["command_args"]["summary"],
                "status": "finished"
            }
        elif command == "manual_finish":
            # ~~~ delete the temp code file ~~~
            keys_to_fetch_from_state = ["temp_code_file_location"]
            fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
            temp_code_file_location = fetched_state["temp_code_file_location"]
            if os.path.exists(temp_code_file_location):
                os.remove(temp_code_file_location)
            # ~~~ return the manual quit status ~~~
            return {
                "EARLY_EXIT": True,
                "code": "no code was generated",
                "result": "CodeWriter was terminated explicitly by the user, process is unfinished",
                "summary": "ExtendLibrary/CodeWriter: CodeWriter was terminated explicitly by the user, process is unfinished",
                "status": "unfinished"
            }
        elif command == "test":
            # ~~~ fetch code string from flow state ~~~
            keys_to_fetch_from_state = ["code"]
            fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)

            # ~~~ add code content to the command args (branch input data) ~~~
            code_content = fetched_state["code"]
            output_payload["command_args"]["code"] = code_content

            return output_payload
        else:
            return output_payload

    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """The run function of the flow.
        :param input_data: The input data of the flow
        :return: The output data of the flow
        """
        # ~~~ sets the input_data in the flow_state dict ~~~
        self._state_update_dict(update_data=input_data)

        max_rounds = self.flow_config.get("max_rounds", 1)
        if max_rounds is None:
            log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.")

        self._sequential_run(max_rounds=max_rounds)

        output = self._get_output_from_state()

        self.reset(full_reset=True, recursive=True, src_flow=self)

        return output