Spaces:
Sleeping
Sleeping
import os | |
from dotenv import load_dotenv | |
import urllib.request | |
import fitz # PyMuPDF | |
import re | |
import numpy as np | |
import tensorflow_hub as hub | |
import openai | |
import gradio as gr | |
from sklearn.neighbors import NearestNeighbors | |
# Load environment variables | |
load_dotenv() | |
# Fetch the OpenAI API key from environment variables | |
openAI_key = os.getenv('OPENAI_API_KEY') | |
def download_pdf(url, output_path): | |
urllib.request.urlretrieve(url, output_path) | |
def preprocess(text): | |
text = text.replace('\n', ' ') | |
text = re.sub('\s+', ' ', text) | |
return text | |
def pdf_to_text(path, start_page=1, end_page=None): | |
doc = fitz.open(path) | |
total_pages = doc.page_count | |
if end_page is None: | |
end_page = total_pages | |
text_list = [] | |
for i in range(start_page - 1, end_page): | |
text = doc.load_page(i).get_text("text") | |
text = preprocess(text) | |
text_list.append(text) | |
doc.close() | |
return text_list | |
def text_to_chunks(texts, word_length=150, start_page=1): | |
text_toks = [t.split(' ') for t in texts] | |
chunks = [] | |
for idx, words in enumerate(text_toks): | |
for i in range(0, len(words), word_length): | |
chunk = words[i:i+word_length] | |
if (i + word_length) > len(words) and (len(chunk) < word_length) and ( | |
len(text_toks) != (idx + 1)): | |
text_toks[idx + 1] = chunk + text_toks[idx + 1] | |
continue | |
chunk = ' '.join(chunk).strip() | |
chunk = f'[Page no. {idx + start_page}]' + ' ' + '"' + chunk + '"' | |
chunks.append(chunk) | |
return chunks | |
class SemanticSearch: | |
def __init__(self): | |
self.use = hub.load('https://tfhub.dev/google/universal-sentence-encoder/4') | |
self.fitted = False | |
def fit(self, data, batch=1000, n_neighbors=5): | |
self.data = data | |
self.embeddings = self.get_text_embedding(data, batch=batch) | |
n_neighbors = min(n_neighbors, len(self.embeddings)) | |
self.nn = NearestNeighbors(n_neighbors=n_neighbors) | |
self.nn.fit(self.embeddings) | |
self.fitted = True | |
def __call__(self, text, return_data=True): | |
inp_emb = self.use([text]) | |
neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0] | |
if return_data: | |
return [self.data[i] for i in neighbors] | |
else: | |
return neighbors | |
def get_text_embedding(self, texts, batch=1000): | |
embeddings = [] | |
for i in range(0, len(texts), batch): | |
text_batch = texts[i:(i + batch)] | |
emb_batch = self.use(text_batch) | |
embeddings.append(emb_batch) | |
embeddings = np.vstack(embeddings) | |
return embeddings | |
recommender = SemanticSearch() | |
def load_recommender(path, start_page=1): | |
texts = pdf_to_text(path, start_page=start_page) | |
chunks = text_to_chunks(texts, start_page=start_page) | |
recommender.fit(chunks) | |
return 'Corpus Loaded.' | |
def generate_text(prompt, model="gpt-3.5-turbo"): | |
openai.api_key = openAI_key | |
temperature = 0.7 | |
max_tokens = 256 | |
top_p = 1 | |
frequency_penalty = 0 | |
presence_penalty = 0 | |
if model == "text-davinci-003": | |
completions = openai.Completion.create( | |
engine=model, | |
prompt=prompt, | |
max_tokens=max_tokens, | |
n=1, | |
stop=None, | |
temperature=temperature, | |
) | |
message = completions.choices[0].text | |
else: | |
message = openai.ChatCompletion.create( | |
model=model, | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant."}, | |
{"role": "assistant", "content": "Here is some initial assistant message."}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=.3, | |
max_tokens=max_tokens, | |
top_p=top_p, | |
frequency_penalty=frequency_penalty, | |
presence_penalty=presence_penalty, | |
).choices[0].message['content'] | |
return message | |
def question_answer(chat_history, url, file, question, model): | |
try: | |
if openAI_key.strip() == '': | |
return '[ERROR]: Please enter your Open AI Key. Get your key here : https://platform.openai.com/account/api-keys' | |
if url.strip() != '' and file is not None: | |
return '[ERROR]: Both URL and PDF is provided. Please provide only one (either URL or PDF).' | |
if model is None or model == '': | |
return '[ERROR]: You have not selected any model. Please choose an LLM model.' | |
if url.strip() != '': | |
download_pdf(url, 'corpus.pdf') | |
load_recommender('corpus.pdf') | |
else: | |
old_file_name = file.name | |
file_name = file.name | |
file_name = file_name[:-12] + file_name[-4:] | |
os.rename(old_file_name, file_name) | |
load_recommender(file_name) | |
if question.strip() == '': | |
return '[ERROR]: Question field is empty' | |
answer = generate_text(question, model) | |
chat_history.append([question, answer]) | |
return chat_history | |
except openai.error.InvalidRequestError as e: | |
return f'[ERROR]: Either you do not have access to GPT4 or you have exhausted your quota!' | |
title = 'PDF GPT Turbo' | |
description = """ PDF GPT Turbo allows you to chat with your PDF files. It uses Google's Universal Sentence Encoder with Deep averaging network (DAN) to give hallucination free response by improving the embedding quality of OpenAI. It cites the page number in square brackets([Page No.]) and shows where the information is located, adding credibility to the responses.""" | |
with gr.Blocks(css="""#chatbot { font-size: 14px; min-height: 1200; }""") as demo: | |
gr.Markdown(f'<center><h3>{title}</h3></center>') | |
gr.Markdown(description) | |
with gr.Row(): | |
with gr.Group(): | |
url = gr.Textbox(label='Enter PDF URL here (Example: https://arxiv.org/pdf/1706.03762.pdf )') | |
gr.Markdown("<center><h4>OR<h4></center>") | |
file = gr.File(label='Upload your PDF/ Research Paper / Book here', file_types=['.pdf']) | |
question = gr.Textbox(label='Enter your question here') | |
model = gr.Radio([ | |
'gpt-3.5-turbo', | |
'gpt-3.5-turbo-16k', | |
'gpt-3.5-turbo-0613', | |
'gpt-3.5-turbo-16k-0613', | |
'text-davinci-003', | |
'gpt-4', | |
'gpt-4-32k' | |
], label='Select Model', default='gpt-3.5-turbo') | |
btn = gr.Button(value='Submit') | |
btn.style(full_width=True) | |
with gr.Group(): | |
chatbot = gr.Chatbot(placeholder="Chat History", label="Chat History", lines=50, elem_id="chatbot") | |
btn.click( | |
question_answer, | |
inputs=[chatbot, url, file, question, model], | |
outputs=[chatbot], | |
) | |
demo.launch() | |