import torch
import gradio as gr
import yt_dlp as youtube_dl
import numpy as np
from datasets import Dataset, Audio
from scipy.io import wavfile
from transformers import pipeline
from transformers.pipelines.audio_utils import ffmpeg_read
import tempfile
import os
import time
import demucs.api
MODEL_NAME = "openai/whisper-large-v3"
DEMUCS_MODEL_NAME = "htdemucs_ft"
BATCH_SIZE = 8
FILE_LIMIT_MB = 1000
YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files
device = 0 if torch.cuda.is_available() else "cpu"
pipe = pipeline(
task="automatic-speech-recognition",
model=MODEL_NAME,
chunk_length_s=30,
device=device,
)
separator = demucs.api.Separator(model = DEMUCS_MODEL_NAME, )
def separate_vocal(path):
origin, separated = separator.separate_audio_file(path)
demucs.api.save_audio(separated["vocals"], path, samplerate=separator.samplerate)
return path
# def separate_vocal(path, track_name, output_folder, demucs_model_name = "htdemucs_ft"):
#
# os.system(f"python3 -m demucs.separate --two-stems=vocals -n {demucs_model_name} {path} -o {output_folder}")
#
# return os.path.join(output_folder, demucs_model_name, track_name, "vocals.wav")
def transcribe(inputs_path, task, use_demucs, dataset_name, oauth_token: gr.OAuthToken):
if inputs_path is None:
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.")
sampling_rate, inputs = wavfile.read(inputs_path)
out = pipe(inputs_path, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True)
text = out["text"]
chunks = naive_postprocess_whisper_chunks(out["chunks"], inputs, sampling_rate)
transcripts = []
audios = []
with tempfile.TemporaryDirectory() as tmpdirname:
for i,chunk in enumerate(chunks):
# TODO: make sure 1D or 2D?
arr = chunk["audio"]
path = os.path.join(tmpdirname, f"{i}.wav")
wavfile.write(path, sampling_rate, arr)
if use_demucs == "separate-audio":
# use demucs tp separate vocals
print(f"Separating vocals #{i}")
path = separate_vocal(path)
audios.append(path)
transcripts.append(chunk["text"])
dataset = Dataset.from_dict({"audio": audios, "transcript": transcripts}).cast_column("audio", Audio())
dataset.push_to_hub(dataset_name, token=oauth_token)
return text
def _return_yt_html_embed(yt_url):
video_id = yt_url.split("?v=")[-1]
HTML_str = (
f'
'
" "
)
return HTML_str
def download_yt_audio(yt_url, filename):
info_loader = youtube_dl.YoutubeDL()
try:
info = info_loader.extract_info(yt_url, download=False)
except youtube_dl.utils.DownloadError as err:
raise gr.Error(str(err))
file_length = info["duration_string"]
file_h_m_s = file_length.split(":")
file_h_m_s = [int(sub_length) for sub_length in file_h_m_s]
if len(file_h_m_s) == 1:
file_h_m_s.insert(0, 0)
if len(file_h_m_s) == 2:
file_h_m_s.insert(0, 0)
file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2]
if file_length_s > YT_LENGTH_LIMIT_S:
yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S))
file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s))
raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.")
ydl_opts = {"outtmpl": filename, "format": "worstvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
try:
ydl.download([yt_url])
except youtube_dl.utils.ExtractorError as err:
raise gr.Error(str(err))
def yt_transcribe(yt_url, task, use_demucs, dataset_name, oauth_token: gr.OAuthToken, max_filesize=75.0, dataset_sampling_rate = 24000):
html_embed_str = _return_yt_html_embed(yt_url)
with tempfile.TemporaryDirectory() as tmpdirname:
filepath = os.path.join(tmpdirname, "video.mp4")
download_yt_audio(yt_url, filepath)
with open(filepath, "rb") as f:
inputs_path = f.read()
inputs = ffmpeg_read(inputs_path, pipe.feature_extractor.sampling_rate)
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate}
out = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True)
text = out["text"]
inputs = ffmpeg_read(inputs_path, dataset_sampling_rate)
chunks = naive_postprocess_whisper_chunks(out["chunks"], inputs, dataset_sampling_rate)
transcripts = []
audios = []
with tempfile.TemporaryDirectory() as tmpdirname:
for i,chunk in enumerate(chunks):
# TODO: make sure 1D or 2D?
arr = chunk["audio"]
path = os.path.join(tmpdirname, f"{i}.wav")
wavfile.write(path, dataset_sampling_rate, arr)
if use_demucs == "separate-audio":
# use demucs tp separate vocals
print(f"Separating vocals #{i}")
path = separate_vocal(path)
audios.append(path)
transcripts.append(chunk["text"])
dataset = Dataset.from_dict({"audio": audios, "transcript": transcripts}).cast_column("audio", Audio())
dataset.push_to_hub(dataset_name, token=oauth_token)
return html_embed_str, text
def naive_postprocess_whisper_chunks(chunks, audio_array, sampling_rate, stop_chars = ".!:;?", min_duration = 5):
# merge chunks as long as merged audio duration is lower than min_duration and that a stop character is not met
# return list of dictionnaries (text, audio)
# min duration is in seconds
min_duration = int(min_duration * sampling_rate)
new_chunks = []
while chunks:
current_chunk = chunks.pop(0)
begin, end = current_chunk["timestamp"]
begin, end = int(begin*sampling_rate), int(end*sampling_rate)
current_dur = end-begin
text = current_chunk["text"]
chunk_to_concat = [audio_array[begin:end]]
while chunks and (text[-1] not in stop_chars or (current_dur