|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import json |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
import os |
|
import re |
|
import sys |
|
from urllib.parse import urlparse, parse_qs |
|
|
|
import unicodedata |
|
|
|
import yt_dlp |
|
|
|
from App_Function_Libraries.SQLite_DB import check_media_and_whisper_model |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_title(title): |
|
|
|
title = unicodedata.normalize('NFKD', title).encode('ascii', 'ignore').decode('ascii') |
|
title = title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('"', '').replace('*', '').replace('?', |
|
'').replace( |
|
'<', '').replace('>', '').replace('|', '') |
|
return title |
|
|
|
def get_video_info(url: str) -> dict: |
|
ydl_opts = { |
|
'quiet': True, |
|
'no_warnings': True, |
|
'skip_download': True, |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
try: |
|
info_dict = ydl.extract_info(url, download=False) |
|
return info_dict |
|
except Exception as e: |
|
logging.error(f"Error extracting video info: {e}") |
|
return None |
|
|
|
|
|
def get_youtube(video_url): |
|
ydl_opts = { |
|
'format': 'bestaudio[ext=m4a]', |
|
'noplaylist': False, |
|
'quiet': True, |
|
'extract_flat': True |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
logging.debug("About to extract youtube info") |
|
info_dict = ydl.extract_info(video_url, download=False) |
|
logging.debug("Youtube info successfully extracted") |
|
return info_dict |
|
|
|
|
|
def get_playlist_videos(playlist_url): |
|
ydl_opts = { |
|
'extract_flat': True, |
|
'skip_download': True, |
|
'quiet': True |
|
} |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(playlist_url, download=False) |
|
|
|
if 'entries' in info: |
|
video_urls = [entry['url'] for entry in info['entries']] |
|
playlist_title = info['title'] |
|
return video_urls, playlist_title |
|
else: |
|
print("No videos found in the playlist.") |
|
return [], None |
|
|
|
|
|
def download_video(video_url, download_path, info_dict, download_video_flag, current_whisper_model): |
|
global video_file_path, ffmpeg_path |
|
global audio_file_path |
|
|
|
|
|
logging.debug("About to normalize downloaded video title") |
|
if 'title' not in info_dict or 'ext' not in info_dict: |
|
logging.error("info_dict is missing 'title' or 'ext'") |
|
return None |
|
|
|
normalized_video_title = normalize_title(info_dict['title']) |
|
|
|
|
|
should_download, reason = check_media_and_whisper_model( |
|
title=normalized_video_title, |
|
url=video_url, |
|
current_whisper_model=current_whisper_model |
|
) |
|
|
|
if not should_download: |
|
logging.info(f"Skipping download: {reason}") |
|
return None |
|
|
|
logging.info(f"Proceeding with download: {reason}") |
|
|
|
video_file_path = os.path.join(download_path, f"{normalized_video_title}.{info_dict['ext']}") |
|
|
|
|
|
if os.path.exists(video_file_path): |
|
logging.info(f"Video file already exists: {video_file_path}") |
|
return video_file_path |
|
|
|
|
|
if sys.platform.startswith('win'): |
|
ffmpeg_path = os.path.join(os.getcwd(), 'Bin', 'ffmpeg.exe') |
|
elif sys.platform.startswith('linux'): |
|
ffmpeg_path = 'ffmpeg' |
|
elif sys.platform.startswith('darwin'): |
|
ffmpeg_path = 'ffmpeg' |
|
|
|
if download_video_flag: |
|
video_file_path = os.path.join(download_path, f"{normalized_video_title}.mp4") |
|
ydl_opts_video = { |
|
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]', |
|
'outtmpl': video_file_path, |
|
'ffmpeg_location': ffmpeg_path |
|
} |
|
|
|
try: |
|
with yt_dlp.YoutubeDL(ydl_opts_video) as ydl: |
|
logging.debug("yt_dlp: About to download video with youtube-dl") |
|
ydl.download([video_url]) |
|
logging.debug("yt_dlp: Video successfully downloaded with youtube-dl") |
|
if os.path.exists(video_file_path): |
|
return video_file_path |
|
else: |
|
logging.error("yt_dlp: Video file not found after download") |
|
return None |
|
except Exception as e: |
|
logging.error(f"yt_dlp: Error downloading video: {e}") |
|
return None |
|
elif not download_video_flag: |
|
video_file_path = os.path.join(download_path, f"{normalized_video_title}.mp4") |
|
|
|
ydl_opts = { |
|
'format': 'bestaudio[ext=m4a]', |
|
'quiet': True, |
|
'outtmpl': video_file_path |
|
} |
|
|
|
try: |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
logging.debug("yt_dlp: About to download video with youtube-dl") |
|
ydl.download([video_url]) |
|
logging.debug("yt_dlp: Video successfully downloaded with youtube-dl") |
|
if os.path.exists(video_file_path): |
|
return video_file_path |
|
else: |
|
logging.error("yt_dlp: Video file not found after download") |
|
return None |
|
except Exception as e: |
|
logging.error(f"yt_dlp: Error downloading video: {e}") |
|
return None |
|
|
|
else: |
|
logging.debug("download_video: Download video flag is set to False and video file path is not found") |
|
return None |
|
|
|
|
|
def extract_video_info(url): |
|
try: |
|
with yt_dlp.YoutubeDL({'quiet': True}) as ydl: |
|
info = ydl.extract_info(url, download=False) |
|
|
|
|
|
log_info = { |
|
'title': info.get('title'), |
|
'duration': info.get('duration'), |
|
'upload_date': info.get('upload_date') |
|
} |
|
logging.debug(f"Extracted info for {url}: {log_info}") |
|
|
|
return info |
|
except Exception as e: |
|
logging.error(f"Error extracting video info for {url}: {str(e)}", exc_info=True) |
|
return None |
|
|
|
|
|
def get_youtube_playlist_urls(playlist_id): |
|
ydl_opts = { |
|
'extract_flat': True, |
|
'quiet': True, |
|
} |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
result = ydl.extract_info(f'https://www.youtube.com/playlist?list={playlist_id}', download=False) |
|
return [entry['url'] for entry in result['entries'] if entry.get('url')] |
|
|
|
|
|
def parse_and_expand_urls(urls): |
|
logging.info(f"Starting parse_and_expand_urls with input: {urls}") |
|
expanded_urls = [] |
|
|
|
for url in urls: |
|
try: |
|
logging.info(f"Processing URL: {url}") |
|
parsed_url = urlparse(url) |
|
logging.debug(f"Parsed URL components: {parsed_url}") |
|
|
|
|
|
if 'youtube.com' in parsed_url.netloc and 'list' in parsed_url.query: |
|
playlist_id = parse_qs(parsed_url.query)['list'][0] |
|
logging.info(f"Detected YouTube playlist with ID: {playlist_id}") |
|
playlist_urls = get_youtube_playlist_urls(playlist_id) |
|
logging.info(f"Expanded playlist URLs: {playlist_urls}") |
|
expanded_urls.extend(playlist_urls) |
|
|
|
|
|
elif 'youtu.be' in parsed_url.netloc: |
|
video_id = parsed_url.path.lstrip('/') |
|
full_url = f'https://www.youtube.com/watch?v={video_id}' |
|
logging.info(f"Expanded YouTube short URL to: {full_url}") |
|
expanded_urls.append(full_url) |
|
|
|
|
|
elif 'vimeo.com' in parsed_url.netloc: |
|
video_id = parsed_url.path.lstrip('/') |
|
full_url = f'https://vimeo.com/{video_id}' |
|
logging.info(f"Processed Vimeo URL: {full_url}") |
|
expanded_urls.append(full_url) |
|
|
|
|
|
|
|
else: |
|
logging.info(f"URL not recognized as special case, adding as-is: {url}") |
|
expanded_urls.append(url) |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing URL {url}: {str(e)}", exc_info=True) |
|
|
|
|
|
|
|
logging.info(f"Final expanded URLs: {expanded_urls}") |
|
return expanded_urls |
|
|
|
|
|
def extract_metadata(url, use_cookies=False, cookies=None): |
|
ydl_opts = { |
|
'quiet': True, |
|
'no_warnings': True, |
|
'extract_flat': True, |
|
'skip_download': True, |
|
} |
|
|
|
if use_cookies and cookies: |
|
try: |
|
cookie_dict = json.loads(cookies) |
|
ydl_opts['cookiefile'] = cookie_dict |
|
except json.JSONDecodeError: |
|
logging.warning("Invalid cookie format. Proceeding without cookies.") |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
try: |
|
info = ydl.extract_info(url, download=False) |
|
metadata = { |
|
'title': info.get('title'), |
|
'uploader': info.get('uploader'), |
|
'upload_date': info.get('upload_date'), |
|
'view_count': info.get('view_count'), |
|
'like_count': info.get('like_count'), |
|
'duration': info.get('duration'), |
|
'tags': info.get('tags'), |
|
'description': info.get('description') |
|
} |
|
|
|
|
|
safe_metadata = { |
|
'title': metadata.get('title', 'No title'), |
|
'duration': metadata.get('duration', 'Unknown duration'), |
|
'upload_date': metadata.get('upload_date', 'Unknown upload date'), |
|
'uploader': metadata.get('uploader', 'Unknown uploader') |
|
} |
|
|
|
logging.info(f"Successfully extracted metadata for {url}: {safe_metadata}") |
|
return metadata |
|
except Exception as e: |
|
logging.error(f"Error extracting metadata for {url}: {str(e)}", exc_info=True) |
|
return None |
|
|
|
|
|
def generate_timestamped_url(url, hours, minutes, seconds): |
|
|
|
video_id_match = re.search(r'(?:v=|)([0-9A-Za-z_-]{11}).*', url) |
|
if not video_id_match: |
|
return "Invalid YouTube URL" |
|
|
|
video_id = video_id_match.group(1) |
|
|
|
|
|
total_seconds = int(hours) * 3600 + int(minutes) * 60 + int(seconds) |
|
|
|
|
|
new_url = f"https://www.youtube.com/watch?v={video_id}&t={total_seconds}s" |
|
|
|
return new_url |
|
|
|
|
|
|
|
|
|
|
|
|
|
|