tutor_dev / modules /dataloader /data_loader.py
XThomasBU
updates
60929fd
raw
history blame
16.9 kB
import os
import re
import requests
import pysrt
from langchain_community.document_loaders import (
Docx2txtLoader,
YoutubeLoader,
TextLoader,
)
from langchain.schema import Document
import logging
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai.embeddings import OpenAIEmbeddings
import json
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin
import html2text
import bs4
import PyPDF2
from modules.dataloader.pdf_readers.base import PDFReader
from modules.dataloader.pdf_readers.llama import LlamaParser
from modules.dataloader.pdf_readers.gpt import GPTParser
from modules.dataloader.helpers import get_metadata
from modules.config.constants import TIMEOUT
logger = logging.getLogger(__name__)
BASE_DIR = os.getcwd()
class HTMLReader:
def __init__(self):
pass
def read_url(self, url):
response = requests.get(url, timeout=TIMEOUT)
if response.status_code == 200:
return response.text
else:
logger.warning(f"Failed to download HTML from URL: {url}")
return None
def check_links(self, base_url, html_content):
soup = bs4.BeautifulSoup(html_content, "html.parser")
for link in soup.find_all("a"):
href = link.get("href")
if not href or href.startswith("#"):
continue
elif not href.startswith("https"):
href = href.replace("http", "https")
absolute_url = urljoin(base_url, href)
link["href"] = absolute_url
resp = requests.head(absolute_url, timeout=TIMEOUT)
if resp.status_code != 200:
logger.warning(
f"Link {absolute_url} is broken. Status code: {resp.status_code}"
)
return str(soup)
def html_to_md(self, url, html_content):
html_processed = self.check_links(url, html_content)
markdown_content = html2text.html2text(html_processed)
return markdown_content
def read_html(self, url):
html_content = self.read_url(url)
if html_content:
return self.html_to_md(url, html_content)
else:
return None
class FileReader:
def __init__(self, logger, kind):
self.logger = logger
self.kind = kind
if kind == "llama":
self.pdf_reader = LlamaParser()
elif kind == "gpt":
self.pdf_reader = GPTParser()
else:
self.pdf_reader = PDFReader()
self.web_reader = HTMLReader()
self.logger.info(
f"Initialized FileReader with {kind} PDF reader and HTML reader"
)
def extract_text_from_pdf(self, pdf_path):
text = ""
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
num_pages = len(reader.pages)
for page_num in range(num_pages):
page = reader.pages[page_num]
text += page.extract_text()
return text
def read_pdf(self, temp_file_path: str):
documents = self.pdf_reader.parse(temp_file_path)
return documents
def read_txt(self, temp_file_path: str):
loader = TextLoader(temp_file_path, autodetect_encoding=True)
return loader.load()
def read_docx(self, temp_file_path: str):
loader = Docx2txtLoader(temp_file_path)
return loader.load()
def read_srt(self, temp_file_path: str):
subs = pysrt.open(temp_file_path)
text = ""
for sub in subs:
text += sub.text
return [Document(page_content=text)]
def read_youtube_transcript(self, url: str):
loader = YoutubeLoader.from_youtube_url(
url, add_video_info=True, language=["en"], translation="en"
)
return loader.load()
def read_html(self, url: str):
return [Document(page_content=self.web_reader.read_html(url))]
def read_tex_from_url(self, tex_url):
response = requests.get(tex_url, timeout=TIMEOUT)
if response.status_code == 200:
return [Document(page_content=response.text)]
else:
self.logger.error(f"Failed to fetch .tex file from URL: {tex_url}")
return None
class ChunkProcessor:
def __init__(self, config, logger):
self.config = config
self.logger = logger
self.document_data = {}
self.document_metadata = {}
self.document_chunks_full = []
# TODO: Fix when reparse_files is False
if not config["vectorstore"]["reparse_files"]:
self.load_document_data()
if config["splitter_options"]["use_splitter"]:
if config["splitter_options"]["chunking_mode"] == "fixed":
if config["splitter_options"]["split_by_token"]:
self.splitter = (
RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=config["splitter_options"]["chunk_size"],
chunk_overlap=config["splitter_options"]["chunk_overlap"],
separators=config["splitter_options"]["chunk_separators"],
disallowed_special=(),
)
)
else:
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=config["splitter_options"]["chunk_size"],
chunk_overlap=config["splitter_options"]["chunk_overlap"],
separators=config["splitter_options"]["chunk_separators"],
disallowed_special=(),
)
else:
self.splitter = SemanticChunker(
OpenAIEmbeddings(), breakpoint_threshold_type="percentile"
)
else:
self.splitter = None
self.logger.info("ChunkProcessor instance created")
def remove_delimiters(self, document_chunks: list):
for chunk in document_chunks:
for delimiter in self.config["splitter_options"]["delimiters_to_remove"]:
chunk.page_content = re.sub(delimiter, " ", chunk.page_content)
return document_chunks
def remove_chunks(self, document_chunks: list):
front = self.config["splitter_options"]["front_chunk_to_remove"]
end = self.config["splitter_options"]["last_chunks_to_remove"]
for _ in range(front):
del document_chunks[0]
for _ in range(end):
document_chunks.pop()
return document_chunks
def process_chunks(
self, documents, file_type="txt", source="", page=0, metadata={}
):
# TODO: Clear up this pipeline of re-adding metadata
documents = [Document(page_content=documents, source=source, page=page)]
if (
file_type == "pdf"
and self.config["splitter_options"]["chunking_mode"] == "fixed"
):
document_chunks = documents
else:
document_chunks = self.splitter.split_documents(documents)
# add the source and page number back to the metadata
for chunk in document_chunks:
chunk.metadata["source"] = source
chunk.metadata["page"] = page
# add the metadata extracted from the document
for key, value in metadata.items():
chunk.metadata[key] = value
if self.config["splitter_options"]["remove_leftover_delimiters"]:
document_chunks = self.remove_delimiters(document_chunks)
if self.config["splitter_options"]["remove_chunks"]:
document_chunks = self.remove_chunks(document_chunks)
return document_chunks
def chunk_docs(self, file_reader, uploaded_files, weblinks):
addl_metadata = get_metadata(
*self.config["metadata"]["metadata_links"], self.config
) # For any additional metadata
# remove already processed files if reparse_files is False
if not self.config["vectorstore"]["reparse_files"]:
total_documents = len(uploaded_files) + len(weblinks)
uploaded_files = [
file_path
for file_path in uploaded_files
if file_path not in self.document_data
]
weblinks = [link for link in weblinks if link not in self.document_data]
print(
f"Total documents to process: {total_documents}, Documents already processed: {total_documents - len(uploaded_files) - len(weblinks)}"
)
with ThreadPoolExecutor() as executor:
executor.map(
self.process_file,
uploaded_files,
range(len(uploaded_files)),
[file_reader] * len(uploaded_files),
[addl_metadata] * len(uploaded_files),
)
executor.map(
self.process_weblink,
weblinks,
range(len(weblinks)),
[file_reader] * len(weblinks),
[addl_metadata] * len(weblinks),
)
document_names = [
f"{file_name}_{page_num}"
for file_name, pages in self.document_data.items()
for page_num in pages.keys()
]
documents = [
page for doc in self.document_data.values() for page in doc.values()
]
document_metadata = [
page for doc in self.document_metadata.values() for page in doc.values()
]
self.save_document_data()
self.logger.info(
f"Total document chunks extracted: {len(self.document_chunks_full)}"
)
return self.document_chunks_full, document_names, documents, document_metadata
def process_documents(
self, documents, file_path, file_type, metadata_source, addl_metadata
):
file_data = {}
file_metadata = {}
for doc in documents:
# if len(doc.page_content) <= 400: # better approach to filter out non-informative documents
# continue
page_num = doc.metadata.get("page", 0)
file_data[page_num] = doc.page_content
# Create a new dictionary for metadata in each iteration
metadata = addl_metadata.get(file_path, {}).copy()
metadata["page"] = page_num
metadata["source"] = file_path
file_metadata[page_num] = metadata
if self.config["vectorstore"]["db_option"] not in ["RAGatouille"]:
document_chunks = self.process_chunks(
doc.page_content,
file_type,
source=file_path,
page=page_num,
metadata=metadata,
)
self.document_chunks_full.extend(document_chunks)
self.document_data[file_path] = file_data
self.document_metadata[file_path] = file_metadata
def process_file(self, file_path, file_index, file_reader, addl_metadata):
print(f"Processing file {file_index + 1} : {file_path}")
file_name = os.path.basename(file_path)
file_type = file_name.split(".")[-1]
read_methods = {
"pdf": file_reader.read_pdf,
"txt": file_reader.read_txt,
"docx": file_reader.read_docx,
"srt": file_reader.read_srt,
"tex": file_reader.read_tex_from_url,
}
if file_type not in read_methods:
self.logger.warning(f"Unsupported file type: {file_type}")
return
try:
if file_path in self.document_data:
self.logger.warning(f"File {file_name} already processed")
documents = [
Document(page_content=content)
for content in self.document_data[file_path].values()
]
else:
documents = read_methods[file_type](file_path)
self.process_documents(
documents, file_path, file_type, "file", addl_metadata
)
except Exception as e:
self.logger.error(f"Error processing file {file_name}: {str(e)}")
def process_weblink(self, link, link_index, file_reader, addl_metadata):
if link in self.document_data:
return
self.logger.info(f"Reading link {link_index + 1} : {link}")
try:
if "youtube" in link:
documents = file_reader.read_youtube_transcript(link)
else:
documents = file_reader.read_html(link)
self.process_documents(documents, link, "txt", "link", addl_metadata)
except Exception as e:
self.logger.error(f"Error Reading link {link_index + 1} : {link}: {str(e)}")
def save_document_data(self):
if not os.path.exists(f"{self.config['log_chunk_dir']}/docs"):
os.makedirs(f"{self.config['log_chunk_dir']}/docs")
self.logger.info(
f"Creating directory {self.config['log_chunk_dir']}/docs for document data"
)
self.logger.info(
f"Saving document content to {self.config['log_chunk_dir']}/docs/doc_content.json"
)
if not os.path.exists(f"{self.config['log_chunk_dir']}/metadata"):
os.makedirs(f"{self.config['log_chunk_dir']}/metadata")
self.logger.info(
f"Creating directory {self.config['log_chunk_dir']}/metadata for document metadata"
)
self.logger.info(
f"Saving document metadata to {self.config['log_chunk_dir']}/metadata/doc_metadata.json"
)
with open(
f"{self.config['log_chunk_dir']}/docs/doc_content.json", "w"
) as json_file:
json.dump(self.document_data, json_file, indent=4)
with open(
f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "w"
) as json_file:
json.dump(self.document_metadata, json_file, indent=4)
def load_document_data(self):
try:
with open(
f"{self.config['log_chunk_dir']}/docs/doc_content.json", "r"
) as json_file:
self.document_data = json.load(json_file)
with open(
f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "r"
) as json_file:
self.document_metadata = json.load(json_file)
self.logger.info(
f"Loaded document content from {self.config['log_chunk_dir']}/docs/doc_content.json. Total documents: {len(self.document_data)}"
)
except FileNotFoundError:
self.logger.warning(
f"Document content not found in {self.config['log_chunk_dir']}/docs/doc_content.json"
)
self.document_data = {}
self.document_metadata = {}
class DataLoader:
def __init__(self, config, logger=None):
self.file_reader = FileReader(
logger=logger, kind=config["llm_params"]["pdf_reader"]
)
self.chunk_processor = ChunkProcessor(config, logger=logger)
def get_chunks(self, uploaded_files, weblinks):
return self.chunk_processor.chunk_docs(
self.file_reader, uploaded_files, weblinks
)
if __name__ == "__main__":
import yaml
import argparse
parser = argparse.ArgumentParser(description="Process some links.")
parser.add_argument(
"--links", nargs="+", required=True, help="List of links to process."
)
parser.add_argument(
"--config_file", type=str, help="Path to the main config file", required=True
)
parser.add_argument(
"--project_config_file",
type=str,
help="Path to the project config file",
required=True,
)
args = parser.parse_args()
links_to_process = args.links
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
with open(args.config_file, "r") as f:
config = yaml.safe_load(f)
with open(args.project_config_file, "r") as f:
project_config = yaml.safe_load(f)
# Combine project config with the main config
config.update(project_config)
STORAGE_DIR = os.path.join(BASE_DIR, config["vectorstore"]["data_path"])
uploaded_files = [
os.path.join(STORAGE_DIR, file)
for file in os.listdir(STORAGE_DIR)
if file != "urls.txt"
]
data_loader = DataLoader(config, logger=logger)
# Just for testing
(
document_chunks,
document_names,
documents,
document_metadata,
) = data_loader.get_chunks(
links_to_process,
[],
)
print(document_names[:5])
print(len(document_chunks))