|
import os |
|
import time |
|
from typing import Dict, Any |
|
import subprocess |
|
from flow_modules.aiflows.InterpreterFlowModule import InterpreterAtomicFlow |
|
|
|
|
|
class ExecuteCodeAtomicFlow(InterpreterAtomicFlow): |
|
"""This class inherits from InterpreterAtomicFlow and is used to execute code in a file. |
|
It opens up the file in VSCode and waits for the user to save and close the file. Once the file is saved and closed, it reads the code |
|
from the file and executes it. It then returns the output of the execution. |
|
|
|
*Input Interface*: |
|
- temp_code_file_location: The location of the file containing the code to be executed. |
|
- language: The language of the code to be executed. |
|
|
|
*Output Interface*: |
|
- interpreter_output: The output of the execution of the code. |
|
- code_ran: The code that was executed. |
|
|
|
*Configuration Parameters*: |
|
- `input_interface`: The input interface of the atomic flow. |
|
- `output_interface`: The output interface of the atomic flow. |
|
|
|
""" |
|
def _prepare_code(self, input_data: Dict[str, Any]): |
|
""" |
|
This method reads the code from the file and stores it in the input_data dictionary. |
|
:param input_data: The input_data dictionary. |
|
:type input_data: Dict[str, Any] |
|
:return: None |
|
""" |
|
file_location = input_data["temp_code_file_location"] |
|
start_marker = "# Code:\n" |
|
end_marker = "############" |
|
code_started = False |
|
code_str = "" |
|
with open(file_location, 'r') as file: |
|
for line in file: |
|
if line.strip() == start_marker.strip(): |
|
code_started = True |
|
continue |
|
if line.strip() == end_marker.strip(): |
|
break |
|
if code_started: |
|
code_str += line |
|
input_data["code"] = code_str |
|
|
|
def _check_input(self, input_data: Dict[str, Any]): |
|
""" |
|
This method checks if the input_data dictionary contains the required keys. |
|
:param input_data: The input_data dictionary. |
|
:type input_data: Dict[str, Any] |
|
:raises AssertionError: If the input_data dictionary does not contain the required keys. |
|
:return: None |
|
""" |
|
assert "temp_code_file_location" in input_data, "temp_code_file_location not passed to ExecuteCodeAtomicFlow" |
|
assert "language" in input_data, "language not passed to ExecuteCodeAtomicFlow" |
|
|
|
def _delete_file(self, file_location): |
|
""" |
|
This method deletes the file at the given location. |
|
:param file_location: The location of the file to be deleted. |
|
:type file_location: str |
|
:return: None |
|
""" |
|
if os.path.exists(file_location): |
|
os.remove(file_location) |
|
|
|
def _open_file_and_wait_for_upd(self, file_location): |
|
""" |
|
This method opens the file at the given location in VSCode and waits for the user to save the file. |
|
:param file_location: The location of the file to be opened. |
|
:type file_location: str |
|
:return: None |
|
""" |
|
process = subprocess.Popen(["code", "--wait", file_location]) |
|
while True: |
|
if process.poll() is not None: |
|
break |
|
time.sleep(1) |
|
|
|
def run( |
|
self, |
|
input_data: Dict[str, Any]): |
|
""" |
|
This method is called when the atomic flow is called. |
|
:param input_data: The input_data dictionary. |
|
:type input_data: Dict[str, Any] |
|
:return: The output of the execution of the code. |
|
:rtype: Dict[str, Any] |
|
""" |
|
self._check_input(input_data) |
|
file_loc = input_data["temp_code_file_location"] |
|
self._open_file_and_wait_for_upd(file_loc) |
|
self._prepare_code(input_data) |
|
self._process_input_data(input_data) |
|
execution_output = self._call() |
|
self._delete_file(file_loc) |
|
response = {"interpreter_output": execution_output, "code_ran": input_data['code']} |
|
return response |
|
|