Spaces:
BG5
/
Running

mylist / app.py
BG5's picture
Update app.py
c158ca6 verified
raw
history blame
2.27 kB
from flask import Flask, render_template, request, redirect, url_for, send_from_directory
from flask_socketio import SocketIO, emit
from flask_cors import CORS
from flask_uploads import UploadSet, configure_uploads, ALL
import pty
import os
import subprocess
import threading
app = Flask(__name__)
CORS(app) # 启用 CORS
app.config['SECRET_KEY'] = 'your_secret_key'
# 配置文件上传
app.config['UPLOADED_FILES_DEST'] = 'uploads' # 上传文件的保存目录
files = UploadSet('files', ALL)
configure_uploads(app, files)
socketio = SocketIO(app)
master_fd, slave_fd = pty.openpty()
def run_shell():
process = subprocess.Popen(['bash'], stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, text=True)
while True:
output = os.read(master_fd, 1024).decode('utf-8')
if output:
socketio.emit('output', output)
socketio.sleep(0.1)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' in request.files:
filename = files.save(request.files['file'])
return redirect(url_for('index'))
return 'No file uploaded', 400
@app.route('/files', defaults={'path': ''})
@app.route('/files/<path:path>')
def list_files(path):
full_path = os.path.join(app.config['UPLOADED_FILES_DEST'], path)
if not os.path.exists(full_path):
return 'Directory not found', 404
files_and_dirs = os.listdir(full_path)
files_and_dirs = [f for f in files_and_dirs if not f.startswith('.')] # 排除隐藏文件
return render_template('files.html', files=files_and_dirs, current_path=path)
@app.route('/files/<path:path>/<filename>')
def download_file(path, filename):
return send_from_directory(os.path.join(app.config['UPLOADED_FILES_DEST'], path), filename)
@socketio.on('input')
def handle_input(input_data):
os.write(master_fd, (input_data + '\n').encode('utf-8'))
if __name__ == '__main__':
if not os.path.exists(app.config['UPLOADED_FILES_DEST']):
os.makedirs(app.config['UPLOADED_FILES_DEST'])
shell_thread = threading.Thread(target=run_shell)
shell_thread.daemon = True
shell_thread.start()
socketio.run(app, host='0.0.0.0', port=7860, allow_unsafe_werkzeug=True)