innofinderai / core-langchain-rag.py
sabazo's picture
added rag project core files
4b9ea4c unverified
raw
history blame
11.1 kB
# Importing necessary libraries
import sys
import os
import time
# # Importing RecursiveUrlLoader for web scraping and BeautifulSoup for HTML parsing
# from langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader
# from bs4 import BeautifulSoup as Soup
# import mimetypes
# # List of URLs to scrape
# urls = ["https://langchain-doc.readthedocs.io/en/latest"]
# # Initialize an empty list to store the documents
# docs = []
# # Looping through each URL in the list - this could take some time!
# stf = time.time() # Start time for performance measurement
# for url in urls:
# try:
# st = time.time() # Start time for performance measurement
# # Create a RecursiveUrlLoader instance with a specified URL and depth
# # The extractor function uses BeautifulSoup to parse the HTML content and extract text
# loader = RecursiveUrlLoader(url=url, max_depth=5, extractor=lambda x: Soup(x, "html.parser").text)
# # Load the documents from the URL and extend the docs list
# docs.extend(loader.load())
# et = time.time() - st # Calculate time taken for splitting
# print(f'Time taken for downloading documents from {url}: {et} seconds.')
# except Exception as e:
# # Print an error message if there is an issue with loading or parsing the URL
# print(f"Failed to load or parse the URL {url}. Error: {e}", file=sys.stderr)
# etf = time.time() - stf # Calculate time taken for splitting
# print(f'Total time taken for downloading {len(docs)} documents: {etf} seconds.')
# # Import necessary modules for text splitting and vectorization
# from langchain.text_splitter import RecursiveCharacterTextSplitter
# import time
# from langchain_community.vectorstores import FAISS
# from langchain.vectorstores.utils import filter_complex_metadata
# from langchain_community.embeddings import HuggingFaceEmbeddings
# # Configure the text splitter
# text_splitter = RecursiveCharacterTextSplitter(
# separators=["\n\n", "\n", "(?<=\. )", " ", ""], # Define the separators for splitting text
# chunk_size=500, # The size of each text chunk
# chunk_overlap=50, # Overlap between chunks to ensure continuity
# length_function=len, # Function to determine the length of each chunk
# )
# try:
# # Stage one: Splitting the documents into chunks for vectorization
# st = time.time() # Start time for performance measurement
# print('Loading documents and creating chunks ...')
# # Split each document into chunks using the configured text splitter
# chunks = text_splitter.create_documents([doc.page_content for doc in docs], metadatas=[doc.metadata for doc in docs])
# et = time.time() - st # Calculate time taken for splitting
# print(f"created "+chunks+" chunks")
# print(f'Time taken for document chunking: {et} seconds.')
# except Exception as e:
# print(f"Error during document chunking: {e}", file=sys.stderr)
# # Path for saving the FAISS index
# FAISS_INDEX_PATH = "./vectorstore/lc-faiss-multi-mpnet-500"
# try:
# # Stage two: Vectorization of the document chunks
# model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1" # Model used for embedding
# # Initialize HuggingFace embeddings with the specified model
# embeddings = HuggingFaceEmbeddings(model_name=model_name)
# print(f'Loading chunks into vector store ...')
# st = time.time() # Start time for performance measurement
# # Create a FAISS vector store from the document chunks and save it locally
# db = FAISS.from_documents(filter_complex_metadata(chunks), embeddings)
# db.save_local(FAISS_INDEX_PATH)
# et = time.time() - st # Calculate time taken for vectorization
# print(f'Time taken for vectorization and saving: {et} seconds.')
# except Exception as e:
# print(f"Error during vectorization or FAISS index saving: {e}", file=sys.stderr)
# alternatively download a preparaed vectorized index from S3 and load the index into vectorstore
# Import necessary libraries for AWS S3 interaction, file handling, and FAISS vector stores
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import zipfile
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from dotenv import load_dotenv
# Load environment variables from a .env file
config = load_dotenv(".env")
# Retrieve the Hugging Face API token from environment variables
HUGGINGFACEHUB_API_TOKEN = os.getenv('HUGGINGFACEHUB_API_TOKEN')
S3_LOCATION = os.getenv("S3_LOCATION")
try:
# Initialize an S3 client with unsigned configuration for public access
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
# Define the FAISS index path and the destination for the downloaded file
FAISS_INDEX_PATH = './vectorstore/lc-faiss-multi-mpnet-500-markdown'
VS_DESTINATION = FAISS_INDEX_PATH + ".zip"
# Download the pre-prepared vectorized index from the S3 bucket
print("Downloading the pre-prepared vectorized index from S3...")
s3.download_file(S3_LOCATION, 'vectorstores/lc-faiss-multi-mpnet-500-markdown.zip', VS_DESTINATION)
# Extract the downloaded zip file
with zipfile.ZipFile(VS_DESTINATION, 'r') as zip_ref:
zip_ref.extractall('./vectorstore/')
print("Download and extraction completed.")
except Exception as e:
print(f"Error during downloading or extracting from S3: {e}", file=sys.stderr)
# Define the model name for embeddings
model_name = "sentence-transformers/multi-qa-mpnet-base-dot-v1"
try:
# Initialize HuggingFace embeddings with the specified model
embeddings = HuggingFaceEmbeddings(model_name=model_name)
# Load the local FAISS index with the specified embeddings
db = FAISS.load_local(FAISS_INDEX_PATH, embeddings)
print("FAISS index loaded successfully.")
except Exception as e:
print(f"Error during FAISS index loading: {e}", file=sys.stderr)
# Import necessary modules for environment variable management and HuggingFace integration
from langchain_community.llms import HuggingFaceHub
# Initialize the vector store as a retriever for the RAG pipeline
retriever = db.as_retriever()
try:
# Load the model from the Hugging Face Hub
model_id = HuggingFaceHub(repo_id="mistralai/Mixtral-8x7B-Instruct-v0.1", model_kwargs={
"temperature": 0.1, # Controls randomness in response generation (lower value means less random)
"max_new_tokens": 1024, # Maximum number of new tokens to generate in responses
"repetition_penalty": 1.2, # Penalty for repeating the same words (higher value increases penalty)
"return_full_text": False # If False, only the newly generated text is returned; if True, the input is included as well
})
print("Model loaded successfully from Hugging Face Hub.")
except Exception as e:
print(f"Error loading model from Hugging Face Hub: {e}", file=sys.stderr)
# Importing necessary modules for retrieval-based question answering and prompt handling
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
# Declare a global variable 'qa' for the retrieval-based question answering system
global qa
# Define a prompt template for guiding the model's responses
template = """
You are the friendly documentation buddy Arti, if you don't know the answer say 'I don't know' and don't make things up.\
Use the following context (delimited by <ctx></ctx>) and the chat history (delimited by <hs></hs>) to answer the question :
------
<ctx>
{context}
</ctx>
------
<hs>
{history}
</hs>
------
{question}
Answer:
"""
# Create a PromptTemplate object with specified input variables and the defined template
prompt = PromptTemplate.from_template(
#input_variables=["history", "context", "question"], # Variables to be included in the prompt
template=template, # The prompt template as defined above
)
prompt.format(context="context", history="history", question="question")
# Create a memory buffer to manage conversation history
memory = ConversationBufferMemory(
memory_key="history", # Key for storing the conversation history
input_key="question" # Key for the input question
)
# Initialize the RetrievalQA object with the specified model, retriever, and additional configurations
qa = RetrievalQA.from_chain_type(
llm=model_id, # Language model loaded from Hugging Face Hub
retriever=retriever, # The vector store retriever initialized earlier
return_source_documents=True, # Option to return source documents along with responses
chain_type_kwargs={
"verbose": True, # Enables verbose output for debugging and analysis
"memory": memory, # Memory buffer for managing conversation history
"prompt": prompt # Prompt template for guiding the model's responses
}
)
# Import Gradio for UI, along with other necessary libraries
import gradio as gr
import random
import time
# Function to add a new input to the chat history
def add_text(history, text):
# Append the new text to the history with a placeholder for the response
history = history + [(text, None)]
return history, ""
# Function representing the bot's response mechanism
def bot(history):
# Obtain the response from the 'infer' function using the latest input
response = infer(history[-1][0], history)
# Update the history with the bot's response
history[-1][1] = response['result']
return history
# Function to infer the response using the RAG model
def infer(question, history):
# Use the question and history to query the RAG model
result = qa({"query": question, "history": history, "question": question})
return result
# CSS styling for the Gradio interface
css = """
#col-container {max-width: 700px; margin-left: auto; margin-right: auto;}
"""
# HTML content for the Gradio interface title
title = """
<div style="text-align: center;max-width: 700px;">
<h1>Chat with your Documentation</h1>
<p style="text-align: center;">Chat with LangChain Documentation, <br />
You can ask questions about the LangChain docu ;)</p>
</div>
"""
# Building the Gradio interface
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.HTML(title) # Add the HTML title to the interface
chatbot = gr.Chatbot([], elem_id="chatbot") # Initialize the chatbot component
clear = gr.Button("Clear") # Add a button to clear the chat
# Create a row for the question input
with gr.Row():
question = gr.Textbox(label="Question", placeholder="Type your question and hit Enter ")
# Define the action when the question is submitted
question.submit(add_text, [chatbot, question], [chatbot, question], queue=False).then(
bot, chatbot, chatbot
)
# Define the action for the clear button
clear.click(lambda: None, None, chatbot, queue=False)
# Launch the Gradio demo interface
demo.launch(share=False)