DocsGPT / app.py
nnpy's picture
Upload 3 files
d988646 verified
import os
from io import BytesIO
from PIL import Image
import google.generativeai as genai
import google.ai.generativelanguage as glm
from langchain.vectorstores import Chroma
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import streamlit as st
st.title("DocsGPT")
genai.configure(api_key=os.environ['GOOGLE_API_KEY'])
st.markdown(
"""
<style>
.css-1jc7ptx, .e1ewe7hr3, .viewerBadge_container__1QSob,
.styles_viewerBadge__1yB5_, .viewerBadge_link__1S137,
.viewerBadge_text__1JaDK {
display: none;
}
</style>
""",
unsafe_allow_html=True
)
rag = glm.Tool(
function_declarations=[
glm.FunctionDeclaration(
name='vector_search',
description="Returns the content of the document user attached. Make sure that your not passing query as a question use like **keywords** instead. Use this function to search for contents in the user attached or uploaded documents to you. Try not to completly paste the user question as query, instead use keywords.",
parameters=glm.Schema(
type=glm.Type.OBJECT,
properties={
'query': glm.Schema(type=glm.Type.STRING),
},
required=['query']
)
)
]
)
gemini = genai.GenerativeModel('gemini-pro', tools=[rag])
gemini_vision = genai.GenerativeModel('gemini-pro-vision')
class rawkn:
def __init__(self, text):
self.text = text
def get_relevant_documents(self, query):
return self.text
def loader_data(files, include_getting_real):
file_type = files[0].type if len(files) > 0 else "application/pdf"
total_content = ''
num_pages = 0
if include_getting_real:
files.append("./getting_real_basecamp.pdf")
for file in files:
if file_type == "application/pdf":
pdf_reader = PdfReader(file)
content = ''
for page in pdf_reader.pages:
num_pages += 1
content += page.extract_text()
for img in page.images:
try:
image_stream = BytesIO(img.data)
img = Image.open(image_stream)
img_desc = gemini_vision.generate_content(["Generate a detailed description of the image. If it is a flow chart, please create a flowchart that exactly as it is. If it is table, try to create a table exactly like in the image. write all the text in the image it it contains any text. Clearly explain the image in more detailed.\nAlso make sure give a nice heading to the image contant.", img]).candidates[0].content.parts[0].text
print("***************************")
print(img_desc)
print("***************************")
content += "Image content:\n" + img_desc
except:
print("cannot extract image")
if file_type == "text/plain":
content = file.read()
content = content.decode("utf-8")
total_content += content
if num_pages <= 2:
chunk_size = 500
elif num_pages <= 3:
chunk_size = 1000
elif num_pages <= 5:
chunk_size = 2000
elif num_pages <= 10:
chunk_size = 3000
else:
chunk_size = 4000
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=0)
texts = text_splitter.split_text(total_content)
try:
embeddings = GoogleGenerativeAIEmbeddings(model = "models/embedding-001")
vector_store = Chroma.from_texts(texts, embeddings).as_retriever()
st.session_state.knowledge = vector_store
st.session_state.chat.history.append(glm.Content(
parts=[glm.Part(
text=f"Now i've uploaded some files.\nHere are the list of documents you have access to:\n{[i.name if type(i) != str else i for i in files]}"
)],
role="user"
)
)
st.session_state.chat.history.append(glm.Content(
parts=[glm.Part(
text=f"Sure! Ask me anything about the documents you have uploaded. I can help you with that."
)],
role="model"
)
)
except:
st.session_state.knowledge = rawkn(total_content)
if "history" not in st.session_state:
st.session_state.history = []
if "knowledge" not in st.session_state:
st.session_state.knowledge = None
if "chat" not in st.session_state:
st.session_state.chat = gemini.start_chat(history=[glm.Content(
parts=[glm.Part(
text="Your name is DocsGPT. You are very helpful and can assist with documents uploaded by the user. Use the vector_search tool/function to search for contents in the user attached or uploaded documents to you.\nYou have access to all documents uploaded by the user."
)],
role="user"
),
glm.Content(
parts=[glm.Part(
text="Sure, i can do that for you."
)],
role="model"
)])
for history in st.session_state.history:
with st.chat_message(history["role"]):
st.markdown(history["text"])
with st.sidebar:
st.title("Knowledge")
st.markdown("""### Tips to use DocsGPT:
- Upload your documents [pdf, txt] to DocsGPT and make sure to click on the process button.
- wait for a second and then start chatting with DocsGPT.
- While asking questions to DocsGPT about your uploaded files, please refer your uploaded files as *Document*, *Docs*, *attached or uploaded docs*, so the model can easily understands what you are referring to.""")
files = st.file_uploader("Upload a file", accept_multiple_files=True, type=["pdf", "txt"])
include_getting_real = st.checkbox("Include getting-real?")
process = st.button("Process")
if process and files:
with st.spinner('loading your file. This may take a while...'):
loader_data(files, include_getting_real)
elif process and include_getting_real:
with st.spinner('loading your file. This may take a while...'):
loader_data([], include_getting_real)
if prompt := st.chat_input("Enter your message..."):
st.session_state.history.append({"role": "user", "text": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
message_placeholder = st.empty()
response = st.session_state.chat.send_message(prompt)
if response.candidates[0].content.parts[0].text == '':
args = response.candidates[0].content.parts[0].function_call.args['query']
if st.session_state.knowledge is not None:
print("searching for ", args)
related_docs = str(st.session_state.knowledge.get_relevant_documents(args))
print(related_docs)
else:
related_docs = 'No knowledge documents loaded'
response = st.session_state.chat.send_message(
glm.Content(
parts=[glm.Part(
function_response = glm.FunctionResponse(
name='vector_search',
response={'rag': related_docs},
)
)]
)
).candidates[0].content.parts[0].text
else:
response = response.candidates[0].content.parts[0].text
print(st.session_state.chat.history)
message_placeholder.markdown(response)
st.session_state.history.append({"role": "assistant", "text": response})