|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import configparser |
|
import hashlib |
|
import json |
|
import logging |
|
import os |
|
import re |
|
import time |
|
from datetime import timedelta |
|
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse |
|
|
|
import requests |
|
import unicodedata |
|
from tqdm import tqdm |
|
|
|
from App_Function_Libraries.Video_DL_Ingestion_Lib import get_youtube |
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_text_from_segments(segments): |
|
logging.debug(f"Segments received: {segments}") |
|
logging.debug(f"Type of segments: {type(segments)}") |
|
|
|
def extract_text_recursive(data): |
|
if isinstance(data, dict): |
|
for key, value in data.items(): |
|
if key == 'Text': |
|
return value |
|
elif isinstance(value, (dict, list)): |
|
result = extract_text_recursive(value) |
|
if result: |
|
return result |
|
elif isinstance(data, list): |
|
return ' '.join(filter(None, [extract_text_recursive(item) for item in data])) |
|
return None |
|
|
|
text = extract_text_recursive(segments) |
|
|
|
if text: |
|
return text.strip() |
|
else: |
|
logging.error(f"Unable to extract text from segments: {segments}") |
|
return "Error: Unable to extract transcription" |
|
|
|
|
|
def download_file(url, dest_path, expected_checksum=None, max_retries=3, delay=5): |
|
temp_path = dest_path + '.tmp' |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
|
|
resume_header = {} |
|
if os.path.exists(temp_path): |
|
resume_header = {'Range': f'bytes={os.path.getsize(temp_path)}-'} |
|
|
|
response = requests.get(url, stream=True, headers=resume_header) |
|
response.raise_for_status() |
|
|
|
|
|
total_size = int(response.headers.get('content-length', 0)) |
|
initial_pos = os.path.getsize(temp_path) if os.path.exists(temp_path) else 0 |
|
|
|
mode = 'ab' if 'Range' in response.headers else 'wb' |
|
with open(temp_path, mode) as temp_file, tqdm( |
|
total=total_size, unit='B', unit_scale=True, desc=dest_path, initial=initial_pos, ascii=True |
|
) as pbar: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
if chunk: |
|
temp_file.write(chunk) |
|
pbar.update(len(chunk)) |
|
|
|
|
|
if expected_checksum: |
|
if not verify_checksum(temp_path, expected_checksum): |
|
os.remove(temp_path) |
|
raise ValueError("Downloaded file's checksum does not match the expected checksum") |
|
|
|
|
|
os.rename(temp_path, dest_path) |
|
print("Download complete and verified!") |
|
return dest_path |
|
|
|
except Exception as e: |
|
print(f"Attempt {attempt + 1} failed: {e}") |
|
if attempt < max_retries - 1: |
|
print(f"Retrying in {delay} seconds...") |
|
time.sleep(delay) |
|
else: |
|
print("Max retries reached. Download failed.") |
|
raise |
|
|
|
|
|
def verify_checksum(file_path, expected_checksum): |
|
sha256_hash = hashlib.sha256() |
|
with open(file_path, 'rb') as f: |
|
for byte_block in iter(lambda: f.read(4096), b''): |
|
sha256_hash.update(byte_block) |
|
return sha256_hash.hexdigest() == expected_checksum |
|
|
|
|
|
def create_download_directory(title): |
|
base_dir = "Results" |
|
|
|
safe_title = normalize_title(title) |
|
logging.debug(f"{title} successfully normalized") |
|
session_path = os.path.join(base_dir, safe_title) |
|
if not os.path.exists(session_path): |
|
os.makedirs(session_path, exist_ok=True) |
|
logging.debug(f"Created directory for downloaded video: {session_path}") |
|
else: |
|
logging.debug(f"Directory already exists for downloaded video: {session_path}") |
|
return session_path |
|
|
|
|
|
def sanitize_filename(filename): |
|
|
|
sanitized = re.sub(r'[<>:"/\\|?*]', '', filename) |
|
sanitized = re.sub(r'\s+', ' ', sanitized).strip() |
|
return sanitized |
|
|
|
|
|
def normalize_title(title): |
|
|
|
title = unicodedata.normalize('NFKD', title).encode('ascii', 'ignore').decode('ascii') |
|
title = title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('"', '').replace('*', '').replace('?', |
|
'').replace( |
|
'<', '').replace('>', '').replace('|', '') |
|
return title |
|
|
|
|
|
def clean_youtube_url(url): |
|
parsed_url = urlparse(url) |
|
query_params = parse_qs(parsed_url.query) |
|
if 'list' in query_params: |
|
query_params.pop('list') |
|
cleaned_query = urlencode(query_params, doseq=True) |
|
cleaned_url = urlunparse(parsed_url._replace(query=cleaned_query)) |
|
return cleaned_url |
|
|
|
|
|
def extract_video_info(url): |
|
info_dict = get_youtube(url) |
|
title = info_dict.get('title', 'Untitled') |
|
return info_dict, title |
|
|
|
|
|
def import_data(file): |
|
|
|
pass |
|
|
|
|
|
def safe_read_file(file_path): |
|
encodings = ['utf-8', 'utf-16', 'ascii', 'latin-1', 'iso-8859-1', 'cp1252'] |
|
for encoding in encodings: |
|
try: |
|
with open(file_path, 'r', encoding=encoding) as file: |
|
return file.read() |
|
except UnicodeDecodeError: |
|
continue |
|
except FileNotFoundError: |
|
return f"File not found: {file_path}" |
|
except Exception as e: |
|
return f"An error occurred: {e}" |
|
return f"Unable to decode the file {file_path} with any of the attempted encodings: {encodings}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
downloaded_files = [] |
|
|
|
def cleanup_downloads(): |
|
"""Function to clean up downloaded files when the server exits.""" |
|
for file_path in downloaded_files: |
|
try: |
|
if os.path.exists(file_path): |
|
os.remove(file_path) |
|
print(f"Cleaned up file: {file_path}") |
|
except Exception as e: |
|
print(f"Error cleaning up file {file_path}: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_comprehensive_config(): |
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
project_root = os.path.dirname(current_dir) |
|
|
|
config_path = os.path.join(project_root, 'config.txt') |
|
|
|
config = configparser.ConfigParser() |
|
|
|
files_read = config.read(config_path) |
|
if not files_read: |
|
raise FileNotFoundError(f"Config file not found at {config_path}") |
|
return config |
|
|
|
|
|
|
|
def load_and_log_configs(): |
|
try: |
|
config = load_comprehensive_config() |
|
if config is None: |
|
logging.error("Config is None, cannot proceed") |
|
return None |
|
|
|
anthropic_api_key = config.get('API', 'anthropic_api_key', fallback=None) |
|
logging.debug( |
|
f"Loaded Anthropic API Key: {anthropic_api_key[:5]}...{anthropic_api_key[-5:] if anthropic_api_key else None}") |
|
|
|
cohere_api_key = config.get('API', 'cohere_api_key', fallback=None) |
|
logging.debug( |
|
f"Loaded Cohere API Key: {cohere_api_key[:5]}...{cohere_api_key[-5:] if cohere_api_key else None}") |
|
|
|
groq_api_key = config.get('API', 'groq_api_key', fallback=None) |
|
logging.debug(f"Loaded Groq API Key: {groq_api_key[:5]}...{groq_api_key[-5:] if groq_api_key else None}") |
|
|
|
openai_api_key = config.get('API', 'openai_api_key', fallback=None) |
|
logging.debug( |
|
f"Loaded OpenAI API Key: {openai_api_key[:5]}...{openai_api_key[-5:] if openai_api_key else None}") |
|
|
|
huggingface_api_key = config.get('API', 'huggingface_api_key', fallback=None) |
|
logging.debug( |
|
f"Loaded HuggingFace API Key: {huggingface_api_key[:5]}...{huggingface_api_key[-5:] if huggingface_api_key else None}") |
|
|
|
openrouter_api_key = config.get('API', 'openrouter_api_key', fallback=None) |
|
logging.debug( |
|
f"Loaded OpenRouter API Key: {openrouter_api_key[:5]}...{openrouter_api_key[-5:] if openrouter_api_key else None}") |
|
|
|
deepseek_api_key = config.get('API', 'deepseek_api_key', fallback=None) |
|
logging.debug( |
|
f"Loaded DeepSeek API Key: {deepseek_api_key[:5]}...{deepseek_api_key[-5:] if deepseek_api_key else None}") |
|
|
|
|
|
anthropic_model = config.get('API', 'anthropic_model', fallback='claude-3-sonnet-20240229') |
|
cohere_model = config.get('API', 'cohere_model', fallback='command-r-plus') |
|
groq_model = config.get('API', 'groq_model', fallback='llama3-70b-8192') |
|
openai_model = config.get('API', 'openai_model', fallback='gpt-4-turbo') |
|
huggingface_model = config.get('API', 'huggingface_model', fallback='CohereForAI/c4ai-command-r-plus') |
|
openrouter_model = config.get('API', 'openrouter_model', fallback='microsoft/wizardlm-2-8x22b') |
|
deepseek_model = config.get('API', 'deepseek_model', fallback='deepseek-chat') |
|
|
|
logging.debug(f"Loaded Anthropic Model: {anthropic_model}") |
|
logging.debug(f"Loaded Cohere Model: {cohere_model}") |
|
logging.debug(f"Loaded Groq Model: {groq_model}") |
|
logging.debug(f"Loaded OpenAI Model: {openai_model}") |
|
logging.debug(f"Loaded HuggingFace Model: {huggingface_model}") |
|
logging.debug(f"Loaded OpenRouter Model: {openrouter_model}") |
|
|
|
|
|
kobold_api_ip = config.get('Local-API', 'kobold_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate') |
|
kobold_api_key = config.get('Local-API', 'kobold_api_key', fallback='') |
|
|
|
llama_api_IP = config.get('Local-API', 'llama_api_IP', fallback='http://127.0.0.1:8080/v1/chat/completions') |
|
llama_api_key = config.get('Local-API', 'llama_api_key', fallback='') |
|
|
|
ooba_api_IP = config.get('Local-API', 'ooba_api_IP', fallback='http://127.0.0.1:5000/v1/chat/completions') |
|
ooba_api_key = config.get('Local-API', 'ooba_api_key', fallback='') |
|
|
|
tabby_api_IP = config.get('Local-API', 'tabby_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate') |
|
tabby_api_key = config.get('Local-API', 'tabby_api_key', fallback=None) |
|
tabby_model = config.get('models', 'tabby_model', fallback=None) |
|
|
|
vllm_api_url = config.get('Local-API', 'vllm_api_IP', fallback='http://127.0.0.1:500/api/v1/chat/completions') |
|
vllm_api_key = config.get('Local-API', 'vllm_api_key', fallback=None) |
|
vllm_model = config.get('Local-API', 'vllm_model', fallback=None) |
|
|
|
logging.debug(f"Loaded Kobold API IP: {kobold_api_ip}") |
|
logging.debug(f"Loaded Llama API IP: {llama_api_IP}") |
|
logging.debug(f"Loaded Ooba API IP: {ooba_api_IP}") |
|
logging.debug(f"Loaded Tabby API IP: {tabby_api_IP}") |
|
logging.debug(f"Loaded VLLM API URL: {vllm_api_url}") |
|
|
|
|
|
output_path = config.get('Paths', 'output_path', fallback='results') |
|
logging.debug(f"Output path set to: {output_path}") |
|
|
|
|
|
processing_choice = config.get('Processing', 'processing_choice', fallback='cpu') |
|
logging.debug(f"Processing choice set to: {processing_choice}") |
|
|
|
|
|
prompt_path = config.get('Prompts', 'prompt_path', fallback='prompts.db') |
|
|
|
return { |
|
'api_keys': { |
|
'anthropic': anthropic_api_key, |
|
'cohere': cohere_api_key, |
|
'groq': groq_api_key, |
|
'openai': openai_api_key, |
|
'huggingface': huggingface_api_key, |
|
'openrouter': openrouter_api_key, |
|
'deepseek': deepseek_api_key, |
|
'kobold': kobold_api_key, |
|
'llama': llama_api_key, |
|
'ooba': ooba_api_key, |
|
'tabby': tabby_api_key, |
|
'vllm': vllm_api_key |
|
}, |
|
'models': { |
|
'anthropic': anthropic_model, |
|
'cohere': cohere_model, |
|
'groq': groq_model, |
|
'openai': openai_model, |
|
'huggingface': huggingface_model, |
|
'openrouter': openrouter_model, |
|
'deepseek': deepseek_model, |
|
'vllm': vllm_model, |
|
'tabby': tabby_model |
|
|
|
}, |
|
'local_api_ip': { |
|
'kobold': kobold_api_ip, |
|
'llama': llama_api_IP, |
|
'ooba': ooba_api_IP, |
|
'tabby': tabby_api_IP, |
|
'vllm': vllm_api_url, |
|
}, |
|
'output_path': output_path, |
|
'processing_choice': processing_choice |
|
} |
|
|
|
except Exception as e: |
|
logging.error(f"Error loading config: {str(e)}") |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_metadata_as_text(metadata): |
|
if not metadata: |
|
return "No metadata available" |
|
|
|
formatted_text = "Video Metadata:\n" |
|
for key, value in metadata.items(): |
|
if value is not None: |
|
if isinstance(value, list): |
|
|
|
formatted_value = ", ".join(str(item) for item in value) |
|
elif key == 'upload_date' and len(str(value)) == 8: |
|
|
|
formatted_value = f"{value[:4]}-{value[4:6]}-{value[6:]}" |
|
elif key in ['view_count', 'like_count']: |
|
|
|
formatted_value = f"{value:,}" |
|
elif key == 'duration': |
|
|
|
hours, remainder = divmod(value, 3600) |
|
minutes, seconds = divmod(remainder, 60) |
|
formatted_value = f"{hours:02d}:{minutes:02d}:{seconds:02d}" |
|
else: |
|
formatted_value = str(value) |
|
|
|
formatted_text += f"{key.capitalize()}: {formatted_value}\n" |
|
return formatted_text.strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_seconds(time_str): |
|
if not time_str: |
|
return 0 |
|
|
|
|
|
if time_str.isdigit(): |
|
return int(time_str) |
|
|
|
|
|
time_parts = time_str.split(':') |
|
if len(time_parts) == 3: |
|
return int(timedelta(hours=int(time_parts[0]), |
|
minutes=int(time_parts[1]), |
|
seconds=int(time_parts[2])).total_seconds()) |
|
elif len(time_parts) == 2: |
|
return int(timedelta(minutes=int(time_parts[0]), |
|
seconds=int(time_parts[1])).total_seconds()) |
|
elif len(time_parts) == 1: |
|
return int(time_parts[0]) |
|
else: |
|
raise ValueError(f"Invalid time format: {time_str}") |
|
|
|
|
|
def save_to_file(video_urls, filename): |
|
with open(filename, 'w') as file: |
|
file.write('\n'.join(video_urls)) |
|
print(f"Video URLs saved to {filename}") |
|
|
|
|
|
def save_segments_to_json(segments, file_name="transcription_segments.json"): |
|
""" |
|
Save transcription segments to a JSON file. |
|
|
|
Parameters: |
|
segments (list): List of transcription segments |
|
file_name (str): Name of the JSON file to save (default: "transcription_segments.json") |
|
|
|
Returns: |
|
str: Path to the saved JSON file |
|
""" |
|
|
|
os.makedirs("Results", exist_ok=True) |
|
|
|
|
|
json_file_path = os.path.join("Results", file_name) |
|
|
|
|
|
with open(json_file_path, 'w', encoding='utf-8') as json_file: |
|
json.dump(segments, json_file, ensure_ascii=False, indent=4) |
|
|
|
return json_file_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|