Bajiyo's picture
Create app.py
a1b1ec8 verified
from pytube import YouTube
from pydub import AudioSegment
import whisper
import webrtcvad
import gradio as gr
import os
def download_audio(youtube_url, download_path='downloads', audio_filename='audio.mp3'):
yt = YouTube(youtube_url)
audio_stream = yt.streams.filter(only_audio=True).first()
if not os.path.exists(download_path):
os.makedirs(download_path)
out_file = audio_stream.download(output_path=download_path, filename=audio_filename)
return out_file
def convert_to_wav(mp3_path, wav_path='downloads/audio.wav'):
audio = AudioSegment.from_file(mp3_path)
audio.export(wav_path, format='wav')
return wav_path
def transcribe_audio(audio_path):
model = whisper.load_model("base")
result = model.transcribe(audio_path)
return result["segments"]
def vad_audio(audio_path, aggressiveness=3):
audio = AudioSegment.from_wav(audio_path)
audio = audio.set_frame_rate(16000).set_channels(1)
vad = webrtcvad.Vad(aggressiveness)
def frame_generator(audio_segment, frame_duration_ms=10):
n = int(audio_segment.frame_rate * (frame_duration_ms / 1000.0) * 2) # Calculate frame size
offset = 0
while offset + n < len(audio_segment.raw_data):
yield audio_segment.raw_data[offset:offset + n]
offset += n
frames = frame_generator(audio)
segments = []
chunk_start = None
timestamp = 0.0
for frame in frames:
is_speech = vad.is_speech(frame, sample_rate=16000)
if is_speech:
if chunk_start is None:
chunk_start = timestamp
else:
if chunk_start is not None:
segments.append((chunk_start, timestamp))
chunk_start = None
timestamp += 0.01
if chunk_start is not None:
segments.append((chunk_start, timestamp))
return segments
def semantic_chunking(transcription_segments, vad_segments, max_duration=15.0):
chunks = []
chunk_id = 0
for i, (start, end) in enumerate(vad_segments):
segment_texts = [seg['text'] for seg in transcription_segments if seg['start'] >= start and seg['end'] <= end]
segment_text = ' '.join(segment_texts)
duration = end - start
if duration <= max_duration:
chunks.append({
"chunk_id": chunk_id,
"chunk_length": duration,
"text": segment_text,
"start_time": start,
"end_time": end,
})
chunk_id += 1
return chunks
def process_video(youtube_url):
mp3_path = download_audio(youtube_url)
audio_path = convert_to_wav(mp3_path)
transcription_segments = transcribe_audio(audio_path)
vad_segments = vad_audio(audio_path)
chunks = semantic_chunking(transcription_segments, vad_segments)
return chunks
iface = gr.Interface(fn=process_video, inputs="text", outputs="json", title="Semantic Chunking of YouTube Video", description="Enter a YouTube URL to get semantic chunks of the video.")
iface.launch()