import gradio as gr |
from gradio import utils |
import os |
import re |
import requests |
from concurrent.futures import ThreadPoolExecutor |
import time |
from yt_dlp import YoutubeDL |
from yt_dlp.utils import DownloadError |
import subprocess |
import shutil |
from typing import List, Tuple |
import pandas as pd |
import asyncio |
import aiohttp |
import aiofiles |
import json |
from functools import lru_cache |
import tenacity |
from tqdm import tqdm |
from pathlib import Path |
import logging |
logging.basicConfig(level=logging.DEBUG) |
logger = logging.getLogger(__name__) |
def sanitize_title(title): |
return re.sub(r'[\\/*?:"<>|]', "", title) |
def format_time(seconds): |
return time.strftime('%H:%M:%S', time.gmtime(seconds)) |
@tenacity.retry( |
stop=tenacity.stop_after_attempt(3), |
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10), |
retry=tenacity.retry_if_exception_type((requests.RequestException, aiohttp.ClientError)), |
reraise=True |
) |
@lru_cache(maxsize=100) |
def get_video_info(video_url): |
with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: |
try: |
info = ydl.extract_info(video_url, download=False) |
formats = info.get('formats', []) |
def get_bitrate(format_dict, key): |
return format_dict.get(key, 0) or 0 |
video_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') == 'none'] |
audio_formats = [f for f in formats if f.get('acodec') != 'none' and f.get('vcodec') == 'none'] |
if video_formats and audio_formats: |
video_format = max(video_formats, key=lambda f: get_bitrate(f, 'vbr')) |
audio_format = max(audio_formats, key=lambda f: get_bitrate(f, 'abr')) |
return info['title'], video_format['url'], audio_format['url'] |
else: |
combined_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') != 'none'] |
if combined_formats: |
best_format = max(combined_formats, key=lambda f: get_bitrate(f, 'tbr')) |
return info['title'], best_format['url'], None |
else: |
raise Exception("No suitable video formats found") |
except DownloadError as e: |
raise Exception(f"Error extracting video info: {str(e)}") |
except Exception as e: |
raise Exception(f"Unexpected error: {str(e)}") |
@tenacity.retry( |
stop=tenacity.stop_after_attempt(3), |
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10), |
retry=tenacity.retry_if_exception_type((subprocess.CalledProcessError, asyncio.TimeoutError)), |
reraise=True |
) |
async def download_segment_async(url, start_time, end_time, output_path): |
output_path = Path(output_path) |
command = [ |
'ffmpeg', |
'-ss', format_time(start_time), |
'-i', url, |
'-t', format_time(end_time - start_time), |
'-c', 'copy', |
'-avoid_negative_ts', 'make_zero', |
'-y', |
str(output_path) |
] |
logger.debug(f"Executing command: {' '.join(command)}") |
process = await asyncio.create_subprocess_exec( |
*command, |
stdout=asyncio.subprocess.PIPE, |
stderr=asyncio.subprocess.PIPE |
) |
stdout, stderr = await process.communicate() |
if process.returncode != 0: |
error_message = f"FFmpeg error: {stderr.decode()}" |
logger.error(error_message) |
yield error_message |
raise Exception(error_message) |
if not output_path.exists(): |
error_message = f"Output file not created: {output_path}" |
logger.error(error_message) |
yield error_message |
raise FileNotFoundError(error_message) |
logger.info(f"Successfully downloaded segment to {output_path}") |
yield f"Successfully downloaded segment to {output_path}" |
async def combine_segments_async(video_segments, audio_segments, output_path): |
if not audio_segments: |
raise Exception("No audio segments to combine") |
video_list = None |
audio_list = None |
temp_video = None |
temp_audio = None |
try: |
if video_segments: |
temp_video = 'temp_video.mp4' |
temp_audio = 'temp_audio.m4a' |
video_list = 'video_list.txt' |
async with aiofiles.open(video_list, 'w') as f: |
await f.write('\n'.join(f"file '{segment}'" for segment in video_segments)) |
process = await asyncio.create_subprocess_exec( |
'ffmpeg', '-f', 'concat', '-safe', '0', '-i', video_list, '-c', 'copy', temp_video, |
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
) |
stdout, stderr = await process.communicate() |
if process.returncode != 0: |
raise Exception(f"Error concatenating video segments: {stderr.decode()}") |
audio_list = 'audio_list.txt' |
async with aiofiles.open(audio_list, 'w') as f: |
await f.write('\n'.join(f"file '{segment}'" for segment in audio_segments)) |
process = await asyncio.create_subprocess_exec( |
'ffmpeg', '-f', 'concat', '-safe', '0', '-i', audio_list, '-c', 'copy', temp_audio, |
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
) |
stdout, stderr = await process.communicate() |
if process.returncode != 0: |
raise Exception(f"Error concatenating audio segments: {stderr.decode()}") |
process = await asyncio.create_subprocess_exec( |
'ffmpeg', '-i', temp_video, '-i', temp_audio, '-c', 'copy', output_path, |
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
) |
stdout, stderr = await process.communicate() |
if process.returncode != 0: |
raise Exception(f"Error combining video and audio: {stderr.decode()}") |
else: |
audio_list = 'audio_list.txt' |
async with aiofiles.open(audio_list, 'w') as f: |
await f.write('\n'.join(f"file '{segment}'" for segment in audio_segments)) |
process = await asyncio.create_subprocess_exec( |
'ffmpeg', '-f', 'concat', '-safe', '0', '-i', audio_list, '-c', 'copy', output_path, |
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
) |
stdout, stderr = await process.communicate() |
if process.returncode != 0: |
raise Exception(f"Error combining audio segments: {stderr.decode()}") |
return output_path |
except Exception as e: |
raise Exception(f"Error in combine_segments_async: {str(e)}") |
finally: |
for file in [f for f in [video_list, audio_list, temp_video, temp_audio] if f]: |
try: |
Path(file).unlink(missing_ok=True) |
except Exception as e: |
print(f"Error removing temporary file {file}: {str(e)}") |
def add_segment(start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments): |
start_time = f"{start_hours:02d}:{start_minutes:02d}:{start_seconds:02d}" |
end_time = f"{end_hours:02d}:{end_minutes:02d}:{end_seconds:02d}" |
new_segment = f"{start_time}-{end_time}" |
new_row = pd.DataFrame([new_segment], columns=["Segment"]) |
return pd.concat([segments, new_row], ignore_index=True) |
def remove_segment(segments, index): |
return segments.drop(index).reset_index(drop=True) |
def move_segment(segments, old_index, new_index): |
if 0 <= old_index < len(segments) and 0 <= new_index < len(segments): |
segment = segments.iloc[old_index] |
segments = segments.drop(old_index).reset_index(drop=True) |
segments = pd.concat([segments.iloc[:new_index], pd.DataFrame([segment]), segments.iloc[new_index:]]).reset_index(drop=True) |
return segments |
def parse_segments(segments: pd.DataFrame) -> List[Tuple[int, int]]: |
parsed_segments = [] |
for segment in segments['Segment']: |
if not isinstance(segment, str) or '-' not in segment: |
continue |
try: |
start, end = segment.split('-') |
start_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(start.split(':'))) if i) |
end_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(end.split(':'))) if i) |
if start_seconds < end_seconds: |
parsed_segments.append((start_seconds, end_seconds)) |
except ValueError: |
continue |
return parsed_segments |
async def process_video(video_url, segments, combine, audio_only, progress=gr.Progress()): |
if not video_url.strip(): |
yield 0, "Error: Please provide a valid YouTube URL", None |
return |
parsed_segments = parse_segments(segments) |
if not parsed_segments: |
yield 0, "Error: No valid segments provided", None |
return |
output_dir = Path('output') |
if output_dir.exists(): |
try: |
shutil.rmtree(output_dir) |
yield 0, "Cleaned up existing output directory", None |
except Exception as e: |
yield 0, f"Error cleaning up output directory: {str(e)}", None |
return |
output_dir.mkdir(exist_ok=True) |
try: |
progress(0, "Extracting video info...") |
video_title, video_url, audio_url = get_video_info(video_url) |
except Exception as e: |
yield 0, f"Error: {str(e)}", None |
return |
video_segments = [] |
audio_segments = [] |
total_segments = len(parsed_segments) |
for i, (start_time, end_time) in enumerate(parsed_segments): |
progress((i / total_segments) * 0.8, f"Downloading segment {i+1}/{total_segments}") |
try: |
if not audio_only: |
video_output = output_dir / f"{sanitize_title(video_title)}_video_segment_{i+1}.mp4" |
async for output in download_segment_async(video_url, start_time, end_time, str(video_output)): |
progress((i / total_segments) * 0.8 + (1 / total_segments) * 0.4, f"Downloading video segment {i+1}/{total_segments}: {output}") |
if video_output.exists(): |
video_segments.append(str(video_output)) |
else: |
raise FileNotFoundError(f"Video segment file not found: {video_output}") |
audio_output = output_dir / f"{sanitize_title(video_title)}_audio_segment_{i+1}.m4a" |
async for output in download_segment_async(audio_url or video_url, start_time, end_time, str(audio_output)): |
progress((i / total_segments) * 0.8 + (1 / total_segments) * 0.8, f"Downloading audio segment {i+1}/{total_segments}: {output}") |
if audio_output.exists(): |
audio_segments.append(str(audio_output)) |
else: |
raise FileNotFoundError(f"Audio segment file not found: {audio_output}") |
except Exception as e: |
yield (i / total_segments) * 100, f"Error downloading segment {i+1}: {str(e)}", None |
return |
try: |
if combine: |
progress(90, "Combining segments...") |
if audio_only: |
output_path = output_dir / f"{sanitize_title(video_title)}_combined.m4a" |
else: |
output_path = output_dir / f"{sanitize_title(video_title)}_combined.mp4" |
output_path = await combine_segments_async(video_segments if not audio_only else [], audio_segments, str(output_path)) |
yield 100, f"Segments combined and saved as {output_path}", output_path |
else: |
output_path = audio_segments[0] if audio_only else video_segments[0] |
yield 100, "All segments downloaded successfully", output_path |
except Exception as e: |
yield 100, f"Error: {str(e)}", None |
finally: |
if combine: |
for segment in video_segments + audio_segments: |
try: |
Path(segment).unlink(missing_ok=True) |
except Exception as e: |
print(f"Error removing segment file {segment}: {str(e)}") |
def get_video_qualities(video_url): |
if not video_url.strip(): |
return [] |
with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: |
try: |
info = ydl.extract_info(video_url, download=False) |
formats = info.get('formats', []) |
qualities = set() |
for f in formats: |
if f.get('vcodec') != 'none' and f.get('acodec') != 'none': |
height = f.get('height') |
if height: |
qualities.add(f"{height}p") |
return sorted(list(qualities), key=lambda x: int(x[:-1]), reverse=True) |
except DownloadError: |
return [] |
except Exception as e: |
print(f"Error in get_video_qualities: {str(e)}") |
return [] |
utils.colab_check = lambda: True |
custom_css = """ |
/* Reset and base styles */ |
* { box-sizing: border-box; margin: 0; padding: 0; } |
body, #component-0 { |
height: 100vh; |
max-height: 100vh; |
overflow: hidden; |
display: flex; |
flex-direction: column; |
} |
/* Gradio container modifications */ |
.gradio-container { |
flex-grow: 1; |
display: flex; |
flex-direction: column; |
max-height: 100vh; |
overflow: hidden; |
} |
/* Header */ |
.header { |
padding: 1rem; |
background-color: #f0f0f0; |
} |
/* Main content area */ |
.main-content { |
flex-grow: 1; |
display: flex; |
overflow: hidden; |
padding: 1rem; |
gap: 1rem; |
} |
/* Columns */ |
.input-section, .options-section, .output-section { |
flex: 1; |
display: flex; |
flex-direction: column; |
overflow: hidden; |
background-color: #ffffff; |
border-radius: 8px; |
box-shadow: 0 2px 10px rgba(0,0,0,0.1); |
} |
/* Scrollable areas */ |
.scroll-area { |
flex-grow: 1; |
overflow-y: auto; |
padding: 1rem; |
} |
/* Responsive adjustments */ |
@media (max-width: 768px) { |
.main-content { |
flex-direction: column; |
} |
} |
/* Additional styles */ |
.segment-input, .time-input, .button-row { |
display: flex; |
gap: 0.5rem; |
margin-bottom: 1rem; |
} |
.time-input { width: 60px !important; } |
.url-input { flex-grow: 1; } |
.quality-dropdown { width: 100px !important; } |
""" |
with gr.Blocks(title="YouTube Segment Downloader", theme=gr.themes.Default(), css=custom_css) as iface: |
gr.Markdown("# 🎬 YouTube Segment Downloader", elem_classes="header") |
with gr.Row(elem_classes="main-content"): |
with gr.Column(elem_classes="input-section"): |
with gr.Column(elem_classes="scroll-area"): |
with gr.Row(): |
video_url = gr.Textbox(label="YouTube URL", placeholder="Paste URL here", elem_classes="url-input") |
quality = gr.Dropdown(label="Quality", choices=[], interactive=True, elem_classes="quality-dropdown", visible=False) |
url_status = gr.Markdown(visible=False) |
gr.Markdown("### Add Segments") |
with gr.Row(elem_classes="segment-input"): |
start_hours = gr.Number(label="Start HH", minimum=0, maximum=23, step=1, value=0, elem_classes="time-input") |
start_minutes = gr.Number(label="MM", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") |
start_seconds = gr.Number(label="SS", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") |
gr.Markdown("to") |
end_hours = gr.Number(label="End HH", minimum=0, maximum=23, step=1, value=0, elem_classes="time-input") |
end_minutes = gr.Number(label="MM", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") |
end_seconds = gr.Number(label="SS", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") |
add_btn = gr.Button("Add Segment", variant="primary") |
segments = gr.Dataframe(headers=["Segment"], row_count=3, col_count=1, datatype=["str"], interactive=True, label="Segments") |
with gr.Column(elem_classes="options-section"): |
with gr.Column(elem_classes="scroll-area"): |
combine = gr.Checkbox(label="Combine segments", value=True) |
audio_only = gr.Checkbox(label="Audio only", value=False) |
remove_index = gr.Number(label="Remove segment", minimum=0, step=1, value=0) |
remove_btn = gr.Button("Remove", variant="secondary") |
old_index = gr.Number(label="Move from", minimum=0, step=1, value=0) |
new_index = gr.Number(label="to", minimum=0, step=1, value=0) |
move_btn = gr.Button("Move", variant="secondary") |
submit_btn = gr.Button("🚀 Download", variant="primary") |
with gr.Column(elem_classes="output-section"): |
with gr.Column(elem_classes="scroll-area"): |
progress = gr.Slider(label="Progress", minimum=0, maximum=100, step=1, interactive=False) |
status = gr.Textbox(label="Status", lines=1) |
output_file = gr.File(label="Downloaded File") |
add_btn.click( |
add_segment, |
inputs=[start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments], |
outputs=[segments] |
) |
submit_btn.click( |
process_video, |
inputs=[video_url, segments, combine, audio_only], |
outputs=[progress, status, output_file] |
) |
remove_btn.click( |
remove_segment, |
inputs=[segments, remove_index], |
outputs=[segments] |
) |
move_btn.click( |
move_segment, |
inputs=[segments, old_index, new_index], |
outputs=[segments] |
) |
def update_qualities(url): |
qualities = get_video_qualities(url) |
if qualities: |
return ( |
gr.Dropdown(choices=qualities, value=qualities[0], visible=True), |
gr.Markdown(visible=False) |
) |
elif url.strip(): |
return ( |
gr.Dropdown(choices=[], visible=False), |
gr.Markdown("Unable to fetch video qualities. The URL might be invalid or the video might be unavailable.", visible=True) |
) |
else: |
return ( |
gr.Dropdown(choices=[], visible=False), |
gr.Markdown(visible=False) |
) |
video_url.change( |
update_qualities, |
inputs=[video_url], |
outputs=[quality, url_status] |
) |
iface.launch(server_name="", server_port=7860) |