legen / app_hf.py
RafaG's picture
Update app_hf.py
c512a5e verified
import gradio as gr
import os
from pathlib import Path
import subprocess
import asyncio
import threading
import signal
import sys
import re
import shutil
from typing import Optional, List, Tuple
from i18n.i18n import I18nAuto
from header import badges, description
i18n = I18nAuto()
# Variável global para armazenar o processo atual
current_process: Optional[subprocess.Popen] = None
# Força o uso de UTF-8 para o Python
os.environ["PYTHONIOENCODING"] = "utf-8"
# Para garantir que a codificação esteja correta no terminal
if sys.platform == "win32":
os.system('chcp 65001')
# Redefine a configuração de codificação da saída padrão
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
# Criar diretórios necessários se não existirem
def ensure_directories():
directories = ['uploaded_videos', 'softsubs_output', 'hardsubs_output']
for directory in directories:
os.makedirs(os.path.join(os.getcwd(), directory), exist_ok=True)
return os.path.join(os.getcwd(), 'uploaded_videos')
def save_uploaded_files(files):
"""Salva os arquivos enviados na pasta uploaded_videos"""
if not files:
return "No files uploaded"
upload_dir = ensure_directories()
saved_files = []
for file in files:
filename = os.path.basename(file.name)
destination = os.path.join(upload_dir, filename)
shutil.copy2(file.name, destination)
saved_files.append(filename)
return f"Uploaded files: {', '.join(saved_files)}"
def get_output_files() -> Tuple[List[str], List[str]]:
"""Retorna listas de arquivos nas pastas de saída"""
softsubs_dir = os.path.join(os.getcwd(), 'softsubs_output')
hardsubs_dir = os.path.join(os.getcwd(), 'hardsubs_output')
softsubs_files = [os.path.join(softsubs_dir, f) for f in os.listdir(softsubs_dir) if os.path.isfile(os.path.join(softsubs_dir, f))]
hardsubs_files = [os.path.join(hardsubs_dir, f) for f in os.listdir(hardsubs_dir) if os.path.isfile(os.path.join(hardsubs_dir, f))]
return softsubs_files, hardsubs_files
def clean_ansi(text: str) -> str:
"""Remove códigos ANSI e limpa o texto para exibição"""
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
def process_output(line: str, progress: gr.Progress) -> str:
"""Processa uma linha de saída e atualiza o progresso"""
clean_line = clean_ansi(line.strip())
if "%" in clean_line:
try:
progress_match = re.search(r'(\d+\.?\d*)%', clean_line)
if progress_match:
progress_value = float(progress_match.group(1)) / 100
progress(progress_value, desc=clean_line)
except ValueError:
pass
return clean_line
def stop_process():
global current_process
if current_process:
try:
if os.name == 'nt':
current_process.terminate()
else:
os.killpg(os.getpgid(current_process.pid), signal.SIGTERM)
current_process.wait(timeout=5)
except subprocess.TimeoutExpired:
if os.name == 'nt':
current_process.kill()
else:
os.killpg(os.getpgid(current_process.pid), signal.SIGKILL)
current_process = None
return "Process stopped by user"
return "No process running"
def run_legen(
transcription_engine,
transcription_model,
compute_type,
device,
batch_size,
input_lang,
translate_lang,
video_codec,
audio_codec,
normalize,
overwrite,
copy_files,
disable_srt,
disable_softsubs,
disable_hardsubs,
progress=gr.Progress()
):
global current_process
input_dir = ensure_directories()
if not os.path.exists(input_dir) or not os.listdir(input_dir):
return "No files found in uploaded_videos directory"
if not os.path.exists("legen.py"):
return "legen.py not found in current directory"
cmd = ["python", "legen.py", "-i", input_dir]
# Adiciona as flags baseadas nos checkboxes
if normalize: cmd.append("--norm")
if overwrite: cmd.append("--overwrite")
if copy_files: cmd.append("--copy_files")
if disable_srt: cmd.append("--disable_srt")
if disable_softsubs: cmd.append("--disable_softsubs")
if disable_hardsubs: cmd.append("--disable_hardsubs")
# Adiciona configurações de transcrição
cmd.extend(["-ts:e", transcription_engine])
cmd.extend(["-ts:m", transcription_model])
cmd.extend(["-ts:d", device])
cmd.extend(["-ts:c", compute_type])
cmd.extend(["-ts:b", str(batch_size)])
if translate_lang != "none":
cmd.extend(["--translate", translate_lang])
if input_lang != "auto":
cmd.extend(["--input_lang", input_lang])
# Adiciona configurações de codec
cmd.extend(["-c:v", video_codec])
cmd.extend(["-c:a", audio_codec])
# Adiciona caminhos de saída fixos
cmd.extend(["-o:s", os.path.join(os.getcwd(), "softsubs_output")])
cmd.extend(["-o:h", os.path.join(os.getcwd(), "hardsubs_output")])
try:
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
current_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
startupinfo=startupinfo,
encoding='utf-8',
errors='replace',
preexec_fn=None if os.name == 'nt' else os.setsid
)
output_lines: List[str] = []
last_progress_update = 0
while True:
line = current_process.stdout.readline()
if not line and current_process.poll() is not None:
break
if line:
try:
clean_line = process_output(line, progress)
output_lines.append(clean_line)
if len(output_lines) - last_progress_update >= 5:
yield "\n".join(output_lines)
last_progress_update = len(output_lines)
except Exception as e:
output_lines.append(f"Error processing output: {str(e)}")
if current_process.poll() == 0:
final_output = "Processing completed successfully!\n\n" + "\n".join(output_lines)
else:
final_output = f"Process ended with error code {current_process.poll()}\n\n" + "\n".join(output_lines)
current_process = None
return final_output
except Exception as e:
current_process = None
return f"Error: {str(e)}"
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(badges)
gr.Markdown(description)
title = "LeGen"
ensure_directories() # Garante que os diretórios existam ao iniciar
with gr.Row():
with gr.Column(scale=1):
# Upload Section
with gr.Group():
upload_files = gr.Files(
label=i18n("Upload Videos"),
file_types=["video"],
file_count="multiple"
)
upload_button = gr.Button(i18n("Upload to processing directory"))
# Transcription Settings
with gr.Group():
transcription_engine = gr.Dropdown(
choices=["whisperx", "whisper"],
value="whisperx",
label=i18n("Transcription Engine")
)
with gr.Row():
transcription_model = gr.Dropdown(
choices=["tiny", "base", "small", "medium", "large", "large-v1", "large-v2", "large-v3", "large-v3-turbo"],
value="large-v3",
label=i18n("Model")
)
compute_type = gr.Dropdown(
choices=["auto", "int8", "float16", "float32"],
value="auto",
label=i18n("Compute Type")
)
with gr.Row():
device = gr.Dropdown(
choices=["auto", "cpu", "cuda"],
value="auto",
label=i18n("Device")
)
batch_size = gr.Number(
value=4,
label=i18n("Batch Size"),
precision=0
)
with gr.Row():
input_lang = gr.Dropdown(
choices=["auto", "en", "es", "pt", "fr", "de", "it", "ja", "ko", "zh"],
value="auto",
label=i18n("Input Language")
)
translate_lang = gr.Dropdown(
choices=["none", "en", "es", "pt", "fr", "de", "it", "ja", "ko", "zh"],
value="none",
label=i18n("Translate to")
)
with gr.Column(scale=1):
# Output Settings
with gr.Group():
with gr.Row():
video_codec = gr.Dropdown(
choices=["h264", "libx264", "h264_vaapi", "h264_nvenc", "hevc", "libx265", "hevc_vaapi"],
value="h264",
label=i18n("Video Codec")
)
audio_codec = gr.Dropdown(
choices=["aac", "libopus", "mp3", "vorbis"],
value="aac",
label=i18n("Audio Codec")
)
# Options
with gr.Group():
with gr.Row():
normalize = gr.Checkbox(label=i18n("Normalize folder times"), value=False)
overwrite = gr.Checkbox(label=i18n("Overwrite existing files"), value=False)
copy_files = gr.Checkbox(label=i18n("Copy non-video files"), value=False)
with gr.Row():
disable_srt = gr.Checkbox(label=i18n("Disable SRT generation"), value=False)
disable_softsubs = gr.Checkbox(label=i18n("Disable softsubs"), value=False)
disable_hardsubs = gr.Checkbox(label=i18n("Disable hardsubs"), value=False)
# Output Files Display
with gr.Group():
softsubs_files = gr.Files(label="Softsubs Output Files", file_count="multiple", interactive=False)
hardsubs_files = gr.Files(label="Hardsubs Output Files", file_count="multiple", interactive=False)
# Run Button, Stop Button and Output
with gr.Row():
with gr.Column(scale=1):
run_btn = gr.Button(i18n("Run LeGen"), variant="primary")
stop_btn = gr.Button(i18n("Stop"), variant="stop")
output = gr.Textbox(label=i18n("Output"), lines=2, interactive=False, elem_id="output")
# Event handlers
upload_button.click(
fn=save_uploaded_files,
inputs=[upload_files],
outputs=[output]
)
def update_output_files():
softsubs_files, hardsubs_files = get_output_files()
return softsubs_files, hardsubs_files
# Connect the run button to the processing function
run_btn.click(
fn=run_legen,
inputs=[
transcription_engine,
transcription_model,
compute_type,
device,
batch_size,
input_lang,
translate_lang,
video_codec,
audio_codec,
normalize,
overwrite,
copy_files,
disable_srt,
disable_softsubs,
disable_hardsubs
],
outputs=output
).then(
fn=update_output_files,
inputs=[],
outputs=[softsubs_files, hardsubs_files]
)
stop_btn.click(
fn=stop_process,
inputs=[],
outputs=output
)
gr.Markdown("""
<center>WebUI Desenvolvida por Rafa.Godoy</center>
<center>Agradecimentos ao MatheusBach por desenvolver o LeGen</center>
""")
if __name__ == "__main__":
demo.launch()