import gradio as gr from gradio import utils import os import re import requests from concurrent.futures import ThreadPoolExecutor import time from yt_dlp import YoutubeDL from yt_dlp.utils import DownloadError import subprocess import shutil from typing import List, Tuple import pandas as pd import asyncio import aiohttp import aiofiles import json from functools import lru_cache import tenacity from tqdm import tqdm from pathlib import Path import logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) def sanitize_title(title): return re.sub(r'[\\/*?:"<>|]', "", title) def format_time(seconds): return time.strftime('%H:%M:%S', time.gmtime(seconds)) @tenacity.retry( stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(multiplier=1, min=4, max=10), retry=tenacity.retry_if_exception_type((requests.RequestException, aiohttp.ClientError)), reraise=True ) @lru_cache(maxsize=100) def get_video_info(video_url): with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: try: info = ydl.extract_info(video_url, download=False) formats = info.get('formats', []) # Function to safely get bitrate def get_bitrate(format_dict, key): return format_dict.get(key, 0) or 0 # Prefer adaptive formats (separate video and audio) video_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') == 'none'] audio_formats = [f for f in formats if f.get('acodec') != 'none' and f.get('vcodec') == 'none'] if video_formats and audio_formats: video_format = max(video_formats, key=lambda f: get_bitrate(f, 'vbr')) audio_format = max(audio_formats, key=lambda f: get_bitrate(f, 'abr')) return info['title'], video_format['url'], audio_format['url'] else: # Fallback to best combined format combined_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') != 'none'] if combined_formats: best_format = max(combined_formats, key=lambda f: get_bitrate(f, 'tbr')) return info['title'], best_format['url'], None else: raise Exception("No suitable video formats found") except DownloadError as e: raise Exception(f"Error extracting video info: {str(e)}") except Exception as e: raise Exception(f"Unexpected error: {str(e)}") @tenacity.retry( stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(multiplier=1, min=4, max=10), retry=tenacity.retry_if_exception_type((subprocess.CalledProcessError, asyncio.TimeoutError)), reraise=True ) async def download_segment_async(url, start_time, end_time, output_path): output_path = Path(output_path) command = [ 'ffmpeg', '-ss', format_time(start_time), '-i', url, '-t', format_time(end_time - start_time), '-c', 'copy', '-avoid_negative_ts', 'make_zero', '-y', str(output_path) ] logger.debug(f"Executing command: {' '.join(command)}") process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: error_message = f"FFmpeg error: {stderr.decode()}" logger.error(error_message) yield error_message raise Exception(error_message) if not output_path.exists(): error_message = f"Output file not created: {output_path}" logger.error(error_message) yield error_message raise FileNotFoundError(error_message) logger.info(f"Successfully downloaded segment to {output_path}") yield f"Successfully downloaded segment to {output_path}" async def combine_segments_async(video_segments, audio_segments, output_path): if not audio_segments: raise Exception("No audio segments to combine") video_list = None audio_list = None temp_video = None temp_audio = None try: if video_segments: temp_video = 'temp_video.mp4' temp_audio = 'temp_audio.m4a' # Concatenate video segments video_list = 'video_list.txt' async with aiofiles.open(video_list, 'w') as f: await f.write('\n'.join(f"file '{segment}'" for segment in video_segments)) process = await asyncio.create_subprocess_exec( 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', video_list, '-c', 'copy', temp_video, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: raise Exception(f"Error concatenating video segments: {stderr.decode()}") # Concatenate audio segments audio_list = 'audio_list.txt' async with aiofiles.open(audio_list, 'w') as f: await f.write('\n'.join(f"file '{segment}'" for segment in audio_segments)) process = await asyncio.create_subprocess_exec( 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', audio_list, '-c', 'copy', temp_audio, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: raise Exception(f"Error concatenating audio segments: {stderr.decode()}") # Combine video and audio process = await asyncio.create_subprocess_exec( 'ffmpeg', '-i', temp_video, '-i', temp_audio, '-c', 'copy', output_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: raise Exception(f"Error combining video and audio: {stderr.decode()}") else: # Audio only audio_list = 'audio_list.txt' async with aiofiles.open(audio_list, 'w') as f: await f.write('\n'.join(f"file '{segment}'" for segment in audio_segments)) process = await asyncio.create_subprocess_exec( 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', audio_list, '-c', 'copy', output_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: raise Exception(f"Error combining audio segments: {stderr.decode()}") return output_path except Exception as e: raise Exception(f"Error in combine_segments_async: {str(e)}") finally: # Clean up temporary files for file in [f for f in [video_list, audio_list, temp_video, temp_audio] if f]: try: Path(file).unlink(missing_ok=True) except Exception as e: print(f"Error removing temporary file {file}: {str(e)}") def add_segment(start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments): start_time = f"{start_hours:02d}:{start_minutes:02d}:{start_seconds:02d}" end_time = f"{end_hours:02d}:{end_minutes:02d}:{end_seconds:02d}" new_segment = f"{start_time}-{end_time}" new_row = pd.DataFrame([new_segment], columns=["Segment"]) return pd.concat([segments, new_row], ignore_index=True) def remove_segment(segments, index): return segments.drop(index).reset_index(drop=True) def move_segment(segments, old_index, new_index): if 0 <= old_index < len(segments) and 0 <= new_index < len(segments): segment = segments.iloc[old_index] segments = segments.drop(old_index).reset_index(drop=True) segments = pd.concat([segments.iloc[:new_index], pd.DataFrame([segment]), segments.iloc[new_index:]]).reset_index(drop=True) return segments def parse_segments(segments: pd.DataFrame) -> List[Tuple[int, int]]: parsed_segments = [] for segment in segments['Segment']: if not isinstance(segment, str) or '-' not in segment: continue try: start, end = segment.split('-') start_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(start.split(':'))) if i) end_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(end.split(':'))) if i) if start_seconds < end_seconds: parsed_segments.append((start_seconds, end_seconds)) except ValueError: continue # Skip invalid segments return parsed_segments async def process_video(video_url, segments, combine, audio_only, progress=gr.Progress()): if not video_url.strip(): yield 0, "Error: Please provide a valid YouTube URL", None return parsed_segments = parse_segments(segments) if not parsed_segments: yield 0, "Error: No valid segments provided", None return output_dir = Path('output') # Clean up the output directory if output_dir.exists(): try: shutil.rmtree(output_dir) yield 0, "Cleaned up existing output directory", None except Exception as e: yield 0, f"Error cleaning up output directory: {str(e)}", None return output_dir.mkdir(exist_ok=True) try: progress(0, "Extracting video info...") video_title, video_url, audio_url = get_video_info(video_url) except Exception as e: yield 0, f"Error: {str(e)}", None return video_segments = [] audio_segments = [] total_segments = len(parsed_segments) for i, (start_time, end_time) in enumerate(parsed_segments): progress((i / total_segments) * 0.8, f"Downloading segment {i+1}/{total_segments}") try: if not audio_only: video_output = output_dir / f"{sanitize_title(video_title)}_video_segment_{i+1}.mp4" async for output in download_segment_async(video_url, start_time, end_time, str(video_output)): progress((i / total_segments) * 0.8 + (1 / total_segments) * 0.4, f"Downloading video segment {i+1}/{total_segments}: {output}") if video_output.exists(): video_segments.append(str(video_output)) else: raise FileNotFoundError(f"Video segment file not found: {video_output}") audio_output = output_dir / f"{sanitize_title(video_title)}_audio_segment_{i+1}.m4a" async for output in download_segment_async(audio_url or video_url, start_time, end_time, str(audio_output)): progress((i / total_segments) * 0.8 + (1 / total_segments) * 0.8, f"Downloading audio segment {i+1}/{total_segments}: {output}") if audio_output.exists(): audio_segments.append(str(audio_output)) else: raise FileNotFoundError(f"Audio segment file not found: {audio_output}") except Exception as e: yield (i / total_segments) * 100, f"Error downloading segment {i+1}: {str(e)}", None return try: if combine: progress(90, "Combining segments...") if audio_only: output_path = output_dir / f"{sanitize_title(video_title)}_combined.m4a" else: output_path = output_dir / f"{sanitize_title(video_title)}_combined.mp4" output_path = await combine_segments_async(video_segments if not audio_only else [], audio_segments, str(output_path)) yield 100, f"Segments combined and saved as {output_path}", output_path else: output_path = audio_segments[0] if audio_only else video_segments[0] yield 100, "All segments downloaded successfully", output_path except Exception as e: yield 100, f"Error: {str(e)}", None finally: # Clean up individual segment files if they were combined if combine: for segment in video_segments + audio_segments: try: Path(segment).unlink(missing_ok=True) except Exception as e: print(f"Error removing segment file {segment}: {str(e)}") def get_video_qualities(video_url): if not video_url.strip(): return [] with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: try: info = ydl.extract_info(video_url, download=False) formats = info.get('formats', []) qualities = set() for f in formats: if f.get('vcodec') != 'none' and f.get('acodec') != 'none': height = f.get('height') if height: qualities.add(f"{height}p") return sorted(list(qualities), key=lambda x: int(x[:-1]), reverse=True) except DownloadError: return [] except Exception as e: print(f"Error in get_video_qualities: {str(e)}") return [] # Disable Gradio analytics utils.colab_check = lambda: True custom_css = """ /* Reset and base styles */ * { box-sizing: border-box; margin: 0; padding: 0; } body, #component-0 { height: 100vh; max-height: 100vh; overflow: hidden; display: flex; flex-direction: column; } /* Gradio container modifications */ .gradio-container { flex-grow: 1; display: flex; flex-direction: column; max-height: 100vh; overflow: hidden; } /* Header */ .header { padding: 1rem; background-color: #f0f0f0; } /* Main content area */ .main-content { flex-grow: 1; display: flex; overflow: hidden; padding: 1rem; gap: 1rem; } /* Columns */ .input-section, .options-section, .output-section { flex: 1; display: flex; flex-direction: column; overflow: hidden; background-color: #ffffff; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); } /* Scrollable areas */ .scroll-area { flex-grow: 1; overflow-y: auto; padding: 1rem; } /* Responsive adjustments */ @media (max-width: 768px) { .main-content { flex-direction: column; } } /* Additional styles */ .segment-input, .time-input, .button-row { display: flex; gap: 0.5rem; margin-bottom: 1rem; } .time-input { width: 60px !important; } .url-input { flex-grow: 1; } .quality-dropdown { width: 100px !important; } """ with gr.Blocks(title="YouTube Segment Downloader", theme=gr.themes.Default(), css=custom_css) as iface: gr.Markdown("# 🎬 YouTube Segment Downloader", elem_classes="header") with gr.Row(elem_classes="main-content"): with gr.Column(elem_classes="input-section"): with gr.Column(elem_classes="scroll-area"): with gr.Row(): video_url = gr.Textbox(label="YouTube URL", placeholder="Paste URL here", elem_classes="url-input") quality = gr.Dropdown(label="Quality", choices=[], interactive=True, elem_classes="quality-dropdown", visible=False) url_status = gr.Markdown(visible=False) gr.Markdown("### Add Segments") with gr.Row(elem_classes="segment-input"): start_hours = gr.Number(label="Start HH", minimum=0, maximum=23, step=1, value=0, elem_classes="time-input") start_minutes = gr.Number(label="MM", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") start_seconds = gr.Number(label="SS", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") gr.Markdown("to") end_hours = gr.Number(label="End HH", minimum=0, maximum=23, step=1, value=0, elem_classes="time-input") end_minutes = gr.Number(label="MM", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") end_seconds = gr.Number(label="SS", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input") add_btn = gr.Button("Add Segment", variant="primary") segments = gr.Dataframe(headers=["Segment"], row_count=3, col_count=1, datatype=["str"], interactive=True, label="Segments") with gr.Column(elem_classes="options-section"): with gr.Column(elem_classes="scroll-area"): combine = gr.Checkbox(label="Combine segments", value=True) audio_only = gr.Checkbox(label="Audio only", value=False) remove_index = gr.Number(label="Remove segment", minimum=0, step=1, value=0) remove_btn = gr.Button("Remove", variant="secondary") old_index = gr.Number(label="Move from", minimum=0, step=1, value=0) new_index = gr.Number(label="to", minimum=0, step=1, value=0) move_btn = gr.Button("Move", variant="secondary") submit_btn = gr.Button("🚀 Download", variant="primary") with gr.Column(elem_classes="output-section"): with gr.Column(elem_classes="scroll-area"): progress = gr.Slider(label="Progress", minimum=0, maximum=100, step=1, interactive=False) status = gr.Textbox(label="Status", lines=1) output_file = gr.File(label="Downloaded File") add_btn.click( add_segment, inputs=[start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments], outputs=[segments] ) submit_btn.click( process_video, inputs=[video_url, segments, combine, audio_only], outputs=[progress, status, output_file] ) remove_btn.click( remove_segment, inputs=[segments, remove_index], outputs=[segments] ) move_btn.click( move_segment, inputs=[segments, old_index, new_index], outputs=[segments] ) def update_qualities(url): qualities = get_video_qualities(url) if qualities: return ( gr.Dropdown(choices=qualities, value=qualities[0], visible=True), gr.Markdown(visible=False) ) elif url.strip(): # Only show error if URL is not empty return ( gr.Dropdown(choices=[], visible=False), gr.Markdown("Unable to fetch video qualities. The URL might be invalid or the video might be unavailable.", visible=True) ) else: return ( gr.Dropdown(choices=[], visible=False), gr.Markdown(visible=False) ) video_url.change( update_qualities, inputs=[video_url], outputs=[quality, url_status] ) iface.launch(server_name="0.0.0.0", server_port=7860)