|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import json |
|
import logging |
|
import os |
|
import subprocess |
|
import tempfile |
|
import uuid |
|
from datetime import datetime |
|
from pathlib import Path |
|
|
|
import requests |
|
import yt_dlp |
|
|
|
from App_Function_Libraries.Audio_Transcription_Lib import speech_to_text |
|
from App_Function_Libraries.Chunk_Lib import improved_chunking_process |
|
|
|
|
|
from App_Function_Libraries.SQLite_DB import add_media_to_database, add_media_with_keywords, \ |
|
check_media_and_whisper_model |
|
from App_Function_Libraries.Summarization_General_Lib import save_transcription_and_summary, perform_transcription, \ |
|
perform_summarization |
|
from App_Function_Libraries.Utils import create_download_directory, save_segments_to_json, downloaded_files, \ |
|
sanitize_filename |
|
from App_Function_Libraries.Video_DL_Ingestion_Lib import extract_metadata |
|
|
|
|
|
|
|
|
|
|
|
|
|
MAX_FILE_SIZE = 500 * 1024 * 1024 |
|
|
|
|
|
def download_audio_file(url, current_whisper_model="", use_cookies=False, cookies=None): |
|
try: |
|
|
|
should_download, reason = check_media_and_whisper_model( |
|
url=url, |
|
current_whisper_model=current_whisper_model |
|
) |
|
|
|
if not should_download: |
|
logging.info(f"Skipping audio download: {reason}") |
|
return None |
|
|
|
logging.info(f"Proceeding with audio download: {reason}") |
|
|
|
|
|
headers = {} |
|
if use_cookies and cookies: |
|
try: |
|
cookie_dict = json.loads(cookies) |
|
headers['Cookie'] = '; '.join([f'{k}={v}' for k, v in cookie_dict.items()]) |
|
except json.JSONDecodeError: |
|
logging.warning("Invalid cookie format. Proceeding without cookies.") |
|
|
|
|
|
response = requests.get(url, headers=headers, stream=True) |
|
|
|
response.raise_for_status() |
|
|
|
|
|
file_size = int(response.headers.get('content-length', 0)) |
|
if file_size > 500 * 1024 * 1024: |
|
raise ValueError("File size exceeds the 500MB limit.") |
|
|
|
|
|
file_name = f"audio_{uuid.uuid4().hex[:8]}.mp3" |
|
save_path = os.path.join('downloads', file_name) |
|
|
|
|
|
os.makedirs('downloads', exist_ok=True) |
|
|
|
|
|
|
|
with open(save_path, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
if chunk: |
|
f.write(chunk) |
|
|
|
logging.info(f"Audio file downloaded successfully: {save_path}") |
|
return save_path |
|
|
|
except requests.RequestException as e: |
|
logging.error(f"Error downloading audio file: {str(e)}") |
|
raise |
|
except ValueError as e: |
|
logging.error(str(e)) |
|
raise |
|
except Exception as e: |
|
logging.error(f"Unexpected error downloading audio file: {str(e)}") |
|
raise |
|
|
|
|
|
def process_audio( |
|
audio_file_path, |
|
num_speakers=2, |
|
whisper_model="small.en", |
|
custom_prompt_input=None, |
|
offset=0, |
|
api_name=None, |
|
api_key=None, |
|
vad_filter=False, |
|
rolling_summarization=False, |
|
detail_level=0.01, |
|
keywords="default,no_keyword_set", |
|
chunk_text_by_words=False, |
|
max_words=0, |
|
chunk_text_by_sentences=False, |
|
max_sentences=0, |
|
chunk_text_by_paragraphs=False, |
|
max_paragraphs=0, |
|
chunk_text_by_tokens=False, |
|
max_tokens=0 |
|
): |
|
try: |
|
|
|
|
|
audio_file_path, segments = perform_transcription(audio_file_path, offset, whisper_model, vad_filter) |
|
|
|
if audio_file_path is None or segments is None: |
|
logging.error("Process_Audio: Transcription failed or segments not available.") |
|
return "Process_Audio: Transcription failed.", None, None, None, None, None |
|
|
|
logging.debug(f"Process_Audio: Transcription audio_file: {audio_file_path}") |
|
logging.debug(f"Process_Audio: Transcription segments: {segments}") |
|
|
|
transcription_text = {'audio_file': audio_file_path, 'transcription': segments} |
|
logging.debug(f"Process_Audio: Transcription text: {transcription_text}") |
|
|
|
|
|
segments_json_path = save_segments_to_json(segments) |
|
|
|
|
|
summary_text = None |
|
if api_name: |
|
if rolling_summarization is not None: |
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
summary_text = perform_summarization(api_name, segments_json_path, custom_prompt_input, api_key) |
|
|
|
if summary_text is None: |
|
logging.error("Summary text is None. Check summarization function.") |
|
summary_file_path = None |
|
else: |
|
summary_text = 'Summary not available' |
|
summary_file_path = None |
|
|
|
|
|
download_path = create_download_directory("Audio_Processing") |
|
json_file_path, summary_file_path = save_transcription_and_summary(transcription_text, summary_text, |
|
download_path) |
|
|
|
|
|
|
|
add_media_to_database(None, {'title': 'Audio File', 'author': 'Unknown'}, segments, summary_text, keywords, |
|
custom_prompt_input, whisper_model) |
|
|
|
return transcription_text, summary_text, json_file_path, summary_file_path, None, None |
|
|
|
except Exception as e: |
|
logging.error(f"Error in process_audio: {str(e)}") |
|
return str(e), None, None, None, None, None |
|
|
|
|
|
def process_single_audio(audio_file_path, whisper_model, api_name, api_key, keep_original,custom_keywords, source, |
|
custom_prompt_input, chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking, |
|
use_multi_level_chunking, chunk_language): |
|
progress = [] |
|
transcription = "" |
|
summary = "" |
|
|
|
def update_progress(message): |
|
progress.append(message) |
|
return "\n".join(progress) |
|
|
|
try: |
|
|
|
file_size = os.path.getsize(audio_file_path) |
|
if file_size > MAX_FILE_SIZE: |
|
update_progress(f"File size ({file_size / (1024 * 1024):.2f} MB) exceeds the maximum limit of {MAX_FILE_SIZE / (1024 * 1024):.2f} MB. Skipping this file.") |
|
return "\n".join(progress), "", "" |
|
|
|
|
|
update_progress("Starting transcription...") |
|
segments = speech_to_text(audio_file_path, whisper_model=whisper_model) |
|
transcription = " ".join([segment['Text'] for segment in segments]) |
|
update_progress("Audio transcribed successfully.") |
|
|
|
|
|
if api_name and api_key: |
|
update_progress("Starting summarization...") |
|
summary = perform_summarization(api_name, transcription, "Summarize the following audio transcript", |
|
api_key) |
|
update_progress("Audio summarized successfully.") |
|
else: |
|
summary = "No summary available" |
|
|
|
|
|
keywords = "audio,transcription" |
|
if custom_keywords: |
|
keywords += f",{custom_keywords}" |
|
|
|
|
|
add_media_with_keywords( |
|
url=source, |
|
title=os.path.basename(audio_file_path), |
|
media_type='audio', |
|
content=transcription, |
|
keywords=keywords, |
|
prompt="Summarize the following audio transcript", |
|
summary=summary, |
|
transcription_model=whisper_model, |
|
author="Unknown", |
|
ingestion_date=None |
|
) |
|
update_progress("Audio file added to database successfully.") |
|
|
|
if not keep_original and source != "Uploaded File": |
|
os.remove(audio_file_path) |
|
update_progress(f"Temporary file {audio_file_path} removed.") |
|
elif keep_original and source != "Uploaded File": |
|
update_progress(f"Original audio file kept at: {audio_file_path}") |
|
|
|
except Exception as e: |
|
update_progress(f"Error processing {source}: {str(e)}") |
|
transcription = f"Error: {str(e)}" |
|
summary = "No summary due to error" |
|
|
|
return "\n".join(progress), transcription, summary |
|
|
|
|
|
def process_audio_files(audio_urls, audio_file, whisper_model, api_name, api_key, use_cookies, cookies, keep_original, |
|
custom_keywords, custom_prompt_input, chunk_method, max_chunk_size, chunk_overlap, |
|
use_adaptive_chunking, use_multi_level_chunking, chunk_language, diarize): |
|
progress = [] |
|
temp_files = [] |
|
all_transcriptions = [] |
|
all_summaries = [] |
|
|
|
def update_progress(message): |
|
progress.append(message) |
|
return "\n".join(progress) |
|
|
|
def cleanup_files(): |
|
for file in temp_files: |
|
try: |
|
if os.path.exists(file): |
|
os.remove(file) |
|
update_progress(f"Temporary file {file} removed.") |
|
except Exception as e: |
|
update_progress(f"Failed to remove temporary file {file}: {str(e)}") |
|
|
|
def reencode_mp3(mp3_file_path): |
|
try: |
|
reencoded_mp3_path = mp3_file_path.replace(".mp3", "_reencoded.mp3") |
|
subprocess.run([ffmpeg_cmd, '-i', mp3_file_path, '-codec:a', 'libmp3lame', reencoded_mp3_path], check=True) |
|
update_progress(f"Re-encoded {mp3_file_path} to {reencoded_mp3_path}.") |
|
return reencoded_mp3_path |
|
except subprocess.CalledProcessError as e: |
|
update_progress(f"Error re-encoding {mp3_file_path}: {str(e)}") |
|
raise |
|
|
|
def convert_mp3_to_wav(mp3_file_path): |
|
try: |
|
wav_file_path = mp3_file_path.replace(".mp3", ".wav") |
|
subprocess.run([ffmpeg_cmd, '-i', mp3_file_path, wav_file_path], check=True) |
|
update_progress(f"Converted {mp3_file_path} to {wav_file_path}.") |
|
return wav_file_path |
|
except subprocess.CalledProcessError as e: |
|
update_progress(f"Error converting {mp3_file_path} to WAV: {str(e)}") |
|
raise |
|
|
|
try: |
|
|
|
global ffmpeg_cmd |
|
if os.name == "nt": |
|
logging.debug("Running on Windows") |
|
ffmpeg_cmd = os.path.join(os.getcwd(), "Bin", "ffmpeg.exe") |
|
else: |
|
ffmpeg_cmd = 'ffmpeg' |
|
|
|
|
|
if not os.path.exists(ffmpeg_cmd) and os.name == "nt": |
|
raise FileNotFoundError(f"ffmpeg executable not found at path: {ffmpeg_cmd}") |
|
|
|
|
|
chunk_options = { |
|
'method': chunk_method, |
|
'max_size': max_chunk_size, |
|
'overlap': chunk_overlap, |
|
'adaptive': use_adaptive_chunking, |
|
'multi_level': use_multi_level_chunking, |
|
'language': chunk_language |
|
} |
|
|
|
|
|
urls = [url.strip() for url in audio_urls.split('\n') if url.strip()] |
|
|
|
for i, url in enumerate(urls): |
|
update_progress(f"Processing URL {i + 1}/{len(urls)}: {url}") |
|
|
|
|
|
audio_file_path = download_audio_file(url, use_cookies, cookies) |
|
if not os.path.exists(audio_file_path): |
|
update_progress(f"Downloaded file not found: {audio_file_path}") |
|
continue |
|
|
|
temp_files.append(audio_file_path) |
|
update_progress("Audio file downloaded successfully.") |
|
|
|
|
|
reencoded_mp3_path = reencode_mp3(audio_file_path) |
|
if not os.path.exists(reencoded_mp3_path): |
|
update_progress(f"Re-encoded file not found: {reencoded_mp3_path}") |
|
continue |
|
|
|
temp_files.append(reencoded_mp3_path) |
|
|
|
|
|
wav_file_path = convert_mp3_to_wav(reencoded_mp3_path) |
|
if not os.path.exists(wav_file_path): |
|
update_progress(f"Converted WAV file not found: {wav_file_path}") |
|
continue |
|
|
|
temp_files.append(wav_file_path) |
|
|
|
|
|
transcription = "" |
|
|
|
|
|
if diarize: |
|
segments = speech_to_text(wav_file_path, whisper_model=whisper_model, diarize=True) |
|
else: |
|
segments = speech_to_text(wav_file_path, whisper_model=whisper_model) |
|
|
|
|
|
if isinstance(segments, dict) and 'segments' in segments: |
|
segments = segments['segments'] |
|
|
|
if isinstance(segments, list): |
|
transcription = " ".join([segment.get('Text', '') for segment in segments]) |
|
update_progress("Audio transcribed successfully.") |
|
else: |
|
update_progress("Unexpected segments format received from speech_to_text.") |
|
logging.error(f"Unexpected segments format: {segments}") |
|
continue |
|
|
|
if not transcription.strip(): |
|
update_progress("Transcription is empty.") |
|
else: |
|
|
|
chunked_text = improved_chunking_process(transcription, chunk_options) |
|
|
|
|
|
if api_name: |
|
try: |
|
summary = perform_summarization(api_name, chunked_text, custom_prompt_input, api_key) |
|
update_progress("Audio summarized successfully.") |
|
except Exception as e: |
|
logging.error(f"Error during summarization: {str(e)}") |
|
summary = "Summary generation failed" |
|
else: |
|
summary = "No summary available (API not provided)" |
|
|
|
all_transcriptions.append(transcription) |
|
all_summaries.append(summary) |
|
|
|
|
|
add_media_with_keywords( |
|
url=url, |
|
title=os.path.basename(wav_file_path), |
|
media_type='audio', |
|
content=transcription, |
|
keywords=custom_keywords, |
|
prompt=custom_prompt_input, |
|
summary=summary, |
|
transcription_model=whisper_model, |
|
author="Unknown", |
|
ingestion_date=datetime.now().strftime('%Y-%m-%d') |
|
) |
|
update_progress("Audio file processed and added to database.") |
|
|
|
|
|
if audio_file: |
|
if os.path.getsize(audio_file.name) > MAX_FILE_SIZE: |
|
update_progress( |
|
f"Uploaded file size exceeds the maximum limit of {MAX_FILE_SIZE / (1024 * 1024):.2f}MB. Skipping this file.") |
|
else: |
|
|
|
reencoded_mp3_path = reencode_mp3(audio_file.name) |
|
if not os.path.exists(reencoded_mp3_path): |
|
update_progress(f"Re-encoded file not found: {reencoded_mp3_path}") |
|
return update_progress("Processing failed: Re-encoded file not found"), "", "" |
|
|
|
temp_files.append(reencoded_mp3_path) |
|
|
|
|
|
wav_file_path = convert_mp3_to_wav(reencoded_mp3_path) |
|
if not os.path.exists(wav_file_path): |
|
update_progress(f"Converted WAV file not found: {wav_file_path}") |
|
return update_progress("Processing failed: Converted WAV file not found"), "", "" |
|
|
|
temp_files.append(wav_file_path) |
|
|
|
|
|
transcription = "" |
|
|
|
if diarize: |
|
segments = speech_to_text(wav_file_path, whisper_model=whisper_model, diarize=True) |
|
else: |
|
segments = speech_to_text(wav_file_path, whisper_model=whisper_model) |
|
|
|
|
|
if isinstance(segments, dict) and 'segments' in segments: |
|
segments = segments['segments'] |
|
|
|
if isinstance(segments, list): |
|
transcription = " ".join([segment.get('Text', '') for segment in segments]) |
|
else: |
|
update_progress("Unexpected segments format received from speech_to_text.") |
|
logging.error(f"Unexpected segments format: {segments}") |
|
|
|
chunked_text = improved_chunking_process(transcription, chunk_options) |
|
|
|
if api_name and api_key: |
|
try: |
|
summary = perform_summarization(api_name, chunked_text, custom_prompt_input, api_key) |
|
update_progress("Audio summarized successfully.") |
|
except Exception as e: |
|
logging.error(f"Error during summarization: {str(e)}") |
|
summary = "Summary generation failed" |
|
else: |
|
summary = "No summary available (API not provided)" |
|
|
|
all_transcriptions.append(transcription) |
|
all_summaries.append(summary) |
|
|
|
add_media_with_keywords( |
|
url="Uploaded File", |
|
title=os.path.basename(wav_file_path), |
|
media_type='audio', |
|
content=transcription, |
|
keywords=custom_keywords, |
|
prompt=custom_prompt_input, |
|
summary=summary, |
|
transcription_model=whisper_model, |
|
author="Unknown", |
|
ingestion_date=datetime.now().strftime('%Y-%m-%d') |
|
) |
|
update_progress("Uploaded file processed and added to database.") |
|
|
|
|
|
if not keep_original: |
|
cleanup_files() |
|
|
|
final_progress = update_progress("All processing complete.") |
|
final_transcriptions = "\n\n".join(all_transcriptions) |
|
final_summaries = "\n\n".join(all_summaries) |
|
|
|
return final_progress, final_transcriptions, final_summaries |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing audio files: {str(e)}") |
|
cleanup_files() |
|
return update_progress(f"Processing failed: {str(e)}"), "", "" |
|
|
|
|
|
def download_youtube_audio(url): |
|
try: |
|
|
|
ffmpeg_path = './Bin/ffmpeg.exe' if os.name == 'nt' else 'ffmpeg' |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
|
with yt_dlp.YoutubeDL({'quiet': True}) as ydl: |
|
info_dict = ydl.extract_info(url, download=False) |
|
sanitized_title = sanitize_filename(info_dict['title']) |
|
|
|
|
|
temp_video_path = Path(temp_dir) / f"{sanitized_title}_temp.mp4" |
|
temp_audio_path = Path(temp_dir) / f"{sanitized_title}.mp3" |
|
|
|
|
|
ydl_opts = { |
|
'format': 'bestaudio[ext=m4a]/best[height<=480]', |
|
'ffmpeg_location': ffmpeg_path, |
|
'outtmpl': str(temp_video_path), |
|
'noplaylist': True, |
|
'quiet': True |
|
} |
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
ydl.download([url]) |
|
|
|
|
|
if not temp_video_path.exists(): |
|
raise FileNotFoundError(f"Expected file was not found: {temp_video_path}") |
|
|
|
|
|
ffmpeg_command = [ |
|
ffmpeg_path, |
|
'-i', str(temp_video_path), |
|
'-vn', |
|
'-acodec', 'libmp3lame', |
|
'-b:a', '192k', |
|
str(temp_audio_path) |
|
] |
|
subprocess.run(ffmpeg_command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
|
if not temp_audio_path.exists(): |
|
raise FileNotFoundError(f"Expected audio file was not found: {temp_audio_path}") |
|
|
|
|
|
persistent_dir = Path("downloads") |
|
persistent_dir.mkdir(exist_ok=True) |
|
|
|
|
|
persistent_file_path = persistent_dir / f"{sanitized_title}.mp3" |
|
os.replace(str(temp_audio_path), str(persistent_file_path)) |
|
|
|
|
|
downloaded_files.append(str(persistent_file_path)) |
|
|
|
return str(persistent_file_path), f"Audio downloaded successfully: {sanitized_title}.mp3" |
|
except Exception as e: |
|
return None, f"Error downloading audio: {str(e)}" |
|
|
|
|
|
def process_podcast(url, title, author, keywords, custom_prompt, api_name, api_key, whisper_model, |
|
keep_original=False, enable_diarization=False, use_cookies=False, cookies=None, |
|
chunk_method=None, max_chunk_size=300, chunk_overlap=0, use_adaptive_chunking=False, |
|
use_multi_level_chunking=False, chunk_language='english'): |
|
progress = [] |
|
error_message = "" |
|
temp_files = [] |
|
|
|
def update_progress(message): |
|
progress.append(message) |
|
return "\n".join(progress) |
|
|
|
def cleanup_files(): |
|
if not keep_original: |
|
for file in temp_files: |
|
try: |
|
if os.path.exists(file): |
|
os.remove(file) |
|
update_progress(f"Temporary file {file} removed.") |
|
except Exception as e: |
|
update_progress(f"Failed to remove temporary file {file}: {str(e)}") |
|
|
|
try: |
|
|
|
audio_file = download_audio_file(url, use_cookies, cookies) |
|
temp_files.append(audio_file) |
|
update_progress("Podcast downloaded successfully.") |
|
|
|
|
|
metadata = extract_metadata(url) |
|
title = title or metadata.get('title', 'Unknown Podcast') |
|
author = author or metadata.get('uploader', 'Unknown Author') |
|
|
|
|
|
metadata_text = f""" |
|
Metadata: |
|
Title: {title} |
|
Author: {author} |
|
Series: {metadata.get('series', 'N/A')} |
|
Episode: {metadata.get('episode', 'N/A')} |
|
Season: {metadata.get('season', 'N/A')} |
|
Upload Date: {metadata.get('upload_date', 'N/A')} |
|
Duration: {metadata.get('duration', 'N/A')} seconds |
|
Description: {metadata.get('description', 'N/A')} |
|
""" |
|
|
|
|
|
new_keywords = [] |
|
if metadata.get('series'): |
|
new_keywords.append(f"series:{metadata['series']}") |
|
if metadata.get('episode'): |
|
new_keywords.append(f"episode:{metadata['episode']}") |
|
if metadata.get('season'): |
|
new_keywords.append(f"season:{metadata['season']}") |
|
|
|
keywords = f"{keywords},{','.join(new_keywords)}" if keywords else ','.join(new_keywords) |
|
|
|
update_progress(f"Metadata extracted - Title: {title}, Author: {author}, Keywords: {keywords}") |
|
|
|
|
|
try: |
|
if enable_diarization: |
|
segments = speech_to_text(audio_file, whisper_model=whisper_model, diarize=True) |
|
else: |
|
segments = speech_to_text(audio_file, whisper_model=whisper_model) |
|
transcription = " ".join([segment['Text'] for segment in segments]) |
|
update_progress("Podcast transcribed successfully.") |
|
except Exception as e: |
|
error_message = f"Transcription failed: {str(e)}" |
|
raise |
|
|
|
|
|
chunk_options = { |
|
'method': chunk_method, |
|
'max_size': max_chunk_size, |
|
'overlap': chunk_overlap, |
|
'adaptive': use_adaptive_chunking, |
|
'multi_level': use_multi_level_chunking, |
|
'language': chunk_language |
|
} |
|
chunked_text = improved_chunking_process(transcription, chunk_options) |
|
|
|
|
|
full_content = metadata_text + "\n\nTranscription:\n" + transcription |
|
|
|
|
|
summary = None |
|
if api_name and api_key: |
|
try: |
|
summary = perform_summarization(api_name, chunked_text, custom_prompt, api_key) |
|
update_progress("Podcast summarized successfully.") |
|
except Exception as e: |
|
error_message = f"Summarization failed: {str(e)}" |
|
raise |
|
|
|
|
|
try: |
|
add_media_with_keywords( |
|
url=url, |
|
title=title, |
|
media_type='podcast', |
|
content=full_content, |
|
keywords=keywords, |
|
prompt=custom_prompt, |
|
summary=summary or "No summary available", |
|
transcription_model=whisper_model, |
|
author=author, |
|
ingestion_date=datetime.now().strftime('%Y-%m-%d') |
|
) |
|
update_progress("Podcast added to database successfully.") |
|
except Exception as e: |
|
error_message = f"Error adding podcast to database: {str(e)}" |
|
raise |
|
|
|
|
|
cleanup_files() |
|
|
|
return (update_progress("Processing complete."), full_content, summary or "No summary generated.", |
|
title, author, keywords, error_message) |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing podcast: {str(e)}") |
|
cleanup_files() |
|
return update_progress(f"Processing failed: {str(e)}"), "", "", "", "", "", str(e) |
|
|
|
|
|
|
|
|
|
|