import os |
import re |
import requests |
import pysrt |
from langchain_community.document_loaders import ( |
PyMuPDFLoader, |
Docx2txtLoader, |
YoutubeLoader, |
WebBaseLoader, |
TextLoader, |
) |
from langchain_community.document_loaders import UnstructuredMarkdownLoader |
from llama_parse import LlamaParse |
from langchain.schema import Document |
import logging |
from langchain.text_splitter import RecursiveCharacterTextSplitter |
from ragatouille import RAGPretrainedModel |
from langchain.chains import LLMChain |
from langchain_community.llms import OpenAI |
from langchain import PromptTemplate |
import json |
from concurrent.futures import ThreadPoolExecutor |
from modules.dataloader.helpers import get_metadata |
class PDFReader: |
def __init__(self): |
pass |
def get_loader(self, pdf_path): |
loader = PyMuPDFLoader(pdf_path) |
return loader |
def get_documents(self, loader): |
return loader.load() |
class FileReader: |
def __init__(self, logger): |
self.pdf_reader = PDFReader() |
self.logger = logger |
def extract_text_from_pdf(self, pdf_path): |
text = "" |
with open(pdf_path, "rb") as file: |
reader = PyPDF2.PdfReader(file) |
num_pages = len(reader.pages) |
for page_num in range(num_pages): |
page = reader.pages[page_num] |
text += page.extract_text() |
return text |
def download_pdf_from_url(self, pdf_url): |
response = requests.get(pdf_url) |
if response.status_code == 200: |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: |
temp_file.write(response.content) |
temp_file_path = temp_file.name |
return temp_file_path |
else: |
self.logger.error(f"Failed to download PDF from URL: {pdf_url}") |
return None |
def read_pdf(self, temp_file_path: str): |
loader = self.pdf_reader.get_loader(temp_file_path) |
documents = self.pdf_reader.get_documents(loader) |
return documents |
def read_txt(self, temp_file_path: str): |
loader = TextLoader(temp_file_path, autodetect_encoding=True) |
return loader.load() |
def read_docx(self, temp_file_path: str): |
loader = Docx2txtLoader(temp_file_path) |
return loader.load() |
def read_srt(self, temp_file_path: str): |
subs = pysrt.open(temp_file_path) |
text = "" |
for sub in subs: |
text += sub.text |
return [Document(page_content=text)] |
def read_youtube_transcript(self, url: str): |
loader = YoutubeLoader.from_youtube_url( |
url, add_video_info=True, language=["en"], translation="en" |
) |
return loader.load() |
def read_html(self, url: str): |
loader = WebBaseLoader(url) |
return loader.load() |
def read_tex_from_url(self, tex_url): |
response = requests.get(tex_url) |
if response.status_code == 200: |
return [Document(page_content=response.text)] |
else: |
self.logger.error(f"Failed to fetch .tex file from URL: {tex_url}") |
return None |
class ChunkProcessor: |
def __init__(self, config, logger): |
self.config = config |
self.logger = logger |
self.document_data = {} |
self.document_metadata = {} |
self.document_chunks_full = [] |
if config["splitter_options"]["use_splitter"]: |
if config["splitter_options"]["split_by_token"]: |
self.splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( |
chunk_size=config["splitter_options"]["chunk_size"], |
chunk_overlap=config["splitter_options"]["chunk_overlap"], |
separators=config["splitter_options"]["chunk_separators"], |
disallowed_special=(), |
) |
else: |
self.splitter = RecursiveCharacterTextSplitter( |
chunk_size=config["splitter_options"]["chunk_size"], |
chunk_overlap=config["splitter_options"]["chunk_overlap"], |
separators=config["splitter_options"]["chunk_separators"], |
disallowed_special=(), |
) |
else: |
self.splitter = None |
self.logger.info("ChunkProcessor instance created") |
def remove_delimiters(self, document_chunks: list): |
for chunk in document_chunks: |
for delimiter in self.config["splitter_options"]["delimiters_to_remove"]: |
chunk.page_content = re.sub(delimiter, " ", chunk.page_content) |
return document_chunks |
def remove_chunks(self, document_chunks: list): |
front = self.config["splitter_options"]["front_chunk_to_remove"] |
end = self.config["splitter_options"]["last_chunks_to_remove"] |
for _ in range(front): |
del document_chunks[0] |
for _ in range(end): |
document_chunks.pop() |
return document_chunks |
def process_chunks( |
self, documents, file_type="txt", source="", page=0, metadata={} |
): |
documents = [Document(page_content=documents, source=source, page=page)] |
if ( |
file_type == "txt" |
or file_type == "docx" |
or file_type == "srt" |
or file_type == "tex" |
): |
document_chunks = self.splitter.split_documents(documents) |
elif file_type == "pdf": |
document_chunks = documents |
for chunk in document_chunks: |
chunk.metadata["source"] = source |
chunk.metadata["page"] = page |
for key, value in metadata.items(): |
chunk.metadata[key] = value |
if self.config["splitter_options"]["remove_leftover_delimiters"]: |
document_chunks = self.remove_delimiters(document_chunks) |
if self.config["splitter_options"]["remove_chunks"]: |
document_chunks = self.remove_chunks(document_chunks) |
return document_chunks |
def chunk_docs(self, file_reader, uploaded_files, weblinks): |
addl_metadata = get_metadata( |
"https://dl4ds.github.io/sp2024/lectures/", |
"https://dl4ds.github.io/sp2024/schedule/", |
) |
with ThreadPoolExecutor() as executor: |
executor.map( |
self.process_file, |
uploaded_files, |
range(len(uploaded_files)), |
[file_reader] * len(uploaded_files), |
[addl_metadata] * len(uploaded_files), |
) |
executor.map( |
self.process_weblink, |
weblinks, |
range(len(weblinks)), |
[file_reader] * len(weblinks), |
[addl_metadata] * len(weblinks), |
) |
document_names = [ |
f"{file_name}_{page_num}" |
for file_name, pages in self.document_data.items() |
for page_num in pages.keys() |
] |
documents = [ |
page for doc in self.document_data.values() for page in doc.values() |
] |
document_metadata = [ |
page for doc in self.document_metadata.values() for page in doc.values() |
] |
self.save_document_data() |
self.logger.info( |
f"Total document chunks extracted: {len(self.document_chunks_full)}" |
) |
return self.document_chunks_full, document_names, documents, document_metadata |
def process_documents( |
self, documents, file_path, file_type, metadata_source, addl_metadata |
): |
file_data = {} |
file_metadata = {} |
for doc in documents: |
page_num = doc.metadata.get("page", 0) |
file_data[page_num] = doc.page_content |
metadata = ( |
addl_metadata.get(file_path, {}) |
if metadata_source == "file" |
else {"source": file_path, "page": page_num} |
) |
file_metadata[page_num] = metadata |
if self.config["vectorstore"]["db_option"] not in ["RAGatouille"]: |
document_chunks = self.process_chunks( |
doc.page_content, |
file_type, |
source=file_path, |
page=page_num, |
metadata=metadata, |
) |
self.document_chunks_full.extend(document_chunks) |
self.document_data[file_path] = file_data |
self.document_metadata[file_path] = file_metadata |
def process_file(self, file_path, file_index, file_reader, addl_metadata): |
file_name = os.path.basename(file_path) |
if file_name in self.document_data: |
return |
file_type = file_name.split(".")[-1].lower() |
self.logger.info(f"Reading file {file_index + 1}: {file_path}") |
read_methods = { |
"pdf": file_reader.read_pdf, |
"txt": file_reader.read_txt, |
"docx": file_reader.read_docx, |
"srt": file_reader.read_srt, |
"tex": file_reader.read_tex_from_url, |
} |
if file_type not in read_methods: |
self.logger.warning(f"Unsupported file type: {file_type}") |
return |
try: |
documents = read_methods[file_type](file_path) |
self.process_documents( |
documents, file_path, file_type, "file", addl_metadata |
) |
except Exception as e: |
self.logger.error(f"Error processing file {file_name}: {str(e)}") |
def process_weblink(self, link, link_index, file_reader, addl_metadata): |
if link in self.document_data: |
return |
self.logger.info(f"Reading link {link_index + 1} : {link}") |
try: |
if "youtube" in link: |
documents = file_reader.read_youtube_transcript(link) |
else: |
documents = file_reader.read_html(link) |
self.process_documents(documents, link, "txt", "link", addl_metadata) |
except Exception as e: |
self.logger.error(f"Error Reading link {link_index + 1} : {link}: {str(e)}") |
def save_document_data(self): |
if not os.path.exists(f"{self.config['log_chunk_dir']}/docs"): |
os.makedirs(f"{self.config['log_chunk_dir']}/docs") |
self.logger.info( |
f"Creating directory {self.config['log_chunk_dir']}/docs for document data" |
) |
self.logger.info( |
f"Saving document content to {self.config['log_chunk_dir']}/docs/doc_content.json" |
) |
if not os.path.exists(f"{self.config['log_chunk_dir']}/metadata"): |
os.makedirs(f"{self.config['log_chunk_dir']}/metadata") |
self.logger.info( |
f"Creating directory {self.config['log_chunk_dir']}/metadata for document metadata" |
) |
self.logger.info( |
f"Saving document metadata to {self.config['log_chunk_dir']}/metadata/doc_metadata.json" |
) |
with open( |
f"{self.config['log_chunk_dir']}/docs/doc_content.json", "w" |
) as json_file: |
json.dump(self.document_data, json_file, indent=4) |
with open( |
f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "w" |
) as json_file: |
json.dump(self.document_metadata, json_file, indent=4) |
def load_document_data(self): |
with open( |
f"{self.config['log_chunk_dir']}/docs/doc_content.json", "r" |
) as json_file: |
self.document_data = json.load(json_file) |
with open( |
f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "r" |
) as json_file: |
self.document_metadata = json.load(json_file) |
class DataLoader: |
def __init__(self, config, logger=None): |
self.file_reader = FileReader(logger=logger) |
self.chunk_processor = ChunkProcessor(config, logger=logger) |
def get_chunks(self, uploaded_files, weblinks): |
return self.chunk_processor.chunk_docs( |
self.file_reader, uploaded_files, weblinks |
) |
if __name__ == "__main__": |
import yaml |
logger = logging.getLogger(__name__) |
logger.setLevel(logging.INFO) |
with open("../code/config.yml", "r") as f: |
config = yaml.safe_load(f) |
data_loader = DataLoader(config, logger=logger) |
document_chunks, document_names, documents, document_metadata = ( |
data_loader.get_chunks( |
[], |
["https://dl4ds.github.io/sp2024/"], |
) |
) |