|
import gradio as gr |
|
import os |
|
from pathlib import Path |
|
import subprocess |
|
import asyncio |
|
import threading |
|
import signal |
|
import sys |
|
import re |
|
import shutil |
|
from typing import Optional, List, Tuple |
|
from i18n.i18n import I18nAuto |
|
from header import badges, description |
|
i18n = I18nAuto() |
|
|
|
|
|
current_process: Optional[subprocess.Popen] = None |
|
|
|
|
|
os.environ["PYTHONIOENCODING"] = "utf-8" |
|
|
|
|
|
if sys.platform == "win32": |
|
os.system('chcp 65001') |
|
|
|
|
|
sys.stdout.reconfigure(encoding='utf-8') |
|
sys.stderr.reconfigure(encoding='utf-8') |
|
|
|
|
|
def ensure_directories(): |
|
directories = ['uploaded_videos', 'softsubs_output', 'hardsubs_output'] |
|
for directory in directories: |
|
os.makedirs(os.path.join(os.getcwd(), directory), exist_ok=True) |
|
return os.path.join(os.getcwd(), 'uploaded_videos') |
|
|
|
def save_uploaded_files(files): |
|
"""Salva os arquivos enviados na pasta uploaded_videos""" |
|
if not files: |
|
return "No files uploaded" |
|
|
|
upload_dir = ensure_directories() |
|
saved_files = [] |
|
|
|
for file in files: |
|
filename = os.path.basename(file.name) |
|
destination = os.path.join(upload_dir, filename) |
|
shutil.copy2(file.name, destination) |
|
saved_files.append(filename) |
|
|
|
return f"Uploaded files: {', '.join(saved_files)}" |
|
|
|
def get_output_files() -> Tuple[List[str], List[str]]: |
|
"""Retorna listas de arquivos nas pastas de saída""" |
|
softsubs_dir = os.path.join(os.getcwd(), 'softsubs_output') |
|
hardsubs_dir = os.path.join(os.getcwd(), 'hardsubs_output') |
|
|
|
softsubs_files = [os.path.join(softsubs_dir, f) for f in os.listdir(softsubs_dir) if os.path.isfile(os.path.join(softsubs_dir, f))] |
|
hardsubs_files = [os.path.join(hardsubs_dir, f) for f in os.listdir(hardsubs_dir) if os.path.isfile(os.path.join(hardsubs_dir, f))] |
|
|
|
return softsubs_files, hardsubs_files |
|
|
|
def clean_ansi(text: str) -> str: |
|
"""Remove códigos ANSI e limpa o texto para exibição""" |
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') |
|
return ansi_escape.sub('', text) |
|
|
|
def process_output(line: str, progress: gr.Progress) -> str: |
|
"""Processa uma linha de saída e atualiza o progresso""" |
|
clean_line = clean_ansi(line.strip()) |
|
|
|
if "%" in clean_line: |
|
try: |
|
progress_match = re.search(r'(\d+\.?\d*)%', clean_line) |
|
if progress_match: |
|
progress_value = float(progress_match.group(1)) / 100 |
|
progress(progress_value, desc=clean_line) |
|
except ValueError: |
|
pass |
|
|
|
return clean_line |
|
|
|
def stop_process(): |
|
global current_process |
|
if current_process: |
|
try: |
|
if os.name == 'nt': |
|
current_process.terminate() |
|
else: |
|
os.killpg(os.getpgid(current_process.pid), signal.SIGTERM) |
|
current_process.wait(timeout=5) |
|
except subprocess.TimeoutExpired: |
|
if os.name == 'nt': |
|
current_process.kill() |
|
else: |
|
os.killpg(os.getpgid(current_process.pid), signal.SIGKILL) |
|
current_process = None |
|
return "Process stopped by user" |
|
return "No process running" |
|
|
|
def run_legen( |
|
transcription_engine, |
|
transcription_model, |
|
compute_type, |
|
device, |
|
batch_size, |
|
input_lang, |
|
translate_lang, |
|
video_codec, |
|
audio_codec, |
|
normalize, |
|
overwrite, |
|
copy_files, |
|
disable_srt, |
|
disable_softsubs, |
|
disable_hardsubs, |
|
progress=gr.Progress() |
|
): |
|
global current_process |
|
|
|
input_dir = ensure_directories() |
|
if not os.path.exists(input_dir) or not os.listdir(input_dir): |
|
return "No files found in uploaded_videos directory" |
|
|
|
if not os.path.exists("legen.py"): |
|
return "legen.py not found in current directory" |
|
|
|
cmd = ["python", "legen.py", "-i", input_dir] |
|
|
|
|
|
if normalize: cmd.append("--norm") |
|
if overwrite: cmd.append("--overwrite") |
|
if copy_files: cmd.append("--copy_files") |
|
if disable_srt: cmd.append("--disable_srt") |
|
if disable_softsubs: cmd.append("--disable_softsubs") |
|
if disable_hardsubs: cmd.append("--disable_hardsubs") |
|
|
|
|
|
cmd.extend(["-ts:e", transcription_engine]) |
|
cmd.extend(["-ts:m", transcription_model]) |
|
cmd.extend(["-ts:d", device]) |
|
cmd.extend(["-ts:c", compute_type]) |
|
cmd.extend(["-ts:b", str(batch_size)]) |
|
|
|
if translate_lang != "none": |
|
cmd.extend(["--translate", translate_lang]) |
|
if input_lang != "auto": |
|
cmd.extend(["--input_lang", input_lang]) |
|
|
|
|
|
cmd.extend(["-c:v", video_codec]) |
|
cmd.extend(["-c:a", audio_codec]) |
|
|
|
|
|
cmd.extend(["-o:s", os.path.join(os.getcwd(), "softsubs_output")]) |
|
cmd.extend(["-o:h", os.path.join(os.getcwd(), "hardsubs_output")]) |
|
|
|
try: |
|
startupinfo = None |
|
if os.name == 'nt': |
|
startupinfo = subprocess.STARTUPINFO() |
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW |
|
|
|
current_process = subprocess.Popen( |
|
cmd, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT, |
|
text=True, |
|
bufsize=1, |
|
universal_newlines=True, |
|
startupinfo=startupinfo, |
|
encoding='utf-8', |
|
errors='replace', |
|
preexec_fn=None if os.name == 'nt' else os.setsid |
|
) |
|
|
|
output_lines: List[str] = [] |
|
last_progress_update = 0 |
|
|
|
while True: |
|
line = current_process.stdout.readline() |
|
|
|
if not line and current_process.poll() is not None: |
|
break |
|
|
|
if line: |
|
try: |
|
clean_line = process_output(line, progress) |
|
output_lines.append(clean_line) |
|
|
|
if len(output_lines) - last_progress_update >= 5: |
|
yield "\n".join(output_lines) |
|
last_progress_update = len(output_lines) |
|
|
|
except Exception as e: |
|
output_lines.append(f"Error processing output: {str(e)}") |
|
|
|
if current_process.poll() == 0: |
|
final_output = "Processing completed successfully!\n\n" + "\n".join(output_lines) |
|
else: |
|
final_output = f"Process ended with error code {current_process.poll()}\n\n" + "\n".join(output_lines) |
|
|
|
current_process = None |
|
return final_output |
|
|
|
except Exception as e: |
|
current_process = None |
|
return f"Error: {str(e)}" |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown(badges) |
|
gr.Markdown(description) |
|
title = "LeGen" |
|
|
|
ensure_directories() |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
|
|
with gr.Group(): |
|
upload_files = gr.Files( |
|
label=i18n("Upload Videos"), |
|
file_types=["video"], |
|
file_count="multiple" |
|
) |
|
upload_button = gr.Button(i18n("Upload to processing directory")) |
|
|
|
|
|
with gr.Group(): |
|
transcription_engine = gr.Dropdown( |
|
choices=["whisperx", "whisper"], |
|
value="whisperx", |
|
label=i18n("Transcription Engine") |
|
) |
|
with gr.Row(): |
|
transcription_model = gr.Dropdown( |
|
choices=["tiny", "base", "small", "medium", "large", "large-v1", "large-v2", "large-v3", "large-v3-turbo"], |
|
value="large-v3", |
|
label=i18n("Model") |
|
) |
|
compute_type = gr.Dropdown( |
|
choices=["auto", "int8", "float16", "float32"], |
|
value="auto", |
|
label=i18n("Compute Type") |
|
) |
|
with gr.Row(): |
|
device = gr.Dropdown( |
|
choices=["auto", "cpu", "cuda"], |
|
value="auto", |
|
label=i18n("Device") |
|
) |
|
batch_size = gr.Number( |
|
value=4, |
|
label=i18n("Batch Size"), |
|
precision=0 |
|
) |
|
with gr.Row(): |
|
input_lang = gr.Dropdown( |
|
choices=["auto", "en", "es", "pt", "fr", "de", "it", "ja", "ko", "zh"], |
|
value="auto", |
|
label=i18n("Input Language") |
|
) |
|
translate_lang = gr.Dropdown( |
|
choices=["none", "en", "es", "pt", "fr", "de", "it", "ja", "ko", "zh"], |
|
value="none", |
|
label=i18n("Translate to") |
|
) |
|
|
|
with gr.Column(scale=1): |
|
|
|
with gr.Group(): |
|
with gr.Row(): |
|
video_codec = gr.Dropdown( |
|
choices=["h264", "libx264", "h264_vaapi", "h264_nvenc", "hevc", "libx265", "hevc_vaapi"], |
|
value="h264", |
|
label=i18n("Video Codec") |
|
) |
|
audio_codec = gr.Dropdown( |
|
choices=["aac", "libopus", "mp3", "vorbis"], |
|
value="aac", |
|
label=i18n("Audio Codec") |
|
) |
|
|
|
|
|
with gr.Group(): |
|
with gr.Row(): |
|
normalize = gr.Checkbox(label=i18n("Normalize folder times"), value=False) |
|
overwrite = gr.Checkbox(label=i18n("Overwrite existing files"), value=False) |
|
copy_files = gr.Checkbox(label=i18n("Copy non-video files"), value=False) |
|
with gr.Row(): |
|
disable_srt = gr.Checkbox(label=i18n("Disable SRT generation"), value=False) |
|
disable_softsubs = gr.Checkbox(label=i18n("Disable softsubs"), value=False) |
|
disable_hardsubs = gr.Checkbox(label=i18n("Disable hardsubs"), value=False) |
|
|
|
|
|
with gr.Group(): |
|
softsubs_files = gr.Files(label="Softsubs Output Files", file_count="multiple", interactive=False) |
|
hardsubs_files = gr.Files(label="Hardsubs Output Files", file_count="multiple", interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
run_btn = gr.Button(i18n("Run LeGen"), variant="primary") |
|
stop_btn = gr.Button(i18n("Stop"), variant="stop") |
|
output = gr.Textbox(label=i18n("Output"), lines=2, interactive=False, elem_id="output") |
|
|
|
|
|
upload_button.click( |
|
fn=save_uploaded_files, |
|
inputs=[upload_files], |
|
outputs=[output] |
|
) |
|
|
|
def update_output_files(): |
|
softsubs_files, hardsubs_files = get_output_files() |
|
return softsubs_files, hardsubs_files |
|
|
|
|
|
run_btn.click( |
|
fn=run_legen, |
|
inputs=[ |
|
transcription_engine, |
|
transcription_model, |
|
compute_type, |
|
device, |
|
batch_size, |
|
input_lang, |
|
translate_lang, |
|
video_codec, |
|
audio_codec, |
|
normalize, |
|
overwrite, |
|
copy_files, |
|
disable_srt, |
|
disable_softsubs, |
|
disable_hardsubs |
|
], |
|
outputs=output |
|
).then( |
|
fn=update_output_files, |
|
inputs=[], |
|
outputs=[softsubs_files, hardsubs_files] |
|
) |
|
|
|
stop_btn.click( |
|
fn=stop_process, |
|
inputs=[], |
|
outputs=output |
|
) |
|
|
|
gr.Markdown(""" |
|
<center>WebUI Desenvolvida por Rafa.Godoy</center> |
|
<center>Agradecimentos ao MatheusBach por desenvolver o LeGen</center> |
|
""") |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |