|
from flask import Flask, request, jsonify, render_template, send_from_directory |
|
import os |
|
import subprocess |
|
import tempfile |
|
import shutil |
|
import sys |
|
import google.generativeai as genai |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
current_dir = temp_dir |
|
|
|
|
|
genai.configure(api_key=os.environ["GEMINI_API_KEY"]) |
|
|
|
generation_config = { |
|
"temperature": 0.9, |
|
"top_p": 1, |
|
"top_k": 40, |
|
"max_output_tokens": 1024, |
|
} |
|
|
|
model = genai.GenerativeModel( |
|
model_name="gemini-1.5-pro", |
|
generation_config=generation_config, |
|
) |
|
|
|
system_instruction = """ |
|
You are an AI-powered terminal assistant. Your role is to interpret user requests and execute appropriate terminal commands or actions. Follow these guidelines: |
|
|
|
1. Understand and execute various commands, including but not limited to: |
|
- pip install: For installing Python packages |
|
- git clone: For cloning repositories |
|
- cd: For changing directories |
|
- Python script execution: For running .py files |
|
- File creation and editing: For creating new files and adding content |
|
|
|
2. Provide the exact command(s) to be executed or action(s) to be taken, without any explanation or additional text. |
|
|
|
3. For pip install requests, always prefix the command with '!python -m' to ensure proper execution. |
|
|
|
4. For git clone requests, provide the full 'git clone' command with the repository URL. |
|
|
|
5. For cd requests, simply provide the 'cd' command with the specified directory. |
|
|
|
6. For Python script execution, provide the command to run the script (e.g., '!python script_name.py'). |
|
|
|
7. For file creation and editing requests: |
|
- If asked to create a new file, respond with: "CREATE_FILE:filename.py" |
|
- If asked to edit a file with specific content, respond with: "EDIT_FILE:filename.py:file_content" |
|
Replace 'filename.py' with the actual filename and 'file_content' with the requested content. |
|
|
|
8. If a request requires multiple commands, provide them as a list of commands, each on a new line, prefixed with 'CMD:'. |
|
|
|
9. If a request is unclear or doesn't match any known command type, respond with "Unclear request. Please provide more details." |
|
|
|
Always respond with ONLY the command(s) to be executed or action(s) to be taken, nothing else. |
|
""" |
|
|
|
chat = model.start_chat(history=[]) |
|
|
|
def execute_command(command, cwd=None): |
|
"""Executes a command and returns the output.""" |
|
process = subprocess.Popen( |
|
command, |
|
shell=True, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
cwd=cwd or current_dir |
|
) |
|
stdout, stderr = process.communicate() |
|
return stdout + stderr |
|
|
|
def read_knowledge_base(): |
|
knowledge_file = os.path.join(os.path.dirname(__file__), 'knowledge.txt') |
|
with open(knowledge_file, 'r') as file: |
|
return file.read() |
|
|
|
@app.route("/") |
|
def index(): |
|
return render_template("index.html") |
|
|
|
@app.route("/execute", methods=["POST"]) |
|
def execute_code(): |
|
global current_dir |
|
command = request.json.get("code", "").strip() |
|
if not command: |
|
return jsonify({"result": "Error: No command provided."}) |
|
|
|
try: |
|
if command.lower().startswith("ai:"): |
|
|
|
ai_command = command[3:].strip() |
|
knowledge_base = read_knowledge_base() |
|
response = chat.send_message(f"{system_instruction}\n\nKnowledge Base:\n{knowledge_base}\n\nUser request: {ai_command}") |
|
ai_result = response.text.strip() |
|
|
|
|
|
commands = [cmd.strip() for cmd in ai_result.split('CMD:') if cmd.strip()] |
|
results = [] |
|
|
|
for cmd in commands: |
|
if cmd.startswith("CREATE_FILE:"): |
|
filename = cmd.split(":")[1] |
|
filepath = os.path.join(current_dir, filename) |
|
with open(filepath, 'w') as f: |
|
pass |
|
results.append(f"Created new file: {filename}") |
|
elif cmd.startswith("EDIT_FILE:"): |
|
_, filename, content = cmd.split(":", 2) |
|
filepath = os.path.join(current_dir, filename) |
|
with open(filepath, 'w') as f: |
|
f.write(content) |
|
results.append(f"File {filename} created and edited successfully.") |
|
elif cmd.startswith("!"): |
|
results.append(execute_command(cmd[1:])) |
|
elif cmd.startswith("show files"): |
|
files = os.listdir(current_dir) |
|
results.append("Files in current directory:\n" + "\n".join(files)) |
|
elif cmd.startswith("cd "): |
|
new_dir = os.path.join(current_dir, cmd[3:]) |
|
if os.path.isdir(new_dir): |
|
current_dir = os.path.abspath(new_dir) |
|
results.append(f"Changed directory to: {current_dir}") |
|
else: |
|
results.append(f"Error: Directory not found: {new_dir}") |
|
else: |
|
results.append(execute_command(cmd)) |
|
|
|
return jsonify({"result": f"AI Executed:\n{ai_result}\n\nOutput:\n" + "\n".join(results)}) |
|
elif command == "show files": |
|
files = os.listdir(current_dir) |
|
return jsonify({"result": "Files in current directory:\n" + "\n".join(files)}) |
|
elif command == "hide files": |
|
return jsonify({"result": "Files hidden."}) |
|
elif command.startswith("new file "): |
|
filename = command[9:].strip() |
|
filepath = os.path.join(current_dir, filename) |
|
with open(filepath, 'w') as f: |
|
pass |
|
return jsonify({"result": f"Created new file: {filename}"}) |
|
elif command.startswith("edit "): |
|
filename = command[5:].strip() |
|
filepath = os.path.join(current_dir, filename) |
|
if os.path.exists(filepath): |
|
return jsonify({"result": "Enter code:", "action": "edit", "filename": filename}) |
|
else: |
|
return jsonify({"result": f"Error: File {filename} not found."}) |
|
elif command.startswith("cd "): |
|
new_dir = os.path.join(current_dir, command[3:]) |
|
if os.path.isdir(new_dir): |
|
current_dir = os.path.abspath(new_dir) |
|
return jsonify({"result": f"Changed directory to: {current_dir}"}) |
|
else: |
|
return jsonify({"result": f"Error: Directory not found: {new_dir}"}) |
|
elif command.startswith("!"): |
|
result = execute_command(command[1:]) |
|
elif command.startswith("pip install"): |
|
result = execute_command(f"{sys.executable} -m {command}") |
|
elif command.startswith("git "): |
|
result = execute_command(command) |
|
else: |
|
if command.endswith(".py"): |
|
result = execute_command(f"{sys.executable} {command}") |
|
else: |
|
result = execute_command(f"{sys.executable} -c \"{command}\"") |
|
return jsonify({"result": result}) |
|
except Exception as e: |
|
return jsonify({"result": f"Error: {str(e)}"}) |
|
|
|
@app.route("/save_file", methods=["POST"]) |
|
def save_file(): |
|
filename = request.json.get("filename") |
|
content = request.json.get("content") |
|
filepath = os.path.join(current_dir, filename) |
|
with open(filepath, 'w') as f: |
|
f.write(content) |
|
return jsonify({"result": f"File {filename} saved successfully."}) |
|
|
|
@app.route("/cleanup", methods=["POST"]) |
|
def cleanup(): |
|
global temp_dir, current_dir |
|
if os.path.exists(temp_dir): |
|
shutil.rmtree(temp_dir) |
|
temp_dir = tempfile.mkdtemp() |
|
current_dir = temp_dir |
|
return jsonify({"result": "Temporary files cleaned up."}) |
|
|
|
@app.route("/list_files", methods=["GET"]) |
|
def list_files(): |
|
files = os.listdir(current_dir) |
|
return jsonify({"files": files}) |
|
|
|
@app.route("/download/<path:filename>", methods=["GET"]) |
|
def download_file(filename): |
|
return send_from_directory(current_dir, filename, as_attachment=True) |
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=7860) |
|
|
|
|