from pytubefix import YouTube from moviepy.editor import VideoFileClip, AudioFileClip from pydub import AudioSegment import whisper import pandas as pd import nltk from nltk.tokenize import sent_tokenize nltk.download('punkt') import gradio as gr import ast from IPython.display import Audio, display import requests nltk.download('punkt_tab') from pytubefix.cli import on_progress model = whisper.load_model("base") def extract_yt_audio(it, video_url, video_file): """ Takes youtube url (youtobe_url) and path where audio clip will be stored (audio_path) in string format as input arguments. Returns the extracted video clip (video) and the path to audio clip (audio_path). """ if it == 'URL' and ("youtube.com" in video_url or "youtu.be" in video_url): yt = YouTube(video_url, use_oauth=True, allow_oauth_cache=True, on_progress_callback = on_progress) a = yt.streams.filter(only_audio=True).first() audio_file = a.download() sample = AudioSegment.from_file(audio_file, format="mp4") # elif it == 'URL' and ("https://www" in video_url or "https://" in video_url or "www." in video_url): # response = requests.get(video_url) # with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file: # temp_file.write(response.content) # temp_file_path = temp_file.name # wav_file_path = temp_file_path.replace(".mp4", ".wav") # subprocess.run(["ffmpeg", "-i", temp_file_path, "-vn", "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", wav_file_path], check=True) # sample = AudioSegment.from_wav(wav_file_path) # os.remove(temp_file_path) # os.remove(wav_file_path) elif it == 'URL': sample = AudioSegment.from_file(video_url) else: sample = AudioSegment.from_file(video_file) audio_path = 'audio.wav' # display(Audio(audio_path)) sample.export(audio_path, format="wav") print("Transcription started \nTranscript:\n") result = model.transcribe(audio_path) print(result['text'], '\n') return gr.update(visible=True, value=result['text']), gr.update(visible=True), result['segments'], gr.update(visible=True, value=audio_path) def semantic_chunks(segs, max_chunk_length=15.0): print("Trying to get symantically chunked segments:") """ Takes segments of transcribed audio and 15secs as maximum check duration and returns chunks of the audio as a list. """ segs = ast.literal_eval(segs) chunks = [] current_chunk = [] chunk_start_time = None chunk_end_time = None chunk_duration = 0 # iterate over segments and create chunks out of each segment for segment in segs: start = segment['start'] end = segment['end'] text = segment['text'] # sentence tokenize each segment to capture more semantic context sentences = sent_tokenize(text) # iterate over the sentences and group them into chunks subject to the max_chunk_length is 15 secs for sentence in sentences: sentence_duration = (end - start) / len(sentences) # Check if adding the sentence exceeds the max_chunk_length of 15 secs if chunk_duration + sentence_duration <= max_chunk_length: if not current_chunk: chunk_start_time = start current_chunk.append(sentence) chunk_duration += sentence_duration chunk_end_time = end else: # If the chunk would be too long, finalize the current chunk with required parameters chunks.append({ 'chunk_id': len(chunks) + 1, 'chunk_length (secs)': chunk_duration, 'semantic_chunk': ' '.join(current_chunk), 'start_time (secs)': chunk_start_time, 'end_time (secs)': chunk_end_time }) # Start a new chunk with the current sentence current_chunk = [sentence] chunk_start_time = start chunk_end_time = end chunk_duration = sentence_duration # Finalize the last chunk if it exists if current_chunk: chunks.append({ 'chunk_id': len(chunks) + 1, 'chunk_length (secs)': chunk_duration, 'semantic_chunk': ' '.join(current_chunk), 'start_time (secs)': chunk_start_time, 'end_time (secs)': chunk_end_time }) print(pd.DataFrame(chunks)) return gr.update(visible=True, value=pd.DataFrame(chunks)) def toggle_input_fields(input_type): if input_type == "URL": return gr.update(visible=True, value='sample.mp4'), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True) else: return gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) def clear_all(): return (gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)) with gr.Blocks() as demo: gr.Markdown( """ # Extract audio from video, get the transcript and then get the semantic chunk information. ### Currently YouTube videos need authorization mechanism in HuggingFace Spaces to work. If you have file location (ex:- .mp4, .wav) as URL, you can try that. """) # Radio button to choose between URL or upload input_type = gr.Radio(choices=["URL", "Upload"], label="Select Video Input Type") # input_url = gr.Textbox(label="Type-in the URL or File Location of the Video", value='https://www.youtube.com/watch?v=ug5e4JfC3oo') input_url = gr.Textbox(label="Enter Video URL", visible=False) video_file = gr.File(label="Upload Video", visible=False) # input_url = gr.Textbox(label="Type-in the URL or File Location of the Video", value='sample.mp4') segments = gr.Textbox(visible=False) submit_btn_1 = gr.Button("Get the Transcript", visible=True) audio = gr.Audio(visible=False, type="filepath", label='Play Audio') transcript = gr.Textbox(visible=False, label='Transcript') submit_btn_2 = gr.Button("Get the Semantically Chuncked Segments", visible=False) chunks = gr.Dataframe(visible=False, label = 'Semantic Chunks') clear_btn = gr.Button("Clear") input_type.change(fn=toggle_input_fields, inputs=input_type, outputs=[input_url, video_file, audio, transcript]) submit_btn_1.click(fn=extract_yt_audio, inputs=[input_type, input_url, video_file], outputs=[transcript, submit_btn_2, segments, audio]) # submit_btn_1.click(fn=extract_yt_audio, inputs=[input_url], outputs=[transcript, submit_btn_2, segments, audio]) submit_btn_2.click(fn=semantic_chunks, inputs=[segments], outputs=[chunks]) clear_btn.click(fn=clear_all, outputs=[input_url, video_file, transcript, submit_btn_2, chunks, audio]) demo.launch(debug=True)