|
import gradio as gr |
|
from gradio import utils |
|
import os |
|
import re |
|
import requests |
|
from concurrent.futures import ThreadPoolExecutor |
|
import time |
|
from yt_dlp import YoutubeDL |
|
from yt_dlp.utils import DownloadError |
|
import subprocess |
|
import shutil |
|
from typing import List, Tuple |
|
import pandas as pd |
|
import asyncio |
|
import aiohttp |
|
import aiofiles |
|
import json |
|
from functools import lru_cache |
|
import tenacity |
|
from tqdm import tqdm |
|
from pathlib import Path |
|
import logging |
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
logger = logging.getLogger(__name__) |
|
|
|
def sanitize_title(title): |
|
return re.sub(r'[\\/*?:"<>|]', "", title) |
|
|
|
def format_time(seconds): |
|
return time.strftime('%H:%M:%S', time.gmtime(seconds)) |
|
|
|
@tenacity.retry( |
|
stop=tenacity.stop_after_attempt(3), |
|
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10), |
|
retry=tenacity.retry_if_exception_type((requests.RequestException, aiohttp.ClientError)), |
|
reraise=True |
|
) |
|
@lru_cache(maxsize=100) |
|
def get_video_info(video_url): |
|
with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: |
|
try: |
|
info = ydl.extract_info(video_url, download=False) |
|
formats = info.get('formats', []) |
|
|
|
|
|
def get_bitrate(format_dict, key): |
|
return format_dict.get(key, 0) or 0 |
|
|
|
|
|
video_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') == 'none'] |
|
audio_formats = [f for f in formats if f.get('acodec') != 'none' and f.get('vcodec') == 'none'] |
|
|
|
if video_formats and audio_formats: |
|
video_format = max(video_formats, key=lambda f: get_bitrate(f, 'vbr')) |
|
audio_format = max(audio_formats, key=lambda f: get_bitrate(f, 'abr')) |
|
return info['title'], video_format['url'], audio_format['url'] |
|
else: |
|
|
|
combined_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') != 'none'] |
|
if combined_formats: |
|
best_format = max(combined_formats, key=lambda f: get_bitrate(f, 'tbr')) |
|
return info['title'], best_format['url'], None |
|
else: |
|
raise Exception("No suitable video formats found") |
|
except DownloadError as e: |
|
raise Exception(f"Error extracting video info: {str(e)}") |
|
except Exception as e: |
|
raise Exception(f"Unexpected error: {str(e)}") |
|
|
|
@tenacity.retry( |
|
stop=tenacity.stop_after_attempt(3), |
|
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10), |
|
retry=tenacity.retry_if_exception_type((subprocess.CalledProcessError, asyncio.TimeoutError)), |
|
reraise=True |
|
) |
|
async def download_segment_async(url, start_time, end_time, output_path): |
|
output_path = Path(output_path) |
|
command = [ |
|
'ffmpeg', |
|
'-ss', format_time(start_time), |
|
'-i', url, |
|
'-t', format_time(end_time - start_time), |
|
'-c', 'copy', |
|
'-avoid_negative_ts', 'make_zero', |
|
'-y', |
|
str(output_path) |
|
] |
|
logger.debug(f"Executing command: {' '.join(command)}") |
|
process = await asyncio.create_subprocess_exec( |
|
*command, |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.PIPE |
|
) |
|
|
|
stdout, stderr = await process.communicate() |
|
|
|
if process.returncode != 0: |
|
error_message = f"FFmpeg error: {stderr.decode()}" |
|
logger.error(error_message) |
|
yield error_message |
|
raise Exception(error_message) |
|
|
|
if not output_path.exists(): |
|
error_message = f"Output file not created: {output_path}" |
|
logger.error(error_message) |
|
yield error_message |
|
raise FileNotFoundError(error_message) |
|
|
|
logger.info(f"Successfully downloaded segment to {output_path}") |
|
yield f"Successfully downloaded segment to {output_path}" |
|
|
|
async def combine_segments_async(video_segments, audio_segments, output_path): |
|
if not audio_segments: |
|
raise Exception("No audio segments to combine") |
|
|
|
video_list = None |
|
audio_list = None |
|
temp_video = None |
|
temp_audio = None |
|
|
|
try: |
|
if video_segments: |
|
temp_video = 'temp_video.mp4' |
|
temp_audio = 'temp_audio.m4a' |
|
|
|
|
|
video_list = 'video_list.txt' |
|
async with aiofiles.open(video_list, 'w') as f: |
|
await f.write('\n'.join(f"file '{segment}'" for segment in video_segments)) |
|
|
|
process = await asyncio.create_subprocess_exec( |
|
'ffmpeg', '-f', 'concat', '-safe', '0', '-i', video_list, '-c', 'copy', temp_video, |
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
|
) |
|
stdout, stderr = await process.communicate() |
|
if process.returncode != 0: |
|
raise Exception(f"Error concatenating video segments: {stderr.decode()}") |
|
|
|
|
|
audio_list = 'audio_list.txt' |
|
async with aiofiles.open(audio_list, 'w') as f: |
|
await f.write('\n'.join(f"file '{segment}'" for segment in audio_segments)) |
|
|
|
process = await asyncio.create_subprocess_exec( |
|
'ffmpeg', '-f', 'concat', '-safe', '0', '-i', audio_list, '-c', 'copy', temp_audio, |
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
|
) |
|
stdout, stderr = await process.communicate() |
|
if process.returncode != 0: |
|
raise Exception(f"Error concatenating audio segments: {stderr.decode()}") |
|
|
|
|
|
process = await asyncio.create_subprocess_exec( |
|
'ffmpeg', '-i', temp_video, '-i', temp_audio, '-c', 'copy', output_path, |
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
|
) |
|
stdout, stderr = await process.communicate() |
|
if process.returncode != 0: |
|
raise Exception(f"Error combining video and audio: {stderr.decode()}") |
|
else: |
|
|
|
audio_list = 'audio_list.txt' |
|
async with aiofiles.open(audio_list, 'w') as f: |
|
await f.write('\n'.join(f"file '{segment}'" for segment in audio_segments)) |
|
|
|
process = await asyncio.create_subprocess_exec( |
|
'ffmpeg', '-f', 'concat', '-safe', '0', '-i', audio_list, '-c', 'copy', output_path, |
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
|
) |
|
stdout, stderr = await process.communicate() |
|
if process.returncode != 0: |
|
raise Exception(f"Error combining audio segments: {stderr.decode()}") |
|
|
|
return output_path |
|
except Exception as e: |
|
raise Exception(f"Error in combine_segments_async: {str(e)}") |
|
finally: |
|
|
|
for file in [f for f in [video_list, audio_list, temp_video, temp_audio] if f]: |
|
try: |
|
Path(file).unlink(missing_ok=True) |
|
except Exception as e: |
|
print(f"Error removing temporary file {file}: {str(e)}") |
|
|
|
def add_segment(start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments): |
|
start_time = f"{start_hours:02d}:{start_minutes:02d}:{start_seconds:02d}" |
|
end_time = f"{end_hours:02d}:{end_minutes:02d}:{end_seconds:02d}" |
|
new_segment = f"{start_time}-{end_time}" |
|
new_row = pd.DataFrame([new_segment], columns=["Segment"]) |
|
return pd.concat([segments, new_row], ignore_index=True) |
|
|
|
def remove_segment(segments, index): |
|
return segments.drop(index).reset_index(drop=True) |
|
|
|
def move_segment(segments, old_index, new_index): |
|
if 0 <= old_index < len(segments) and 0 <= new_index < len(segments): |
|
segment = segments.iloc[old_index] |
|
segments = segments.drop(old_index).reset_index(drop=True) |
|
segments = pd.concat([segments.iloc[:new_index], pd.DataFrame([segment]), segments.iloc[new_index:]]).reset_index(drop=True) |
|
return segments |
|
|
|
def parse_segments(segments: pd.DataFrame) -> List[Tuple[int, int]]: |
|
parsed_segments = [] |
|
for segment in segments['Segment']: |
|
if not isinstance(segment, str) or '-' not in segment: |
|
continue |
|
try: |
|
start, end = segment.split('-') |
|
start_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(start.split(':'))) if i) |
|
end_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(end.split(':'))) if i) |
|
if start_seconds < end_seconds: |
|
parsed_segments.append((start_seconds, end_seconds)) |
|
except ValueError: |
|
continue |
|
return parsed_segments |
|
|
|
async def process_video(video_url, segments, combine, audio_only, progress=gr.Progress()): |
|
if not video_url.strip(): |
|
yield 0, "Error: Please provide a valid YouTube URL", None |
|
return |
|
|
|
parsed_segments = parse_segments(segments) |
|
if not parsed_segments: |
|
yield 0, "Error: No valid segments provided", None |
|
return |
|
|
|
output_dir = Path('output') |
|
|
|
|
|
if output_dir.exists(): |
|
try: |
|
shutil.rmtree(output_dir) |
|
yield 0, "Cleaned up existing output directory", None |
|
except Exception as e: |
|
yield 0, f"Error cleaning up output directory: {str(e)}", None |
|
return |
|
|
|
output_dir.mkdir(exist_ok=True) |
|
|
|
try: |
|
progress(0, "Extracting video info...") |
|
video_title, video_url, audio_url = get_video_info(video_url) |
|
except Exception as e: |
|
yield 0, f"Error: {str(e)}", None |
|
return |
|
|
|
video_segments = [] |
|
audio_segments = [] |
|
total_segments = len(parsed_segments) |
|
|
|
for i, (start_time, end_time) in enumerate(parsed_segments): |
|
progress((i / total_segments) * 0.8, f"Downloading segment {i+1}/{total_segments}") |
|
|
|
try: |
|
if not audio_only: |
|
video_output = output_dir / f"{sanitize_title(video_title)}_video_segment_{i+1}.mp4" |
|
async for output in download_segment_async(video_url, start_time, end_time, str(video_output)): |
|
progress((i / total_segments) * 0.8 + (1 / total_segments) * 0.4, f"Downloading video segment {i+1}/{total_segments}: {output}") |
|
if video_output.exists(): |
|
video_segments.append(str(video_output)) |
|
else: |
|
raise FileNotFoundError(f"Video segment file not found: {video_output}") |
|
|
|
audio_output = output_dir / f"{sanitize_title(video_title)}_audio_segment_{i+1}.m4a" |
|
async for output in download_segment_async(audio_url or video_url, start_time, end_time, str(audio_output)): |
|
progress((i / total_segments) * 0.8 + (1 / total_segments) * 0.8, f"Downloading audio segment {i+1}/{total_segments}: {output}") |
|
if audio_output.exists(): |
|
audio_segments.append(str(audio_output)) |
|
else: |
|
raise FileNotFoundError(f"Audio segment file not found: {audio_output}") |
|
|
|
except Exception as e: |
|
yield (i / total_segments) * 100, f"Error downloading segment {i+1}: {str(e)}", None |
|
return |
|
|
|
try: |
|
if combine: |
|
progress(90, "Combining segments...") |
|
if audio_only: |
|
output_path = output_dir / f"{sanitize_title(video_title)}_combined.m4a" |
|
else: |
|
output_path = output_dir / f"{sanitize_title(video_title)}_combined.mp4" |
|
|
|
output_path = await combine_segments_async(video_segments if not audio_only else [], audio_segments, str(output_path)) |
|
yield 100, f"Segments combined and saved as {output_path}", output_path |
|
else: |
|
output_path = audio_segments[0] if audio_only else video_segments[0] |
|
yield 100, "All segments downloaded successfully", output_path |
|
|
|
except Exception as e: |
|
yield 100, f"Error: {str(e)}", None |
|
finally: |
|
|
|
if combine: |
|
for segment in video_segments + audio_segments: |
|
try: |
|
Path(segment).unlink(missing_ok=True) |
|
except Exception as e: |
|
print(f"Error removing segment file {segment}: {str(e)}") |
|
|
|
def get_video_qualities(video_url): |
|
if not video_url.strip(): |
|
return [] |
|
|
|
with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: |
|
try: |
|
info = ydl.extract_info(video_url, download=False) |
|
formats = info.get('formats', []) |
|
|
|
qualities = set() |
|
for f in formats: |
|
if f.get('vcodec') != 'none' and f.get('acodec') != 'none': |
|
height = f.get('height') |
|
if height: |
|
qualities.add(f"{height}p") |
|
|
|
return sorted(list(qualities), key=lambda x: int(x[:-1]), reverse=True) |
|
except DownloadError: |
|
return [] |
|
except Exception as e: |
|
print(f"Error in get_video_qualities: {str(e)}") |
|
return [] |
|
|
|
|
|
utils.colab_check = lambda: True |
|
|
|
custom_css = """ |
|
/* Reset and base styles */ |
|
* { box-sizing: border-box; margin: 0; padding: 0; } |
|
|
|
body, #component-0 { |
|
height: 100vh; |
|
max-height: 100vh; |
|
overflow: hidden; |
|
display: flex; |
|
flex-direction: column; |
|
} |
|
|
|
/* Gradio container modifications */ |
|
.gradio-container { |
|
flex-grow: 1; |
|
display: flex; |
|
flex-direction: column; |
|
max-height: 100vh; |
|
overflow: hidden; |
|
} |
|
|
|
/* Header */ |
|
.header { |
|
padding: 1rem; |
|
background-color: #f0f0f0; |
|
} |
|
|
|
/* Main content area */ |
|
.main-content { |
|
flex-grow: 1; |
|
display: flex; |
|
overflow: hidden; |
|
padding: 1rem; |
|
gap: 1rem; |
|
} |
|
|
|
/* Columns */ |
|
.input-section, .options-section, .output-section { |
|
flex: 1; |
|
display: flex; |
|
flex-direction: column; |
|
overflow: hidden; |
|
background-color: #ffffff; |
|
border-radius: 8px; |
|
box-shadow: 0 2px 10px rgba(0,0,0,0.1); |
|
} |
|
|
|
/* Scrollable areas */ |
|
.scroll-area { |
|
flex-grow: 1; |
|
overflow-y: auto; |
|
padding: 1rem; |
|
} |
|
|
|
/* Responsive adjustments */ |
|
@media (max-width: 768px) { |
|
.main-content { |
|
flex-direction: column; |
|
} |
|
} |
|
|
|
/* Additional styles */ |
|
.segment-input, .time-input, .button-row { |
|
display: flex; |
|
gap: 0.5rem; |
|
margin-bottom: 1rem; |
|
} |
|
.time-input { width: 60px !important; } |
|
.url-input { flex-grow: 1; } |
|
.quality-dropdown { width: 100px !important; } |
|
""" |
|
|
|
with gr.Blocks(title="YouTube Segment Downloader", theme=gr.themes.Default(), css=custom_css) as iface: |
|
gr.Markdown("# 🎬 YouTube Segment Downloader", elem_classes="header") |
|
|
|
with gr.Row(elem_classes="main-content"): |
|
with gr.Column(elem_classes="input-section"): |
|
with gr.Column(elem_classes="scroll-area"): |
|
with gr.Row(): |
|
video_url = gr.Textbox(label="YouTube URL", placeholder="Paste URL here", elem_classes="url-input") |
|
quality = gr.Dropdown(label="Quality", choices=[], interactive=True, elem_classes="quality-dropdown", visible=False) |
|
url_status = gr.Markdown(visible=False) |
|
|
|
gr.Markdown("### Add Segments") |
|
with gr.Row(elem_classes="segment-input"): |
|
start_hours = gr.Number(label="Start HH", minimum=0, maximum=23, step=1, value=0, elem_classes="time-input") |
|
start_minutes = gr.Number(label="MM", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") |
|
start_seconds = gr.Number(label="SS", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") |
|
gr.Markdown("to") |
|
end_hours = gr.Number(label="End HH", minimum=0, maximum=23, step=1, value=0, elem_classes="time-input") |
|
end_minutes = gr.Number(label="MM", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") |
|
end_seconds = gr.Number(label="SS", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") |
|
|
|
add_btn = gr.Button("Add Segment", variant="primary") |
|
segments = gr.Dataframe(headers=["Segment"], row_count=3, col_count=1, datatype=["str"], interactive=True, label="Segments") |
|
|
|
with gr.Column(elem_classes="options-section"): |
|
with gr.Column(elem_classes="scroll-area"): |
|
combine = gr.Checkbox(label="Combine segments", value=True) |
|
audio_only = gr.Checkbox(label="Audio only", value=False) |
|
remove_index = gr.Number(label="Remove segment", minimum=0, step=1, value=0) |
|
remove_btn = gr.Button("Remove", variant="secondary") |
|
old_index = gr.Number(label="Move from", minimum=0, step=1, value=0) |
|
new_index = gr.Number(label="to", minimum=0, step=1, value=0) |
|
move_btn = gr.Button("Move", variant="secondary") |
|
submit_btn = gr.Button("🚀 Download", variant="primary") |
|
|
|
with gr.Column(elem_classes="output-section"): |
|
with gr.Column(elem_classes="scroll-area"): |
|
progress = gr.Slider(label="Progress", minimum=0, maximum=100, step=1, interactive=False) |
|
status = gr.Textbox(label="Status", lines=1) |
|
output_file = gr.File(label="Downloaded File") |
|
|
|
add_btn.click( |
|
add_segment, |
|
inputs=[start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments], |
|
outputs=[segments] |
|
) |
|
|
|
submit_btn.click( |
|
process_video, |
|
inputs=[video_url, segments, combine, audio_only], |
|
outputs=[progress, status, output_file] |
|
) |
|
|
|
remove_btn.click( |
|
remove_segment, |
|
inputs=[segments, remove_index], |
|
outputs=[segments] |
|
) |
|
|
|
move_btn.click( |
|
move_segment, |
|
inputs=[segments, old_index, new_index], |
|
outputs=[segments] |
|
) |
|
|
|
def update_qualities(url): |
|
qualities = get_video_qualities(url) |
|
if qualities: |
|
return ( |
|
gr.Dropdown(choices=qualities, value=qualities[0], visible=True), |
|
gr.Markdown(visible=False) |
|
) |
|
elif url.strip(): |
|
return ( |
|
gr.Dropdown(choices=[], visible=False), |
|
gr.Markdown("Unable to fetch video qualities. The URL might be invalid or the video might be unavailable.", visible=True) |
|
) |
|
else: |
|
return ( |
|
gr.Dropdown(choices=[], visible=False), |
|
gr.Markdown(visible=False) |
|
) |
|
|
|
video_url.change( |
|
update_qualities, |
|
inputs=[video_url], |
|
outputs=[quality, url_status] |
|
) |
|
|
|
iface.launch(server_name="0.0.0.0", server_port=7860) |