|
|
|
import argparse |
|
import asyncio |
|
import atexit |
|
import configparser |
|
import hashlib |
|
import json |
|
import logging |
|
import os |
|
import platform |
|
import re |
|
import shutil |
|
import signal |
|
import sqlite3 |
|
import subprocess |
|
import sys |
|
import time |
|
from multiprocessing import process |
|
from typing import List, Tuple, Optional, Dict, Callable |
|
import zipfile |
|
from datetime import datetime |
|
from typing import List, Tuple |
|
from typing import Optional |
|
import webbrowser |
|
|
|
from bs4 import BeautifulSoup |
|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
from playwright.async_api import async_playwright |
|
import requests |
|
from requests.exceptions import RequestException |
|
from SQLite_DB import * |
|
import tiktoken |
|
import trafilatura |
|
import unicodedata |
|
import yt_dlp |
|
|
|
from openai import OpenAI |
|
from tqdm import tqdm |
|
import tiktoken |
|
|
|
|
|
|
|
log_level = "DEBUG" |
|
logging.basicConfig(level=getattr(logging, log_level), format='%(asctime)s - %(levelname)s - %(message)s') |
|
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
global local_llm_model, \ |
|
userOS, \ |
|
processing_choice, \ |
|
segments, \ |
|
detail_level_number, \ |
|
summary, \ |
|
audio_file, \ |
|
detail_level |
|
|
|
process = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
config = configparser.ConfigParser() |
|
config.read('config.txt') |
|
|
|
|
|
anthropic_api_key = config.get('API', 'anthropic_api_key', fallback=None) |
|
logging.debug(f"Loaded Anthropic API Key: {anthropic_api_key}") |
|
|
|
cohere_api_key = config.get('API', 'cohere_api_key', fallback=None) |
|
logging.debug(f"Loaded cohere API Key: {cohere_api_key}") |
|
|
|
groq_api_key = config.get('API', 'groq_api_key', fallback=None) |
|
logging.debug(f"Loaded groq API Key: {groq_api_key}") |
|
|
|
openai_api_key = config.get('API', 'openai_api_key', fallback=None) |
|
logging.debug(f"Loaded openAI Face API Key: {openai_api_key}") |
|
|
|
huggingface_api_key = config.get('API', 'huggingface_api_key', fallback=None) |
|
logging.debug(f"Loaded HuggingFace Face API Key: {huggingface_api_key}") |
|
|
|
|
|
anthropic_model = config.get('API', 'anthropic_model', fallback='claude-3-sonnet-20240229') |
|
cohere_model = config.get('API', 'cohere_model', fallback='command-r-plus') |
|
groq_model = config.get('API', 'groq_model', fallback='llama3-70b-8192') |
|
openai_model = config.get('API', 'openai_model', fallback='gpt-4-turbo') |
|
huggingface_model = config.get('API', 'huggingface_model', fallback='CohereForAI/c4ai-command-r-plus') |
|
|
|
|
|
kobold_api_IP = config.get('Local-API', 'kobold_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate') |
|
kobold_api_key = config.get('Local-API', 'kobold_api_key', fallback='') |
|
llama_api_IP = config.get('Local-API', 'llama_api_IP', fallback='http://127.0.0.1:8080/v1/chat/completions') |
|
llama_api_key = config.get('Local-API', 'llama_api_key', fallback='') |
|
ooba_api_IP = config.get('Local-API', 'ooba_api_IP', fallback='http://127.0.0.1:5000/v1/chat/completions') |
|
ooba_api_key = config.get('Local-API', 'ooba_api_key', fallback='') |
|
tabby_api_IP = config.get('Local-API', 'tabby_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate') |
|
tabby_api_key = config.get('Local-API', 'tabby_api_key', fallback=None) |
|
vllm_api_url = config.get('Local-API', 'vllm_api_IP', fallback='http://127.0.0.1:500/api/v1/chat/completions') |
|
vllm_api_key = config.get('Local-API', 'vllm_api_key', fallback=None) |
|
|
|
|
|
DEFAULT_CHUNK_DURATION = config.getint('Settings', 'chunk_duration', fallback='30') |
|
WORDS_PER_SECOND = config.getint('Settings', 'words_per_second', fallback='3') |
|
|
|
|
|
output_path = config.get('Paths', 'output_path', fallback='results') |
|
|
|
|
|
processing_choice = config.get('Processing', 'processing_choice', fallback='cpu') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' |
|
|
|
whisper_models = ["small", "medium", "small.en", "medium.en"] |
|
source_languages = { |
|
"en": "English", |
|
"zh": "Chinese", |
|
"de": "German", |
|
"es": "Spanish", |
|
"ru": "Russian", |
|
"ko": "Korean", |
|
"fr": "French" |
|
} |
|
source_language_list = [key[0] for key in source_languages.items()] |
|
|
|
print(r"""_____ _ ________ _ _ |
|
|_ _|| | / /| _ \| | | | _ |
|
| | | | / / | | | || | | |(_) |
|
| | | | / / | | | || |/\| | |
|
| | | |____ / / | |/ / \ /\ / _ |
|
\_/ \_____//_/ |___/ \/ \/ (_) |
|
|
|
|
|
_ _ |
|
| | | | |
|
| |_ ___ ___ | | ___ _ __ __ _ |
|
| __| / _ \ / _ \ | | / _ \ | '_ \ / _` | |
|
| |_ | (_) || (_) | | || (_) || | | || (_| | _ |
|
\__| \___/ \___/ |_| \___/ |_| |_| \__, |( ) |
|
__/ ||/ |
|
|___/ |
|
_ _ _ _ _ _ _ |
|
| |(_) | | ( )| | | | | | |
|
__| | _ __| | _ __ |/ | |_ __ __ __ _ | |_ ___ | |__ |
|
/ _` || | / _` || '_ \ | __| \ \ /\ / / / _` || __| / __|| '_ \ |
|
| (_| || || (_| || | | | | |_ \ V V / | (_| || |_ | (__ | | | | |
|
\__,_||_| \__,_||_| |_| \__| \_/\_/ \__,_| \__| \___||_| |_| |
|
""") |
|
time.sleep(1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
userOS = "" |
|
|
|
|
|
def platform_check(): |
|
global userOS |
|
if platform.system() == "Linux": |
|
print("Linux OS detected \n Running Linux appropriate commands") |
|
userOS = "Linux" |
|
elif platform.system() == "Windows": |
|
print("Windows OS detected \n Running Windows appropriate commands") |
|
userOS = "Windows" |
|
else: |
|
print("Other OS detected \n Maybe try running things manually?") |
|
exit() |
|
|
|
|
|
|
|
def cuda_check(): |
|
global processing_choice |
|
try: |
|
nvidia_smi = subprocess.check_output("nvidia-smi", shell=True).decode() |
|
if "NVIDIA-SMI" in nvidia_smi: |
|
print("NVIDIA GPU with CUDA is available.") |
|
processing_choice = "cuda" |
|
else: |
|
print("NVIDIA GPU with CUDA is not available.\nYou either have an AMD GPU, or you're stuck with CPU only.") |
|
processing_choice = "cpu" |
|
except subprocess.CalledProcessError: |
|
print("NVIDIA GPU with CUDA is not available.\nYou either have an AMD GPU, or you're stuck with CPU only.") |
|
processing_choice = "cpu" |
|
|
|
|
|
|
|
def decide_cpugpu(): |
|
global processing_choice |
|
processing_input = input("Would you like to use your GPU or CPU for transcription? (1/cuda)GPU/(2/cpu)CPU): ") |
|
if processing_choice == "cuda" and (processing_input.lower() == "cuda" or processing_input == "1"): |
|
print("You've chosen to use the GPU.") |
|
logging.debug("GPU is being used for processing") |
|
processing_choice = "cuda" |
|
elif processing_input.lower() == "cpu" or processing_input == "2": |
|
print("You've chosen to use the CPU.") |
|
logging.debug("CPU is being used for processing") |
|
processing_choice = "cpu" |
|
else: |
|
print("Invalid choice. Please select either GPU or CPU.") |
|
|
|
|
|
|
|
def check_ffmpeg(): |
|
if shutil.which("ffmpeg") or (os.path.exists("Bin") and os.path.isfile(".\\Bin\\ffmpeg.exe")): |
|
logging.debug("ffmpeg found installed on the local system, in the local PATH, or in the './Bin' folder") |
|
pass |
|
else: |
|
logging.debug("ffmpeg not installed on the local system/in local PATH") |
|
print( |
|
"ffmpeg is not installed.\n\n You can either install it manually, or through your package manager of " |
|
"choice.\n Windows users, builds are here: https://www.gyan.dev/ffmpeg/builds/") |
|
if userOS == "Windows": |
|
download_ffmpeg() |
|
elif userOS == "Linux": |
|
print( |
|
"You should install ffmpeg using your platform's appropriate package manager, 'apt install ffmpeg'," |
|
"'dnf install ffmpeg' or 'pacman', etc.") |
|
else: |
|
logging.debug("running an unsupported OS") |
|
print("You're running an unspported/Un-tested OS") |
|
exit_script = input("Let's exit the script, unless you're feeling lucky? (y/n)") |
|
if exit_script == "y" or "yes" or "1": |
|
exit() |
|
|
|
|
|
|
|
def download_ffmpeg(): |
|
user_choice = input("Do you want to download ffmpeg? (y)Yes/(n)No: ") |
|
if user_choice.lower() == 'yes' or 'y' or '1': |
|
print("Downloading ffmpeg") |
|
url = "https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-essentials.zip" |
|
response = requests.get(url) |
|
|
|
if response.status_code == 200: |
|
print("Saving ffmpeg zip file") |
|
logging.debug("Saving ffmpeg zip file") |
|
zip_path = "ffmpeg-release-essentials.zip" |
|
with open(zip_path, 'wb') as file: |
|
file.write(response.content) |
|
|
|
logging.debug("Extracting the 'ffmpeg.exe' file from the zip") |
|
print("Extracting ffmpeg.exe from zip file to '/Bin' folder") |
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
ffmpeg_path = "ffmpeg-7.0-essentials_build/bin/ffmpeg.exe" |
|
|
|
logging.debug("checking if the './Bin' folder exists, creating if not") |
|
bin_folder = "Bin" |
|
if not os.path.exists(bin_folder): |
|
logging.debug("Creating a folder for './Bin', it didn't previously exist") |
|
os.makedirs(bin_folder) |
|
|
|
logging.debug("Extracting 'ffmpeg.exe' to the './Bin' folder") |
|
zip_ref.extract(ffmpeg_path, path=bin_folder) |
|
|
|
logging.debug("Moving 'ffmpeg.exe' to the './Bin' folder") |
|
src_path = os.path.join(bin_folder, ffmpeg_path) |
|
dst_path = os.path.join(bin_folder, "ffmpeg.exe") |
|
shutil.move(src_path, dst_path) |
|
|
|
logging.debug("Removing ffmpeg zip file") |
|
print("Deleting zip file (we've already extracted ffmpeg.exe, no worries)") |
|
os.remove(zip_path) |
|
|
|
logging.debug("ffmpeg.exe has been downloaded and extracted to the './Bin' folder.") |
|
print("ffmpeg.exe has been successfully downloaded and extracted to the './Bin' folder.") |
|
else: |
|
logging.error("Failed to download the zip file.") |
|
print("Failed to download the zip file.") |
|
else: |
|
logging.debug("User chose to not download ffmpeg") |
|
print("ffmpeg will not be downloaded.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_paths_from_file(file_path): |
|
""" Reads a file containing URLs or local file paths and returns them as a list. """ |
|
paths = [] |
|
with open(file_path, 'r') as file: |
|
paths = [line.strip() for line in file] |
|
return paths |
|
|
|
|
|
def process_path(path): |
|
""" Decides whether the path is a URL or a local file and processes accordingly. """ |
|
if path.startswith('http'): |
|
logging.debug("file is a URL") |
|
|
|
return get_youtube(path) |
|
elif os.path.exists(path): |
|
logging.debug("File is a path") |
|
|
|
return process_local_file(path) |
|
else: |
|
logging.error(f"Path does not exist: {path}") |
|
return None |
|
|
|
|
|
|
|
def process_local_file(file_path): |
|
logging.info(f"Processing local file: {file_path}") |
|
title = normalize_title(os.path.splitext(os.path.basename(file_path))[0]) |
|
info_dict = {'title': title} |
|
logging.debug(f"Creating {title} directory...") |
|
download_path = create_download_directory(title) |
|
logging.debug(f"Converting '{title}' to an audio file (wav).") |
|
audio_file = convert_to_wav(file_path) |
|
logging.debug(f"'{title}' successfully converted to an audio file (wav).") |
|
return download_path, info_dict, audio_file |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_page_title(url: str) -> str: |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
title_tag = soup.find('title') |
|
return title_tag.string.strip() if title_tag else "Untitled" |
|
except requests.RequestException as e: |
|
logging.error(f"Error fetching page title: {e}") |
|
return "Untitled" |
|
|
|
|
|
def get_article_text(url: str) -> str: |
|
pass |
|
|
|
|
|
def get_artice_title(article_url_arg: str) -> str: |
|
|
|
article_title = get_page_title(article_url_arg) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sanitize_filename(filename): |
|
return re.sub(r'[<>:"/\\|?*]', '_', filename) |
|
|
|
|
|
def get_video_info(url: str) -> dict: |
|
ydl_opts = { |
|
'quiet': True, |
|
'no_warnings': True, |
|
'skip_download': True, |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
try: |
|
info_dict = ydl.extract_info(url, download=False) |
|
return info_dict |
|
except Exception as e: |
|
logging.error(f"Error extracting video info: {e}") |
|
return None |
|
|
|
|
|
def process_url(url, |
|
num_speakers, |
|
whisper_model, |
|
custom_prompt, |
|
offset, |
|
api_name, |
|
api_key, |
|
vad_filter, |
|
download_video, |
|
download_audio, |
|
rolling_summarization, |
|
detail_level, |
|
question_box, |
|
keywords, |
|
chunk_summarization, |
|
chunk_duration_input, |
|
words_per_second_input, |
|
): |
|
|
|
if not url: |
|
return "No URL provided.", "No URL provided.", None, None, None, None, None, None |
|
|
|
if not is_valid_url(url): |
|
return "Invalid URL format.", "Invalid URL format.", None, None, None, None, None, None |
|
|
|
print("API Name received:", api_name) |
|
|
|
logging.info(f"Processing URL: {url}") |
|
video_file_path = None |
|
|
|
try: |
|
|
|
db = Database() |
|
media_url = url |
|
|
|
info_dict = get_youtube(url) |
|
media_title = info_dict['title'] if 'title' in info_dict else 'Untitled' |
|
|
|
results = main(url, api_name=api_name, api_key=api_key, |
|
num_speakers=num_speakers, |
|
whisper_model=whisper_model, |
|
offset=offset, |
|
vad_filter=vad_filter, |
|
download_video_flag=download_video, |
|
custom_prompt=custom_prompt, |
|
overwrite=args.overwrite, |
|
rolling_summarization=rolling_summarization, |
|
detail=detail_level, |
|
keywords=keywords, |
|
chunk_summarization=chunk_summarization, |
|
chunk_duration=chunk_duration_input, |
|
words_per_second=words_per_second_input, |
|
) |
|
|
|
if not results: |
|
return "No URL provided.", "No URL provided.", None, None, None, None, None, None |
|
|
|
transcription_result = results[0] |
|
transcription_text = json.dumps(transcription_result['transcription'], indent=2) |
|
summary_text = transcription_result.get('summary', 'Summary not available') |
|
|
|
|
|
|
|
audio_file_sanitized = sanitize_filename(transcription_result['audio_file']) |
|
json_file_path = audio_file_sanitized.replace('.wav', '.segments_pretty.json') |
|
summary_file_path = audio_file_sanitized.replace('.wav', '_summary.txt') |
|
|
|
logging.debug(f"Transcription result: {transcription_result}") |
|
logging.debug(f"Audio file path: {transcription_result['audio_file']}") |
|
|
|
|
|
try: |
|
with open(json_file_path, 'w') as json_file: |
|
json.dump(transcription_result['transcription'], json_file, indent=2) |
|
except IOError as e: |
|
logging.error(f"Error writing transcription to JSON file: {e}") |
|
|
|
|
|
with open(summary_file_path, 'w') as summary_file: |
|
summary_file.write(summary_text) |
|
|
|
if download_video: |
|
video_file_path = transcription_result['video_path'] if 'video_path' in transcription_result else None |
|
|
|
|
|
if not os.path.exists(json_file_path): |
|
raise FileNotFoundError(f"File not found: {json_file_path}") |
|
if not os.path.exists(summary_file_path): |
|
raise FileNotFoundError(f"File not found: {summary_file_path}") |
|
|
|
formatted_transcription = format_transcription(transcription_result) |
|
|
|
|
|
if chunk_summarization: |
|
chunk_duration = chunk_duration_input if chunk_duration_input else DEFAULT_CHUNK_DURATION |
|
words_per_second = words_per_second_input if words_per_second_input else WORDS_PER_SECOND |
|
summary_text = summarize_chunks(api_name, api_key, transcription_result['transcription'], chunk_duration, |
|
words_per_second) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
custom_prompt = args.custom_prompt if args.custom_prompt else ("\n\nabove is the transcript of a video " |
|
"Please read through the transcript carefully. Identify the main topics that are discussed over the " |
|
"course of the transcript. Then, summarize the key points about each main topic in a concise bullet " |
|
"point. The bullet points should cover the key information conveyed about each topic in the video, " |
|
"but should be much shorter than the full transcript. Please output your bullet point summary inside " |
|
"<bulletpoints> tags.") |
|
|
|
db = Database() |
|
create_tables() |
|
media_url = url |
|
|
|
video_info = get_video_info(media_url) |
|
media_title = get_page_title(media_url) |
|
media_type = "video" |
|
media_content = transcription_text |
|
keyword_list = keywords.split(',') if keywords else ["default"] |
|
media_keywords = ', '.join(keyword_list) |
|
media_author = "auto_generated" |
|
media_ingestion_date = datetime.now().strftime('%Y-%m-%d') |
|
transcription_model = whisper_model |
|
|
|
|
|
logging.info(f"Media URL: {media_url}") |
|
logging.info(f"Media Title: {media_title}") |
|
logging.info(f"Media Type: {media_type}") |
|
logging.info(f"Media Content: {media_content}") |
|
logging.info(f"Media Keywords: {media_keywords}") |
|
logging.info(f"Media Author: {media_author}") |
|
logging.info(f"Ingestion Date: {media_ingestion_date}") |
|
logging.info(f"Custom Prompt: {custom_prompt}") |
|
logging.info(f"Summary Text: {summary_text}") |
|
logging.info(f"Transcription Model: {transcription_model}") |
|
|
|
|
|
if not media_url or not media_title or not media_type or not media_content or not media_keywords or not custom_prompt or not summary_text: |
|
raise InputError("Please provide all required fields.") |
|
|
|
add_media_with_keywords( |
|
url=media_url, |
|
title=media_title, |
|
media_type=media_type, |
|
content=media_content, |
|
keywords=media_keywords, |
|
prompt=custom_prompt, |
|
summary=summary_text, |
|
transcription_model=transcription_model, |
|
author=media_author, |
|
ingestion_date=media_ingestion_date |
|
) |
|
except Exception as e: |
|
logging.error(f"Failed to add media to the database: {e}") |
|
|
|
if summary_file_path and os.path.exists(summary_file_path): |
|
return transcription_text, summary_text, json_file_path, summary_file_path, video_file_path, None |
|
else: |
|
return transcription_text, summary_text, json_file_path, None, video_file_path, None |
|
except Exception as e: |
|
logging.error(f"Error processing URL: {e}") |
|
return str(e), 'Error processing the request.', None, None, None, None |
|
|
|
|
|
def create_download_directory(title): |
|
base_dir = "Results" |
|
|
|
safe_title = normalize_title(title) |
|
logging.debug(f"{title} successfully normalized") |
|
session_path = os.path.join(base_dir, safe_title) |
|
if not os.path.exists(session_path): |
|
os.makedirs(session_path, exist_ok=True) |
|
logging.debug(f"Created directory for downloaded video: {session_path}") |
|
else: |
|
logging.debug(f"Directory already exists for downloaded video: {session_path}") |
|
return session_path |
|
|
|
|
|
def normalize_title(title): |
|
|
|
title = unicodedata.normalize('NFKD', title).encode('ascii', 'ignore').decode('ascii') |
|
title = title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('"', '').replace('*', '').replace('?', |
|
'').replace( |
|
'<', '').replace('>', '').replace('|', '') |
|
return title |
|
|
|
|
|
def get_youtube(video_url): |
|
ydl_opts = { |
|
'format': 'bestaudio[ext=m4a]', |
|
'noplaylist': False, |
|
'quiet': True, |
|
'extract_flat': True |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
logging.debug("About to extract youtube info") |
|
info_dict = ydl.extract_info(video_url, download=False) |
|
logging.debug("Youtube info successfully extracted") |
|
return info_dict |
|
|
|
|
|
def get_playlist_videos(playlist_url): |
|
ydl_opts = { |
|
'extract_flat': True, |
|
'skip_download': True, |
|
'quiet': True |
|
} |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(playlist_url, download=False) |
|
|
|
if 'entries' in info: |
|
video_urls = [entry['url'] for entry in info['entries']] |
|
playlist_title = info['title'] |
|
return video_urls, playlist_title |
|
else: |
|
print("No videos found in the playlist.") |
|
return [], None |
|
|
|
|
|
def save_to_file(video_urls, filename): |
|
with open(filename, 'w') as file: |
|
file.write('\n'.join(video_urls)) |
|
print(f"Video URLs saved to {filename}") |
|
|
|
|
|
def download_video(video_url, download_path, info_dict, download_video_flag): |
|
logging.debug("About to normalize downloaded video title") |
|
title = normalize_title(info_dict['title']) |
|
|
|
if not download_video_flag: |
|
file_path = os.path.join(download_path, f"{title}.m4a") |
|
ydl_opts = { |
|
'format': 'bestaudio[ext=m4a]', |
|
'outtmpl': file_path, |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
logging.debug("yt_dlp: About to download audio with youtube-dl") |
|
ydl.download([video_url]) |
|
logging.debug("yt_dlp: Audio successfully downloaded with youtube-dl") |
|
return file_path |
|
else: |
|
video_file_path = os.path.join(download_path, f"{title}_video.mp4") |
|
audio_file_path = os.path.join(download_path, f"{title}_audio.m4a") |
|
ydl_opts_video = { |
|
'format': 'bestvideo[ext=mp4]', |
|
'outtmpl': video_file_path, |
|
} |
|
ydl_opts_audio = { |
|
'format': 'bestaudio[ext=m4a]', |
|
'outtmpl': audio_file_path, |
|
} |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts_video) as ydl: |
|
logging.debug("yt_dlp: About to download video with youtube-dl") |
|
ydl.download([video_url]) |
|
logging.debug("yt_dlp: Video successfully downloaded with youtube-dl") |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts_audio) as ydl: |
|
logging.debug("yt_dlp: About to download audio with youtube-dl") |
|
ydl.download([video_url]) |
|
logging.debug("yt_dlp: Audio successfully downloaded with youtube-dl") |
|
|
|
output_file_path = os.path.join(download_path, f"{title}.mp4") |
|
|
|
if sys.platform.startswith('win'): |
|
logging.debug("Running ffmpeg on Windows...") |
|
ffmpeg_command = [ |
|
'.\\Bin\\ffmpeg.exe', |
|
'-i', video_file_path, |
|
'-i', audio_file_path, |
|
'-c:v', 'copy', |
|
'-c:a', 'copy', |
|
output_file_path |
|
] |
|
subprocess.run(ffmpeg_command, check=True) |
|
elif userOS == "Linux": |
|
logging.debug("Running ffmpeg on Linux...") |
|
ffmpeg_command = [ |
|
'ffmpeg', |
|
'-i', video_file_path, |
|
'-i', audio_file_path, |
|
'-c:v', 'copy', |
|
'-c:a', 'copy', |
|
output_file_path |
|
] |
|
subprocess.run(ffmpeg_command, check=True) |
|
else: |
|
logging.error("ffmpeg: Unsupported operating system for video download and merging.") |
|
raise RuntimeError("ffmpeg: Unsupported operating system for video download and merging.") |
|
os.remove(video_file_path) |
|
os.remove(audio_file_path) |
|
|
|
return output_file_path |
|
|
|
|
|
def read_paths_from_file(file_path: str) -> List[str]: |
|
"""Read paths from a text file.""" |
|
with open(file_path, 'r') as file: |
|
paths = file.readlines() |
|
return [path.strip() for path in paths] |
|
|
|
|
|
def save_summary_to_file(summary: str, file_path: str): |
|
"""Save summary to a JSON file.""" |
|
summary_data = {'summary': summary, 'generated_at': datetime.now().isoformat()} |
|
with open(file_path, 'w') as file: |
|
json.dump(summary_data, file, indent=4) |
|
|
|
|
|
def extract_text_from_segments(segments: List[Dict]) -> str: |
|
"""Extract text from segments.""" |
|
return " ".join([segment['text'] for segment in segments]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_wav(video_file_path, offset=0, overwrite=False): |
|
out_path = os.path.splitext(video_file_path)[0] + ".wav" |
|
|
|
if os.path.exists(out_path) and not overwrite: |
|
print(f"File '{out_path}' already exists. Skipping conversion.") |
|
logging.info(f"Skipping conversion as file already exists: {out_path}") |
|
return out_path |
|
print("Starting conversion process of .m4a to .WAV") |
|
out_path = os.path.splitext(video_file_path)[0] + ".wav" |
|
|
|
try: |
|
if os.name == "nt": |
|
logging.debug("ffmpeg being ran on windows") |
|
|
|
if sys.platform.startswith('win'): |
|
ffmpeg_cmd = ".\\Bin\\ffmpeg.exe" |
|
logging.debug(f"ffmpeg_cmd: {ffmpeg_cmd}") |
|
else: |
|
ffmpeg_cmd = 'ffmpeg' |
|
|
|
command = [ |
|
ffmpeg_cmd, |
|
"-ss", "00:00:00", |
|
"-i", video_file_path, |
|
"-ar", "16000", |
|
"-ac", "1", |
|
"-c:a", "pcm_s16le", |
|
out_path |
|
] |
|
try: |
|
|
|
with open(os.devnull, 'rb') as null_file: |
|
result = subprocess.run(command, stdin=null_file, text=True, capture_output=True) |
|
if result.returncode == 0: |
|
logging.info("FFmpeg executed successfully") |
|
logging.debug("FFmpeg output: %s", result.stdout) |
|
else: |
|
logging.error("Error in running FFmpeg") |
|
logging.error("FFmpeg stderr: %s", result.stderr) |
|
raise RuntimeError(f"FFmpeg error: {result.stderr}") |
|
except Exception as e: |
|
logging.error("Error occurred - ffmpeg doesn't like windows") |
|
raise RuntimeError("ffmpeg failed") |
|
elif os.name == "posix": |
|
os.system(f'ffmpeg -ss 00:00:00 -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{out_path}"') |
|
else: |
|
raise RuntimeError("Unsupported operating system") |
|
logging.info("Conversion to WAV completed: %s", out_path) |
|
except subprocess.CalledProcessError as e: |
|
logging.error("Error executing FFmpeg command: %s", str(e)) |
|
raise RuntimeError("Error converting video file to WAV") |
|
except Exception as e: |
|
logging.error("Unexpected error occurred: %s", str(e)) |
|
raise RuntimeError("Error converting video file to WAV") |
|
return out_path |
|
|
|
|
|
|
|
def speech_to_text(audio_file_path, selected_source_lang='en', whisper_model='small.en', vad_filter=False): |
|
logging.info('speech-to-text: Loading faster_whisper model: %s', whisper_model) |
|
from faster_whisper import WhisperModel |
|
model = WhisperModel(whisper_model, device=f"{processing_choice}") |
|
time_start = time.time() |
|
if audio_file_path is None: |
|
raise ValueError("speech-to-text: No audio file provided") |
|
logging.info("speech-to-text: Audio file path: %s", audio_file_path) |
|
|
|
try: |
|
_, file_ending = os.path.splitext(audio_file_path) |
|
out_file = audio_file_path.replace(file_ending, ".segments.json") |
|
prettified_out_file = audio_file_path.replace(file_ending, ".segments_pretty.json") |
|
if os.path.exists(out_file): |
|
logging.info("speech-to-text: Segments file already exists: %s", out_file) |
|
with open(out_file) as f: |
|
global segments |
|
segments = json.load(f) |
|
return segments |
|
|
|
logging.info('speech-to-text: Starting transcription...') |
|
options = dict(language=selected_source_lang, beam_size=5, best_of=5, vad_filter=vad_filter) |
|
transcribe_options = dict(task="transcribe", **options) |
|
segments_raw, info = model.transcribe(audio_file_path, **transcribe_options) |
|
|
|
segments = [] |
|
for segment_chunk in segments_raw: |
|
chunk = { |
|
"start": segment_chunk.start, |
|
"end": segment_chunk.end, |
|
"text": segment_chunk.text |
|
} |
|
logging.debug("Segment: %s", chunk) |
|
segments.append(chunk) |
|
logging.info("speech-to-text: Transcription completed with faster_whisper") |
|
|
|
|
|
with open(prettified_out_file, 'w') as f: |
|
json.dump(segments, f, indent=2) |
|
|
|
|
|
with open(out_file, 'w') as f: |
|
json.dump(segments, f) |
|
|
|
except Exception as e: |
|
logging.error("speech-to-text: Error transcribing audio: %s", str(e)) |
|
raise RuntimeError("speech-to-text: Error transcribing audio") |
|
return segments |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunk_transcript(transcript: str, chunk_duration: int, words_per_second) -> List[str]: |
|
words = transcript.split() |
|
words_per_chunk = chunk_duration * words_per_second |
|
chunks = [' '.join(words[i:i + words_per_chunk]) for i in range(0, len(words), words_per_chunk)] |
|
return chunks |
|
|
|
|
|
def summarize_chunks(api_name: str, api_key: str, transcript: List[dict], chunk_duration: int, |
|
words_per_second: int) -> str: |
|
if api_name not in summarizers: |
|
return f"Unsupported API: {api_name}" |
|
|
|
summarizer = summarizers[api_name] |
|
text = extract_text_from_segments(transcript) |
|
chunks = chunk_transcript(text, chunk_duration, words_per_second) |
|
|
|
summaries = [] |
|
for chunk in chunks: |
|
if api_name == 'openai': |
|
|
|
summaries.append(summarizer(api_key, chunk, custom_prompt)) |
|
else: |
|
summaries.append(summarizer(api_key, chunk)) |
|
|
|
return "\n\n".join(summaries) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_chat_completion(messages, model='gpt-4-turbo'): |
|
response = client.chat.completions.create( |
|
model=model, |
|
messages=messages, |
|
temperature=0, |
|
) |
|
return response.choices[0].message.content |
|
|
|
|
|
|
|
def chunk_on_delimiter(input_string: str, |
|
max_tokens: int, |
|
delimiter: str) -> List[str]: |
|
chunks = input_string.split(delimiter) |
|
combined_chunks, _, dropped_chunk_count = combine_chunks_with_no_minimum( |
|
chunks, max_tokens, chunk_delimiter=delimiter, add_ellipsis_for_overflow=True) |
|
if dropped_chunk_count > 0: |
|
print(f"Warning: {dropped_chunk_count} chunks were dropped due to exceeding the token limit.") |
|
combined_chunks = [f"{chunk}{delimiter}" for chunk in combined_chunks] |
|
return combined_chunks |
|
|
|
|
|
|
|
|
|
def combine_chunks_with_no_minimum( |
|
chunks: List[str], |
|
max_tokens: int, |
|
chunk_delimiter="\n\n", |
|
header: Optional[str] = None, |
|
add_ellipsis_for_overflow=False, |
|
) -> Tuple[List[str], List[int]]: |
|
dropped_chunk_count = 0 |
|
output = [] |
|
output_indices = [] |
|
candidate = ( |
|
[] if header is None else [header] |
|
) |
|
candidate_indices = [] |
|
for chunk_i, chunk in enumerate(chunks): |
|
chunk_with_header = [chunk] if header is None else [header, chunk] |
|
|
|
if len(openai_tokenize(chunk_delimiter.join(chunk_with_header))) > max_tokens: |
|
print(f"warning: chunk overflow") |
|
if ( |
|
add_ellipsis_for_overflow |
|
|
|
and len(openai_tokenize(chunk_delimiter.join(candidate + ["..."]))) <= max_tokens |
|
): |
|
candidate.append("...") |
|
dropped_chunk_count += 1 |
|
continue |
|
|
|
|
|
extended_candidate_token_count = len(openai_tokenize(chunk_delimiter.join(candidate + [chunk]))) |
|
|
|
if extended_candidate_token_count > max_tokens: |
|
output.append(chunk_delimiter.join(candidate)) |
|
output_indices.append(candidate_indices) |
|
candidate = chunk_with_header |
|
candidate_indices = [chunk_i] |
|
|
|
else: |
|
candidate.append(chunk) |
|
candidate_indices.append(chunk_i) |
|
|
|
if (header is not None and len(candidate) > 1) or (header is None and len(candidate) > 0): |
|
output.append(chunk_delimiter.join(candidate)) |
|
output_indices.append(candidate_indices) |
|
return output, output_indices, dropped_chunk_count |
|
|
|
|
|
def rolling_summarize(text: str, |
|
detail: float = 0, |
|
model: str = 'gpt-4-turbo', |
|
additional_instructions: Optional[str] = None, |
|
minimum_chunk_size: Optional[int] = 500, |
|
chunk_delimiter: str = ".", |
|
summarize_recursively=False, |
|
verbose=False): |
|
""" |
|
Summarizes a given text by splitting it into chunks, each of which is summarized individually. |
|
The level of detail in the summary can be adjusted, and the process can optionally be made recursive. |
|
|
|
Parameters: - text (str): The text to be summarized. - detail (float, optional): A value between 0 and 1 |
|
indicating the desired level of detail in the summary. 0 leads to a higher level summary, and 1 results in a more |
|
detailed summary. Defaults to 0. - model (str, optional): The model to use for generating summaries. Defaults to |
|
'gpt-3.5-turbo'. - additional_instructions (Optional[str], optional): Additional instructions to provide to the |
|
model for customizing summaries. - minimum_chunk_size (Optional[int], optional): The minimum size for text |
|
chunks. Defaults to 500. - chunk_delimiter (str, optional): The delimiter used to split the text into chunks. |
|
Defaults to ".". - summarize_recursively (bool, optional): If True, summaries are generated recursively, |
|
using previous summaries for context. - verbose (bool, optional): If True, prints detailed information about the |
|
chunking process. |
|
|
|
Returns: |
|
- str: The final compiled summary of the text. |
|
|
|
The function first determines the number of chunks by interpolating between a minimum and a maximum chunk count |
|
based on the `detail` parameter. It then splits the text into chunks and summarizes each chunk. If |
|
`summarize_recursively` is True, each summary is based on the previous summaries, adding more context to the |
|
summarization process. The function returns a compiled summary of all chunks. |
|
""" |
|
|
|
|
|
assert 0 <= detail <= 1 |
|
|
|
|
|
max_chunks = len(chunk_on_delimiter(text, minimum_chunk_size, chunk_delimiter)) |
|
min_chunks = 1 |
|
num_chunks = int(min_chunks + detail * (max_chunks - min_chunks)) |
|
|
|
|
|
|
|
document_length = len(openai_tokenize(text)) |
|
chunk_size = max(minimum_chunk_size, document_length // num_chunks) |
|
text_chunks = chunk_on_delimiter(text, chunk_size, chunk_delimiter) |
|
if verbose: |
|
print(f"Splitting the text into {len(text_chunks)} chunks to be summarized.") |
|
|
|
print(f"Chunk lengths are {[len(openai_tokenize(x)) for x in text_chunks]}") |
|
|
|
|
|
system_message_content = "Rewrite this text in summarized form." |
|
if additional_instructions is not None: |
|
system_message_content += f"\n\n{additional_instructions}" |
|
|
|
accumulated_summaries = [] |
|
for chunk in tqdm(text_chunks): |
|
if summarize_recursively and accumulated_summaries: |
|
|
|
accumulated_summaries_string = '\n\n'.join(accumulated_summaries) |
|
user_message_content = f"Previous summaries:\n\n{accumulated_summaries_string}\n\nText to summarize next:\n\n{chunk}" |
|
else: |
|
|
|
user_message_content = chunk |
|
|
|
|
|
messages = [ |
|
{"role": "system", "content": system_message_content}, |
|
{"role": "user", "content": user_message_content} |
|
] |
|
|
|
|
|
response = get_chat_completion(messages, model=model) |
|
accumulated_summaries.append(response) |
|
|
|
|
|
global final_summary |
|
final_summary = '\n\n'.join(accumulated_summaries) |
|
|
|
return final_summary |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunk_transcript(transcript: str, chunk_duration: int, words_per_second) -> List[str]: |
|
words = transcript.split() |
|
words_per_chunk = chunk_duration * words_per_second |
|
chunks = [' '.join(words[i:i + words_per_chunk]) for i in range(0, len(words), words_per_chunk)] |
|
return chunks |
|
|
|
|
|
def summarize_chunks(api_name: str, api_key: str, transcript: List[dict], chunk_duration: int, |
|
words_per_second: int) -> str: |
|
if api_name not in summarizers: |
|
return f"Unsupported API: {api_name}" |
|
|
|
if not transcript: |
|
logging.error("Empty or None transcript provided to summarize_chunks") |
|
return "Error: Empty or None transcript provided" |
|
|
|
text = extract_text_from_segments(transcript) |
|
chunks = chunk_transcript(text, chunk_duration, words_per_second) |
|
|
|
custom_prompt = args.custom_prompt |
|
|
|
summaries = [] |
|
for chunk in chunks: |
|
if api_name == 'openai': |
|
|
|
summaries.append(summarize_with_openai(api_key, chunk, custom_prompt)) |
|
elif api_name == 'anthropic': |
|
summaries.append(summarize_with_cohere(api_key, chunk, anthropic_model, custom_prompt)) |
|
elif api_name == 'cohere': |
|
summaries.append(summarize_with_claude(api_key, chunk, cohere_model, custom_prompt)) |
|
elif api_name == 'groq': |
|
summaries.append(summarize_with_groq(api_key, chunk, groq_model, custom_prompt)) |
|
elif api_name == 'llama': |
|
summaries.append(summarize_with_llama(llama_api_IP, chunk, api_key, custom_prompt)) |
|
elif api_name == 'kobold': |
|
summaries.append(summarize_with_kobold(kobold_api_IP, chunk, api_key, custom_prompt)) |
|
elif api_name == 'ooba': |
|
summaries.append(summarize_with_oobabooga(ooba_api_IP, chunk, api_key, custom_prompt)) |
|
elif api_name == 'tabbyapi': |
|
summaries.append(summarize_with_vllm(api_key, tabby_api_IP, chunk, llm_model, custom_prompt)) |
|
elif api_name == 'local-llm': |
|
summaries.append(summarize_with_local_llm(chunk, custom_prompt)) |
|
else: |
|
return f"Unsupported API: {api_name}" |
|
|
|
return "\n\n".join(summaries) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def openai_tokenize(text: str) -> List[str]: |
|
encoding = tiktoken.encoding_for_model('gpt-4-turbo') |
|
return encoding.encode(text) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def scrape_article(url): |
|
async def fetch_html(url: str) -> str: |
|
async with async_playwright() as p: |
|
browser = await p.chromium.launch(headless=True) |
|
context = await browser.new_context( |
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3") |
|
page = await context.new_page() |
|
await page.goto(url) |
|
await page.wait_for_load_state("networkidle") |
|
content = await page.content() |
|
await browser.close() |
|
return content |
|
|
|
def extract_article_data(html: str) -> dict: |
|
downloaded = trafilatura.extract(html, include_comments=False, include_tables=False, include_images=False) |
|
if downloaded: |
|
metadata = trafilatura.extract_metadata(html) |
|
if metadata: |
|
return { |
|
'title': metadata.title if metadata.title else 'N/A', |
|
'author': metadata.author if metadata.author else 'N/A', |
|
'content': downloaded, |
|
'date': metadata.date if metadata.date else 'N/A', |
|
} |
|
else: |
|
print("Metadata extraction failed.") |
|
return None |
|
else: |
|
print("Content extraction failed.") |
|
return None |
|
|
|
def convert_html_to_markdown(html: str) -> str: |
|
soup = BeautifulSoup(html, 'html.parser') |
|
|
|
for para in soup.find_all('p'): |
|
para.append('\n') |
|
|
|
|
|
text = soup.get_text(separator='\n\n') |
|
|
|
return text |
|
|
|
async def fetch_and_extract_article(url: str): |
|
html = await fetch_html(url) |
|
print("HTML Content:", html[:500]) |
|
article_data = extract_article_data(html) |
|
if article_data: |
|
article_data['content'] = convert_html_to_markdown(article_data['content']) |
|
return article_data |
|
else: |
|
return None |
|
|
|
|
|
article_data = asyncio.run(fetch_and_extract_article(url)) |
|
return article_data |
|
|
|
|
|
def ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, custom_prompt): |
|
try: |
|
|
|
if not content.strip(): |
|
raise ValueError("Content is empty.") |
|
|
|
db = Database() |
|
create_tables() |
|
keyword_list = keywords.split(',') if keywords else ["default"] |
|
keyword_str = ', '.join(keyword_list) |
|
|
|
|
|
url = url or 'Unknown' |
|
title = title or 'Unknown' |
|
author = author or 'Unknown' |
|
keywords = keywords or 'default' |
|
summary = summary or 'No summary available' |
|
ingestion_date = ingestion_date or datetime.now().strftime('%Y-%m-%d') |
|
|
|
|
|
logging.debug(f"URL: {url}") |
|
logging.debug(f"Title: {title}") |
|
logging.debug(f"Author: {author}") |
|
logging.debug(f"Content: {content[:50]}... (length: {len(content)})") |
|
logging.debug(f"Keywords: {keywords}") |
|
logging.debug(f"Summary: {summary}") |
|
logging.debug(f"Ingestion Date: {ingestion_date}") |
|
logging.debug(f"Custom Prompt: {custom_prompt}") |
|
|
|
|
|
if not url: |
|
logging.error("URL is missing.") |
|
raise ValueError("URL is missing.") |
|
if not title: |
|
logging.error("Title is missing.") |
|
raise ValueError("Title is missing.") |
|
if not content: |
|
logging.error("Content is missing.") |
|
raise ValueError("Content is missing.") |
|
if not keywords: |
|
logging.error("Keywords are missing.") |
|
raise ValueError("Keywords are missing.") |
|
if not summary: |
|
logging.error("Summary is missing.") |
|
raise ValueError("Summary is missing.") |
|
if not ingestion_date: |
|
logging.error("Ingestion date is missing.") |
|
raise ValueError("Ingestion date is missing.") |
|
if not custom_prompt: |
|
logging.error("Custom prompt is missing.") |
|
raise ValueError("Custom prompt is missing.") |
|
|
|
|
|
result = add_media_with_keywords( |
|
url=url, |
|
title=title, |
|
media_type='article', |
|
content=content, |
|
keywords=keyword_str or "article_default", |
|
prompt=custom_prompt or None, |
|
summary=summary or "No summary generated", |
|
transcription_model=None, |
|
author=author or 'Unknown', |
|
ingestion_date=ingestion_date |
|
) |
|
return result |
|
except Exception as e: |
|
logging.error(f"Failed to ingest article to the database: {e}") |
|
return str(e) |
|
|
|
|
|
def scrape_and_summarize(url, custom_prompt_arg, api_name, api_key, keywords, custom_article_title): |
|
|
|
article_data = scrape_article(url) |
|
print(f"Scraped Article Data: {article_data}") |
|
if not article_data: |
|
return "Failed to scrape the article." |
|
|
|
|
|
title = custom_article_title.strip() if custom_article_title else article_data.get('title', 'Untitled') |
|
author = article_data.get('author', 'Unknown') |
|
content = article_data.get('content', '') |
|
ingestion_date = datetime.now().strftime('%Y-%m-%d') |
|
|
|
print(f"Title: {title}, Author: {author}, Content Length: {len(content)}") |
|
|
|
|
|
article_custom_prompt = custom_prompt_arg or "Summarize this article." |
|
|
|
|
|
summary = None |
|
if api_name: |
|
logging.debug(f"Article_Summarizer: Summarization being performed by {api_name}") |
|
|
|
|
|
sanitized_title = sanitize_filename(title) |
|
json_file_path = os.path.join("Results", f"{sanitized_title}_segments.json") |
|
|
|
with open(json_file_path, 'w') as json_file: |
|
json.dump([{'text': content}], json_file, indent=2) |
|
|
|
try: |
|
if api_name.lower() == 'openai': |
|
openai_api_key = api_key if api_key else config.get('API', 'openai_api_key', fallback=None) |
|
logging.debug(f"Article_Summarizer: trying to summarize with openAI") |
|
summary = summarize_with_openai(openai_api_key, json_file_path, article_custom_prompt) |
|
elif api_name.lower() == "anthropic": |
|
anthropic_api_key = api_key if api_key else config.get('API', 'anthropic_api_key', fallback=None) |
|
logging.debug(f"Article_Summarizer: Trying to summarize with anthropic") |
|
summary = summarize_with_claude(anthropic_api_key, json_file_path, anthropic_model, |
|
custom_prompt_arg=article_custom_prompt) |
|
elif api_name.lower() == "cohere": |
|
cohere_api_key = api_key if api_key else config.get('API', 'cohere_api_key', fallback=None) |
|
logging.debug(f"Article_Summarizer: Trying to summarize with cohere") |
|
summary = summarize_with_cohere(cohere_api_key, json_file_path, cohere_model, |
|
custom_prompt_arg=article_custom_prompt) |
|
elif api_name.lower() == "groq": |
|
groq_api_key = api_key if api_key else config.get('API', 'groq_api_key', fallback=None) |
|
logging.debug(f"Article_Summarizer: Trying to summarize with Groq") |
|
summary = summarize_with_groq(groq_api_key, json_file_path, groq_model, |
|
custom_prompt_arg=article_custom_prompt) |
|
elif api_name.lower() == "llama": |
|
llama_token = api_key if api_key else config.get('API', 'llama_api_key', fallback=None) |
|
llama_ip = llama_api_IP |
|
logging.debug(f"Article_Summarizer: Trying to summarize with Llama.cpp") |
|
summary = summarize_with_llama(llama_ip, json_file_path, llama_token, article_custom_prompt) |
|
elif api_name.lower() == "kobold": |
|
kobold_token = api_key if api_key else config.get('API', 'kobold_api_key', fallback=None) |
|
kobold_ip = kobold_api_IP |
|
logging.debug(f"Article_Summarizer: Trying to summarize with kobold.cpp") |
|
summary = summarize_with_kobold(kobold_ip, json_file_path, kobold_token, article_custom_prompt) |
|
elif api_name.lower() == "ooba": |
|
ooba_token = api_key if api_key else config.get('API', 'ooba_api_key', fallback=None) |
|
ooba_ip = ooba_api_IP |
|
logging.debug(f"Article_Summarizer: Trying to summarize with oobabooga") |
|
summary = summarize_with_oobabooga(ooba_ip, json_file_path, ooba_token, article_custom_prompt) |
|
elif api_name.lower() == "tabbyapi": |
|
tabbyapi_key = api_key if api_key else config.get('API', 'tabby_api_key', fallback=None) |
|
tabbyapi_ip = tabby_api_IP |
|
logging.debug(f"Article_Summarizer: Trying to summarize with tabbyapi") |
|
tabby_model = llm_model |
|
summary = summarize_with_tabbyapi(tabbyapi_key, tabbyapi_ip, json_file_path, tabby_model, |
|
article_custom_prompt) |
|
elif api_name.lower() == "vllm": |
|
logging.debug(f"Article_Summarizer: Trying to summarize with VLLM") |
|
summary = summarize_with_vllm(vllm_api_url, vllm_api_key, llm_model, json_file_path, |
|
article_custom_prompt) |
|
elif api_name.lower() == "huggingface": |
|
huggingface_api_key = api_key if api_key else config.get('API', 'huggingface_api_key', fallback=None) |
|
logging.debug(f"Article_Summarizer: Trying to summarize with huggingface") |
|
summary = summarize_with_huggingface(huggingface_api_key, json_file_path, article_custom_prompt) |
|
except requests.exceptions.ConnectionError as e: |
|
logging.error(f"Connection error while trying to summarize with {api_name}: {str(e)}") |
|
|
|
if summary: |
|
logging.info(f"Article_Summarizer: Summary generated using {api_name} API") |
|
save_summary_to_file(summary, json_file_path) |
|
else: |
|
summary = "Summary not available" |
|
logging.warning(f"Failed to generate summary using {api_name} API") |
|
|
|
else: |
|
summary = "Article Summarization: No API provided for summarization." |
|
|
|
print(f"Summary: {summary}") |
|
|
|
|
|
ingestion_result = ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, |
|
article_custom_prompt) |
|
|
|
return f"Title: {title}\nAuthor: {author}\nSummary: {summary}\nIngestion Result: {ingestion_result}" |
|
|
|
|
|
def ingest_unstructured_text(text, custom_prompt, api_name, api_key, keywords, custom_article_title): |
|
title = custom_article_title.strip() if custom_article_title else "Unstructured Text" |
|
author = "Unknown" |
|
ingestion_date = datetime.now().strftime('%Y-%m-%d') |
|
|
|
|
|
if api_name: |
|
json_file_path = f"Results/{title.replace(' ', '_')}_segments.json" |
|
with open(json_file_path, 'w') as json_file: |
|
json.dump([{'text': text}], json_file, indent=2) |
|
|
|
if api_name.lower() == 'openai': |
|
summary = summarize_with_openai(api_key, json_file_path, custom_prompt) |
|
|
|
else: |
|
summary = "Unsupported API." |
|
else: |
|
summary = "No API provided for summarization." |
|
|
|
|
|
ingestion_result = ingest_article_to_db('Unstructured Text', title, author, text, keywords, summary, ingestion_date, |
|
custom_prompt) |
|
return f"Title: {title}\nSummary: {summary}\nIngestion Result: {ingestion_result}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_text_from_segments(segments): |
|
logging.debug(f"Main: extracting text from {segments}") |
|
text = ' '.join([segment['text'] for segment in segments]) |
|
logging.debug(f"Main: Successfully extracted text from {segments}") |
|
return text |
|
|
|
|
|
def summarize_with_openai(api_key, file_path, custom_prompt_arg): |
|
try: |
|
logging.debug("openai: Loading json data for summarization") |
|
with open(file_path, 'r') as file: |
|
segments = json.load(file) |
|
|
|
open_ai_model = openai_model or 'gpt-4-turbo' |
|
|
|
logging.debug("openai: Extracting text from the segments") |
|
text = extract_text_from_segments(segments) |
|
|
|
headers = { |
|
'Authorization': f'Bearer {api_key}', |
|
'Content-Type': 'application/json' |
|
} |
|
|
|
logging.debug(f"openai: API Key is: {api_key}") |
|
logging.debug("openai: Preparing data + prompt for submittal") |
|
openai_prompt = f"{text} \n\n\n\n{custom_prompt_arg}" |
|
data = { |
|
"model": open_ai_model, |
|
"messages": [ |
|
{ |
|
"role": "system", |
|
"content": "You are a professional summarizer." |
|
}, |
|
{ |
|
"role": "user", |
|
"content": openai_prompt |
|
} |
|
], |
|
"max_tokens": 8192, |
|
"temperature": 0.1 |
|
} |
|
logging.debug("openai: Posting request") |
|
response = requests.post('https://api.openai.com/v1/chat/completions', headers=headers, json=data) |
|
|
|
if response.status_code == 200: |
|
response_data = response.json() |
|
if 'choices' in response_data and len(response_data['choices']) > 0: |
|
summary = response_data['choices'][0]['message']['content'].strip() |
|
logging.debug("openai: Summarization successful") |
|
print("openai: Summarization successful.") |
|
return summary |
|
else: |
|
logging.warning("openai: Summary not found in the response data") |
|
return "openai: Summary not available" |
|
else: |
|
logging.debug("openai: Summarization failed") |
|
print("openai: Failed to process summary:", response.text) |
|
return "openai: Failed to process summary" |
|
except Exception as e: |
|
logging.debug("openai: Error in processing: %s", str(e)) |
|
print("openai: Error occurred while processing summary with openai:", str(e)) |
|
return "openai: Error occurred while processing summary" |
|
|
|
|
|
def summarize_with_claude(api_key, file_path, model, custom_prompt_arg, max_retries=3, retry_delay=5): |
|
try: |
|
logging.debug("anthropic: Loading JSON data") |
|
with open(file_path, 'r') as file: |
|
segments = json.load(file) |
|
|
|
logging.debug("anthropic: Extracting text from the segments file") |
|
text = extract_text_from_segments(segments) |
|
|
|
headers = { |
|
'x-api-key': api_key, |
|
'anthropic-version': '2023-06-01', |
|
'Content-Type': 'application/json' |
|
} |
|
|
|
anthropic_prompt = custom_prompt_arg |
|
logging.debug(f"anthropic: Prompt is {anthropic_prompt}") |
|
user_message = { |
|
"role": "user", |
|
"content": f"{text} \n\n\n\n{anthropic_prompt}" |
|
} |
|
|
|
data = { |
|
"model": model, |
|
"max_tokens": 4096, |
|
"messages": [user_message], |
|
"stop_sequences": ["\n\nHuman:"], |
|
"temperature": 0.1, |
|
"top_k": 0, |
|
"top_p": 1.0, |
|
"metadata": { |
|
"user_id": "example_user_id", |
|
}, |
|
"stream": False, |
|
"system": "You are a professional summarizer." |
|
} |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
logging.debug("anthropic: Posting request to API") |
|
response = requests.post('https://api.anthropic.com/v1/messages', headers=headers, json=data) |
|
|
|
|
|
if response.status_code == 200: |
|
logging.debug("anthropic: Post submittal successful") |
|
response_data = response.json() |
|
try: |
|
summary = response_data['content'][0]['text'].strip() |
|
logging.debug("anthropic: Summarization successful") |
|
print("Summary processed successfully.") |
|
return summary |
|
except (IndexError, KeyError) as e: |
|
logging.debug("anthropic: Unexpected data in response") |
|
print("Unexpected response format from Claude API:", response.text) |
|
return None |
|
elif response.status_code == 500: |
|
logging.debug("anthropic: Internal server error") |
|
print("Internal server error from API. Retrying may be necessary.") |
|
time.sleep(retry_delay) |
|
else: |
|
logging.debug( |
|
f"anthropic: Failed to summarize, status code {response.status_code}: {response.text}") |
|
print(f"Failed to process summary, status code {response.status_code}: {response.text}") |
|
return None |
|
|
|
except RequestException as e: |
|
logging.error(f"anthropic: Network error during attempt {attempt + 1}/{max_retries}: {str(e)}") |
|
if attempt < max_retries - 1: |
|
time.sleep(retry_delay) |
|
else: |
|
return f"anthropic: Network error: {str(e)}" |
|
|
|
except FileNotFoundError as e: |
|
logging.error(f"anthropic: File not found: {file_path}") |
|
return f"anthropic: File not found: {file_path}" |
|
except json.JSONDecodeError as e: |
|
logging.error(f"anthropic: Invalid JSON format in file: {file_path}") |
|
return f"anthropic: Invalid JSON format in file: {file_path}" |
|
except Exception as e: |
|
logging.error(f"anthropic: Error in processing: {str(e)}") |
|
return f"anthropic: Error occurred while processing summary with Anthropic: {str(e)}" |
|
|
|
|
|
|
|
def summarize_with_cohere(api_key, file_path, model, custom_prompt_arg): |
|
try: |
|
logging.debug("cohere: Loading JSON data") |
|
with open(file_path, 'r') as file: |
|
segments = json.load(file) |
|
|
|
logging.debug(f"cohere: Extracting text from segments file") |
|
text = extract_text_from_segments(segments) |
|
api_key = os.environ.get(COHERE_TOKEN) |
|
headers = { |
|
'accept': 'application/json', |
|
'content-type': 'application/json', |
|
'Authorization': f'Bearer {api_key}' |
|
} |
|
|
|
cohere_prompt = f"{text} \n\n\n\n{custom_prompt_arg}" |
|
logging.debug("cohere: Prompt being sent is {cohere_prompt}") |
|
|
|
data = { |
|
"chat_history": [ |
|
{"role": "USER", "message": cohere_prompt} |
|
], |
|
"message": "Please provide a summary.", |
|
"model": model, |
|
"connectors": [{"id": "web-search"}] |
|
} |
|
|
|
logging.debug("cohere: Submitting request to API endpoint") |
|
print("cohere: Submitting request to API endpoint") |
|
response = requests.post('https://api.cohere.ai/v1/chat', headers=headers, json=data) |
|
response_data = response.json() |
|
logging.debug("API Response Data: %s", response_data) |
|
|
|
if response.status_code == 200: |
|
if 'text' in response_data: |
|
summary = response_data['text'].strip() |
|
logging.debug("cohere: Summarization successful") |
|
print("Summary processed successfully.") |
|
return summary |
|
else: |
|
logging.error("Expected data not found in API response.") |
|
return "Expected data not found in API response." |
|
else: |
|
logging.error(f"cohere: API request failed with status code {response.status_code}: {response.text}") |
|
print(f"Failed to process summary, status code {response.status_code}: {response.text}") |
|
return f"cohere: API request failed: {response.text}" |
|
|
|
except Exception as e: |
|
logging.error("cohere: Error in processing: %s", str(e)) |
|
return f"cohere: Error occurred while processing summary with Cohere: {str(e)}" |
|
|
|
|
|
|
|
def summarize_with_groq(api_key, file_path, model, custom_prompt_arg): |
|
try: |
|
logging.debug("groq: Loading JSON data") |
|
with open(file_path, 'r') as file: |
|
segments = json.load(file) |
|
|
|
logging.debug(f"groq: Extracting text from segments file") |
|
text = extract_text_from_segments(segments) |
|
|
|
headers = { |
|
'Authorization': f'Bearer {api_key}', |
|
'Content-Type': 'application/json' |
|
} |
|
|
|
groq_prompt = f"{text} \n\n\n\n{custom_prompt_arg}" |
|
logging.debug("groq: Prompt being sent is {groq_prompt}") |
|
|
|
data = { |
|
"messages": [ |
|
{ |
|
"role": "user", |
|
"content": groq_prompt |
|
} |
|
], |
|
"model": model |
|
} |
|
|
|
logging.debug("groq: Submitting request to API endpoint") |
|
print("groq: Submitting request to API endpoint") |
|
response = requests.post('https://api.groq.com/openai/v1/chat/completions', headers=headers, json=data) |
|
|
|
response_data = response.json() |
|
logging.debug("API Response Data: %s", response_data) |
|
|
|
if response.status_code == 200: |
|
if 'choices' in response_data and len(response_data['choices']) > 0: |
|
summary = response_data['choices'][0]['message']['content'].strip() |
|
logging.debug("groq: Summarization successful") |
|
print("Summarization successful.") |
|
return summary |
|
else: |
|
logging.error("Expected data not found in API response.") |
|
return "Expected data not found in API response." |
|
else: |
|
logging.error(f"groq: API request failed with status code {response.status_code}: {response.text}") |
|
return f"groq: API request failed: {response.text}" |
|
|
|
except Exception as e: |
|
logging.error("groq: Error in processing: %s", str(e)) |
|
return f"groq: Error occurred while processing summary with groq: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
def summarize_with_local_llm(file_path, custom_prompt_arg): |
|
try: |
|
logging.debug("Local LLM: Loading json data for summarization") |
|
with open(file_path, 'r') as file: |
|
segments = json.load(file) |
|
|
|
logging.debug("Local LLM: Extracting text from the segments") |
|
text = extract_text_from_segments(segments) |
|
|
|
headers = { |
|
'Content-Type': 'application/json' |
|
} |
|
|
|
logging.debug("Local LLM: Preparing data + prompt for submittal") |
|
local_llm_prompt = f"{text} \n\n\n\n{custom_prompt_arg}" |
|
data = { |
|
"messages": [ |
|
{ |
|
"role": "system", |
|
"content": "You are a professional summarizer." |
|
}, |
|
{ |
|
"role": "user", |
|
"content": local_llm_prompt |
|
} |
|
], |
|
"max_tokens": 28000, |
|
} |
|
logging.debug("Local LLM: Posting request") |
|
response = requests.post('http://127.0.0.1:8080/v1/chat/completions', headers=headers, json=data) |
|
|
|
if response.status_code == 200: |
|
response_data = response.json() |
|
if 'choices' in response_data and len(response_data['choices']) > 0: |
|
summary = response_data['choices'][0]['message']['content'].strip() |
|
logging.debug("Local LLM: Summarization successful") |
|
print("Local LLM: Summarization successful.") |
|
return summary |
|
else: |
|
logging.warning("Local LLM: Summary not found in the response data") |
|
return "Local LLM: Summary not available" |
|
else: |
|
logging.debug("Local LLM: Summarization failed") |
|
print("Local LLM: Failed to process summary:", response.text) |
|
return "Local LLM: Failed to process summary" |
|
except Exception as e: |
|
logging.debug("Local LLM: Error in processing: %s", str(e)) |
|
print("Error occurred while processing summary with Local LLM:", str(e)) |
|
return "Local LLM: Error occurred while processing summary" |
|
|
|
def summarize_with_llama(api_url, file_path, token, custom_prompt): |
|
try: |
|
logging.debug("llama: Loading JSON data") |
|
with open(file_path, 'r') as file: |
|
segments = json.load(file) |
|
|
|
logging.debug(f"llama: Extracting text from segments file") |
|
text = extract_text_from_segments(segments) |
|
|
|
headers = { |
|
'accept': 'application/json', |
|
'content-type': 'application/json', |
|
} |
|
if len(token) > 5: |
|
headers['Authorization'] = f'Bearer {token}' |
|
|
|
llama_prompt = f"{text} \n\n\n\n{custom_prompt}" |
|
logging.debug("llama: Prompt being sent is {llama_prompt}") |
|
|
|
data = { |
|
"prompt": llama_prompt |
|
} |
|
|
|
logging.debug("llama: Submitting request to API endpoint") |
|
print("llama: Submitting request to API endpoint") |
|
response = requests.post(api_url, headers=headers, json=data) |
|
response_data = response.json() |
|
logging.debug("API Response Data: %s", response_data) |
|
|
|
if response.status_code == 200: |
|
|
|
logging.debug(response_data) |
|
summary = response_data['content'].strip() |
|
logging.debug("llama: Summarization successful") |
|
print("Summarization successful.") |
|
return summary |
|
else: |
|
logging.error(f"llama: API request failed with status code {response.status_code}: {response.text}") |
|
return f"llama: API request failed: {response.text}" |
|
|
|
except Exception as e: |
|
logging.error("llama: Error in processing: %s", str(e)) |
|
return f"llama: Error occurred while processing summary with llama: {str(e)}" |
|
|
|
|
|
|
|
def summarize_with_kobold(api_url, file_path, kobold_api_token, custom_prompt): |
|
try: |
|
logging.debug("kobold: Loading JSON data") |
|
with open(file_path, 'r') as file: |
|
segments = json.load(file) |
|
|
|
logging.debug(f"kobold: Extracting text from segments file") |
|
text = extract_text_from_segments(segments) |
|
|
|
headers = { |
|
'accept': 'application/json', |
|
'content-type': 'application/json', |
|
} |
|
|
|
kobold_prompt = f"{text} \n\n\n\n{custom_prompt}" |
|
logging.debug("kobold: Prompt being sent is {kobold_prompt}") |
|
|
|
|
|
|
|
data = { |
|
"max_context_length": 8096, |
|
"max_length": 4096, |
|
"prompt": kobold_prompt, |
|
} |
|
|
|
logging.debug("kobold: Submitting request to API endpoint") |
|
print("kobold: Submitting request to API endpoint") |
|
response = requests.post(api_url, headers=headers, json=data) |
|
response_data = response.json() |
|
logging.debug("kobold: API Response Data: %s", response_data) |
|
|
|
if response.status_code == 200: |
|
if 'results' in response_data and len(response_data['results']) > 0: |
|
summary = response_data['results'][0]['text'].strip() |
|
logging.debug("kobold: Summarization successful") |
|
print("Summarization successful.") |
|
return summary |
|
else: |
|
logging.error("Expected data not found in API response.") |
|
return "Expected data not found in API response." |
|
else: |
|
logging.error(f"kobold: API request failed with status code {response.status_code}: {response.text}") |
|
return f"kobold: API request failed: {response.text}" |
|
|
|
except Exception as e: |
|
logging.error("kobold: Error in processing: %s", str(e)) |
|
return f"kobold: Error occurred while processing summary with kobold: {str(e)}" |
|
|
|
|
|
|
|
def summarize_with_oobabooga(api_url, file_path, ooba_api_token, custom_prompt): |
|
try: |
|
logging.debug("ooba: Loading JSON data") |
|
with open(file_path, 'r') as file: |
|
segments = json.load(file) |
|
|
|
logging.debug(f"ooba: Extracting text from segments file\n\n\n") |
|
text = extract_text_from_segments(segments) |
|
logging.debug(f"ooba: Finished extracting text from segments file") |
|
|
|
headers = { |
|
'accept': 'application/json', |
|
'content-type': 'application/json', |
|
} |
|
|
|
|
|
|
|
|
|
ooba_prompt = "{text}\n\n\n\n{custom_prompt}" |
|
logging.debug("ooba: Prompt being sent is {ooba_prompt}") |
|
|
|
data = { |
|
"mode": "chat", |
|
"character": "Example", |
|
"messages": [{"role": "user", "content": ooba_prompt}] |
|
} |
|
|
|
logging.debug("ooba: Submitting request to API endpoint") |
|
print("ooba: Submitting request to API endpoint") |
|
response = requests.post(api_url, headers=headers, json=data, verify=False) |
|
logging.debug("ooba: API Response Data: %s", response) |
|
|
|
if response.status_code == 200: |
|
response_data = response.json() |
|
summary = response.json()['choices'][0]['message']['content'] |
|
logging.debug("ooba: Summarization successful") |
|
print("Summarization successful.") |
|
return summary |
|
else: |
|
logging.error(f"oobabooga: API request failed with status code {response.status_code}: {response.text}") |
|
return f"ooba: API request failed with status code {response.status_code}: {response.text}" |
|
|
|
except Exception as e: |
|
logging.error("ooba: Error in processing: %s", str(e)) |
|
return f"ooba: Error occurred while processing summary with oobabooga: {str(e)}" |
|
|
|
|
|
|
|
def summarize_with_vllm(vllm_api_url, vllm_api_key_function_arg, llm_model, text, vllm_custom_prompt_function_arg): |
|
vllm_client = OpenAI( |
|
base_url=vllm_api_url, |
|
api_key=vllm_api_key_function_arg |
|
) |
|
|
|
custom_prompt = vllm_custom_prompt_function_arg |
|
|
|
completion = client.chat.completions.create( |
|
model=llm_model, |
|
messages=[ |
|
{"role": "system", "content": "You are a professional summarizer."}, |
|
{"role": "user", "content": f"{text} \n\n\n\n{custom_prompt}"} |
|
] |
|
) |
|
vllm_summary = completion.choices[0].message.content |
|
return vllm_summary |
|
|
|
|
|
|
|
def summarize_with_tabbyapi(tabby_api_key, tabby_api_IP, text, tabby_model, custom_prompt): |
|
model = tabby_model |
|
headers = { |
|
'Authorization': f'Bearer {tabby_api_key}', |
|
'Content-Type': 'application/json' |
|
} |
|
data = { |
|
'text': text, |
|
'model': 'tabby' |
|
} |
|
try: |
|
response = requests.post('https://api.tabbyapi.com/summarize', headers=headers, json=data) |
|
response.raise_for_status() |
|
summary = response.json().get('summary', '') |
|
return summary |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error summarizing with TabbyAPI: {e}") |
|
return "Error summarizing with TabbyAPI." |
|
|
|
|
|
def save_summary_to_file(summary, file_path): |
|
logging.debug("Now saving summary to file...") |
|
summary_file_path = file_path.replace('.segments.json', '_summary.txt') |
|
logging.debug("Opening summary file for writing, *segments.json with *_summary.txt") |
|
with open(summary_file_path, 'w') as file: |
|
file.write(summary) |
|
logging.info(f"Summary saved to file: {summary_file_path}") |
|
|
|
|
|
summarizers: Dict[str, Callable[[str, str], str]] = { |
|
'tabbyapi': summarize_with_tabbyapi, |
|
'openai': summarize_with_openai, |
|
'anthropic': summarize_with_claude, |
|
'cohere': summarize_with_cohere, |
|
'groq': summarize_with_groq, |
|
'llama': summarize_with_llama, |
|
'kobold': summarize_with_kobold, |
|
'oobabooga': summarize_with_oobabooga |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def summarize_with_detail_openai(text, detail, verbose=False): |
|
summary_with_detail_variable = rolling_summarize(text, detail=detail, verbose=True) |
|
print(len(openai_tokenize(summary_with_detail_variable))) |
|
return summary_with_detail_variable |
|
|
|
|
|
def summarize_with_detail_recursive_openai(text, detail, verbose=False): |
|
summary_with_recursive_summarization = rolling_summarize(text, detail=detail, summarize_recursively=True) |
|
print(summary_with_recursive_summarization) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def summarize_with_huggingface(huggingface_api_key, json_file_path, custom_prompt): |
|
logging.debug(f"huggingface: Summarization process starting...") |
|
client = InferenceClient() |
|
|
|
|
|
model = "CohereForAI/c4ai-command-r-plus" |
|
API_URL = f"https://api-inference.huggingface.co/models/{model}" |
|
headers = {"Authorization": f"Bearer {huggingface_api_key}"} |
|
|
|
client = InferenceClient(model=f"{model}", token=f"{huggingface_api_key}") |
|
|
|
response = client.post(json={"inputs": "The goal of life is [MASK]."}, model="bert-base-uncased") |
|
|
|
with open(json_file_path, 'r') as file: |
|
segments = json.load(file) |
|
text = ''.join([segment['text'] for segment in segments]) |
|
|
|
hf_prompt = text + "\n\n\n\n" + custom_prompt |
|
|
|
if huggingface_api_key == "": |
|
api_key = os.getenv(HF_TOKEN) |
|
logging.debug("HUGGINGFACE API KEY CHECK: " + huggingface_api_key) |
|
try: |
|
logging.debug("huggingface: Loading json data for summarization") |
|
with open(json_file_path, 'r') as file: |
|
segments = json.load(file) |
|
|
|
logging.debug("huggingface: Extracting text from the segments") |
|
text = ' '.join([segment['text'] for segment in segments]) |
|
|
|
|
|
logging.debug("HUGGINGFACE API KEY CHECK #2: " + huggingface_api_key) |
|
|
|
logging.debug("huggingface: Submitting request...") |
|
response = client.text_generation(prompt=hf_prompt, max_new_tokens=4096) |
|
if response is not None: |
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
logging.error(f"huggingface: Summarization failed with status code {response}") |
|
return f"Failed to process summary, huggingface library error: {response}" |
|
except Exception as e: |
|
logging.error("huggingface: Error in processing: %s", str(e)) |
|
print(f"Error occurred while processing summary with huggingface: {str(e)}") |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_transcription(transcription_result): |
|
if transcription_result: |
|
json_data = transcription_result['transcription'] |
|
return json.dumps(json_data, indent=2) |
|
else: |
|
return "" |
|
|
|
|
|
def format_file_path(file_path, fallback_path=None): |
|
if file_path and os.path.exists(file_path): |
|
logging.debug(f"File exists: {file_path}") |
|
return file_path |
|
elif fallback_path and os.path.exists(fallback_path): |
|
logging.debug(f"File does not exist: {file_path}. Returning fallback path: {fallback_path}") |
|
return fallback_path |
|
else: |
|
logging.debug(f"File does not exist: {file_path}. No fallback path available.") |
|
return None |
|
|
|
|
|
def search_media(query, fields, keyword, page): |
|
try: |
|
results = search_and_display(query, fields, keyword, page) |
|
return results |
|
except Exception as e: |
|
logger.error(f"Error searching media: {e}") |
|
return str(e) |
|
|
|
|
|
|
|
def ask_question(transcription, question, api_name, api_key): |
|
if not question.strip(): |
|
return "Please enter a question." |
|
|
|
prompt = f"""Transcription:\n{transcription} |
|
|
|
Given the above transcription, please answer the following:\n\n{question}""" |
|
|
|
|
|
|
|
|
|
if api_name.lower() == "openai": |
|
openai_api_key = api_key if api_key else config.get('API', 'openai_api_key', fallback=None) |
|
headers = { |
|
'Authorization': f'Bearer {openai_api_key}', |
|
'Content-Type': 'application/json' |
|
} |
|
if openai_model: |
|
pass |
|
else: |
|
openai_model = 'gpt-4-turbo' |
|
data = { |
|
"model": openai_model, |
|
"messages": [ |
|
{ |
|
"role": "system", |
|
"content": "You are a helpful assistant that answers questions based on the given " |
|
"transcription and summary." |
|
}, |
|
{ |
|
"role": "user", |
|
"content": prompt |
|
} |
|
], |
|
"max_tokens": 150000, |
|
"temperature": 0.1 |
|
} |
|
response = requests.post('https://api.openai.com/v1/chat/completions', headers=headers, json=data) |
|
|
|
if response.status_code == 200: |
|
answer = response.json()['choices'][0]['message']['content'].strip() |
|
return answer |
|
else: |
|
return "Failed to process the question." |
|
else: |
|
return "Question answering is currently only supported with the OpenAI API." |
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
def launch_ui(demo_mode=False): |
|
whisper_models = ["small.en", "medium.en", "large"] |
|
|
|
with gr.Blocks() as iface: |
|
|
|
with gr.Tab("Audio Transcription + Summarization"): |
|
|
|
with gr.Row(): |
|
|
|
theme_toggle = gr.Radio(choices=["Light", "Dark"], value="Light", |
|
label="Light/Dark Mode Toggle (Toggle to change UI color scheme)") |
|
|
|
|
|
ui_mode_toggle = gr.Radio(choices=["Simple", "Advanced"], value="Simple", |
|
label="UI Mode (Toggle to show all options)") |
|
|
|
|
|
url_input = gr.Textbox(label="URL (Mandatory)", placeholder="Enter the video URL here") |
|
|
|
|
|
num_speakers_input = gr.Number(value=2, label="Number of Speakers(Optional - Currently has no effect)", |
|
visible=False) |
|
whisper_model_input = gr.Dropdown(choices=whisper_models, value="small.en", |
|
label="Whisper Model(This is the ML model used for transcription.)", |
|
visible=False) |
|
custom_prompt_input = gr.Textbox( |
|
label="Custom Prompt (Customize your summarization, or ask a question about the video and have it " |
|
"answered)", |
|
placeholder="Above is the transcript of a video. Please read " |
|
"through the transcript carefully. Identify the main topics that are discussed over the " |
|
"course of the transcript. Then, summarize the key points about each main topic in a " |
|
"concise bullet point. The bullet points should cover the key information conveyed about " |
|
"each topic in the video, but should be much shorter than the full transcript. Please " |
|
"output your bullet point summary inside <bulletpoints> tags.", |
|
lines=3, visible=True) |
|
offset_input = gr.Number(value=0, label="Offset (Seconds into the video to start transcribing at)", |
|
visible=False) |
|
api_name_input = gr.Dropdown( |
|
choices=[None, "Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "Llama.cpp", "Kobold", "Ooba", "HuggingFace"], |
|
value=None, |
|
label="(Optional) The LLM endpoint to have summarize your request. If you're running a local model, select 'Local-LLM'", |
|
visible=True) |
|
api_key_input = gr.Textbox(label="API Key (Mandatory unless you're running a local model/server/no API selected)", |
|
placeholder="Enter your API key here; Ignore if using Local API or Built-in API('Local-LLM')", |
|
visible=True) |
|
vad_filter_input = gr.Checkbox(label="VAD Filter (WIP)", value=False, |
|
visible=False) |
|
rolling_summarization_input = gr.Checkbox(label="Enable Rolling Summarization", value=False, |
|
visible=False) |
|
download_video_input = gr.components.Checkbox(label="Download Video(Select to allow for file download of " |
|
"selected video)", value=False, visible=False) |
|
download_audio_input = gr.components.Checkbox(label="Download Audio(Select to allow for file download of " |
|
"selected Video's Audio)", value=False, visible=False) |
|
detail_level_input = gr.Slider(minimum=0.01, maximum=1.0, value=0.01, step=0.01, interactive=True, |
|
label="Summary Detail Level (Slide me) (Only OpenAI currently supported)", |
|
visible=False) |
|
keywords_input = gr.Textbox(label="Keywords", placeholder="Enter keywords here (comma-separated Example: " |
|
"tag_one,tag_two,tag_three)", |
|
value="default,no_keyword_set", |
|
visible=True) |
|
question_box_input = gr.Textbox(label="Question", |
|
placeholder="Enter a question to ask about the transcription", |
|
visible=False) |
|
chunk_summarization_input = gr.Checkbox(label="Time-based Chunk Summarization", |
|
value=False, |
|
visible=False) |
|
chunk_duration_input = gr.Number(label="Chunk Duration (seconds)", value=DEFAULT_CHUNK_DURATION, |
|
visible=False) |
|
words_per_second_input = gr.Number(label="Words per Second", value=WORDS_PER_SECOND, |
|
visible=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
inputs = [ |
|
num_speakers_input, whisper_model_input, custom_prompt_input, offset_input, api_name_input, |
|
api_key_input, vad_filter_input, download_video_input, download_audio_input, |
|
rolling_summarization_input, detail_level_input, question_box_input, keywords_input, |
|
chunk_summarization_input, chunk_duration_input, words_per_second_input |
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
outputs = [ |
|
gr.Textbox(label="Transcription (Resulting Transcription from your input URL)"), |
|
gr.Textbox(label="Summary or Status Message (Current status of Summary or Summary itself)"), |
|
gr.File(label="Download Transcription as JSON (Download the Transcription as a file)"), |
|
gr.File(label="Download Summary as Text (Download the Summary as a file)"), |
|
gr.File(label="Download Video (Download the Video as a file)", visible=False), |
|
gr.File(label="Download Audio (Download the Audio as a file)", visible=False), |
|
] |
|
|
|
def toggle_light(mode): |
|
if mode == "Dark": |
|
return """ |
|
<style> |
|
body { |
|
background-color: #1c1c1c; |
|
color: #ffffff; |
|
} |
|
.gradio-container { |
|
background-color: #1c1c1c; |
|
color: #ffffff; |
|
} |
|
.gradio-button { |
|
background-color: #4c4c4c; |
|
color: #ffffff; |
|
} |
|
.gradio-input { |
|
background-color: #4c4c4c; |
|
color: #ffffff; |
|
} |
|
.gradio-dropdown { |
|
background-color: #4c4c4c; |
|
color: #ffffff; |
|
} |
|
.gradio-slider { |
|
background-color: #4c4c4c; |
|
} |
|
.gradio-checkbox { |
|
background-color: #4c4c4c; |
|
} |
|
.gradio-radio { |
|
background-color: #4c4c4c; |
|
} |
|
.gradio-textbox { |
|
background-color: #4c4c4c; |
|
color: #ffffff; |
|
} |
|
.gradio-label { |
|
color: #ffffff; |
|
} |
|
</style> |
|
""" |
|
else: |
|
return """ |
|
<style> |
|
body { |
|
background-color: #ffffff; |
|
color: #000000; |
|
} |
|
.gradio-container { |
|
background-color: #ffffff; |
|
color: #000000; |
|
} |
|
.gradio-button { |
|
background-color: #f0f0f0; |
|
color: #000000; |
|
} |
|
.gradio-input { |
|
background-color: #f0f0f0; |
|
color: #000000; |
|
} |
|
.gradio-dropdown { |
|
background-color: #f0f0f0; |
|
color: #000000; |
|
} |
|
.gradio-slider { |
|
background-color: #f0f0f0; |
|
} |
|
.gradio-checkbox { |
|
background-color: #f0f0f0; |
|
} |
|
.gradio-radio { |
|
background-color: #f0f0f0; |
|
} |
|
.gradio-textbox { |
|
background-color: #f0f0f0; |
|
color: #000000; |
|
} |
|
.gradio-label { |
|
color: #000000; |
|
} |
|
</style> |
|
""" |
|
|
|
|
|
theme_toggle.change(fn=toggle_light, inputs=theme_toggle, outputs=gr.HTML()) |
|
|
|
|
|
def toggle_ui(mode): |
|
visible = (mode == "Advanced") |
|
return [ |
|
gr.update(visible=True) if i in [0, 3, 5, 6, 13] else gr.update(visible=visible) |
|
for i in range(len(inputs)) |
|
] |
|
|
|
|
|
ui_mode_toggle.change(fn=toggle_ui, inputs=ui_mode_toggle, outputs=inputs) |
|
|
|
|
|
all_inputs = [url_input] + inputs |
|
|
|
gr.Interface( |
|
fn=process_url, |
|
inputs=all_inputs, |
|
outputs=outputs, |
|
title="Video Transcription and Summarization", |
|
description="Submit a video URL for transcription and summarization. Ensure you input all necessary " |
|
"information including API keys." |
|
) |
|
|
|
|
|
with gr.Tab("Scrape & Summarize Articles/Websites"): |
|
url_input = gr.Textbox(label="Article URL", placeholder="Enter the article URL here") |
|
custom_article_title_input = gr.Textbox(label="Custom Article Title (Optional)", |
|
placeholder="Enter a custom title for the article") |
|
custom_prompt_input = gr.Textbox( |
|
label="Custom Prompt (Optional)", |
|
placeholder="Provide a custom prompt for summarization", |
|
lines=3 |
|
) |
|
api_name_input = gr.Dropdown( |
|
choices=[None, "huggingface", "openai", "anthropic", "cohere", "groq", "llama", "kobold", "ooba"], |
|
value=None, |
|
label="API Name (Mandatory for Summarization)" |
|
) |
|
api_key_input = gr.Textbox(label="API Key (Mandatory if API Name is specified)", |
|
placeholder="Enter your API key here; Ignore if using Local API or Built-in API") |
|
keywords_input = gr.Textbox(label="Keywords", placeholder="Enter keywords here (comma-separated)", |
|
value="default,no_keyword_set", visible=True) |
|
|
|
scrape_button = gr.Button("Scrape and Summarize") |
|
result_output = gr.Textbox(label="Result") |
|
|
|
scrape_button.click(scrape_and_summarize, inputs=[url_input, custom_prompt_input, api_name_input, |
|
api_key_input, keywords_input, |
|
custom_article_title_input], outputs=result_output) |
|
|
|
gr.Markdown("### Or Paste Unstructured Text Below (Will use settings from above)") |
|
text_input = gr.Textbox(label="Unstructured Text", placeholder="Paste unstructured text here", lines=10) |
|
text_ingest_button = gr.Button("Ingest Unstructured Text") |
|
text_ingest_result = gr.Textbox(label="Result") |
|
|
|
text_ingest_button.click(ingest_unstructured_text, |
|
inputs=[text_input, custom_prompt_input, api_name_input, api_key_input, |
|
keywords_input, custom_article_title_input], outputs=text_ingest_result) |
|
|
|
with gr.Tab("Ingest & Summarize Documents"): |
|
gr.Markdown("Plan to put ingestion form for documents here") |
|
gr.Markdown("Will ingest documents and store into SQLite DB") |
|
gr.Markdown("RAG here we come....:/") |
|
|
|
with gr.Tab("Sample Prompts/Questions"): |
|
gr.Markdown("Plan to put Sample prompts/questions here") |
|
gr.Markdown("Fabric prompts/live UI?") |
|
|
|
with gr.Row(): |
|
search_box = gr.Textbox(label="Search prompts", placeholder="Type to filter prompts") |
|
search_result = gr.Textbox(label="Matching prompts", interactive=False) |
|
search_box.change(search_prompts, inputs=search_box, outputs=search_result) |
|
|
|
|
|
with gr.Row(): |
|
prompt_selector = gr.Radio(choices=all_prompts, label="Select a prompt") |
|
selected_output = gr.Textbox(label="Selected prompt") |
|
prompt_selector.change(handle_prompt_selection, inputs=prompt_selector, outputs=selected_output) |
|
|
|
|
|
with gr.Accordion("Category 1"): |
|
gr.Markdown("\n".join(prompts_category_1)) |
|
with gr.Accordion("Category 2"): |
|
gr.Markdown("\n".join(prompts_category_2)) |
|
|
|
|
|
search_tab = gr.Interface( |
|
fn=search_and_display, |
|
inputs=[ |
|
gr.Textbox(label="Search Query", placeholder="Enter your search query here..."), |
|
gr.CheckboxGroup(label="Search Fields", choices=["Title", "Content", "URL", "Type", "Author"], |
|
value=["Title"]), |
|
gr.Textbox(label="Keyword", placeholder="Enter keywords here..."), |
|
gr.Number(label="Page", value=1, precision=0), |
|
gr.Checkbox(visible=False) |
|
], |
|
outputs=[ |
|
gr.Dataframe(label="Search Results"), |
|
gr.Textbox(label="Message", visible=False) |
|
], |
|
title="Search Media Summaries", |
|
description="Search for media (documents, videos, articles) and their summaries in the database. Use keywords for better filtering.", |
|
allow_flagging="never" |
|
) |
|
|
|
export_tab = gr.Interface( |
|
fn=export_to_csv, |
|
inputs=[ |
|
gr.Textbox(label="Search Query", placeholder="Enter your search query here..."), |
|
gr.CheckboxGroup(label="Search Fields", choices=["Title", "Content"], value=["Title"]), |
|
gr.Textbox(label="Keyword (Match ALL, can use multiple keywords, separated by ',' (comma) )", |
|
placeholder="Enter keywords here..."), |
|
gr.Number(label="Page", value=1, precision=0), |
|
gr.Number(label="Results per File", value=1000, precision=0) |
|
], |
|
outputs="text", |
|
title="Export Search Results to CSV", |
|
description="Export the search results to a CSV file." |
|
) |
|
|
|
keyword_add_interface = gr.Interface( |
|
fn=add_keyword, |
|
inputs=gr.Textbox(label="Add Keywords (comma-separated)", placeholder="Enter keywords here..."), |
|
outputs="text", |
|
title="Add Keywords", |
|
description="Add one, or multiple keywords to the database.", |
|
allow_flagging="never" |
|
) |
|
|
|
keyword_delete_interface = gr.Interface( |
|
fn=delete_keyword, |
|
inputs=gr.Textbox(label="Delete Keyword", placeholder="Enter keyword to delete here..."), |
|
outputs="text", |
|
title="Delete Keyword", |
|
description="Delete a keyword from the database.", |
|
allow_flagging="never" |
|
) |
|
|
|
keyword_tab = gr.TabbedInterface( |
|
[keyword_add_interface, keyword_delete_interface], |
|
["Add Keywords", "Delete Keywords"] |
|
) |
|
|
|
|
|
tabbed_interface = gr.TabbedInterface([iface, search_tab, export_tab, keyword_tab], |
|
["Transcription + Summarization", "Search", "Export", "Keywords"]) |
|
|
|
|
|
server_port_variable = 7860 |
|
if server_mode: |
|
tabbed_interface.launch(share=True, server_port=server_port_variable, server_name="http://0.0.0.0") |
|
elif share_public: |
|
tabbed_interface.launch(share=True,) |
|
else: |
|
tabbed_interface.launch(share=False,) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
prompts_category_1 = [ |
|
"What are the key points discussed in the video?", |
|
"Summarize the main arguments made by the speaker.", |
|
"Describe the conclusions of the study presented." |
|
] |
|
|
|
prompts_category_2 = [ |
|
"How does the proposed solution address the problem?", |
|
"What are the implications of the findings?", |
|
"Can you explain the theory behind the observed phenomenon?" |
|
] |
|
|
|
all_prompts = prompts_category_1 + prompts_category_2 |
|
|
|
|
|
|
|
def search_prompts(query): |
|
filtered_prompts = [prompt for prompt in all_prompts if query.lower() in prompt.lower()] |
|
return "\n".join(filtered_prompts) |
|
|
|
|
|
|
|
def handle_prompt_selection(prompt): |
|
return f"You selected: {prompt}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def download_latest_llamafile(repo, asset_name_prefix, output_filename): |
|
|
|
global local_llm_model, llamafile |
|
|
|
print("Checking for and downloading Llamafile it it doesn't already exist...") |
|
if os.path.exists(output_filename): |
|
time.sleep(1) |
|
print("Llamafile already exists. Skipping download.") |
|
logging.debug(f"{output_filename} already exists. Skipping download.") |
|
time.sleep(1) |
|
llamafile = output_filename |
|
llamafile_exists = True |
|
else: |
|
llamafile_exists = False |
|
|
|
if llamafile_exists == True: |
|
pass |
|
else: |
|
|
|
latest_release_url = f"https://api.github.com/repos/{repo}/releases/latest" |
|
response = requests.get(latest_release_url) |
|
if response.status_code != 200: |
|
raise Exception(f"Failed to fetch latest release info: {response.status_code}") |
|
|
|
latest_release_data = response.json() |
|
tag_name = latest_release_data['tag_name'] |
|
|
|
|
|
release_details_url = f"https://api.github.com/repos/{repo}/releases/tags/{tag_name}" |
|
response = requests.get(release_details_url) |
|
if response.status_code != 200: |
|
raise Exception(f"Failed to fetch release details for tag {tag_name}: {response.status_code}") |
|
|
|
release_data = response.json() |
|
assets = release_data.get('assets', []) |
|
|
|
|
|
asset_url = None |
|
for asset in assets: |
|
if re.match(f"{asset_name_prefix}.*", asset['name']): |
|
asset_url = asset['browser_download_url'] |
|
break |
|
|
|
if not asset_url: |
|
raise Exception(f"No asset found with prefix {asset_name_prefix}") |
|
|
|
|
|
response = requests.get(asset_url) |
|
if response.status_code != 200: |
|
raise Exception(f"Failed to download asset: {response.status_code}") |
|
|
|
print("Llamafile downloaded successfully.") |
|
logging.debug("Main: Llamafile downloaded successfully.") |
|
|
|
|
|
with open(output_filename, 'wb') as file: |
|
file.write(response.content) |
|
|
|
logging.debug(f"Downloaded {output_filename} from {asset_url}") |
|
print(f"Downloaded {output_filename} from {asset_url}") |
|
|
|
|
|
print("Checking for and downloading LLM from Huggingface if needed...") |
|
logging.debug("Main: Checking and downloading LLM from Huggingface if needed...") |
|
mistral_7b_instruct_v0_2_q8_0_llamafile = "mistral-7b-instruct-v0.2.Q8_0.llamafile" |
|
Samantha_Mistral_Instruct_7B_Bulleted_Notes_Q8 = "samantha-mistral-instruct-7b-bulleted-notes.Q8_0.gguf" |
|
Phi_3_mini_4k_instruct_Q8_0_llamafile = "Phi-3-mini-4k-instruct.Q8_0.llamafile" |
|
meta_Llama_3_8B_Instruct_Q8_0_llamafile = 'Meta-Llama-3-8B-Instruct.Q8_0.llamafile' |
|
|
|
available_models = [] |
|
|
|
|
|
if os.path.exists(mistral_7b_instruct_v0_2_q8_0_llamafile): |
|
available_models.append(mistral_7b_instruct_v0_2_q8_0_llamafile) |
|
print("Mistral-7B-Instruct-v0.2.Q8_0.llamafile already exists. Skipping download.") |
|
if os.path.exists(Samantha_Mistral_Instruct_7B_Bulleted_Notes_Q8): |
|
available_models.append(Samantha_Mistral_Instruct_7B_Bulleted_Notes_Q8) |
|
print("Samantha-Mistral-Instruct-7B-Bulleted-Notes-Q8_0.gguf already exists. Skipping download.") |
|
if os.path.exists(Phi_3_mini_4k_instruct_Q8_0_llamafile): |
|
available_models.append(Phi_3_mini_4k_instruct_Q8_0_llamafile) |
|
print("Phi-3-mini-4k-instruct-Q8_0.llamafile already exists. Skipping download.") |
|
if os.path.exists(meta_Llama_3_8B_Instruct_Q8_0_llamafile): |
|
available_models.append(meta_Llama_3_8B_Instruct_Q8_0_llamafile) |
|
print("Meta-Llama-3-8B-Instruct.Q8_0.llamafile already exists. Skipping download.") |
|
|
|
|
|
if not available_models: |
|
user_choice_main = input("Would you like to download an LLM model? (Y/N): ") |
|
elif available_models: |
|
user_choice_main = input("\nSeems you already have a model available, would you like to download another LLM model? (Y/N): ") |
|
|
|
|
|
if user_choice_main.lower() == "y": |
|
logging.debug("Main: Checking and downloading LLM from Huggingface if needed...") |
|
time.sleep(1) |
|
dl_check = input("Final chance to back out, hit 'N'/'n' to cancel, or 'Y'/'y' to continue: ") |
|
if dl_check.lower == "n" or "2": |
|
exit() |
|
else: |
|
llm_choice = input("\nWhich LLM model would you like to download?\n\n1. Mistral-7B-Instruct-v0.2-GGUF \n2. Samantha-Mistral-Instruct-7B-Bulleted-Notes) \n3. Microsoft Phi3-Mini-128k 3.8B): \n\nPress '1', '2', or '3' to specify:\n\n ") |
|
while llm_choice != "1" and llm_choice != "2" and llm_choice != "3": |
|
print("Invalid choice. Please try again.") |
|
|
|
if llm_choice == "1": |
|
print("Downloading the Mistral-7B-Instruct-v0.2 LLM from Huggingface...") |
|
print("Gonna be a bit...") |
|
print("Like seriously, an 8GB file...(don't say I didn't warn you...)") |
|
time.sleep(2) |
|
mistral_7b_instruct_v0_2_q8_0_llamafile_sha256 = "1ee6114517d2f770425c880e5abc443da36b193c82abec8e2885dd7ce3b9bfa6" |
|
llm_download_model_hash = mistral_7b_instruct_v0_2_q8_0_llamafile_sha256 |
|
llamafile_llm_url = "https://huggingface.co/Mozilla/Mistral-7B-Instruct-v0.2-llamafile/resolve/main/mistral-7b-instruct-v0.2.Q8_0.llamafile?download=true" |
|
llamafile_llm_output_filename = "mistral-7b-instruct-v0.2.Q8_0.llamafile" |
|
download_file(llamafile_llm_url, llamafile_llm_output_filename, llm_download_model_hash) |
|
local_llm_model = "mistral-7b-instruct-v0.2.Q8_0.llamafile" |
|
|
|
elif llm_choice == "2": |
|
print("Downloading the samantha-mistra-instruct-7b-bulleted-notes LLM from Huggingface...") |
|
print("Gonna be a bit...") |
|
print("Like seriously, an 8GB file...(don't say I didn't warn you...)") |
|
time.sleep(2) |
|
samantha_mistral_instruct_7b_bulleted_notes_q8_0_gguf_sha256 = "6334c1ab56c565afd86535271fab52b03e67a5e31376946bce7bf5c144e847e4" |
|
llm_download_model_hash = samantha_mistral_instruct_7b_bulleted_notes_q8_0_gguf_sha256 |
|
llamafile_llm_output_filename = "samantha-mistral-instruct-7b-bulleted-notes.Q8_0.gguf" |
|
llamafile_llm_url = "https://huggingface.co/cognitivetech/samantha-mistral-instruct-7b-bulleted-notes-GGUF/resolve/main/samantha-mistral-instruct-7b-bulleted-notes.Q8_0.gguf?download=true" |
|
download_file(llamafile_llm_url, llamafile_llm_output_filename, llm_download_model_hash) |
|
local_llm_model = "samantha-mistral-instruct-7b-bulleted-notes.Q8_0.gguf" |
|
|
|
elif llm_choice == "3": |
|
print("Downloading MS Phi-3-4k-3.8B LLM from Huggingface...") |
|
print("Gonna be a bit...") |
|
print("Like seriously, a 4GB file...(don't say I didn't warn you...)") |
|
time.sleep(2) |
|
Phi_3_mini_4k_instruct_Q8_0_gguf_sha256 = "1b51fc72fda221dd7b4d3e84603db37fbb1ce53c17f2e7583b7026d181b8d20f" |
|
llm_download_model_hash = Phi_3_mini_4k_instruct_Q8_0_gguf_sha256 |
|
llamafile_llm_output_filename = "Phi-3-mini-4k-instruct.Q8_0.llamafile" |
|
llamafile_llm_url = "https://huggingface.co/Mozilla/Phi-3-mini-4k-instruct-llamafile/resolve/main/Phi-3-mini-4k-instruct.Q8_0.llamafile?download=true" |
|
download_file(llamafile_llm_url, llamafile_llm_output_filename, llm_download_model_hash) |
|
local_llm_model = "Phi-3-mini-4k-instruct-Q8_0.llamafile" |
|
|
|
elif llm_choice == "4": |
|
print("Downloading the Llama-3-8B LLM from Huggingface...") |
|
print("Gonna be a bit...") |
|
print("Like seriously, a 8GB file...(don't say I didn't warn you...)") |
|
time.sleep(2) |
|
meta_Llama_3_8B_Instruct_Q8_0_lamafile_sha256 = "406868a97f02f57183716c7e4441d427f223fdbc7fa42964ef10c4d60dd8ed37" |
|
llm_download_model_hash = meta_Llama_3_8B_Instruct_Q8_0_lamafile_sha256 |
|
llamafile_llm_output_filename = "Meta-Llama-3-8B-Instruct.Q8_0.llamafile" |
|
llamafile_llm_url = "https://huggingface.co/Mozilla/Meta-Llama-3-8B-Instruct-llamafile/resolve/main/Meta-Llama-3-8B-Instruct.Q8_0.llamafile?download=true" |
|
download_file(llamafile_llm_url, llamafile_llm_output_filename, llm_download_model_hash) |
|
local_llm_model = "Meta-Llama-3-8B-Instruct.Q8_0.llamafile" |
|
|
|
else: |
|
print("Invalid choice. Please try again.") |
|
else: |
|
pass |
|
if available_models: |
|
print("\n\nAvailable models:") |
|
for idx, model in enumerate(available_models, start=1): |
|
print(f"{idx}. {model}") |
|
user_choice = input("\nWhich model would you like to use? Please enter the corresponding number: ") |
|
while not user_choice.isdigit() or int(user_choice) not in range(1, len(available_models) + 1): |
|
print("Invalid choice. Please try again.") |
|
user_choice = input("Which model would you like to use? Please enter the corresponding number: ") |
|
user_answer = available_models[int(user_choice) - 1] |
|
local_llm_model = user_answer |
|
print(f"You have chosen to use: {user_answer}") |
|
else: |
|
print("No models available/Found.") |
|
print("Please run the script again and select a model, or download one. Exiting...") |
|
exit() |
|
|
|
return llamafile, user_answer |
|
|
|
|
|
def download_file(url, dest_path, expected_checksum=None, max_retries=3, delay=5): |
|
temp_path = dest_path + '.tmp' |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
|
|
resume_header = {} |
|
if os.path.exists(temp_path): |
|
resume_header = {'Range': f'bytes={os.path.getsize(temp_path)}-'} |
|
|
|
response = requests.get(url, stream=True, headers=resume_header) |
|
response.raise_for_status() |
|
|
|
|
|
total_size = int(response.headers.get('content-length', 0)) |
|
initial_pos = os.path.getsize(temp_path) if os.path.exists(temp_path) else 0 |
|
|
|
mode = 'ab' if 'Range' in response.headers else 'wb' |
|
with open(temp_path, mode) as temp_file, tqdm( |
|
total=total_size, unit='B', unit_scale=True, desc=dest_path, initial=initial_pos, ascii=True |
|
) as pbar: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
if chunk: |
|
temp_file.write(chunk) |
|
pbar.update(len(chunk)) |
|
|
|
|
|
if expected_checksum: |
|
if not verify_checksum(temp_path, expected_checksum): |
|
os.remove(temp_path) |
|
raise ValueError("Downloaded file's checksum does not match the expected checksum") |
|
|
|
|
|
os.rename(temp_path, dest_path) |
|
print("Download complete and verified!") |
|
return dest_path |
|
|
|
except Exception as e: |
|
print(f"Attempt {attempt + 1} failed: {e}") |
|
if attempt < max_retries - 1: |
|
print(f"Retrying in {delay} seconds...") |
|
time.sleep(delay) |
|
else: |
|
print("Max retries reached. Download failed.") |
|
raise |
|
|
|
|
|
def verify_checksum(file_path, expected_checksum): |
|
sha256_hash = hashlib.sha256() |
|
with open(file_path, 'rb') as f: |
|
for byte_block in iter(lambda: f.read(4096), b''): |
|
sha256_hash.update(byte_block) |
|
return sha256_hash.hexdigest() == expected_checksum |
|
|
|
|
|
|
|
|
|
def cleanup_process(): |
|
global process |
|
if process is not None: |
|
process.terminate() |
|
process = None |
|
print("Terminated the external process") |
|
|
|
def signal_handler(sig, frame): |
|
logging.info('Signal handler called with signal: %s', sig) |
|
cleanup_process() |
|
sys.exit(0) |
|
|
|
|
|
|
|
|
|
def local_llm_function(): |
|
repo = "Mozilla-Ocho/llamafile" |
|
asset_name_prefix = "llamafile-" |
|
useros = os.name |
|
if useros == "nt": |
|
output_filename = "llamafile.exe" |
|
else: |
|
output_filename = "llamafile" |
|
print( |
|
"WARNING - Checking for existence of llamafile and HuggingFace model, downloading if needed...This could be a while") |
|
print("WARNING - and I mean a while. We're talking an 8 Gigabyte model here...") |
|
print("WARNING - Hope you're comfy. Or it's already downloaded.") |
|
time.sleep(6) |
|
logging.debug("Main: Checking and downloading Llamafile from Github if needed...") |
|
llamafile, user_answer = download_latest_llamafile(repo, asset_name_prefix, output_filename) |
|
logging.debug("Main: Llamafile downloaded successfully.") |
|
|
|
|
|
arguments = ["-m", user_answer] |
|
try: |
|
logging.info("Main: Launching the LLM (llamafile) in an external terminal window...") |
|
if useros == "nt": |
|
launch_in_new_terminal_windows(llamafile, arguments) |
|
elif useros == "posix": |
|
launch_in_new_terminal_linux(llamafile, arguments) |
|
else: |
|
launch_in_new_terminal_mac(llamafile, arguments) |
|
|
|
|
|
atexit.register(cleanup_process) |
|
except Exception as e: |
|
logging.error(f"Failed to launch the process: {e}") |
|
print(f"Failed to launch the process: {e}") |
|
|
|
|
|
def launch_in_new_terminal_windows(executable, args): |
|
command = f'start cmd /k "{executable} {" ".join(args)}"' |
|
process = subprocess.run(command, shell=True) |
|
|
|
|
|
def launch_in_new_terminal_linux(executable, args): |
|
command = f'gnome-terminal -- {executable} {" ".join(args)}' |
|
process = subprocess.run(command, shell=True) |
|
|
|
|
|
def launch_in_new_terminal_mac(executable, args): |
|
command = f'open -a Terminal.app {executable} {" ".join(args)}' |
|
process = subprocess.run(command, shell=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(input_path, api_name=None, api_key=None, |
|
num_speakers=2, |
|
whisper_model="small.en", |
|
offset=0, |
|
vad_filter=False, |
|
download_video_flag=False, |
|
custom_prompt=None, |
|
overwrite=False, |
|
rolling_summarization=False, |
|
detail=0.01, |
|
keywords=None, |
|
chunk_summarization=False, |
|
chunk_duration=None, |
|
words_per_second=None, |
|
llm_model=None, |
|
time_based=False): |
|
|
|
global detail_level_number, summary, audio_file, detail_level, summary |
|
|
|
detail_level = detail |
|
|
|
print(f"Keywords: {keywords}") |
|
|
|
if input_path is None and args.user_interface: |
|
return [] |
|
start_time = time.monotonic() |
|
paths = [] |
|
if os.path.isfile(input_path) and input_path.endswith('.txt'): |
|
logging.debug("MAIN: User passed in a text file, processing text file...") |
|
paths = read_paths_from_file(input_path) |
|
elif os.path.exists(input_path): |
|
logging.debug("MAIN: Local file path detected") |
|
paths = [input_path] |
|
elif (info_dict := get_youtube(input_path)) and 'entries' in info_dict: |
|
logging.debug("MAIN: YouTube playlist detected") |
|
print( |
|
"\n\nSorry, but playlists aren't currently supported. You can run the following command to generate a " |
|
"text file that you can then pass into this script though! (It may not work... playlist support seems " |
|
"spotty)" + """\n\n\tpython Get_Playlist_URLs.py <Youtube Playlist URL>\n\n\tThen,\n\n\tpython |
|
diarizer.py <playlist text file name>\n\n""") |
|
return |
|
else: |
|
paths = [input_path] |
|
results = [] |
|
|
|
for path in paths: |
|
try: |
|
if path.startswith('http'): |
|
logging.debug("MAIN: URL Detected") |
|
info_dict = get_youtube(path) |
|
json_file_path = None |
|
if info_dict: |
|
logging.debug("MAIN: Creating path for video file...") |
|
download_path = create_download_directory(info_dict['title']) |
|
logging.debug("MAIN: Path created successfully\n MAIN: Now Downloading video from yt_dlp...") |
|
try: |
|
video_path = download_video(path, download_path, info_dict, download_video_flag) |
|
except RuntimeError as e: |
|
logging.error(f"Error downloading video: {str(e)}") |
|
|
|
continue |
|
logging.debug("MAIN: Video downloaded successfully") |
|
logging.debug("MAIN: Converting video file to WAV...") |
|
audio_file = convert_to_wav(video_path, offset) |
|
logging.debug("MAIN: Audio file converted successfully") |
|
else: |
|
if os.path.exists(path): |
|
logging.debug("MAIN: Local file path detected") |
|
download_path, info_dict, audio_file = process_local_file(path) |
|
else: |
|
logging.error(f"File does not exist: {path}") |
|
continue |
|
|
|
if info_dict: |
|
logging.debug("MAIN: Creating transcription file from WAV") |
|
segments = speech_to_text(audio_file, whisper_model=whisper_model, vad_filter=vad_filter) |
|
transcription_result = { |
|
'video_path': path, |
|
'audio_file': audio_file, |
|
'transcription': segments |
|
} |
|
results.append(transcription_result) |
|
logging.info(f"MAIN: Transcription complete: {audio_file}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
if rolling_summarization: |
|
logging.info("MAIN: Rolling Summarization") |
|
api_key = openai_api_key |
|
global client |
|
client = OpenAI(api_key) |
|
|
|
text = extract_text_from_segments(segments) |
|
|
|
|
|
json_file_path = audio_file.replace('.wav', '.segments.json') |
|
|
|
|
|
summary = summarize_with_detail_openai(text, detail=detail_level, verbose=False) |
|
|
|
|
|
if summary: |
|
transcription_result['summary'] = summary |
|
logging.info("MAIN: Rolling Summarization successful.") |
|
save_summary_to_file(summary, json_file_path) |
|
else: |
|
logging.warning("MAIN: Rolling Summarization failed.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elif chunk_summarization: |
|
logging.info("MAIN: Chunk Summarization") |
|
|
|
|
|
json_file_path = audio_file.replace('.wav', '.segments.json') |
|
|
|
|
|
summary = summarize_chunks(api_name, api_key, segments, chunk_duration, words_per_second) |
|
|
|
|
|
if summary: |
|
transcription_result['summary'] = summary |
|
logging.info("MAIN: Chunk Summarization successful.") |
|
save_summary_to_file(summary, json_file_path) |
|
else: |
|
logging.warning("MAIN: Chunk Summarization failed.") |
|
|
|
elif api_name: |
|
logging.debug(f"MAIN: Summarization being performed by {api_name}") |
|
json_file_path = audio_file.replace('.wav', '.segments.json') |
|
if api_name.lower() == 'openai': |
|
openai_api_key = api_key if api_key else config.get('API', 'openai_api_key', |
|
fallback=None) |
|
try: |
|
logging.debug(f"MAIN: trying to summarize with openAI") |
|
summary = summarize_with_openai(openai_api_key, json_file_path, custom_prompt) |
|
except requests.exceptions.ConnectionError: |
|
requests.status_code = "Connection: " |
|
elif api_name.lower() == "anthropic": |
|
anthropic_api_key = api_key if api_key else config.get('API', 'anthropic_api_key', |
|
fallback=None) |
|
try: |
|
logging.debug(f"MAIN: Trying to summarize with anthropic") |
|
summary = summarize_with_claude(anthropic_api_key, json_file_path, anthropic_model, |
|
custom_prompt) |
|
except requests.exceptions.ConnectionError: |
|
requests.status_code = "Connection: " |
|
elif api_name.lower() == "cohere": |
|
cohere_api_key = os.getenv('COHERE_TOKEN').replace('"', '') if api_key is None else api_key |
|
try: |
|
logging.debug(f"MAIN: Trying to summarize with cohere") |
|
summary = summarize_with_cohere(cohere_api_key, json_file_path, cohere_model, custom_prompt) |
|
except requests.exceptions.ConnectionError: |
|
requests.status_code = "Connection: " |
|
elif api_name.lower() == "groq": |
|
groq_api_key = api_key if api_key else config.get('API', 'groq_api_key', fallback=None) |
|
try: |
|
logging.debug(f"MAIN: Trying to summarize with Groq") |
|
summary = summarize_with_groq(groq_api_key, json_file_path, groq_model, custom_prompt) |
|
except requests.exceptions.ConnectionError: |
|
requests.status_code = "Connection: " |
|
elif api_name.lower() == "llama": |
|
llama_token = api_key if api_key else config.get('API', 'llama_api_key', fallback=None) |
|
llama_ip = llama_api_IP |
|
try: |
|
logging.debug(f"MAIN: Trying to summarize with Llama.cpp") |
|
summary = summarize_with_llama(llama_ip, json_file_path, llama_token, custom_prompt) |
|
except requests.exceptions.ConnectionError: |
|
requests.status_code = "Connection: " |
|
elif api_name.lower() == "kobold": |
|
kobold_token = api_key if api_key else config.get('API', 'kobold_api_key', fallback=None) |
|
kobold_ip = kobold_api_IP |
|
try: |
|
logging.debug(f"MAIN: Trying to summarize with kobold.cpp") |
|
summary = summarize_with_kobold(kobold_ip, json_file_path, kobold_token, custom_prompt) |
|
except requests.exceptions.ConnectionError: |
|
requests.status_code = "Connection: " |
|
elif api_name.lower() == "ooba": |
|
ooba_token = api_key if api_key else config.get('API', 'ooba_api_key', fallback=None) |
|
ooba_ip = ooba_api_IP |
|
try: |
|
logging.debug(f"MAIN: Trying to summarize with oobabooga") |
|
summary = summarize_with_oobabooga(ooba_ip, json_file_path, ooba_token, custom_prompt) |
|
except requests.exceptions.ConnectionError: |
|
requests.status_code = "Connection: " |
|
elif api_name.lower() == "tabbyapi": |
|
tabbyapi_key = api_key if api_key else config.get('API', 'tabby_api_key', fallback=None) |
|
tabbyapi_ip = tabby_api_IP |
|
try: |
|
logging.debug(f"MAIN: Trying to summarize with tabbyapi") |
|
tabby_model = llm_model |
|
summary = summarize_with_tabbyapi(tabby_api_key, tabby_api_IP, json_file_path, tabby_model, |
|
custom_prompt) |
|
except requests.exceptions.ConnectionError: |
|
requests.status_code = "Connection: " |
|
elif api_name.lower() == "vllm": |
|
logging.debug(f"MAIN: Trying to summarize with VLLM") |
|
summary = summarize_with_vllm(vllm_api_url, vllm_api_key, llm_model, json_file_path, |
|
custom_prompt) |
|
elif api_name.lower() == "local-llm": |
|
logging.debug(f"MAIN: Trying to summarize with the local LLM, Mistral Instruct v0.2") |
|
local_llm_url = "http://127.0.0.1:8080" |
|
summary = summarize_with_local_llm(json_file_path, custom_prompt) |
|
elif api_name.lower() == "huggingface": |
|
huggingface_api_key = api_key if api_key else config.get('API', 'huggingface_api_key', |
|
fallback=None) |
|
try: |
|
logging.debug(f"MAIN: Trying to summarize with huggingface") |
|
summarize_with_huggingface(huggingface_api_key, json_file_path, custom_prompt) |
|
except requests.exceptions.ConnectionError: |
|
requests.status_code = "Connection: " |
|
|
|
else: |
|
logging.warning(f"Unsupported API: {api_name}") |
|
summary = None |
|
|
|
if summary: |
|
transcription_result['summary'] = summary |
|
logging.info(f"Summary generated using {api_name} API") |
|
save_summary_to_file(summary, json_file_path) |
|
elif final_summary: |
|
logging.info(f"Rolling summary generated using {api_name} API") |
|
logging.info(f"Final Rolling summary is {final_summary}\n\n") |
|
save_summary_to_file(final_summary, json_file_path) |
|
else: |
|
logging.warning(f"Failed to generate summary using {api_name} API") |
|
else: |
|
logging.info("MAIN: #2 - No API specified. Summarization will not be performed") |
|
|
|
|
|
add_media_with_keywords( |
|
url=path, |
|
title=info_dict.get('title', 'Untitled'), |
|
media_type='video', |
|
content=' '.join([segment['text'] for segment in segments]), |
|
keywords=','.join(keywords), |
|
prompt=custom_prompt or 'No prompt provided', |
|
summary=summary or 'No summary provided', |
|
transcription_model=whisper_model, |
|
author=info_dict.get('uploader', 'Unknown'), |
|
ingestion_date=datetime.now().strftime('%Y-%m-%d') |
|
) |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing {path}: {str(e)}") |
|
continue |
|
except Exception as e: |
|
logging.error(f"Error processing path: {path}") |
|
logging.error(str(e)) |
|
continue |
|
|
|
|
|
|
|
return results |
|
|
|
def signal_handler(signal, frame): |
|
logging.info('Signal received, exiting...') |
|
sys.exit(0) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
signal.signal(signal.SIGINT, signal_handler) |
|
signal.signal(signal.SIGTERM, signal_handler) |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
parser = argparse.ArgumentParser( |
|
description='Transcribe and summarize videos.', |
|
epilog=''' |
|
Sample commands: |
|
1. Simple Sample command structure: |
|
summarize.py <path_to_video> -api openai -k tag_one tag_two tag_three |
|
|
|
2. Rolling Summary Sample command structure: |
|
summarize.py <path_to_video> -api openai -prompt "custom_prompt_goes_here-is-appended-after-transcription" -roll -detail 0.01 -k tag_one tag_two tag_three |
|
|
|
3. FULL Sample command structure: |
|
summarize.py <path_to_video> -api openai -ns 2 -wm small.en -off 0 -vad -log INFO -prompt "custom_prompt" -overwrite -roll -detail 0.01 -k tag_one tag_two tag_three |
|
|
|
4. Sample command structure for UI: |
|
summarize.py -gui -log DEBUG |
|
''', |
|
formatter_class=argparse.RawTextHelpFormatter |
|
) |
|
parser.add_argument('input_path', type=str, help='Path or URL of the video', nargs='?') |
|
parser.add_argument('-v', '--video', action='store_true', help='Download the video instead of just the audio') |
|
parser.add_argument('-api', '--api_name', type=str, help='API name for summarization (optional)') |
|
parser.add_argument('-key', '--api_key', type=str, help='API key for summarization (optional)') |
|
parser.add_argument('-ns', '--num_speakers', type=int, default=2, help='Number of speakers (default: 2)') |
|
parser.add_argument('-wm', '--whisper_model', type=str, default='small.en', |
|
help='Whisper model (default: small.en)') |
|
parser.add_argument('-off', '--offset', type=int, default=0, help='Offset in seconds (default: 0)') |
|
parser.add_argument('-vad', '--vad_filter', action='store_true', help='Enable VAD filter') |
|
parser.add_argument('-log', '--log_level', type=str, default='INFO', |
|
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], help='Log level (default: INFO)') |
|
parser.add_argument('-gui', '--user_interface', action='store_true', help="Launch the Gradio user interface") |
|
parser.add_argument('-demo', '--demo_mode', action='store_true', help='Enable demo mode') |
|
parser.add_argument('-prompt', '--custom_prompt', type=str, |
|
help='Pass in a custom prompt to be used in place of the existing one.\n (Probably should just ' |
|
'modify the script itself...)') |
|
parser.add_argument('-overwrite', '--overwrite', action='store_true', help='Overwrite existing files') |
|
parser.add_argument('-roll', '--rolling_summarization', action='store_true', help='Enable rolling summarization') |
|
parser.add_argument('-detail', '--detail_level', type=float, help='Mandatory if rolling summarization is enabled, ' |
|
'defines the chunk size.\n Default is 0.01(lots ' |
|
'of chunks) -> 1.00 (few chunks)\n Currently ' |
|
'only OpenAI works. ', |
|
default=0.01, ) |
|
|
|
parser.add_argument('--chunk_duration', type=int, default=DEFAULT_CHUNK_DURATION, |
|
help='Duration of each chunk in seconds') |
|
|
|
parser.add_argument('-time', '--time_based', type=int, |
|
help='Enable time-based summarization and specify the chunk duration in seconds (minimum 60 seconds, increments of 30 seconds)') |
|
parser.add_argument('-model', '--llm_model', type=str, default='', |
|
help='Model to use for LLM summarization (only used for vLLM/TabbyAPI)') |
|
parser.add_argument('-k', '--keywords', nargs='+', default=['cli_ingest_no_tag'], |
|
help='Keywords for tagging the media, can use multiple separated by spaces (default: cli_ingest_no_tag)') |
|
parser.add_argument('--log_file', type=str, help='Where to save logfile (non-default)') |
|
parser.add_argument('--local_llm', action='store_true', help="Use a local LLM from the script(Downloads llamafile from github and 'mistral-7b-instruct-v0.2.Q8' - 8GB model from Huggingface)") |
|
parser.add_argument('--server_mode', action='store_true', help='Run in server mode (This exposes the GUI/Server to the network)') |
|
parser.add_argument('--share_public', type=int, default=7860, help="This will use Gradio's built-in ngrok tunneling to share the server publicly on the internet. Specify the port to use (default: 7860)") |
|
parser.add_argument('--port', type=int, default=7860, help='Port to run the server on') |
|
|
|
|
|
args = parser.parse_args() |
|
share_public = args.share_public |
|
server_mode = args.server_mode |
|
server_port = args.port |
|
|
|
|
|
logger = logging.getLogger() |
|
logger.setLevel(getattr(logging, args.log_level)) |
|
|
|
|
|
console_handler = logging.StreamHandler() |
|
console_handler.setLevel(getattr(logging, args.log_level)) |
|
console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
|
console_handler.setFormatter(console_formatter) |
|
logger.addHandler(console_handler) |
|
|
|
if args.log_file: |
|
|
|
file_handler = logging.FileHandler(args.log_file) |
|
file_handler.setLevel(getattr(logging, args.log_level)) |
|
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
|
file_handler.setFormatter(file_formatter) |
|
logger.addHandler(file_handler) |
|
logger.info(f"Log file created at: {args.log_file}") |
|
|
|
|
|
custom_prompt = args.custom_prompt |
|
|
|
if custom_prompt is None or custom_prompt == "": |
|
logging.debug("No custom prompt defined, will use default") |
|
args.custom_prompt = ("\n\nabove is the transcript of a video " |
|
"Please read through the transcript carefully. Identify the main topics that are " |
|
"discussed over the course of the transcript. Then, summarize the key points about each " |
|
"main topic in a concise bullet point. The bullet points should cover the key " |
|
"information conveyed about each topic in the video, but should be much shorter than " |
|
"the full transcript. Please output your bullet point summary inside <bulletpoints> " |
|
"tags.") |
|
custom_prompt = args.custom_prompt |
|
print("No custom prompt defined, will use default") |
|
else: |
|
logging.debug(f"Custom prompt defined, will use \n\nf{custom_prompt} \n\nas the prompt") |
|
print(f"Custom Prompt has been defined. Custom prompt: \n\n {args.custom_prompt}") |
|
|
|
|
|
local_llm = args.local_llm |
|
logging.info(f'Local LLM flag: {local_llm}') |
|
|
|
args.user_interface = True |
|
if args.user_interface: |
|
|
|
|
|
|
|
|
|
launch_ui(demo_mode=False) |
|
else: |
|
if not args.input_path: |
|
parser.print_help() |
|
sys.exit(1) |
|
|
|
logging.info('Starting the transcription and summarization process.') |
|
logging.info(f'Input path: {args.input_path}') |
|
logging.info(f'API Name: {args.api_name}') |
|
logging.info(f'Number of speakers: {args.num_speakers}') |
|
logging.info(f'Whisper model: {args.whisper_model}') |
|
logging.info(f'Offset: {args.offset}') |
|
logging.info(f'VAD filter: {args.vad_filter}') |
|
logging.info(f'Log Level: {args.log_level}') |
|
logging.info(f'Demo Mode: {args.demo_mode}') |
|
logging.info(f'Custom Prompt: {args.custom_prompt}') |
|
logging.info(f'Overwrite: {args.overwrite}') |
|
logging.info(f'Rolling Summarization: {args.rolling_summarization}') |
|
logging.info(f'User Interface: {args.user_interface}') |
|
logging.info(f'Video Download: {args.video}') |
|
|
|
|
|
|
|
|
|
api_keys = {key: value for key, value in config.items('API') if key.endswith('_api_key')} |
|
|
|
api_name = args.api_name |
|
|
|
|
|
|
|
|
|
summary = None |
|
if args.detail_level == None: |
|
args.detail_level = 0.01 |
|
if args.api_name and args.rolling_summarization and any( |
|
key.startswith(args.api_name) and value is not None for key, value in api_keys.items()): |
|
logging.info(f'MAIN: API used: {args.api_name}') |
|
logging.info('MAIN: Rolling Summarization will be performed.') |
|
|
|
elif args.api_name: |
|
logging.info(f'MAIN: API used: {args.api_name}') |
|
logging.info('MAIN: Summarization (not rolling) will be performed.') |
|
|
|
else: |
|
logging.info('No API specified. Summarization will not be performed.') |
|
|
|
logging.debug("Platform check being performed...") |
|
platform_check() |
|
logging.debug("CUDA check being performed...") |
|
cuda_check() |
|
logging.debug("ffmpeg check being performed...") |
|
check_ffmpeg() |
|
|
|
llm_model = args.llm_model or None |
|
|
|
|
|
try: |
|
results = main(args.input_path, api_name=args.api_name, |
|
api_key=args.api_key, |
|
num_speakers=args.num_speakers, |
|
whisper_model=args.whisper_model, |
|
offset=args.offset, |
|
vad_filter=args.vad_filter, |
|
download_video_flag=args.video, |
|
custom_prompt=args.custom_prompt, |
|
overwrite=args.overwrite, |
|
rolling_summarization=args.rolling_summarization, |
|
detail=args.detail_level, |
|
keywords=args.keywords, |
|
chunk_summarization=False, |
|
chunk_duration=None, |
|
words_per_second=None, |
|
llm_model=args.llm_model, |
|
time_based=args.time_based) |
|
|
|
logging.info('Transcription process completed.') |
|
atexit.register(cleanup_process) |
|
except Exception as e: |
|
logging.error('An error occurred during the transcription process.') |
|
logging.error(str(e)) |
|
sys.exit(1) |
|
|
|
finally: |
|
cleanup_process() |
|
|