import logging import warnings import gradio as gr import pytube as pt import psutil import torch import whisper from huggingface_hub import hf_hub_download, model_info from transformers.utils.logging import disable_progress_bar warnings.filterwarnings("ignore") disable_progress_bar() DEFAULT_MODEL_NAME = "bofenghuang/whisper-large-v2-cv11-french" CHECKPOINT_FILENAME = "checkpoint_openai.pt" GEN_KWARGS = { "task": "transcribe", "language": "fr", # "without_timestamps": True, # decode options # "beam_size": 5, # "patience": 2, # disable fallback # "compression_ratio_threshold": None, # "logprob_threshold": None, # vad threshold # "no_speech_threshold": None, } logging.basicConfig( format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%SZ", ) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # device = 0 if torch.cuda.is_available() else "cpu" device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") logger.info(f"Model will be loaded on device `{device}`") cached_models = {} def _print_memory_info(): memory = psutil.virtual_memory() logger.info( f"Memory: {memory.total / (1024 ** 3):.2f}GB, used: {memory.percent}%, available: {memory.available / (1024 ** 3):.2f}GB" ) def print_cuda_memory_info(): used_mem, tot_mem = torch.cuda.mem_get_info() logger.info( f"CUDA memory info - Free: {used_mem / 1024 ** 3:.2f} Gb, used: {(tot_mem - used_mem) / 1024 ** 3:.2f} Gb, total: {tot_mem / 1024 ** 3:.2f} Gb" ) def print_memory_info(): _print_memory_info() print_cuda_memory_info() def maybe_load_cached_pipeline(model_name): model = cached_models.get(model_name) if model is None: downloaded_model_path = hf_hub_download(repo_id=model_name, filename=CHECKPOINT_FILENAME) # model = whisper.load_model(downloaded_model_path, device=device) model = whisper.load_model(downloaded_model_path, device="cpu") model = model.to(device) logger.info(f"`{model_name}` has been loaded on device `{device}`") print_memory_info() cached_models[model_name] = model return model def infer(model, filename, with_timestamps): if with_timestamps: model_outputs = model.transcribe(filename, **GEN_KWARGS) return "\n\n".join( [ f'Segment {segment["id"]+1} from {segment["start"]:.2f}s to {segment["end"]:.2f}s:\n{segment["text"].strip()}' for segment in model_outputs["segments"] ] ) else: return model.transcribe(filename, without_timestamps=True, **GEN_KWARGS)["text"] def transcribe(microphone, file_upload, with_timestamps, model_name=DEFAULT_MODEL_NAME): warn_output = "" if (microphone is not None) and (file_upload is not None): warn_output = ( "WARNING: You've uploaded an audio file and used the microphone. " "The recorded file from the microphone will be used and the uploaded audio will be discarded.\n" ) elif (microphone is None) and (file_upload is None): return "ERROR: You have to either use the microphone or upload an audio file" file = microphone if microphone is not None else file_upload try: model = maybe_load_cached_pipeline(model_name) # text = model.transcribe(file, **GEN_KWARGS)["text"] text = infer(model, file, with_timestamps) logger.info(f"Transcription by `{model_name}`:\n{text}\n") except Exception as e: logger.info(str(e)) return warn_output + text def _return_yt_html_embed(yt_url): video_id = yt_url.split("?v=")[-1] HTML_str = ( f'
' "
" ) return HTML_str def yt_transcribe(yt_url, with_timestamps, model_name=DEFAULT_MODEL_NAME): yt = pt.YouTube(yt_url) html_embed_str = _return_yt_html_embed(yt_url) stream = yt.streams.filter(only_audio=True)[0] stream.download(filename="audio.mp3") model = maybe_load_cached_pipeline(model_name) # text = model.transcribe("audio.mp3", **GEN_KWARGS)["text"] text = infer(model, "audio.mp3", with_timestamps) logger.info(f'Transcription by `{model_name}` of "{yt_url}":\n{text}\n') return html_embed_str, text # load default model maybe_load_cached_pipeline(DEFAULT_MODEL_NAME) demo = gr.Blocks() mf_transcribe = gr.Interface( fn=transcribe, inputs=[ gr.inputs.Audio(source="microphone", type="filepath", optional=True, label="Record"), gr.inputs.Audio(source="upload", type="filepath", optional=True, label="Upload File"), gr.Checkbox(label="With timestamps?", value=True), ], # outputs="text", outputs=gr.outputs.Textbox(label="Transcription"), layout="horizontal", theme="huggingface", title="Whisper French Demo 🇫🇷 : Transcribe Audio", description=( "Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the the fine-tuned" f" checkpoint [{DEFAULT_MODEL_NAME}](https://huggingface.co/{DEFAULT_MODEL_NAME}) and 🤗 Transformers to transcribe audio files" " of arbitrary length." ), allow_flagging="never", ) yt_transcribe = gr.Interface( fn=yt_transcribe, inputs=[ gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"), gr.Checkbox(label="With timestamps?", value=True), ], # outputs=["html", "text"], outputs=[ gr.outputs.HTML(label="YouTube Page"), gr.outputs.Textbox(label="Transcription"), ], layout="horizontal", theme="huggingface", title="Whisper French Demo 🇫🇷 : Transcribe YouTube", description=( "Transcribe long-form YouTube videos with the click of a button! Demo uses the the fine-tuned checkpoint:" f" [{DEFAULT_MODEL_NAME}](https://huggingface.co/{DEFAULT_MODEL_NAME}) and 🤗 Transformers to transcribe audio files of" " arbitrary length." ), allow_flagging="never", ) with demo: gr.TabbedInterface([mf_transcribe, yt_transcribe], ["Transcribe Audio", "Transcribe YouTube"]) # demo.launch(server_name="0.0.0.0", debug=True, share=True) demo.launch(enable_queue=True)