import gradio as gr import os from pathlib import Path import subprocess import asyncio import threading import signal import sys import re import shutil from typing import Optional, List, Tuple from i18n.i18n import I18nAuto from header import badges, description i18n = I18nAuto() # Variável global para armazenar o processo atual current_process: Optional[subprocess.Popen] = None # Força o uso de UTF-8 para o Python os.environ["PYTHONIOENCODING"] = "utf-8" # Para garantir que a codificação esteja correta no terminal if sys.platform == "win32": os.system('chcp 65001') # Redefine a configuração de codificação da saída padrão sys.stdout.reconfigure(encoding='utf-8') sys.stderr.reconfigure(encoding='utf-8') # Criar diretórios necessários se não existirem def ensure_directories(): directories = ['uploaded_videos', 'softsubs_output', 'hardsubs_output'] for directory in directories: os.makedirs(os.path.join(os.getcwd(), directory), exist_ok=True) return os.path.join(os.getcwd(), 'uploaded_videos') def save_uploaded_files(files): """Salva os arquivos enviados na pasta uploaded_videos""" if not files: return "No files uploaded" upload_dir = ensure_directories() saved_files = [] for file in files: filename = os.path.basename(file.name) destination = os.path.join(upload_dir, filename) shutil.copy2(file.name, destination) saved_files.append(filename) return f"Uploaded files: {', '.join(saved_files)}" def get_output_files() -> Tuple[List[str], List[str]]: """Retorna listas de arquivos nas pastas de saída""" softsubs_dir = os.path.join(os.getcwd(), 'softsubs_output') hardsubs_dir = os.path.join(os.getcwd(), 'hardsubs_output') softsubs_files = [os.path.join(softsubs_dir, f) for f in os.listdir(softsubs_dir) if os.path.isfile(os.path.join(softsubs_dir, f))] hardsubs_files = [os.path.join(hardsubs_dir, f) for f in os.listdir(hardsubs_dir) if os.path.isfile(os.path.join(hardsubs_dir, f))] return softsubs_files, hardsubs_files def clean_ansi(text: str) -> str: """Remove códigos ANSI e limpa o texto para exibição""" ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') return ansi_escape.sub('', text) def process_output(line: str, progress: gr.Progress) -> str: """Processa uma linha de saída e atualiza o progresso""" clean_line = clean_ansi(line.strip()) if "%" in clean_line: try: progress_match = re.search(r'(\d+\.?\d*)%', clean_line) if progress_match: progress_value = float(progress_match.group(1)) / 100 progress(progress_value, desc=clean_line) except ValueError: pass return clean_line def stop_process(): global current_process if current_process: try: if os.name == 'nt': current_process.terminate() else: os.killpg(os.getpgid(current_process.pid), signal.SIGTERM) current_process.wait(timeout=5) except subprocess.TimeoutExpired: if os.name == 'nt': current_process.kill() else: os.killpg(os.getpgid(current_process.pid), signal.SIGKILL) current_process = None return "Process stopped by user" return "No process running" def run_legen( transcription_engine, transcription_model, compute_type, device, batch_size, input_lang, translate_lang, video_codec, audio_codec, normalize, overwrite, copy_files, disable_srt, disable_softsubs, disable_hardsubs, progress=gr.Progress() ): global current_process input_dir = ensure_directories() if not os.path.exists(input_dir) or not os.listdir(input_dir): return "No files found in uploaded_videos directory" if not os.path.exists("legen.py"): return "legen.py not found in current directory" cmd = ["python", "legen.py", "-i", input_dir] # Adiciona as flags baseadas nos checkboxes if normalize: cmd.append("--norm") if overwrite: cmd.append("--overwrite") if copy_files: cmd.append("--copy_files") if disable_srt: cmd.append("--disable_srt") if disable_softsubs: cmd.append("--disable_softsubs") if disable_hardsubs: cmd.append("--disable_hardsubs") # Adiciona configurações de transcrição cmd.extend(["-ts:e", transcription_engine]) cmd.extend(["-ts:m", transcription_model]) cmd.extend(["-ts:d", device]) cmd.extend(["-ts:c", compute_type]) cmd.extend(["-ts:b", str(batch_size)]) if translate_lang != "none": cmd.extend(["--translate", translate_lang]) if input_lang != "auto": cmd.extend(["--input_lang", input_lang]) # Adiciona configurações de codec cmd.extend(["-c:v", video_codec]) cmd.extend(["-c:a", audio_codec]) # Adiciona caminhos de saída fixos cmd.extend(["-o:s", os.path.join(os.getcwd(), "softsubs_output")]) cmd.extend(["-o:h", os.path.join(os.getcwd(), "hardsubs_output")]) try: startupinfo = None if os.name == 'nt': startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW current_process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, startupinfo=startupinfo, encoding='utf-8', errors='replace', preexec_fn=None if os.name == 'nt' else os.setsid ) output_lines: List[str] = [] last_progress_update = 0 while True: line = current_process.stdout.readline() if not line and current_process.poll() is not None: break if line: try: clean_line = process_output(line, progress) output_lines.append(clean_line) if len(output_lines) - last_progress_update >= 5: yield "\n".join(output_lines) last_progress_update = len(output_lines) except Exception as e: output_lines.append(f"Error processing output: {str(e)}") if current_process.poll() == 0: final_output = "Processing completed successfully!\n\n" + "\n".join(output_lines) else: final_output = f"Process ended with error code {current_process.poll()}\n\n" + "\n".join(output_lines) current_process = None return final_output except Exception as e: current_process = None return f"Error: {str(e)}" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown(badges) gr.Markdown(description) title = "LeGen" ensure_directories() # Garante que os diretórios existam ao iniciar with gr.Row(): with gr.Column(scale=1): # Upload Section with gr.Group(): upload_files = gr.Files( label=i18n("Upload Videos"), file_types=["video"], file_count="multiple" ) upload_button = gr.Button(i18n("Upload to processing directory")) # Transcription Settings with gr.Group(): transcription_engine = gr.Dropdown( choices=["whisperx", "whisper"], value="whisperx", label=i18n("Transcription Engine") ) with gr.Row(): transcription_model = gr.Dropdown( choices=["tiny", "base", "small", "medium", "large", "large-v1", "large-v2", "large-v3", "large-v3-turbo"], value="large-v3", label=i18n("Model") ) compute_type = gr.Dropdown( choices=["auto", "int8", "float16", "float32"], value="auto", label=i18n("Compute Type") ) with gr.Row(): device = gr.Dropdown( choices=["auto", "cpu", "cuda"], value="auto", label=i18n("Device") ) batch_size = gr.Number( value=4, label=i18n("Batch Size"), precision=0 ) with gr.Row(): input_lang = gr.Dropdown( choices=["auto", "en", "es", "pt", "fr", "de", "it", "ja", "ko", "zh"], value="auto", label=i18n("Input Language") ) translate_lang = gr.Dropdown( choices=["none", "en", "es", "pt", "fr", "de", "it", "ja", "ko", "zh"], value="none", label=i18n("Translate to") ) with gr.Column(scale=1): # Output Settings with gr.Group(): with gr.Row(): video_codec = gr.Dropdown( choices=["h264", "libx264", "h264_vaapi", "h264_nvenc", "hevc", "libx265", "hevc_vaapi"], value="h264", label=i18n("Video Codec") ) audio_codec = gr.Dropdown( choices=["aac", "libopus", "mp3", "vorbis"], value="aac", label=i18n("Audio Codec") ) # Options with gr.Group(): with gr.Row(): normalize = gr.Checkbox(label=i18n("Normalize folder times"), value=False) overwrite = gr.Checkbox(label=i18n("Overwrite existing files"), value=False) copy_files = gr.Checkbox(label=i18n("Copy non-video files"), value=False) with gr.Row(): disable_srt = gr.Checkbox(label=i18n("Disable SRT generation"), value=False) disable_softsubs = gr.Checkbox(label=i18n("Disable softsubs"), value=False) disable_hardsubs = gr.Checkbox(label=i18n("Disable hardsubs"), value=False) # Output Files Display with gr.Group(): softsubs_files = gr.Files(label="Softsubs Output Files", file_count="multiple", interactive=False) hardsubs_files = gr.Files(label="Hardsubs Output Files", file_count="multiple", interactive=False) # Run Button, Stop Button and Output with gr.Row(): with gr.Column(scale=1): run_btn = gr.Button(i18n("Run LeGen"), variant="primary") stop_btn = gr.Button(i18n("Stop"), variant="stop") output = gr.Textbox(label=i18n("Output"), lines=2, interactive=False, elem_id="output") # Event handlers upload_button.click( fn=save_uploaded_files, inputs=[upload_files], outputs=[output] ) def update_output_files(): softsubs_files, hardsubs_files = get_output_files() return softsubs_files, hardsubs_files # Connect the run button to the processing function run_btn.click( fn=run_legen, inputs=[ transcription_engine, transcription_model, compute_type, device, batch_size, input_lang, translate_lang, video_codec, audio_codec, normalize, overwrite, copy_files, disable_srt, disable_softsubs, disable_hardsubs ], outputs=output ).then( fn=update_output_files, inputs=[], outputs=[softsubs_files, hardsubs_files] ) stop_btn.click( fn=stop_process, inputs=[], outputs=output ) gr.Markdown("""
WebUI Desenvolvida por Rafa.Godoy
Agradecimentos ao MatheusBach por desenvolver o LeGen
""") if __name__ == "__main__": demo.launch()