import gradio as gr import os from pathlib import Path import subprocess import asyncio import threading import signal import sys import re from typing import Optional, List from i18n.i18n import I18nAuto from header import badges, description i18n = I18nAuto() # Variável global para armazenar o processo atual current_process: Optional[subprocess.Popen] = None # Força o uso de UTF-8 para o Python os.environ["PYTHONIOENCODING"] = "utf-8" # Para garantir que a codificação esteja correta no terminal if sys.platform == "win32": os.system('chcp 65001') # Redefine a configuração de codificação da saída padrão sys.stdout.reconfigure(encoding='utf-8') sys.stderr.reconfigure(encoding='utf-8') def clean_ansi(text: str) -> str: """Remove códigos ANSI e limpa o texto para exibição""" ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') return ansi_escape.sub('', text) def process_output(line: str, progress: gr.Progress) -> str: """Processa uma linha de saída e atualiza o progresso""" clean_line = clean_ansi(line.strip()) # Procura por padrões de progresso if "%" in clean_line: try: # Extrai o número do progresso progress_match = re.search(r'(\d+\.?\d*)%', clean_line) if progress_match: progress_value = float(progress_match.group(1)) / 100 progress(progress_value, desc=clean_line) except ValueError: pass return clean_line def stop_process(): global current_process if current_process: try: if os.name == 'nt': current_process.terminate() else: os.killpg(os.getpgid(current_process.pid), signal.SIGTERM) current_process.wait(timeout=5) # Espera até 5 segundos pelo término except subprocess.TimeoutExpired: if os.name == 'nt': current_process.kill() else: os.killpg(os.getpgid(current_process.pid), signal.SIGKILL) current_process = None return "Process stopped by user" return "No process running" def run_legen( input_path, transcription_engine, transcription_model, compute_type, device, batch_size, input_lang, translate_lang, video_codec, audio_codec, output_softsubs_path, output_hardsubs_path, normalize, overwrite, copy_files, disable_srt, disable_softsubs, disable_hardsubs, progress=gr.Progress() ): global current_process if not input_path: return "Please provide an input path" if not os.path.exists(input_path): return "Input path does not exist" if not os.path.exists("legen.py"): return "legen.py not found in current directory" cmd = ["python", "legen.py", "-i", input_path] # Adiciona as flags baseadas nos checkboxes if normalize: cmd.append("--norm") if overwrite: cmd.append("--overwrite") if copy_files: cmd.append("--copy_files") if disable_srt: cmd.append("--disable_srt") if disable_softsubs: cmd.append("--disable_softsubs") if disable_hardsubs: cmd.append("--disable_hardsubs") # Adiciona configurações de transcrição cmd.extend(["-ts:e", transcription_engine]) cmd.extend(["-ts:m", transcription_model]) cmd.extend(["-ts:d", device]) cmd.extend(["-ts:c", compute_type]) cmd.extend(["-ts:b", str(batch_size)]) if translate_lang != "none": cmd.extend(["--translate", translate_lang]) if input_lang != "auto": cmd.extend(["--input_lang", input_lang]) # Adiciona configurações de codec cmd.extend(["-c:v", video_codec]) cmd.extend(["-c:a", audio_codec]) # Adiciona caminhos de saída if output_softsubs_path: cmd.extend(["-o:s", output_softsubs_path]) if output_hardsubs_path: cmd.extend(["-o:h", output_hardsubs_path]) try: startupinfo = None if os.name == 'nt': startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW current_process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combina stderr com stdout text=True, bufsize=1, universal_newlines=True, startupinfo=startupinfo, encoding='utf-8', errors='replace', preexec_fn=None if os.name == 'nt' else os.setsid ) output_lines: List[str] = [] last_progress_update = 0 while True: line = current_process.stdout.readline() if not line and current_process.poll() is not None: break if line: try: clean_line = process_output(line, progress) output_lines.append(clean_line) # Atualiza a saída periodicamente if len(output_lines) - last_progress_update >= 5: yield "\n".join(output_lines) last_progress_update = len(output_lines) except Exception as e: output_lines.append(f"Error processing output: {str(e)}") if current_process.poll() == 0: final_output = "Processing completed successfully!\n\n" + "\n".join(output_lines) else: final_output = f"Process ended with error code {current_process.poll()}\n\n" + "\n".join(output_lines) current_process = None return final_output except Exception as e: current_process = None return f"Error: {str(e)}" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown(badges) gr.Markdown(description) title="LeGen" with gr.Row(): with gr.Column(): # Input Section with gr.Group(): input_path = gr.Textbox(label=i18n("Input Path"), placeholder=i18n("Enter path to file or folder")) # Transcription Settings with gr.Group(): transcription_engine = gr.Dropdown( choices=["whisperx", "whisper"], value="whisperx", label=i18n("Transcription Engine") ) with gr.Row(): transcription_model = gr.Dropdown( choices=["tiny", "base", "small", "medium", "large", "large-v1", "large-v2", "large-v3", "large-v3-turbo",], value="large-v3", label=i18n("Model") ) compute_type = gr.Dropdown( choices=["auto", "int8", "float16", "float32"], value="auto", label=i18n("Compute Type") ) with gr.Row(): device = gr.Dropdown( choices=["auto", "cpu", "cuda"], value="auto", label=i18n("Device") ) batch_size = gr.Number( value=4, label=i18n("Batch Size"), precision=0 ) with gr.Row(): input_lang = gr.Dropdown( choices=["auto", "en", "es", "pt", "fr", "de", "it", "ja", "ko", "zh"], value="auto", label=i18n("Input Language") ) translate_lang = gr.Dropdown( choices=["none", "en", "es", "pt", "fr", "de", "it", "ja", "ko", "zh"], value="none", label=i18n("Translate to") ) with gr.Column(): # Output Settings with gr.Group(): with gr.Row(): video_codec = gr.Dropdown( choices=["h264", "libx264", "h264_vaapi", "h264_nvenc", "hevc", "libx265", "hevc_vaapi"], value="h264", label=i18n("Video Codec") ) audio_codec = gr.Dropdown( choices=["aac", "libopus", "mp3", "vorbis"], value="aac", label=i18n("Audio Codec") ) output_softsubs_path = gr.Textbox( value="softsubs_output", label=i18n("Softsubs Output Path") ) output_hardsubs_path = gr.Textbox( value="hardsubs_output", label=i18n("Hardsubs Output Path") ) # Options with gr.Group(): with gr.Row(): normalize = gr.Checkbox(label=i18n("Normalize folder times"), value=False) overwrite = gr.Checkbox(label=i18n("Overwrite existing files"), value=False) copy_files = gr.Checkbox(label=i18n("Copy non-video files"), value=False) with gr.Row(): disable_srt = gr.Checkbox(label=i18n("Disable SRT generation"), value=False) disable_softsubs = gr.Checkbox(label=i18n("Disable softsubs"), value=False) disable_hardsubs = gr.Checkbox(label=i18n("Disable hardsubs"), value=False) # Run Button, Stop Button and Output with gr.Row(): with gr.Column(): run_btn = gr.Button(i18n("Run LeGen"), variant="primary") stop_btn = gr.Button(i18n("Stop"), variant="stop") output = gr.Textbox(label=i18n("Output"), lines=5) # Connect the run button to the processing function run_btn.click( fn=run_legen, inputs=[ input_path, transcription_engine, transcription_model, compute_type, device, batch_size, input_lang, translate_lang, video_codec, audio_codec, output_softsubs_path, output_hardsubs_path, normalize, overwrite, copy_files, disable_srt, disable_softsubs, disable_hardsubs ], outputs=output ) stop_btn.click( fn=stop_process, inputs=[], outputs=output ) gr.Markdown("""