|
|
|
|
|
import argparse |
|
import atexit |
|
import json |
|
import logging |
|
import os |
|
import signal |
|
import sys |
|
import time |
|
import webbrowser |
|
|
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'App_Function_Libraries'))) |
|
from App_Function_Libraries.Book_Ingestion_Lib import ingest_folder, ingest_text_file |
|
from App_Function_Libraries.Chunk_Lib import semantic_chunk_long_file |
|
from App_Function_Libraries.Gradio_Related import launch_ui |
|
from App_Function_Libraries.Local_LLM_Inference_Engine_Lib import cleanup_process, local_llm_function |
|
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_local_llm |
|
from App_Function_Libraries.Summarization_General_Lib import summarize_with_openai, summarize_with_anthropic, \ |
|
summarize_with_cohere, summarize_with_groq, perform_transcription, perform_summarization |
|
from App_Function_Libraries.Audio_Transcription_Lib import speech_to_text |
|
from App_Function_Libraries.Local_File_Processing_Lib import read_paths_from_file, process_local_file |
|
from App_Function_Libraries.DB.DB_Manager import add_media_to_database |
|
from App_Function_Libraries.Utils.System_Checks_Lib import cuda_check, platform_check, check_ffmpeg |
|
from App_Function_Libraries.Utils.Utils import load_and_log_configs, create_download_directory, extract_text_from_segments, \ |
|
cleanup_downloads |
|
from App_Function_Libraries.Video_DL_Ingestion_Lib import download_video, extract_video_info |
|
|
|
|
|
import requests |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
log_level = "DEBUG" |
|
logging.basicConfig(level=getattr(logging, log_level), format='%(asctime)s - %(levelname)s - %(message)s') |
|
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
whisper_models = ["small", "medium", "small.en", "medium.en", "medium", "large", "large-v1", "large-v2", "large-v3", |
|
"distil-large-v2", "distil-medium.en", "distil-small.en"] |
|
server_mode = False |
|
share_public = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
abc_xyz = """ |
|
Database Setup |
|
Config Loading |
|
System Checks |
|
DataBase Functions |
|
Processing Paths and local file handling |
|
Video Download/Handling |
|
Audio Transcription |
|
Diarization |
|
Chunking-related Techniques & Functions |
|
Tokenization-related Techniques & Functions |
|
Summarizers |
|
Gradio UI |
|
Main |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' |
|
|
|
whisper_models = ["small", "medium", "small.en", "medium.en", "medium", "large", "large-v1", "large-v2", "large-v3", |
|
"distil-large-v2", "distil-medium.en", "distil-small.en"] |
|
source_languages = { |
|
"en": "English", |
|
"zh": "Chinese", |
|
"de": "German", |
|
"es": "Spanish", |
|
"ru": "Russian", |
|
"ko": "Korean", |
|
"fr": "French" |
|
} |
|
source_language_list = [key[0] for key in source_languages.items()] |
|
|
|
|
|
def print_hello(): |
|
print(r"""_____ _ ________ _ _ |
|
|_ _|| | / /| _ \| | | | _ |
|
| | | | / / | | | || | | |(_) |
|
| | | | / / | | | || |/\| | |
|
| | | |____ / / | |/ / \ /\ / _ |
|
\_/ \_____//_/ |___/ \/ \/ (_) |
|
|
|
|
|
_ _ |
|
| | | | |
|
| |_ ___ ___ | | ___ _ __ __ _ |
|
| __| / _ \ / _ \ | | / _ \ | '_ \ / _` | |
|
| |_ | (_) || (_) | | || (_) || | | || (_| | _ |
|
\__| \___/ \___/ |_| \___/ |_| |_| \__, |( ) |
|
__/ ||/ |
|
|___/ |
|
_ _ _ _ _ _ _ |
|
| |(_) | | ( )| | | | | | |
|
__| | _ __| | _ __ |/ | |_ __ __ __ _ | |_ ___ | |__ |
|
/ _` || | / _` || '_ \ | __| \ \ /\ / / / _` || __| / __|| '_ \ |
|
| (_| || || (_| || | | | | |_ \ V V / | (_| || |_ | (__ | | | | |
|
\__,_||_| \__,_||_| |_| \__| \_/\_/ \__,_| \__| \___||_| |_| |
|
""") |
|
time.sleep(1) |
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(input_path, api_name=None, api_key=None, |
|
num_speakers=2, |
|
whisper_model="small.en", |
|
offset=0, |
|
vad_filter=False, |
|
download_video_flag=False, |
|
custom_prompt=None, |
|
overwrite=False, |
|
rolling_summarization=False, |
|
detail=0.01, |
|
keywords=None, |
|
llm_model=None, |
|
time_based=False, |
|
set_chunk_txt_by_words=False, |
|
set_max_txt_chunk_words=0, |
|
set_chunk_txt_by_sentences=False, |
|
set_max_txt_chunk_sentences=0, |
|
set_chunk_txt_by_paragraphs=False, |
|
set_max_txt_chunk_paragraphs=0, |
|
set_chunk_txt_by_tokens=False, |
|
set_max_txt_chunk_tokens=0, |
|
ingest_text_file=False, |
|
chunk=False, |
|
max_chunk_size=2000, |
|
chunk_overlap=100, |
|
chunk_unit='tokens', |
|
summarize_chunks=None, |
|
diarize=False |
|
): |
|
global detail_level_number, summary, audio_file, transcription_text, info_dict |
|
|
|
detail_level = detail |
|
|
|
print(f"Keywords: {keywords}") |
|
|
|
if not input_path: |
|
return [] |
|
|
|
start_time = time.monotonic() |
|
paths = [input_path] if not os.path.isfile(input_path) else read_paths_from_file(input_path) |
|
results = [] |
|
|
|
for path in paths: |
|
try: |
|
if path.startswith('http'): |
|
info_dict, title = extract_video_info(path) |
|
download_path = create_download_directory(title) |
|
video_path = download_video(path, download_path, info_dict, download_video_flag) |
|
|
|
if video_path: |
|
if diarize: |
|
audio_file, segments = perform_transcription(video_path, offset, whisper_model, vad_filter, diarize=True) |
|
transcription_text = {'audio_file': audio_file, 'transcription': segments} |
|
else: |
|
audio_file, segments = perform_transcription(video_path, offset, whisper_model, vad_filter) |
|
transcription_text = {'audio_file': audio_file, 'transcription': segments} |
|
|
|
|
|
if rolling_summarization == True: |
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elif api_name: |
|
summary = perform_summarization(api_name, transcription_text, custom_prompt_input, api_key) |
|
else: |
|
summary = None |
|
|
|
if summary: |
|
|
|
summary_file_path = os.path.join(download_path, f"{transcription_text}_summary.txt") |
|
with open(summary_file_path, 'w') as file: |
|
file.write(summary) |
|
|
|
add_media_to_database(path, info_dict, segments, summary, keywords, custom_prompt_input, whisper_model) |
|
else: |
|
logging.error(f"Failed to download video: {path}") |
|
|
|
|
|
|
|
elif chunk and path.lower().endswith('.txt'): |
|
chunks = semantic_chunk_long_file(path, max_chunk_size, chunk_overlap) |
|
if chunks: |
|
chunks_data = { |
|
"file_path": path, |
|
"chunk_unit": chunk_unit, |
|
"max_chunk_size": max_chunk_size, |
|
"chunk_overlap": chunk_overlap, |
|
"chunks": [] |
|
} |
|
summaries_data = { |
|
"file_path": path, |
|
"summarization_method": summarize_chunks, |
|
"summaries": [] |
|
} |
|
|
|
for i, chunk_text in enumerate(chunks): |
|
chunk_info = { |
|
"chunk_id": i + 1, |
|
"text": chunk_text |
|
} |
|
chunks_data["chunks"].append(chunk_info) |
|
|
|
if summarize_chunks: |
|
summary = None |
|
if summarize_chunks == 'openai': |
|
summary = summarize_with_openai(api_key, chunk_text, custom_prompt) |
|
elif summarize_chunks == 'anthropic': |
|
summary = summarize_with_anthropic(api_key, chunk_text, custom_prompt) |
|
elif summarize_chunks == 'cohere': |
|
summary = summarize_with_cohere(api_key, chunk_text, custom_prompt) |
|
elif summarize_chunks == 'groq': |
|
summary = summarize_with_groq(api_key, chunk_text, custom_prompt) |
|
elif summarize_chunks == 'local-llm': |
|
summary = summarize_with_local_llm(chunk_text, custom_prompt) |
|
|
|
|
|
if summary: |
|
summary_info = { |
|
"chunk_id": i + 1, |
|
"summary": summary |
|
} |
|
summaries_data["summaries"].append(summary_info) |
|
else: |
|
logging.warning(f"Failed to generate summary for chunk {i + 1}") |
|
|
|
|
|
chunks_file_path = f"{path}_chunks.json" |
|
with open(chunks_file_path, 'w', encoding='utf-8') as f: |
|
json.dump(chunks_data, f, ensure_ascii=False, indent=2) |
|
logging.info(f"All chunks saved to {chunks_file_path}") |
|
|
|
|
|
if summarize_chunks: |
|
summaries_file_path = f"{path}_summaries.json" |
|
with open(summaries_file_path, 'w', encoding='utf-8') as f: |
|
json.dump(summaries_data, f, ensure_ascii=False, indent=2) |
|
logging.info(f"All summaries saved to {summaries_file_path}") |
|
|
|
logging.info(f"File {path} chunked into {len(chunks)} parts using {chunk_unit} as the unit.") |
|
else: |
|
logging.error(f"Failed to chunk file {path}") |
|
|
|
|
|
else: |
|
download_path, info_dict, urls_or_media_file = process_local_file(path) |
|
if isinstance(urls_or_media_file, list): |
|
|
|
for url in urls_or_media_file: |
|
for item in urls_or_media_file: |
|
if item.startswith(('http://', 'https://')): |
|
info_dict, title = extract_video_info(url) |
|
download_path = create_download_directory(title) |
|
video_path = download_video(url, download_path, info_dict, download_video_flag) |
|
|
|
if video_path: |
|
if diarize: |
|
audio_file, segments = perform_transcription(video_path, offset, whisper_model, vad_filter, diarize=True) |
|
else: |
|
audio_file, segments = perform_transcription(video_path, offset, whisper_model, vad_filter) |
|
|
|
transcription_text = {'audio_file': audio_file, 'transcription': segments} |
|
if rolling_summarization: |
|
text = extract_text_from_segments(segments) |
|
|
|
|
|
elif api_name: |
|
summary = perform_summarization(api_name, transcription_text, custom_prompt_input, api_key) |
|
else: |
|
summary = None |
|
|
|
if summary: |
|
|
|
summary_file_path = os.path.join(download_path, f"{transcription_text}_summary.txt") |
|
with open(summary_file_path, 'w') as file: |
|
file.write(summary) |
|
|
|
add_media_to_database(url, info_dict, segments, summary, keywords, custom_prompt_input, whisper_model) |
|
else: |
|
logging.error(f"Failed to download video: {url}") |
|
|
|
else: |
|
|
|
media_path = urls_or_media_file |
|
|
|
if media_path.lower().endswith(('.txt', '.md')): |
|
if media_path.lower().endswith('.txt'): |
|
|
|
result = ingest_text_file(media_path) |
|
logging.info(result) |
|
elif media_path.lower().endswith(('.mp4', '.avi', '.mov')): |
|
if diarize: |
|
audio_file, segments = perform_transcription(media_path, offset, whisper_model, vad_filter, diarize=True) |
|
else: |
|
audio_file, segments = perform_transcription(media_path, offset, whisper_model, vad_filter) |
|
elif media_path.lower().endswith(('.wav', '.mp3', '.m4a')): |
|
if diarize: |
|
segments = speech_to_text(media_path, whisper_model=whisper_model, vad_filter=vad_filter, diarize=True) |
|
else: |
|
segments = speech_to_text(media_path, whisper_model=whisper_model, vad_filter=vad_filter) |
|
else: |
|
logging.error(f"Unsupported media file format: {media_path}") |
|
continue |
|
|
|
transcription_text = {'media_path': path, 'audio_file': media_path, 'transcription': segments} |
|
|
|
|
|
if rolling_summarization: |
|
|
|
|
|
pass |
|
elif api_name: |
|
summary = perform_summarization(api_name, transcription_text, custom_prompt_input, api_key) |
|
else: |
|
summary = None |
|
|
|
if summary: |
|
|
|
summary_file_path = os.path.join(download_path, f"{transcription_text}_summary.txt") |
|
with open(summary_file_path, 'w') as file: |
|
file.write(summary) |
|
|
|
add_media_to_database(path, info_dict, segments, summary, keywords, custom_prompt_input, whisper_model) |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing {path}: {str(e)}") |
|
continue |
|
|
|
return transcription_text |
|
|
|
|
|
def signal_handler(sig, frame): |
|
logging.info('Signal handler called with signal: %s', sig) |
|
cleanup_process() |
|
sys.exit(0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
signal.signal(signal.SIGINT, signal_handler) |
|
signal.signal(signal.SIGTERM, signal_handler) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
loaded_config_data = load_and_log_configs() |
|
|
|
if loaded_config_data: |
|
logging.info("Main: Configuration loaded successfully") |
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
print("Failed to load configuration") |
|
|
|
|
|
print_hello() |
|
|
|
transcription_text = None |
|
|
|
parser = argparse.ArgumentParser( |
|
description='Transcribe and summarize videos.', |
|
epilog=''' |
|
Sample commands: |
|
1. Simple Sample command structure: |
|
summarize.py <path_to_video> -api openai -k tag_one tag_two tag_three |
|
|
|
2. Rolling Summary Sample command structure: |
|
summarize.py <path_to_video> -api openai -prompt "custom_prompt_goes_here-is-appended-after-transcription" -roll -detail 0.01 -k tag_one tag_two tag_three |
|
|
|
3. FULL Sample command structure: |
|
summarize.py <path_to_video> -api openai -ns 2 -wm small.en -off 0 -vad -log INFO -prompt "custom_prompt" -overwrite -roll -detail 0.01 -k tag_one tag_two tag_three |
|
|
|
4. Sample command structure for UI: |
|
summarize.py -gui -log DEBUG |
|
''', |
|
formatter_class=argparse.RawTextHelpFormatter |
|
) |
|
parser.add_argument('input_path', type=str, help='Path or URL of the video', nargs='?') |
|
parser.add_argument('-v', '--video', action='store_true', help='Download the video instead of just the audio') |
|
parser.add_argument('-api', '--api_name', type=str, help='API name for summarization (optional)') |
|
parser.add_argument('-key', '--api_key', type=str, help='API key for summarization (optional)') |
|
parser.add_argument('-ns', '--num_speakers', type=int, default=2, help='Number of speakers (default: 2)') |
|
parser.add_argument('-wm', '--whisper_model', type=str, default='small', |
|
help='Whisper model (default: small)| Options: tiny.en, tiny, base.en, base, small.en, small, medium.en, ' |
|
'medium, large-v1, large-v2, large-v3, large, distil-large-v2, distil-medium.en, ' |
|
'distil-small.en') |
|
parser.add_argument('-off', '--offset', type=int, default=0, help='Offset in seconds (default: 0)') |
|
parser.add_argument('-vad', '--vad_filter', action='store_true', help='Enable VAD filter') |
|
parser.add_argument('-log', '--log_level', type=str, default='INFO', |
|
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], help='Log level (default: INFO)') |
|
parser.add_argument('-gui', '--user_interface', action='store_true', default=True, help="Launch the Gradio user interface") |
|
parser.add_argument('-demo', '--demo_mode', action='store_true', help='Enable demo mode') |
|
parser.add_argument('-prompt', '--custom_prompt', type=str, |
|
help='Pass in a custom prompt to be used in place of the existing one.\n (Probably should just ' |
|
'modify the script itself...)') |
|
parser.add_argument('-overwrite', '--overwrite', action='store_true', help='Overwrite existing files') |
|
parser.add_argument('-roll', '--rolling_summarization', action='store_true', help='Enable rolling summarization') |
|
parser.add_argument('-detail', '--detail_level', type=float, help='Mandatory if rolling summarization is enabled, ' |
|
'defines the chunk size.\n Default is 0.01(lots ' |
|
'of chunks) -> 1.00 (few chunks)\n Currently ' |
|
'only OpenAI works. ', |
|
default=0.01, ) |
|
parser.add_argument('-model', '--llm_model', type=str, default='', |
|
help='Model to use for LLM summarization (only used for vLLM/TabbyAPI)') |
|
parser.add_argument('-k', '--keywords', nargs='+', default=['cli_ingest_no_tag'], |
|
help='Keywords for tagging the media, can use multiple separated by spaces (default: cli_ingest_no_tag)') |
|
parser.add_argument('--log_file', type=str, help='Where to save logfile (non-default)') |
|
parser.add_argument('--local_llm', action='store_true', |
|
help="Use a local LLM from the script(Downloads llamafile from github and 'mistral-7b-instruct-v0.2.Q8' - 8GB model from Huggingface)") |
|
parser.add_argument('--server_mode', action='store_true', |
|
help='Run in server mode (This exposes the GUI/Server to the network)') |
|
parser.add_argument('--share_public', type=int, default=7860, |
|
help="This will use Gradio's built-in ngrok tunneling to share the server publicly on the internet. Specify the port to use (default: 7860)") |
|
parser.add_argument('--port', type=int, default=7860, help='Port to run the server on') |
|
parser.add_argument('--ingest_text_file', action='store_true', |
|
help='Ingest .txt files as content instead of treating them as URL lists') |
|
parser.add_argument('--text_title', type=str, help='Title for the text file being ingested') |
|
parser.add_argument('--text_author', type=str, help='Author of the text file being ingested') |
|
parser.add_argument('--diarize', action='store_true', help='Enable speaker diarization') |
|
|
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
set_chunk_txt_by_words = False |
|
set_max_txt_chunk_words = 0 |
|
set_chunk_txt_by_sentences = False |
|
set_max_txt_chunk_sentences = 0 |
|
set_chunk_txt_by_paragraphs = False |
|
set_max_txt_chunk_paragraphs = 0 |
|
set_chunk_txt_by_tokens = False |
|
set_max_txt_chunk_tokens = 0 |
|
|
|
if args.share_public: |
|
share_public = args.share_public |
|
else: |
|
share_public = None |
|
if args.server_mode: |
|
|
|
server_mode = args.server_mode |
|
else: |
|
server_mode = None |
|
if args.server_mode is True: |
|
server_mode = True |
|
if args.port: |
|
server_port = args.port |
|
else: |
|
server_port = None |
|
|
|
|
|
logger = logging.getLogger() |
|
logger.setLevel(getattr(logging, args.log_level)) |
|
|
|
|
|
console_handler = logging.StreamHandler() |
|
console_handler.setLevel(getattr(logging, args.log_level)) |
|
console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
|
console_handler.setFormatter(console_formatter) |
|
|
|
if args.log_file: |
|
|
|
file_handler = logging.FileHandler(args.log_file) |
|
file_handler.setLevel(getattr(logging, args.log_level)) |
|
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
|
file_handler.setFormatter(file_formatter) |
|
logger.addHandler(file_handler) |
|
logger.info(f"Log file created at: {args.log_file}") |
|
|
|
|
|
local_llm = args.local_llm |
|
logging.info(f'Local LLM flag: {local_llm}') |
|
|
|
|
|
if args.input_path is not None: |
|
if os.path.isdir(args.input_path) and args.ingest_text_file: |
|
results = ingest_folder(args.input_path, keywords=args.keywords) |
|
for result in results: |
|
print(result) |
|
elif args.input_path.lower().endswith('.txt') and args.ingest_text_file: |
|
result = ingest_text_file(args.input_path, title=args.text_title, author=args.text_author, |
|
keywords=args.keywords) |
|
print(result) |
|
sys.exit(0) |
|
|
|
|
|
|
|
if args.user_interface: |
|
if local_llm: |
|
local_llm_function() |
|
time.sleep(2) |
|
webbrowser.open_new_tab('http://127.0.0.1:7860') |
|
launch_ui() |
|
elif not args.input_path: |
|
parser.print_help() |
|
sys.exit(1) |
|
|
|
else: |
|
logging.info('Starting the transcription and summarization process.') |
|
logging.info(f'Input path: {args.input_path}') |
|
logging.info(f'API Name: {args.api_name}') |
|
logging.info(f'Number of speakers: {args.num_speakers}') |
|
logging.info(f'Whisper model: {args.whisper_model}') |
|
logging.info(f'Offset: {args.offset}') |
|
logging.info(f'VAD filter: {args.vad_filter}') |
|
logging.info(f'Log Level: {args.log_level}') |
|
logging.info(f'Demo Mode: {args.demo_mode}') |
|
logging.info(f'Custom Prompt: {args.custom_prompt}') |
|
logging.info(f'Overwrite: {args.overwrite}') |
|
logging.info(f'Rolling Summarization: {args.rolling_summarization}') |
|
logging.info(f'User Interface: {args.user_interface}') |
|
logging.info(f'Video Download: {args.video}') |
|
|
|
|
|
|
|
global api_name |
|
api_name = args.api_name |
|
|
|
|
|
custom_prompt_input = args.custom_prompt |
|
|
|
if not args.custom_prompt: |
|
logging.debug("No custom prompt defined, will use default") |
|
args.custom_prompt_input = ( |
|
"\n\nabove is the transcript of a video. " |
|
"Please read through the transcript carefully. Identify the main topics that are " |
|
"discussed over the course of the transcript. Then, summarize the key points about each " |
|
"main topic in a concise bullet point. The bullet points should cover the key " |
|
"information conveyed about each topic in the video, but should be much shorter than " |
|
"the full transcript. Please output your bullet point summary inside <bulletpoints> " |
|
"tags." |
|
) |
|
print("No custom prompt defined, will use default") |
|
|
|
custom_prompt_input = args.custom_prompt |
|
else: |
|
logging.debug(f"Custom prompt defined, will use \n\nf{custom_prompt_input} \n\nas the prompt") |
|
print(f"Custom Prompt has been defined. Custom prompt: \n\n {args.custom_prompt}") |
|
|
|
|
|
summary = None |
|
if args.detail_level == None: |
|
args.detail_level = 0.01 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elif args.api_name: |
|
logging.info(f'MAIN: API used: {args.api_name}') |
|
logging.info('MAIN: Summarization (not rolling) will be performed.') |
|
|
|
else: |
|
logging.info('No API specified. Summarization will not be performed.') |
|
|
|
logging.debug("Platform check being performed...") |
|
platform_check() |
|
logging.debug("CUDA check being performed...") |
|
cuda_check() |
|
processing_choice = "cpu" |
|
logging.debug("ffmpeg check being performed...") |
|
check_ffmpeg() |
|
|
|
|
|
llm_model = args.llm_model or None |
|
|
|
args.time_based = False |
|
|
|
try: |
|
results = main(args.input_path, api_name=args.api_name, api_key=args.api_key, |
|
num_speakers=args.num_speakers, whisper_model=args.whisper_model, offset=args.offset, |
|
vad_filter=args.vad_filter, download_video_flag=args.video, custom_prompt=args.custom_prompt_input, |
|
overwrite=args.overwrite, rolling_summarization=args.rolling_summarization, |
|
detail=args.detail_level, keywords=args.keywords, llm_model=args.llm_model, |
|
time_based=args.time_based, set_chunk_txt_by_words=set_chunk_txt_by_words, |
|
set_max_txt_chunk_words=set_max_txt_chunk_words, |
|
set_chunk_txt_by_sentences=set_chunk_txt_by_sentences, |
|
set_max_txt_chunk_sentences=set_max_txt_chunk_sentences, |
|
set_chunk_txt_by_paragraphs=set_chunk_txt_by_paragraphs, |
|
set_max_txt_chunk_paragraphs=set_max_txt_chunk_paragraphs, |
|
set_chunk_txt_by_tokens=set_chunk_txt_by_tokens, |
|
set_max_txt_chunk_tokens=set_max_txt_chunk_tokens) |
|
|
|
logging.info('Transcription process completed.') |
|
atexit.register(cleanup_process) |
|
except Exception as e: |
|
logging.error('An error occurred during the transcription process.') |
|
logging.error(str(e)) |
|
sys.exit(1) |
|
|
|
finally: |
|
cleanup_process() |
|
|