Spaces:
Sleeping
Sleeping
import torch | |
import gradio as gr | |
import yt_dlp as youtube_dl | |
from transformers import pipeline | |
from transformers.pipelines.audio_utils import ffmpeg_read | |
import tempfile | |
import os | |
import numpy as np | |
from gensim.models import Word2Vec | |
from sklearn.cluster import KMeans | |
from sklearn.metrics.pairwise import cosine_similarity | |
from collections import defaultdict | |
import spacy | |
from transformers import pipeline | |
from sklearn.metrics import davies_bouldin_score | |
MODEL_NAME = "openai/whisper-large-v3" | |
BATCH_SIZE = 8 | |
FILE_LIMIT_MB = 1000 | |
YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files | |
device = 0 if torch.cuda.is_available() else "cpu" | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=MODEL_NAME, | |
chunk_length_s=30, | |
device=device, | |
) | |
summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
# Download the 'en_core_web_sm' model | |
spacy.cli.download("en_core_web_sm") | |
# Load the model | |
nlp = spacy.load("en_core_web_sm") | |
def summarize(text, max_length=1000): | |
return summarizer(text, max_length=min(max_length, len(text)), min_length=1, do_sample=False)[0]["summary_text"] | |
def segment_sentences(text): | |
# Process the text using spaCy | |
doc = nlp(text) | |
# Extract sentences from the processed document | |
return [sent.text for sent in doc.sents] | |
def preprocess_sentences(sentences): | |
preprocessed_sentences = [] | |
for sentence in sentences: | |
# Tokenize and lemmatize the sentence using spaCy | |
doc = nlp(sentence.lower()) | |
tokens = [token.lemma_ for token in doc if not token.is_stop and token.is_alpha] | |
preprocessed_sentences.append(tokens) | |
return preprocessed_sentences | |
def embedding(preprocessed_sentences): | |
model = Word2Vec(preprocessed_sentences, vector_size=100, window=5, min_count=1, sg=1) | |
sentence_embeddings = [] | |
for sentence in preprocessed_sentences: | |
word_embeddings = [model.wv[word] for word in sentence if word in model.wv] | |
if word_embeddings: | |
sentence_embeddings.append(np.mean(word_embeddings, axis=0)) | |
else: | |
# Handle the case when none of the words in the sentence exist in the Word2Vec vocabulary | |
sentence_embeddings.append(np.zeros(model.vector_size)) # Use zero vector as placeholder | |
return sentence_embeddings | |
def optimal_n_clusters(sentence_embeddings): | |
cosine_sim_matrix = cosine_similarity(sentence_embeddings) | |
db_scores = [] | |
k_values = range(2, len(sentence_embeddings)) | |
for k in k_values: | |
kmeans = KMeans(n_clusters=k, n_init=10, random_state=42) | |
cluster_labels = kmeans.fit_predict(cosine_sim_matrix) | |
db_scores.append(davies_bouldin_score(cosine_sim_matrix, cluster_labels)) | |
# Choose the optimal number of clusters based on Davies-Bouldin index | |
return (cosine_sim_matrix, np.argmin(db_scores) + 2) # Add 2 to account for skipping k=1 | |
def cluster_assignments(cosine_sim_matrix, optimal_n_clusters): | |
# Cluster sentence embeddings using KMeans with the optimal number of clusters | |
kmeans = KMeans(n_clusters=optimal_n_clusters, n_init=10, random_state=42) | |
return kmeans.fit_predict(cosine_sim_matrix) | |
def clusters(sentences, cluster_assignments): | |
# Group sentences into clusters | |
clusters = defaultdict(list) | |
for i, sentence in enumerate(sentences): | |
clusters[cluster_assignments[i]].append(sentence) | |
result = defaultdict(list) | |
for i in range(len(clusters)): | |
cluster = ' '.join(clusters[i]) | |
title = summarize(cluster, 10) | |
result[title].extend(clusters[i]) | |
return result | |
def format_as_bullet_points(dictionary): | |
bullet_points = "" | |
for key, values in dictionary.items(): | |
bullet_points += f"- {key}:\n" | |
for value in values: | |
bullet_points += f" - {value}\n" | |
return bullet_points.strip() | |
def final_result(input): | |
text = summarize(input) | |
sentences = segment_sentences(text) | |
preprocessed_sentences = preprocess_sentences(sentences) | |
sentence_embeddings = embedding(preprocessed_sentences) | |
cosine_sim_matrix, optimal_number_of_clusters = optimal_n_clusters(sentence_embeddings) | |
clusters_assignments = cluster_assignments(cosine_sim_matrix, optimal_number_of_clusters) | |
all_clusters = clusters(sentences, clusters_assignments) | |
return format_as_bullet_points(all_clusters) | |
def transcribe(inputs, task): | |
if inputs is None: | |
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
text = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True)["text"] | |
return final_result(text) | |
def _return_yt_html_embed(yt_url): | |
video_id = yt_url.split("?v=")[-1] | |
HTML_str = ( | |
f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>' | |
" </center>" | |
) | |
return HTML_str | |
def download_yt_audio(yt_url, filename): | |
info_loader = youtube_dl.YoutubeDL() | |
try: | |
info = info_loader.extract_info(yt_url, download=False) | |
except youtube_dl.utils.DownloadError as err: | |
raise gr.Error(str(err)) | |
file_length = info["duration_string"] | |
file_h_m_s = file_length.split(":") | |
file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] | |
if len(file_h_m_s) == 1: | |
file_h_m_s.insert(0, 0) | |
if len(file_h_m_s) == 2: | |
file_h_m_s.insert(0, 0) | |
file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] | |
if file_length_s > YT_LENGTH_LIMIT_S: | |
yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) | |
file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) | |
raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.") | |
ydl_opts = {"outtmpl": filename, "format": "worstvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"} | |
with youtube_dl.YoutubeDL(ydl_opts) as ydl: | |
try: | |
ydl.download([yt_url]) | |
except youtube_dl.utils.ExtractorError as err: | |
raise gr.Error(str(err)) | |
def yt_transcribe(yt_url, task, max_filesize=75.0): | |
html_embed_str = _return_yt_html_embed(yt_url) | |
with tempfile.TemporaryDirectory() as tmpdirname: | |
filepath = os.path.join(tmpdirname, "video.mp4") | |
download_yt_audio(yt_url, filepath) | |
with open(filepath, "rb") as f: | |
inputs = f.read() | |
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) | |
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} | |
text = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True)["text"] | |
return html_embed_str, final_result(text) | |
demo = gr.Blocks() | |
mf_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.inputs.Audio(source="microphone", type="filepath", optional=True), | |
gr.inputs.Radio(["transcribe", "translate"], label="Task", default="transcribe"), | |
], | |
outputs="text", | |
layout="horizontal", | |
theme="huggingface", | |
title="Whisper Large V3: Transcribe Audio", | |
description=( | |
"Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the OpenAI Whisper" | |
f" checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe audio files" | |
" of arbitrary length." | |
), | |
allow_flagging="never", | |
) | |
file_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.inputs.Audio(source="upload", type="filepath", optional=True, label="Audio file"), | |
gr.inputs.Radio(["transcribe", "translate"], label="Task", default="transcribe"), | |
], | |
outputs="text", | |
layout="horizontal", | |
theme="huggingface", | |
title="Whisper Large V3: Transcribe Audio", | |
description=( | |
"Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the OpenAI Whisper" | |
f" checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe audio files" | |
" of arbitrary length." | |
), | |
allow_flagging="never", | |
) | |
yt_transcribe = gr.Interface( | |
fn=yt_transcribe, | |
inputs=[ | |
gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"), | |
gr.inputs.Radio(["transcribe", "translate"], label="Task", default="transcribe") | |
], | |
outputs=["html", "text"], | |
layout="horizontal", | |
theme="huggingface", | |
title="Whisper Large V3: Transcribe YouTube", | |
description=( | |
"Transcribe long-form YouTube videos with the click of a button! Demo uses the OpenAI Whisper checkpoint" | |
f" [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to transcribe video files of" | |
" arbitrary length." | |
), | |
allow_flagging="never", | |
) | |
with demo: | |
gr.TabbedInterface([mf_transcribe, file_transcribe, yt_transcribe], ["Microphone", "Audio file", "YouTube"]) | |
demo.launch(enable_queue=True) | |