Spaces:
Sleeping
Sleeping
import logging | |
from pathlib import Path | |
_logger = logging.getLogger("semantic") | |
from operator import itemgetter | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.runnables.base import RunnableSequence | |
from langchain_core.vectorstores import VectorStore | |
from langchain.retrievers.multi_query import MultiQueryRetriever | |
from langchain_community.vectorstores import Qdrant | |
from langchain.schema.output_parser import StrOutputParser | |
from langchain.schema.runnable import RunnablePassthrough | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.document_loaders import PyMuPDFLoader | |
from langchain_experimental.text_splitter import SemanticChunker | |
from globals import ( | |
embeddings, | |
gpt35_model, | |
gpt4_model, | |
META_10K_FILE_PATH, | |
META_SEMANTIC_COLLECTION, | |
VECTOR_STORE_PATH | |
) | |
USE_MEMORY = True | |
from qdrant_client import QdrantClient | |
qclient: QdrantClient | |
if USE_MEMORY == True: | |
qclient = QdrantClient(":memory:") | |
else: | |
qclient = QdrantClient(path=VECTOR_STORE_PATH) | |
RAG_PROMPT = """ | |
Reply the user's query thoughtfully and clearly. | |
You should only respond to user's query if the context is related to the query. | |
If you are not sure how to answer, please reply "I don't know". | |
Respond with structure in markdown. | |
CONTEXT: | |
{context} | |
QUERY: | |
{question} | |
YOUR REPLY: """ | |
rag_prompt = ChatPromptTemplate.from_template(RAG_PROMPT) | |
class SemanticStoreFactory: | |
_semantic_vectorstore: VectorStore = None | |
def __load_semantic_store( | |
cls | |
) -> VectorStore: | |
path = Path(VECTOR_STORE_PATH) | |
store = None | |
# check if path exists and if it is not empty | |
if path.exists() and path.is_dir() and any(path.iterdir()): | |
_logger.info(f"\tQdrant loading ...") | |
store = Qdrant( | |
client=qclient, | |
embeddings=embeddings, | |
collection_name=META_SEMANTIC_COLLECTION, | |
) | |
else: | |
_logger.info(f"\tQdrant creating ...") | |
store = cls.__create_semantic_store() | |
return store | |
def __create_semantic_store( | |
cls | |
) -> VectorStore: | |
if USE_MEMORY == True: | |
_logger.info(f"creating semantic vector store: {USE_MEMORY}") | |
else: | |
_logger.info(f"creating semantic vector store: {VECTOR_STORE_PATH}") | |
path = Path(VECTOR_STORE_PATH) | |
if not path.exists(): | |
path.mkdir(parents=True, exist_ok=True) | |
_logger.info(f"Directory '{path}' created.") | |
documents = PyMuPDFLoader(META_10K_FILE_PATH).load() | |
semantic_chunker = SemanticChunker( | |
embeddings=embeddings, | |
breakpoint_threshold_type="percentile" | |
) | |
semantic_chunks = semantic_chunker.create_documents( | |
[d.page_content for d in documents] | |
) | |
_logger.info(f"created semantic_chunks: {len(semantic_chunks)}") | |
if USE_MEMORY == True: | |
_logger.info(f"\t==> creating memory vectorstore ...") | |
semantic_chunk_vectorstore = Qdrant.from_documents( | |
semantic_chunks, | |
embeddings, | |
location=":memory:", | |
collection_name=META_SEMANTIC_COLLECTION, | |
force_recreate=True | |
) | |
_logger.info(f"\t==> finished constructing vectorstore") | |
else: | |
semantic_chunk_vectorstore = Qdrant.from_documents( | |
semantic_chunks, | |
embeddings, | |
path=VECTOR_STORE_PATH, | |
collection_name=META_SEMANTIC_COLLECTION, | |
force_recreate=True | |
) | |
_logger.info(f"\t==> return vectorstore {META_SEMANTIC_COLLECTION}") | |
return semantic_chunk_vectorstore | |
def get_semantic_store( | |
cls | |
) -> VectorStore: | |
_logger.info(f"get_semantic_store") | |
if cls._semantic_vectorstore is None: | |
if USE_MEMORY == True: | |
cls._semantic_vectorstore = cls.__create_semantic_store() | |
_logger.info(f"received semantic_vectorstore") | |
else: | |
print(f"Loading semantic vectorstore {META_SEMANTIC_COLLECTION} from: {VECTOR_STORE_PATH}") | |
try: | |
# first try to load the store | |
cls._semantic_vectorstore = cls.__load_semantic_store() | |
except Exception as e: | |
_logger.warning(f"cannot load: {e}") | |
cls._semantic_vectorstore = cls.__create_semantic_store() | |
_logger.info(f"RETURNING get_semantic_store") | |
return cls._semantic_vectorstore | |
class SemanticRAGChainFactory: | |
_chain: RunnableSequence = None | |
def get_semantic_rag_chain( | |
cls | |
) -> RunnableSequence: | |
if cls._chain is None: | |
_logger.info(f"creating SemanticRAGChainFactory") | |
semantic_store = SemanticStoreFactory.get_semantic_store() | |
if semantic_store is not None: | |
semantic_chunk_retriever = semantic_store.as_retriever() | |
semantic_mquery_retriever = MultiQueryRetriever.from_llm( | |
retriever=semantic_chunk_retriever, | |
llm=gpt4_model | |
) | |
cls._chain = ( | |
# INVOKE CHAIN WITH: {"question" : "<<SOME USER QUESTION>>"} | |
# "question" : populated by getting the value of the "question" key | |
# "context" : populated by getting the value of the "question" key and chaining it into the base_retriever | |
{"context": itemgetter("question") | semantic_mquery_retriever, "question": itemgetter("question")} | |
# "context" : is assigned to a RunnablePassthrough object (will not be called or considered in the next step) | |
# by getting the value of the "context" key from the previous step | |
| RunnablePassthrough.assign(context=itemgetter("context")) | |
# "response" : the "context" and "question" values are used to format our prompt object and then piped | |
# into the LLM and stored in a key called "response" | |
# "context" : populated by getting the value of the "context" key from the previous step | |
| {"response": rag_prompt | gpt4_model, "context": itemgetter("context")} | |
) | |
_logger.info(f"\t_chain constructed") | |
return cls._chain |