RO-Rtechs's picture
Update app.py
3fa5d20 verified
import gradio as gr
from gradio import utils
import os
import re
import requests
from concurrent.futures import ThreadPoolExecutor
import time
from yt_dlp import YoutubeDL
from yt_dlp.utils import DownloadError
import subprocess
import shutil
from typing import List, Tuple
import pandas as pd
import asyncio
import aiohttp
import aiofiles
import json
from functools import lru_cache
import tenacity
from tqdm import tqdm
from pathlib import Path
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def sanitize_title(title):
return re.sub(r'[\\/*?:"<>|]', "", title)
def format_time(seconds):
return time.strftime('%H:%M:%S', time.gmtime(seconds))
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
retry=tenacity.retry_if_exception_type((requests.RequestException, aiohttp.ClientError)),
reraise=True
)
@lru_cache(maxsize=100)
def get_video_info(video_url):
with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl:
try:
info = ydl.extract_info(video_url, download=False)
formats = info.get('formats', [])
# Function to safely get bitrate
def get_bitrate(format_dict, key):
return format_dict.get(key, 0) or 0
# Prefer adaptive formats (separate video and audio)
video_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') == 'none']
audio_formats = [f for f in formats if f.get('acodec') != 'none' and f.get('vcodec') == 'none']
if video_formats and audio_formats:
video_format = max(video_formats, key=lambda f: get_bitrate(f, 'vbr'))
audio_format = max(audio_formats, key=lambda f: get_bitrate(f, 'abr'))
return info['title'], video_format['url'], audio_format['url']
else:
# Fallback to best combined format
combined_formats = [f for f in formats if f.get('vcodec') != 'none' and f.get('acodec') != 'none']
if combined_formats:
best_format = max(combined_formats, key=lambda f: get_bitrate(f, 'tbr'))
return info['title'], best_format['url'], None
else:
raise Exception("No suitable video formats found")
except DownloadError as e:
raise Exception(f"Error extracting video info: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
retry=tenacity.retry_if_exception_type((subprocess.CalledProcessError, asyncio.TimeoutError)),
reraise=True
)
async def download_segment_async(url, start_time, end_time, output_path):
output_path = Path(output_path)
command = [
'ffmpeg',
'-ss', format_time(start_time),
'-i', url,
'-t', format_time(end_time - start_time),
'-c', 'copy',
'-avoid_negative_ts', 'make_zero',
'-y',
str(output_path)
]
logger.debug(f"Executing command: {' '.join(command)}")
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_message = f"FFmpeg error: {stderr.decode()}"
logger.error(error_message)
yield error_message
raise Exception(error_message)
if not output_path.exists():
error_message = f"Output file not created: {output_path}"
logger.error(error_message)
yield error_message
raise FileNotFoundError(error_message)
logger.info(f"Successfully downloaded segment to {output_path}")
yield f"Successfully downloaded segment to {output_path}"
async def combine_segments_async(video_segments, audio_segments, output_path):
if not audio_segments:
raise Exception("No audio segments to combine")
video_list = None
audio_list = None
temp_video = None
temp_audio = None
try:
if video_segments:
temp_video = 'temp_video.mp4'
temp_audio = 'temp_audio.m4a'
# Concatenate video segments
video_list = 'video_list.txt'
async with aiofiles.open(video_list, 'w') as f:
await f.write('\n'.join(f"file '{segment}'" for segment in video_segments))
process = await asyncio.create_subprocess_exec(
'ffmpeg', '-f', 'concat', '-safe', '0', '-i', video_list, '-c', 'copy', temp_video,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"Error concatenating video segments: {stderr.decode()}")
# Concatenate audio segments
audio_list = 'audio_list.txt'
async with aiofiles.open(audio_list, 'w') as f:
await f.write('\n'.join(f"file '{segment}'" for segment in audio_segments))
process = await asyncio.create_subprocess_exec(
'ffmpeg', '-f', 'concat', '-safe', '0', '-i', audio_list, '-c', 'copy', temp_audio,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"Error concatenating audio segments: {stderr.decode()}")
# Combine video and audio
process = await asyncio.create_subprocess_exec(
'ffmpeg', '-i', temp_video, '-i', temp_audio, '-c', 'copy', output_path,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"Error combining video and audio: {stderr.decode()}")
else:
# Audio only
audio_list = 'audio_list.txt'
async with aiofiles.open(audio_list, 'w') as f:
await f.write('\n'.join(f"file '{segment}'" for segment in audio_segments))
process = await asyncio.create_subprocess_exec(
'ffmpeg', '-f', 'concat', '-safe', '0', '-i', audio_list, '-c', 'copy', output_path,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"Error combining audio segments: {stderr.decode()}")
return output_path
except Exception as e:
raise Exception(f"Error in combine_segments_async: {str(e)}")
finally:
# Clean up temporary files
for file in [f for f in [video_list, audio_list, temp_video, temp_audio] if f]:
try:
Path(file).unlink(missing_ok=True)
except Exception as e:
print(f"Error removing temporary file {file}: {str(e)}")
def add_segment(start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments):
start_time = f"{start_hours:02d}:{start_minutes:02d}:{start_seconds:02d}"
end_time = f"{end_hours:02d}:{end_minutes:02d}:{end_seconds:02d}"
new_segment = f"{start_time}-{end_time}"
new_row = pd.DataFrame([new_segment], columns=["Segment"])
return pd.concat([segments, new_row], ignore_index=True)
def remove_segment(segments, index):
return segments.drop(index).reset_index(drop=True)
def move_segment(segments, old_index, new_index):
if 0 <= old_index < len(segments) and 0 <= new_index < len(segments):
segment = segments.iloc[old_index]
segments = segments.drop(old_index).reset_index(drop=True)
segments = pd.concat([segments.iloc[:new_index], pd.DataFrame([segment]), segments.iloc[new_index:]]).reset_index(drop=True)
return segments
def parse_segments(segments: pd.DataFrame) -> List[Tuple[int, int]]:
parsed_segments = []
for segment in segments['Segment']:
if not isinstance(segment, str) or '-' not in segment:
continue
try:
start, end = segment.split('-')
start_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(start.split(':'))) if i)
end_seconds = sum(int(i) * 60 ** j for j, i in enumerate(reversed(end.split(':'))) if i)
if start_seconds < end_seconds:
parsed_segments.append((start_seconds, end_seconds))
except ValueError:
continue # Skip invalid segments
return parsed_segments
async def process_video(video_url, segments, combine, audio_only, progress=gr.Progress()):
if not video_url.strip():
yield 0, "Error: Please provide a valid YouTube URL", None
return
parsed_segments = parse_segments(segments)
if not parsed_segments:
yield 0, "Error: No valid segments provided", None
return
output_dir = Path('output')
# Clean up the output directory
if output_dir.exists():
try:
shutil.rmtree(output_dir)
yield 0, "Cleaned up existing output directory", None
except Exception as e:
yield 0, f"Error cleaning up output directory: {str(e)}", None
return
output_dir.mkdir(exist_ok=True)
try:
progress(0, "Extracting video info...")
video_title, video_url, audio_url = get_video_info(video_url)
except Exception as e:
yield 0, f"Error: {str(e)}", None
return
video_segments = []
audio_segments = []
total_segments = len(parsed_segments)
for i, (start_time, end_time) in enumerate(parsed_segments):
progress((i / total_segments) * 0.8, f"Downloading segment {i+1}/{total_segments}")
try:
if not audio_only:
video_output = output_dir / f"{sanitize_title(video_title)}_video_segment_{i+1}.mp4"
async for output in download_segment_async(video_url, start_time, end_time, str(video_output)):
progress((i / total_segments) * 0.8 + (1 / total_segments) * 0.4, f"Downloading video segment {i+1}/{total_segments}: {output}")
if video_output.exists():
video_segments.append(str(video_output))
else:
raise FileNotFoundError(f"Video segment file not found: {video_output}")
audio_output = output_dir / f"{sanitize_title(video_title)}_audio_segment_{i+1}.m4a"
async for output in download_segment_async(audio_url or video_url, start_time, end_time, str(audio_output)):
progress((i / total_segments) * 0.8 + (1 / total_segments) * 0.8, f"Downloading audio segment {i+1}/{total_segments}: {output}")
if audio_output.exists():
audio_segments.append(str(audio_output))
else:
raise FileNotFoundError(f"Audio segment file not found: {audio_output}")
except Exception as e:
yield (i / total_segments) * 100, f"Error downloading segment {i+1}: {str(e)}", None
return
try:
if combine:
progress(90, "Combining segments...")
if audio_only:
output_path = output_dir / f"{sanitize_title(video_title)}_combined.m4a"
else:
output_path = output_dir / f"{sanitize_title(video_title)}_combined.mp4"
output_path = await combine_segments_async(video_segments if not audio_only else [], audio_segments, str(output_path))
yield 100, f"Segments combined and saved as {output_path}", output_path
else:
output_path = audio_segments[0] if audio_only else video_segments[0]
yield 100, "All segments downloaded successfully", output_path
except Exception as e:
yield 100, f"Error: {str(e)}", None
finally:
# Clean up individual segment files if they were combined
if combine:
for segment in video_segments + audio_segments:
try:
Path(segment).unlink(missing_ok=True)
except Exception as e:
print(f"Error removing segment file {segment}: {str(e)}")
def get_video_qualities(video_url):
if not video_url.strip():
return []
with YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl:
try:
info = ydl.extract_info(video_url, download=False)
formats = info.get('formats', [])
qualities = set()
for f in formats:
if f.get('vcodec') != 'none' and f.get('acodec') != 'none':
height = f.get('height')
if height:
qualities.add(f"{height}p")
return sorted(list(qualities), key=lambda x: int(x[:-1]), reverse=True)
except DownloadError:
return []
except Exception as e:
print(f"Error in get_video_qualities: {str(e)}")
return []
# Disable Gradio analytics
utils.colab_check = lambda: True
custom_css = """
/* Reset and base styles */
* { box-sizing: border-box; margin: 0; padding: 0; }
body, #component-0 {
height: 100vh;
max-height: 100vh;
overflow: hidden;
display: flex;
flex-direction: column;
}
/* Gradio container modifications */
.gradio-container {
flex-grow: 1;
display: flex;
flex-direction: column;
max-height: 100vh;
overflow: hidden;
}
/* Header */
.header {
padding: 1rem;
background-color: #f0f0f0;
}
/* Main content area */
.main-content {
flex-grow: 1;
display: flex;
overflow: hidden;
padding: 1rem;
gap: 1rem;
}
/* Columns */
.input-section, .options-section, .output-section {
flex: 1;
display: flex;
flex-direction: column;
overflow: hidden;
background-color: #ffffff;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
/* Scrollable areas */
.scroll-area {
flex-grow: 1;
overflow-y: auto;
padding: 1rem;
}
/* Responsive adjustments */
@media (max-width: 768px) {
.main-content {
flex-direction: column;
}
}
/* Additional styles */
.segment-input, .time-input, .button-row {
display: flex;
gap: 0.5rem;
margin-bottom: 1rem;
}
.time-input { width: 60px !important; }
.url-input { flex-grow: 1; }
.quality-dropdown { width: 100px !important; }
"""
with gr.Blocks(title="YouTube Segment Downloader", theme=gr.themes.Default(), css=custom_css) as iface:
gr.Markdown("# 🎬 YouTube Segment Downloader", elem_classes="header")
with gr.Row(elem_classes="main-content"):
with gr.Column(elem_classes="input-section"):
with gr.Column(elem_classes="scroll-area"):
with gr.Row():
video_url = gr.Textbox(label="YouTube URL", placeholder="Paste URL here", elem_classes="url-input")
quality = gr.Dropdown(label="Quality", choices=[], interactive=True, elem_classes="quality-dropdown", visible=False)
url_status = gr.Markdown(visible=False)
gr.Markdown("### Add Segments")
with gr.Row(elem_classes="segment-input"):
start_hours = gr.Number(label="Start HH", minimum=0, maximum=23, step=1, value=0, elem_classes="time-input")
start_minutes = gr.Number(label="MM", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input")
start_seconds = gr.Number(label="SS", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input")
gr.Markdown("to")
end_hours = gr.Number(label="End HH", minimum=0, maximum=23, step=1, value=0, elem_classes="time-input")
end_minutes = gr.Number(label="MM", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input")
end_seconds = gr.Number(label="SS", minimum=0, maximum=59, step=1, value=0, elem_classes="time-input")
add_btn = gr.Button("Add Segment", variant="primary")
segments = gr.Dataframe(headers=["Segment"], row_count=3, col_count=1, datatype=["str"], interactive=True, label="Segments")
with gr.Column(elem_classes="options-section"):
with gr.Column(elem_classes="scroll-area"):
combine = gr.Checkbox(label="Combine segments", value=True)
audio_only = gr.Checkbox(label="Audio only", value=False)
remove_index = gr.Number(label="Remove segment", minimum=0, step=1, value=0)
remove_btn = gr.Button("Remove", variant="secondary")
old_index = gr.Number(label="Move from", minimum=0, step=1, value=0)
new_index = gr.Number(label="to", minimum=0, step=1, value=0)
move_btn = gr.Button("Move", variant="secondary")
submit_btn = gr.Button("🚀 Download", variant="primary")
with gr.Column(elem_classes="output-section"):
with gr.Column(elem_classes="scroll-area"):
progress = gr.Slider(label="Progress", minimum=0, maximum=100, step=1, interactive=False)
status = gr.Textbox(label="Status", lines=1)
output_file = gr.File(label="Downloaded File")
add_btn.click(
add_segment,
inputs=[start_hours, start_minutes, start_seconds, end_hours, end_minutes, end_seconds, segments],
outputs=[segments]
)
submit_btn.click(
process_video,
inputs=[video_url, segments, combine, audio_only],
outputs=[progress, status, output_file]
)
remove_btn.click(
remove_segment,
inputs=[segments, remove_index],
outputs=[segments]
)
move_btn.click(
move_segment,
inputs=[segments, old_index, new_index],
outputs=[segments]
)
def update_qualities(url):
qualities = get_video_qualities(url)
if qualities:
return (
gr.Dropdown(choices=qualities, value=qualities[0], visible=True),
gr.Markdown(visible=False)
)
elif url.strip(): # Only show error if URL is not empty
return (
gr.Dropdown(choices=[], visible=False),
gr.Markdown("Unable to fetch video qualities. The URL might be invalid or the video might be unavailable.", visible=True)
)
else:
return (
gr.Dropdown(choices=[], visible=False),
gr.Markdown(visible=False)
)
video_url.change(
update_qualities,
inputs=[video_url],
outputs=[quality, url_status]
)
iface.launch(server_name="0.0.0.0", server_port=7860)