import gradio as gr import os from dotenv import load_dotenv from langchain_community.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain.chains import ConversationalRetrievalChain from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.llms import HuggingFacePipeline from langchain.chains import ConversationChain from langchain.memory import ConversationBufferMemory from langchain_huggingface.llms import HuggingFaceEndpoint from huggingface_hub import login from pathlib import Path import chromadb from unidecode import unidecode from transformers import AutoTokenizer import transformers import torch import tqdm import accelerate import re load_dotenv() huggingface_api_key = os.getenv("HUGGINGFACE_API_KEY") print('HF TOKEN: ', huggingface_api_key) list_llm = ["mistralai/Mistral-7B-Instruct-v0.2"] list_llm_simple = [os.path.basename(llm) for llm in list_llm] def load_doc(list_file_path, chunk_size, chunk_overlap): loaders = [PyPDFLoader(x) for x in list_file_path] pages = [] for loader in loaders: pages.extend(loader.load()) text_splitter = RecursiveCharacterTextSplitter(chunk_size = 600, chunk_overlap = 50) doc_splits = text_splitter.split_documents(pages) return doc_splits def create_db(splits, collection_name): embedding = HuggingFaceEmbeddings() new_client = chromadb.EphemeralClient() vectordb = Chroma.from_documents( documents=splits, embedding=embedding, client=new_client, collection_name=collection_name, ) return vectordb def load_db(): embedding = HuggingFaceEmbeddings() vectordb = Chroma(embedding_function=embedding) return vectordb def initialize_llmchain(llm_model, temperature, max_tokens, top_k, vector_db, progress=gr.Progress()): progress(0.1, desc="Initializing HF tokenizer...") progress(0.5, desc="Initializing HF Hub...") llm = HuggingFaceEndpoint( repo_id=llm_model, temperature = temperature, max_new_tokens = max_tokens, top_k = top_k, ) progress(0.75, desc="Defining buffer memory...") memory = ConversationBufferMemory( memory_key="chat_history", output_key='answer', return_messages=True ) retriever = vector_db.as_retriever() progress(0.8, desc="Defining retrieval chain...") qa_chain = ConversationalRetrievalChain.from_llm( llm, retriever=retriever, chain_type="stuff", memory=memory, return_source_documents=True, verbose=False, ) progress(0.9, desc="Done!") return qa_chain def create_collection_name(filepath): collection_name = Path(filepath).stem collection_name = collection_name.replace(" ","-") collection_name = unidecode(collection_name) collection_name = re.sub('[^A-Za-z0-9]+', '-', collection_name) collection_name = collection_name[:50] if len(collection_name) < 3: collection_name = collection_name + 'xyz' if not collection_name[0].isalnum(): collection_name = 'A' + collection_name[1:] if not collection_name[-1].isalnum(): collection_name = collection_name[:-1] + 'Z' print('Filepath: ', filepath) print('Collection name: ', collection_name) return collection_name def initialize_database(list_file_obj, chunk_size, chunk_overlap, progress=gr.Progress()): list_file_path = [x.name for x in list_file_obj if x is not None] progress(0.1, desc="Creating collection name...") collection_name = create_collection_name(list_file_path[0]) progress(0.25, desc="Loading document...") doc_splits = load_doc(list_file_path, chunk_size, chunk_overlap) progress(0.5, desc="Generating vector database...") vector_db = create_db(doc_splits, collection_name) progress(0.9, desc="Done!") return vector_db, collection_name, "Complete!" def initialize_LLM(llm_option, llm_temperature, max_tokens, top_k, vector_db, progress=gr.Progress()): llm_name = list_llm[llm_option] print("llm_name: ",llm_name) qa_chain = initialize_llmchain(llm_name, llm_temperature, max_tokens, top_k, vector_db, progress) return qa_chain, "Complete!" def format_chat_history(message, chat_history): formatted_chat_history = [] for user_message, bot_message in chat_history: formatted_chat_history.append(f"User: {user_message}") formatted_chat_history.append(f"Assistant: {bot_message}") return formatted_chat_history def conversation(qa_chain, message, history): formatted_chat_history = format_chat_history(message, history) response = qa_chain({"question": message, "chat_history": formatted_chat_history}) response_answer = response["answer"] if response_answer.find("Helpful Answer:") != -1: response_answer = response_answer.split("Helpful Answer:")[-1] response_sources = response["source_documents"] response_source1 = response_sources[0].page_content.strip() response_source2 = response_sources[1].page_content.strip() response_source3 = response_sources[2].page_content.strip() response_source1_page = response_sources[0].metadata["page"] + 1 response_source2_page = response_sources[1].metadata["page"] + 1 response_source3_page = response_sources[2].metadata["page"] + 1 new_history = history + [(message, response_answer)] return qa_chain, gr.update(value=""), new_history, response_source1, response_source1_page, response_source2, response_source2_page, response_source3, response_source3_page def upload_file(file_obj): list_file_path = [] for idx, file in enumerate(file_obj): file_path = file_obj.name list_file_path.append(file_path) return list_file_path def demo(): with gr.Blocks(theme="base") as demo: vector_db = gr.State() qa_chain = gr.State() collection_name = gr.State() gr.Markdown( """

PDF-based chatbot

Ask any questions about your PDF documents

""") gr.Markdown( """Note: This AI assistant, using Langchain and open-source LLMs, performs retrieval-augmented generation (RAG) from your PDF documents. \ The user interface explicitely shows multiple steps to help understand the RAG workflow. This chatbot takes past questions into account when generating answers (via conversational memory), and includes document references for clarity purposes.

Warning: This space uses the free CPU Basic hardware from Hugging Face. Some steps and LLM models used below (free inference endpoints) can take some time to generate a reply. """) with gr.Tab("Step 1 - Upload PDF"): with gr.Row(): document = gr.Files(height=100, file_count="multiple", file_types=["pdf"], interactive=True, label="Upload your PDF documents (single or multiple)") with gr.Tab("Step 2 - Process document"): with gr.Row(): db_btn = gr.Radio(["ChromaDB"], label="Vector database type", value = "ChromaDB", type="index", info="Choose your vector database") with gr.Accordion("Advanced options - Document text splitter", open=False): with gr.Row(): slider_chunk_size = gr.Slider(minimum = 100, maximum = 1000, value=600, step=20, label="Chunk size", info="Chunk size", interactive=True) with gr.Row(): slider_chunk_overlap = gr.Slider(minimum = 10, maximum = 200, value=40, step=10, label="Chunk overlap", info="Chunk overlap", interactive=True) with gr.Row(): db_progress = gr.Textbox(label="Vector database initialization", value="None") with gr.Row(): db_generate_btn = gr.Button("Generate vector database") with gr.Tab("Step 3 - Initialize QA chain"): with gr.Row(): llm_btn = gr.Radio(list_llm_simple, \ label="LLM models", value = list_llm_simple[0], type="index", info="Choose your LLM model") with gr.Accordion("Advanced options - LLM model", open=False): with gr.Row(): slider_temperature = gr.Slider(minimum = 0.01, maximum = 1.0, value=0.7, step=0.1, label="Temperature", info="Model temperature", interactive=True) with gr.Row(): slider_maxtokens = gr.Slider(minimum = 32, maximum = 2048, value=1024, step=16, label="Max tokens", info="Maximum tokens", interactive=True) with gr.Row(): slider_topk = gr.Slider(minimum = 10, maximum = 50, value=40, step=2, label="Top K", info="Top K", interactive=True) with gr.Row(): llm_progress = gr.Textbox(label="LLM initialization", value="None") with gr.Row(): llm_generate_btn = gr.Button("Initialize LLM chain") with gr.Tab("Step 4 - Ask questions to your chatbot"): with gr.Row(): chatbot = gr.Chatbot(label="Langchain PDF chatbot", height=400) with gr.Row(): msg = gr.Textbox(label="Ask anything about your PDF document", placeholder="Type your message here...", show_label=False) with gr.Row(): response_source1 = gr.Textbox(label="Source document #1", value="", interactive=False) response_source1_page = gr.Number(label="Page", value=0, interactive=False) with gr.Row(): response_source2 = gr.Textbox(label="Source document #2", value="", interactive=False) response_source2_page = gr.Number(label="Page", value=0, interactive=False) with gr.Row(): response_source3 = gr.Textbox(label="Source document #3", value="", interactive=False) response_source3_page = gr.Number(label="Page", value=0, interactive=False) with gr.Row(): clear = gr.Button("Clear") document.upload(upload_file, [document], [document]) db_generate_btn.click(initialize_database, inputs=[document, slider_chunk_size, slider_chunk_overlap], outputs=[vector_db, collection_name, db_progress]) llm_generate_btn.click(initialize_LLM, inputs=[llm_btn, slider_temperature, slider_maxtokens, slider_topk, vector_db], outputs=[qa_chain, llm_progress]) msg.submit(conversation, [qa_chain, msg, chatbot], [qa_chain, msg, chatbot, response_source1, response_source1_page, response_source2, response_source2_page, response_source3, response_source3_page]) clear.click(lambda: None, None, chatbot, queue=False) return demo if __name__ == "__main__": demo().queue().launch(debug=True, server_port=7861) # Use a different port, e.g., 7861