|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import datetime |
|
from datetime import datetime |
|
import gradio as gr |
|
import json |
|
import os |
|
import logging |
|
import requests |
|
|
|
from tqdm import tqdm |
|
|
|
from App_Function_Libraries.Utils import sanitize_filename |
|
|
|
from Article_Extractor_Lib import scrape_article |
|
from Local_Summarization_Lib import summarize_with_llama, summarize_with_oobabooga, summarize_with_tabbyapi, \ |
|
summarize_with_vllm, summarize_with_kobold, save_summary_to_file, summarize_with_local_llm |
|
from Summarization_General_Lib import summarize_with_openai, summarize_with_anthropic, summarize_with_cohere, \ |
|
summarize_with_groq, summarize_with_openrouter, summarize_with_deepseek, summarize_with_huggingface, \ |
|
summarize_with_mistral |
|
from SQLite_DB import Database, create_tables, add_media_with_keywords |
|
|
|
|
|
|
|
|
|
|
|
def ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, custom_prompt): |
|
try: |
|
|
|
if not content.strip(): |
|
raise ValueError("Content is empty.") |
|
|
|
db = Database() |
|
create_tables() |
|
keyword_list = keywords.split(',') if keywords else ["default"] |
|
keyword_str = ', '.join(keyword_list) |
|
|
|
|
|
url = url or 'Unknown' |
|
title = title or 'Unknown' |
|
author = author or 'Unknown' |
|
keywords = keywords or 'default' |
|
summary = summary or 'No summary available' |
|
ingestion_date = ingestion_date or datetime.datetime.now().strftime('%Y-%m-%d') |
|
|
|
|
|
logging.debug(f"URL: {url}") |
|
logging.debug(f"Title: {title}") |
|
logging.debug(f"Author: {author}") |
|
logging.debug(f"Content: {content[:50]}... (length: {len(content)})") |
|
logging.debug(f"Keywords: {keywords}") |
|
logging.debug(f"Summary: {summary}") |
|
logging.debug(f"Ingestion Date: {ingestion_date}") |
|
logging.debug(f"Custom Prompt: {custom_prompt}") |
|
|
|
|
|
if not url: |
|
logging.error("URL is missing.") |
|
raise ValueError("URL is missing.") |
|
if not title: |
|
logging.error("Title is missing.") |
|
raise ValueError("Title is missing.") |
|
if not content: |
|
logging.error("Content is missing.") |
|
raise ValueError("Content is missing.") |
|
if not keywords: |
|
logging.error("Keywords are missing.") |
|
raise ValueError("Keywords are missing.") |
|
if not summary: |
|
logging.error("Summary is missing.") |
|
raise ValueError("Summary is missing.") |
|
if not ingestion_date: |
|
logging.error("Ingestion date is missing.") |
|
raise ValueError("Ingestion date is missing.") |
|
if not custom_prompt: |
|
logging.error("Custom prompt is missing.") |
|
raise ValueError("Custom prompt is missing.") |
|
|
|
|
|
result = add_media_with_keywords( |
|
url=url, |
|
title=title, |
|
media_type='article', |
|
content=content, |
|
keywords=keyword_str or "article_default", |
|
prompt=custom_prompt or None, |
|
summary=summary or "No summary generated", |
|
transcription_model=None, |
|
author=author or 'Unknown', |
|
ingestion_date=ingestion_date |
|
) |
|
return result |
|
except Exception as e: |
|
logging.error(f"Failed to ingest article to the database: {e}") |
|
return str(e) |
|
|
|
|
|
def scrape_and_summarize_multiple(urls, custom_prompt_arg, api_name, api_key, keywords, custom_article_titles, system_message=None): |
|
urls = [url.strip() for url in urls.split('\n') if url.strip()] |
|
custom_titles = custom_article_titles.split('\n') if custom_article_titles else [] |
|
|
|
results = [] |
|
errors = [] |
|
|
|
|
|
progress = gr.Progress() |
|
|
|
for i, url in tqdm(enumerate(urls), total=len(urls), desc="Processing URLs"): |
|
custom_title = custom_titles[i] if i < len(custom_titles) else None |
|
try: |
|
result = scrape_and_summarize(url, custom_prompt_arg, api_name, api_key, keywords, custom_title, system_message) |
|
results.append(f"Results for URL {i + 1}:\n{result}") |
|
except Exception as e: |
|
error_message = f"Error processing URL {i + 1} ({url}): {str(e)}" |
|
errors.append(error_message) |
|
results.append(f"Failed to process URL {i + 1}: {url}") |
|
|
|
|
|
progress((i + 1) / len(urls), desc=f"Processed {i + 1}/{len(urls)} URLs") |
|
|
|
|
|
combined_output = "\n".join(results) |
|
if errors: |
|
combined_output += "\n\nErrors encountered:\n" + "\n".join(errors) |
|
|
|
return combined_output |
|
|
|
|
|
def scrape_and_summarize(url, custom_prompt_arg, api_name, api_key, keywords, custom_article_title, system_message=None): |
|
try: |
|
|
|
article_data = scrape_article(url) |
|
print(f"Scraped Article Data: {article_data}") |
|
if not article_data: |
|
return "Failed to scrape the article." |
|
|
|
|
|
title = custom_article_title.strip() if custom_article_title else article_data.get('title', 'Untitled') |
|
author = article_data.get('author', 'Unknown') |
|
content = article_data.get('content', '') |
|
ingestion_date = datetime.now().strftime('%Y-%m-%d') |
|
|
|
print(f"Title: {title}, Author: {author}, Content Length: {len(content)}") |
|
|
|
|
|
system_message = system_message or "Act as a professional summarizer and summarize this article." |
|
|
|
article_custom_prompt = custom_prompt_arg or "Act as a professional summarizer and summarize this article." |
|
|
|
|
|
summary = None |
|
if api_name: |
|
logging.debug(f"Article_Summarizer: Summarization being performed by {api_name}") |
|
|
|
|
|
sanitized_title = sanitize_filename(title) |
|
json_file_path = os.path.join("Results", f"{sanitized_title}_segments.json") |
|
|
|
with open(json_file_path, 'w') as json_file: |
|
json.dump([{'text': content}], json_file, indent=2) |
|
|
|
|
|
try: |
|
if api_name.lower() == 'openai': |
|
|
|
summary = summarize_with_openai(api_key, json_file_path, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "anthropic": |
|
|
|
summary = summarize_with_anthropic(api_key, json_file_path, article_custom_prompt, system_message) |
|
elif api_name.lower() == "cohere": |
|
|
|
summary = summarize_with_cohere(api_key, json_file_path, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "groq": |
|
logging.debug(f"MAIN: Trying to summarize with groq") |
|
|
|
summary = summarize_with_groq(api_key, json_file_path, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "openrouter": |
|
logging.debug(f"MAIN: Trying to summarize with OpenRouter") |
|
|
|
summary = summarize_with_openrouter(api_key, json_file_path, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "deepseek": |
|
logging.debug(f"MAIN: Trying to summarize with DeepSeek") |
|
|
|
summary = summarize_with_deepseek(api_key, json_file_path, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "mistral": |
|
summary = summarize_with_mistral(api_key, json_file_path, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "llama.cpp": |
|
logging.debug(f"MAIN: Trying to summarize with Llama.cpp") |
|
|
|
summary = summarize_with_llama(json_file_path, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "kobold": |
|
logging.debug(f"MAIN: Trying to summarize with Kobold.cpp") |
|
|
|
summary = summarize_with_kobold(json_file_path, api_key, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "ooba": |
|
|
|
summary = summarize_with_oobabooga(json_file_path, api_key, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "tabbyapi": |
|
|
|
summary = summarize_with_tabbyapi(json_file_path, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "vllm": |
|
logging.debug(f"MAIN: Trying to summarize with VLLM") |
|
|
|
summary = summarize_with_vllm(json_file_path, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "local-llm": |
|
logging.debug(f"MAIN: Trying to summarize with Local LLM") |
|
summary = summarize_with_local_llm(json_file_path, article_custom_prompt, system_message) |
|
|
|
elif api_name.lower() == "huggingface": |
|
logging.debug(f"MAIN: Trying to summarize with huggingface") |
|
|
|
summarize_with_huggingface(api_key, json_file_path, article_custom_prompt, system_message) |
|
|
|
except requests.exceptions.ConnectionError as e: |
|
logging.error(f"Connection error while trying to summarize with {api_name}: {str(e)}") |
|
|
|
if summary: |
|
logging.info(f"Article_Summarizer: Summary generated using {api_name} API") |
|
save_summary_to_file(summary, json_file_path) |
|
else: |
|
summary = "Summary not available" |
|
logging.warning(f"Failed to generate summary using {api_name} API") |
|
|
|
else: |
|
summary = "Article Summarization: No API provided for summarization." |
|
|
|
print(f"Summary: {summary}") |
|
|
|
|
|
ingestion_result = ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, |
|
article_custom_prompt) |
|
|
|
return f"Title: {title}\nAuthor: {author}\nIngestion Result: {ingestion_result}\n\nSummary: {summary}\n\nArticle Contents: {content}" |
|
except Exception as e: |
|
logging.error(f"Error processing URL {url}: {str(e)}") |
|
return f"Failed to process URL {url}: {str(e)}" |
|
|
|
|
|
def ingest_unstructured_text(text, custom_prompt, api_name, api_key, keywords, custom_article_title, system_message=None): |
|
title = custom_article_title.strip() if custom_article_title else "Unstructured Text" |
|
author = "Unknown" |
|
ingestion_date = datetime.now().strftime('%Y-%m-%d') |
|
|
|
|
|
if api_name: |
|
json_file_path = f"Results/{title.replace(' ', '_')}_segments.json" |
|
with open(json_file_path, 'w') as json_file: |
|
json.dump([{'text': text}], json_file, indent=2) |
|
|
|
if api_name.lower() == 'openai': |
|
summary = summarize_with_openai(api_key, json_file_path, custom_prompt, system_message) |
|
|
|
else: |
|
summary = "Unsupported API." |
|
else: |
|
summary = "No API provided for summarization." |
|
|
|
|
|
ingestion_result = ingest_article_to_db('Unstructured Text', title, author, text, keywords, summary, ingestion_date, |
|
custom_prompt) |
|
return f"Title: {title}\nSummary: {summary}\nIngestion Result: {ingestion_result}" |
|
|
|
|
|
|
|
|
|
|
|
|