# import whisper
from faster_whisper import WhisperModel
import datetime
import subprocess
import gradio as gr
from pathlib import Path
import pandas as pd
import re
import time
import os
import numpy as np
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
from pytube import YouTube
import yt_dlp
import torch
import pyannote.audio
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
from pyannote.audio import Audio
from pyannote.core import Segment
from gpuinfo import GPUInfo
import wave
import contextlib
from transformers import pipeline
import psutil
whisper_models = ["tiny", "base", "small", "medium", "large-v1", "large-v2"]
source_languages = {
"en": "Inglês",
"zh": "Chinês",
"de": "Alemão",
"es": "Espanhol",
"ru": "Russo",
"ko": "Coreano",
"fr": "Francês",
"ja": "Japonês",
"pt": "Português",
"tr": "Turco",
"pl": "Polaco",
"ca": "Catalão",
"nl": "Holandês",
"ar": "Árabe",
"sv": "Sueco",
"it": "Italiano",
"id": "Indonésio",
"hi": "Hindi",
"fi": "Finlandês",
"vi": "Vietnamita",
"he": "Hebraico",
"uk": "Ucraniano",
"el": "Grego",
"ms": "Malaio",
"cs": "Checo",
"ro": "Romeno",
"da": "Dinamarquês",
"hu": "Húngaro",
"ta": "Tâmil",
"no": "Norueguês",
"th": "Tailandês",
"ur": "Urdu",
"hr": "Croata",
"bg": "Búlgaro",
"lt": "Lituano",
"la": "Latim",
"mi": "Maori",
"ml": "Malaiala",
"cy": "Galês",
"sk": "Eslovaco",
"te": "Telugu",
"fa": "Persa",
"lv": "Letão",
"bn": "Bengali",
"sr": "Sérvio",
"az": "Azerbaijano",
"sl": "Esloveno",
"kn": "Canarim",
"et": "Estoniano",
"mk": "Macedónio",
"br": "Bretão",
"eu": "Basco",
"is": "Islandês",
"hy": "Arménio",
"ne": "Nepalês",
"mn": "Mongol",
"bs": "Bósnio",
"kk": "Cazaque",
"sq": "Albanês",
"sw": "Suaíli",
"gl": "Galego",
"mr": "Marata",
"pa": "Punjabi",
"si": "Cingalês",
"km": "Khmer",
"sn": "Shona",
"yo": "Ioruba",
"so": "Somali",
"af": "Africâner",
"oc": "Occitano",
"ka": "Georgiano",
"be": "Bielorrusso",
"tg": "Tajique",
"sd": "Sindi",
"gu": "Guzerate",
"am": "Amárico",
"yi": "Iídiche",
"lo": "Laosiano",
"uz": "Usbeque",
"fo": "Feroês",
"ht": "Crioulo Haitiano",
"ps": "Pashto",
"tk": "Turcomano",
"nn":"Nynorsk",
"mt": "Maltês",
"sa": "Sânscrito",
"lb": "Luxemburguês",
"my": "Birmanês",
"bo": "Tibetano",
"tl": "Tagalog",
"mg": "Malgaxe",
"as": "Assamês",
"tt": "Tártaro",
"haw": "Havaiano",
"ln": "Lingala",
"ha": "Hausa",
"ba": "Bashkir",
"jw": "Javanês",
"su": "Sundanês",
}
source_language_list = [key[0] for key in source_languages.items()]
MODEL_NAME = "vumichien/whisper-medium-jp"
lang = "pt"
device = 0 if torch.cuda.is_available() else "cpu"
pipe = pipeline(
task="automatic-speech-recognition",
model=MODEL_NAME,
chunk_length_s=30,
device=device,
)
os.makedirs('output', exist_ok=True)
pipe.model.config.forced_decoder_ids = pipe.tokenizer.get_decoder_prompt_ids(language=lang, task="transcribe")
embedding_model = PretrainedSpeakerEmbedding(
"speechbrain/spkrec-ecapa-voxceleb",
device=torch.device("cuda" if torch.cuda.is_available() else "cpu"))
def transcribe(microphone, file_upload):
warn_output = ""
if (microphone is not None) and (file_upload is not None):
warn_output = (
"AVISO: Você enviou um arquivo de áudio e usou o microfone. "
"O arquivo gravado pelo microfone será usado e o áudio enviado será descartado.\n"
)
elif (microphone is None) and (file_upload is None):
return "ERRO: Você precisa usar o microfone ou fazer upload de um arquivo de áudio"
file = microphone if microphone is not None else file_upload
text = pipe(file)["text"]
return warn_output + text
def _return_yt_html_embed(yt_url):
video_id = yt_url.split("?v=")[-1]
HTML_str = (
f'
'
"
"
)
return HTML_str
def yt_transcribe(yt_url):
# yt = YouTube(yt_url)
# html_embed_str = _return_yt_html_embed(yt_url)
# stream = yt.streams.filter(only_audio=True)[0]
# stream.download(filename="audio.mp3")
ydl_opts = {
'format': 'bestvideo*+bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl':'audio.%(ext)s',
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([yt_url])
text = pipe("audio.mp3")["text"]
return html_embed_str, text
def convert_time(secs):
return datetime.timedelta(seconds=round(secs))
def get_youtube(video_url):
# yt = YouTube(video_url)
# abs_video_path = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download()
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=False)
abs_video_path = ydl.prepare_filename(info)
ydl.process_info(info)
print("Sucesso ao baixar o vídeo")
print(abs_video_path)
return abs_video_path
def speech_to_text(video_file_path, selected_source_lang, whisper_model, num_speakers):
"""
# Transcreva o link do youtube usando OpenAI Whisper
NOTA: Este modelo foi adaptado por Pedro Faria, para exemplo para a Biometrid, não deve ser usado para outros fins.
1. Usando o modelo Whisper da Open AI para separar áudio em segmentos e gerar transcrições.
2. Gerando embeddings de alto-falante para cada segmento.
3. Aplicando clustering aglomerativo nos embeddings para identificar o falante de cada segmento.
O reconhecimento de fala é baseado em modelos do OpenAI Whisper https://github.com/openai/whisper
Speaker diarization model and pipeline from by https://github.com/pyannote/pyannote-audio
Modelo de diarização de alto-falante e pipeline desenvolvido por https://github.com/pyannote/pyannote-audio
"""
# model = whisper.load_model(whisper_model)
# model = WhisperModel(whisper_model, device="cuda", compute_type="int8_float16")
model = WhisperModel(whisper_model, compute_type="int8")
time_start = time.time()
if(video_file_path == None):
raise ValueError("Error no video input")
print(video_file_path)
try:
# Read and convert youtube video
_,file_ending = os.path.splitext(f'{video_file_path}')
print(f'file enging is {file_ending}')
audio_file = video_file_path.replace(file_ending, ".wav")
print("A iniciar a conversão para WAV")
os.system(f'ffmpeg -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{audio_file}"')
# Get duration
with contextlib.closing(wave.open(audio_file,'r')) as f:
frames = f.getnframes()
rate = f.getframerate()
duration = frames / float(rate)
print(f"Conversão para WAV concluída, duração do arquivo de áudio.: {duration}")
# Transcribe audio
options = dict(language=selected_source_lang, beam_size=5, best_of=5)
transcribe_options = dict(task="transcribe", **options)
segments_raw, info = model.transcribe(audio_file, **transcribe_options)
# Convert back to original openai format
segments = []
i = 0
for segment_chunk in segments_raw:
chunk = {}
chunk["start"] = segment_chunk.start
chunk["end"] = segment_chunk.end
chunk["text"] = segment_chunk.text
segments.append(chunk)
i += 1
print("transcrição de audio com fast whisper terminada")
except Exception as e:
raise RuntimeError("Erro a converter o filme para audio")
try:
# Create embedding
def segment_embedding(segment):
audio = Audio()
start = segment["start"]
# Whisper overshoots the end timestamp in the last segment
end = min(duration, segment["end"])
clip = Segment(start, end)
waveform, sample_rate = audio.crop(audio_file, clip)
return embedding_model(waveform[None])
embeddings = np.zeros(shape=(len(segments), 192))
for i, segment in enumerate(segments):
embeddings[i] = segment_embedding(segment)
embeddings = np.nan_to_num(embeddings)
print(f'Embedding shape: {embeddings.shape}')
if num_speakers == 0:
# Find the best number of speakers
score_num_speakers = {}
for num_speakers in range(2, 10+1):
clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
score = silhouette_score(embeddings, clustering.labels_, metric='euclidean')
score_num_speakers[num_speakers] = score
best_num_speaker = max(score_num_speakers, key=lambda x:score_num_speakers[x])
print(f"O número estimado de participantes: {best_num_speaker} com pontuação de {score_num_speakers[best_num_speaker]} ")
else:
best_num_speaker = num_speakers
# Assign speaker label
clustering = AgglomerativeClustering(best_num_speaker).fit(embeddings)
labels = clustering.labels_
for i in range(len(segments)):
segments[i]["speaker"] = 'Participante ' + str(labels[i] + 1)
# Make output
objects = {
'Start' : [],
'End': [],
'Speaker': [],
'Text': []
}
text = ''
for (i, segment) in enumerate(segments):
if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]:
objects['Start'].append(str(convert_time(segment["start"])))
objects['Speaker'].append(segment["speaker"])
if i != 0:
objects['End'].append(str(convert_time(segments[i - 1]["end"])))
objects['Text'].append(text)
text = ''
text += segment["text"] + ' '
objects['End'].append(str(convert_time(segments[i - 1]["end"])))
objects['Text'].append(text)
time_end = time.time()
time_diff = time_end - time_start
memory = psutil.virtual_memory()
gpu_utilization, gpu_memory = GPUInfo.gpu_usage()
gpu_utilization = gpu_utilization[0] if len(gpu_utilization) > 0 else 0
gpu_memory = gpu_memory[0] if len(gpu_memory) > 0 else 0
system_info = f"""
*Memoria: {memory.total / (1024 * 1024 * 1024):.2f}GB, utilizado: {memory.percent}%, disponivel: {memory.available / (1024 * 1024 * 1024):.2f}GB.*
*Tempo de processamento: {time_diff:.5} segundos.*
*Utilização de GPU: {gpu_utilization}%, Memoria de GPU: {gpu_memory}MiB.*
"""
save_path = "output/transcript_result.csv"
df_results = pd.DataFrame(objects)
df_results.to_csv(save_path, index=False, encoding="utf-8")
return df_results, system_info, save_path
except Exception as e:
raise RuntimeError("Erro a correr a inferência com um modelo local", e)
# ---- Gradio Layout -----
# Inspiration from https://huggingface.co/spaces/RASMUS/Whisper-youtube-crosslingual-subtitles
video_in = gr.Video(label="Ficheiro Video", mirror_webcam=False)
youtube_url_in = gr.Textbox(label="Url Youtube", lines=1, interactive=True)
df_init = pd.DataFrame(columns=['Início', 'Fim', 'Participante', 'Texto'])
memory = psutil.virtual_memory()
selected_source_lang = gr.Dropdown(choices=source_language_list, type="value", value="pt", label="Linguagem detectada no vídeo", interactive=True)
selected_whisper_model = gr.Dropdown(choices=whisper_models, type="value", value="large-v2", label="Modelo Whisper selecionado", interactive=True)
number_speakers = gr.Number(precision=0, value=2, label="Insira o número de participantes para obter melhores resultados. Se o valor for 0, o modelo encontrará automaticamente a melhor quantidade.", interactive=True)
system_info = gr.Markdown(f"*Memoria: {memory.total / (1024 * 1024 * 1024):.2f}GB, utilizado: {memory.percent}%, disponível: {memory.available / (1024 * 1024 * 1024):.2f}GB*")
download_transcript = gr.File(label="Download transcript")
transcription_df = gr.DataFrame(value=df_init,label="Dataframe da transcrição", row_count=(0, "dynamic"), max_rows = 10, wrap=True, overflow_row_behaviour='paginate')
title = "Whisper speaker diarization"
demo = gr.Blocks(title=title)
demo.encrypt = False
with demo:
with gr.Tab("Whisper speaker diarization"):
gr.Markdown('''
Whisper diarização com participantes
Este espaço usa os modelos whisper OpenAI with CTranslate2 which is a fast inference engine for Transformer models to recognize the speech (4 times faster than original openai model with same accuracy)
e o modelo ECAPA-TDNN de SpeechBrain para codificar e identificar participantes
''')
with gr.Row():
gr.Markdown('''
### Transcreva o link do youtube usando OpenAI Whisper
##### 1. Usando o modelo Whisper da Open AI para separar o áudio em segmentos e gerar transcrições.
##### 2. Gerando embeddings para cada segmento.
##### 3. Aplicando clustering aglomerativo nos embeddings para identificar o participante de cada segmento.
''')
with gr.Row():
gr.Markdown('''
### Pode testar com os seguintes exemplos:
''')
examples = gr.Examples(examples=
[ "https://youtu.be/mYT33lWKJyw",
"https://youtu.be/ctirgguI7RM"],
label="Examples", inputs=[youtube_url_in])
with gr.Row():
with gr.Column():
youtube_url_in.render()
download_youtube_btn = gr.Button("Descarregar video do Youtube")
download_youtube_btn.click(get_youtube, [youtube_url_in], [
video_in])
print(video_in)
with gr.Row():
with gr.Column():
video_in.render()
with gr.Column():
gr.Markdown('''
##### Aqui você pode iniciar o processo de transcrição.
##### Por favor, selecione o idioma de origem para a transcrição.
##### Você pode selecionar uma faixa de números estimados de participantes.
''')
selected_source_lang.render()
selected_whisper_model.render()
number_speakers.render()
transcribe_btn = gr.Button("Transcrever audio com diarização")
transcribe_btn.click(speech_to_text,
[video_in, selected_source_lang, selected_whisper_model, number_speakers],
[transcription_df, system_info, download_transcript]
)
with gr.Row():
gr.Markdown('''
##### Aqui vai obter a transcrição
##### ''')
with gr.Row():
with gr.Column():
download_transcript.render()
transcription_df.render()
system_info.render()
demo.launch(debug=True)