|
import os |
|
import re |
|
import requests |
|
import pysrt |
|
from langchain_community.document_loaders import ( |
|
PyMuPDFLoader, |
|
Docx2txtLoader, |
|
YoutubeLoader, |
|
WebBaseLoader, |
|
TextLoader, |
|
) |
|
from langchain_community.document_loaders import UnstructuredMarkdownLoader |
|
from llama_parse import LlamaParse |
|
from langchain.schema import Document |
|
import logging |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from ragatouille import RAGPretrainedModel |
|
from langchain.chains import LLMChain |
|
from langchain_community.llms import OpenAI |
|
from langchain import PromptTemplate |
|
import json |
|
from concurrent.futures import ThreadPoolExecutor |
|
from urllib.parse import urljoin |
|
import html2text |
|
import bs4 |
|
import tempfile |
|
import PyPDF2 |
|
|
|
try: |
|
from modules.dataloader.helpers import get_metadata, download_pdf_from_url |
|
from modules.config.constants import OPENAI_API_KEY, LLAMA_CLOUD_API_KEY |
|
except: |
|
from dataloader.helpers import get_metadata, download_pdf_from_url |
|
from config.constants import OPENAI_API_KEY, LLAMA_CLOUD_API_KEY |
|
|
|
logger = logging.getLogger(__name__) |
|
BASE_DIR = os.getcwd() |
|
|
|
class PDFReader: |
|
def __init__(self): |
|
pass |
|
|
|
def get_loader(self, pdf_path): |
|
loader = PyMuPDFLoader(pdf_path) |
|
return loader |
|
|
|
def get_documents(self, loader): |
|
return loader.load() |
|
|
|
class LlamaParser: |
|
def __init__(self): |
|
print("Initializing LlamaParser") |
|
self.GPT_API_KEY = OPENAI_API_KEY |
|
self.LLAMA_CLOUD_API_KEY = LLAMA_CLOUD_API_KEY |
|
self.parse_url = "https://api.cloud.llamaindex.ai/api/parsing/upload" |
|
self.headers = { |
|
'Accept': 'application/json', |
|
'Authorization': f'Bearer {LLAMA_CLOUD_API_KEY}' |
|
} |
|
self.parser = LlamaParse( |
|
api_key=LLAMA_CLOUD_API_KEY, |
|
result_type="markdown", |
|
verbose=True, |
|
language="en", |
|
gpt4o_mode=False, |
|
|
|
parsing_instruction="The provided documents are PDFs of lecture slides of deep learning material. They contain LaTeX equations, images, and text. The goal is to extract the text, images and equations from the slides. The markdown should be clean and easy to read, and any math equation should be converted to LaTeX format, between $ signs. For images, if you can, give a description and a source." |
|
) |
|
|
|
def parse(self, pdf_path): |
|
pdf_name = os.path.basename(pdf_path) |
|
|
|
if not os.path.exists(pdf_path): |
|
logger.warning(f"File {pdf_name} does not exist locally, installing temporarily...") |
|
pdf_path = download_pdf_from_url(pdf_path) |
|
|
|
documents = self.parser.load_data(pdf_path) |
|
document = [document.to_langchain_format() for document in documents][0] |
|
|
|
content = document.page_content |
|
pages = content.split("\n---\n") |
|
pages = [page.strip() for page in pages] |
|
|
|
documents = [ |
|
Document( |
|
page_content=page, |
|
metadata={"source": pdf_path, "page": i} |
|
) for i, page in enumerate(pages) |
|
] |
|
|
|
return documents |
|
|
|
def make_request(self, pdf_url): |
|
payload = { |
|
"gpt4o_mode": "false", |
|
"parsing_instruction": "The provided document is a PDF of lecture slides of deep learning material. They contain LaTeX equations, images, and text. The goal is to extract the text, images and equations from the slides and convert them to markdown format. The markdown should be clean and easy to read, and any math equation should be converted to LaTeX, between $$. For images, give a description and if you can, a source.", |
|
} |
|
|
|
files = [ |
|
('file', ('file', requests.get(pdf_url).content, 'application/octet-stream')) |
|
] |
|
|
|
response = requests.request( |
|
"POST", self.parse_url, headers=self.headers, data=payload, files=files) |
|
|
|
return response.json()['id'], response.json()['status'] |
|
|
|
async def get_result(self, job_id): |
|
url = f"https://api.cloud.llamaindex.ai/api/parsing/job/{job_id}/result/markdown" |
|
|
|
response = requests.request("GET", url, headers=self.headers, data={}) |
|
|
|
return response.json()['markdown'] |
|
|
|
async def _parse(self, pdf_path): |
|
job_id, status = self.make_request(pdf_path) |
|
|
|
while status != "SUCCESS": |
|
url = f"https://api.cloud.llamaindex.ai/api/parsing/job/{job_id}" |
|
response = requests.request("GET", url, headers=self.headers, data={}) |
|
status = response.json()["status"] |
|
|
|
result = await self.get_result(job_id) |
|
|
|
documents = [ |
|
Document( |
|
page_content=result, |
|
metadata={"source": pdf_path} |
|
) |
|
] |
|
|
|
return documents |
|
|
|
async def _parse(self, pdf_path): |
|
return await self._parse(pdf_path) |
|
|
|
class HTMLReader: |
|
def __init__(self): |
|
pass |
|
|
|
def read_url(self, url): |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
return response.text |
|
else: |
|
logger.warning(f"Failed to download HTML from URL: {url}") |
|
return None |
|
|
|
def check_links(self, base_url, html_content): |
|
soup = bs4.BeautifulSoup(html_content, "html.parser") |
|
for link in soup.find_all("a"): |
|
href = link.get("href") |
|
|
|
if not href or href.startswith("#"): |
|
continue |
|
elif not href.startswith("https"): |
|
href = href.replace("http", "https") |
|
|
|
absolute_url = urljoin(base_url, href) |
|
link['href'] = absolute_url |
|
|
|
resp = requests.head(absolute_url) |
|
if resp.status_code != 200: |
|
logger.warning(f"Link {absolute_url} is broken") |
|
logger.warning(f"Status code: {resp.status_code}") |
|
|
|
return str(soup) |
|
|
|
def html_to_md(self, url, html_content): |
|
html_processed = self.check_links(url, html_content) |
|
markdown_content = html2text.html2text(html_processed) |
|
return markdown_content |
|
|
|
def read_html(self, url): |
|
html_content = self.read_url(url) |
|
if html_content: |
|
return self.html_to_md(url, html_content) |
|
else: |
|
return None |
|
|
|
class FileReader: |
|
def __init__(self, logger, kind): |
|
self.logger = logger |
|
self.kind = kind |
|
if kind == "llama": |
|
self.pdf_reader = LlamaParser() |
|
else: |
|
self.pdf_reader = PDFReader() |
|
self.web_reader = HTMLReader() |
|
|
|
|
|
def extract_text_from_pdf(self, pdf_path): |
|
text = "" |
|
with open(pdf_path, "rb") as file: |
|
reader = PyPDF2.PdfReader(file) |
|
num_pages = len(reader.pages) |
|
for page_num in range(num_pages): |
|
page = reader.pages[page_num] |
|
text += page.extract_text() |
|
return text |
|
|
|
def read_pdf(self, temp_file_path: str): |
|
if self.kind == "llama": |
|
documents = self.pdf_reader.parse(temp_file_path) |
|
else: |
|
loader = self.pdf_reader.get_loader(temp_file_path) |
|
documents = self.pdf_reader.get_documents(loader) |
|
return documents |
|
|
|
def read_txt(self, temp_file_path: str): |
|
loader = TextLoader(temp_file_path, autodetect_encoding=True) |
|
return loader.load() |
|
|
|
def read_docx(self, temp_file_path: str): |
|
loader = Docx2txtLoader(temp_file_path) |
|
return loader.load() |
|
|
|
def read_srt(self, temp_file_path: str): |
|
subs = pysrt.open(temp_file_path) |
|
text = "" |
|
for sub in subs: |
|
text += sub.text |
|
return [Document(page_content=text)] |
|
|
|
def read_youtube_transcript(self, url: str): |
|
loader = YoutubeLoader.from_youtube_url( |
|
url, add_video_info=True, language=["en"], translation="en" |
|
) |
|
return loader.load() |
|
|
|
def read_html(self, url: str): |
|
loader = WebBaseLoader(url) |
|
return loader.load() |
|
|
|
def read_tex_from_url(self, tex_url): |
|
response = requests.get(tex_url) |
|
if response.status_code == 200: |
|
return [Document(page_content=response.text)] |
|
else: |
|
self.logger.error(f"Failed to fetch .tex file from URL: {tex_url}") |
|
return None |
|
|
|
|
|
class ChunkProcessor: |
|
def __init__(self, config, logger): |
|
self.config = config |
|
self.logger = logger |
|
|
|
self.document_data = {} |
|
self.document_metadata = {} |
|
self.document_chunks_full = [] |
|
|
|
if config["splitter_options"]["use_splitter"]: |
|
if config["splitter_options"]["split_by_token"]: |
|
self.splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( |
|
chunk_size=config["splitter_options"]["chunk_size"], |
|
chunk_overlap=config["splitter_options"]["chunk_overlap"], |
|
separators=config["splitter_options"]["chunk_separators"], |
|
disallowed_special=(), |
|
) |
|
else: |
|
self.splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=config["splitter_options"]["chunk_size"], |
|
chunk_overlap=config["splitter_options"]["chunk_overlap"], |
|
separators=config["splitter_options"]["chunk_separators"], |
|
disallowed_special=(), |
|
) |
|
else: |
|
self.splitter = None |
|
self.logger.info("ChunkProcessor instance created") |
|
|
|
def remove_delimiters(self, document_chunks: list): |
|
for chunk in document_chunks: |
|
for delimiter in self.config["splitter_options"]["delimiters_to_remove"]: |
|
chunk.page_content = re.sub(delimiter, " ", chunk.page_content) |
|
return document_chunks |
|
|
|
def remove_chunks(self, document_chunks: list): |
|
front = self.config["splitter_options"]["front_chunk_to_remove"] |
|
end = self.config["splitter_options"]["last_chunks_to_remove"] |
|
for _ in range(front): |
|
del document_chunks[0] |
|
for _ in range(end): |
|
document_chunks.pop() |
|
return document_chunks |
|
|
|
def process_chunks( |
|
self, documents, file_type="txt", source="", page=0, metadata={} |
|
): |
|
documents = [Document(page_content=documents, source=source, page=page)] |
|
if ( |
|
file_type == "txt" |
|
or file_type == "docx" |
|
or file_type == "srt" |
|
or file_type == "tex" |
|
): |
|
document_chunks = self.splitter.split_documents(documents) |
|
elif file_type == "pdf": |
|
document_chunks = documents |
|
|
|
|
|
for chunk in document_chunks: |
|
chunk.metadata["source"] = source |
|
chunk.metadata["page"] = page |
|
|
|
|
|
for key, value in metadata.items(): |
|
chunk.metadata[key] = value |
|
|
|
if self.config["splitter_options"]["remove_leftover_delimiters"]: |
|
document_chunks = self.remove_delimiters(document_chunks) |
|
if self.config["splitter_options"]["remove_chunks"]: |
|
document_chunks = self.remove_chunks(document_chunks) |
|
|
|
return document_chunks |
|
|
|
def chunk_docs(self, file_reader, uploaded_files, weblinks): |
|
addl_metadata = get_metadata( |
|
"https://dl4ds.github.io/sp2024/lectures/", |
|
"https://dl4ds.github.io/sp2024/schedule/", |
|
) |
|
with ThreadPoolExecutor() as executor: |
|
executor.map( |
|
self.process_file, |
|
uploaded_files, |
|
range(len(uploaded_files)), |
|
[file_reader] * len(uploaded_files), |
|
[addl_metadata] * len(uploaded_files), |
|
) |
|
executor.map( |
|
self.process_weblink, |
|
weblinks, |
|
range(len(weblinks)), |
|
[file_reader] * len(weblinks), |
|
[addl_metadata] * len(weblinks), |
|
) |
|
|
|
document_names = [ |
|
f"{file_name}_{page_num}" |
|
for file_name, pages in self.document_data.items() |
|
for page_num in pages.keys() |
|
] |
|
documents = [ |
|
page for doc in self.document_data.values() for page in doc.values() |
|
] |
|
document_metadata = [ |
|
page for doc in self.document_metadata.values() for page in doc.values() |
|
] |
|
|
|
self.save_document_data() |
|
|
|
self.logger.info( |
|
f"Total document chunks extracted: {len(self.document_chunks_full)}" |
|
) |
|
|
|
return self.document_chunks_full, document_names, documents, document_metadata |
|
|
|
def process_documents( |
|
self, documents, file_path, file_type, metadata_source, addl_metadata |
|
): |
|
file_data = {} |
|
file_metadata = {} |
|
|
|
for doc in documents: |
|
|
|
|
|
|
|
page_num = doc.metadata.get("page", 0) |
|
file_data[page_num] = doc.page_content |
|
metadata = ( |
|
addl_metadata.get(file_path, {}) |
|
if metadata_source == "file" |
|
else {"source": file_path, "page": page_num} |
|
) |
|
file_metadata[page_num] = metadata |
|
|
|
if self.config["vectorstore"]["db_option"] not in ["RAGatouille"]: |
|
document_chunks = self.process_chunks( |
|
doc.page_content, |
|
file_type, |
|
source=file_path, |
|
page=page_num, |
|
metadata=metadata, |
|
) |
|
self.document_chunks_full.extend(document_chunks) |
|
|
|
print(f"Processed {file_path}. File_data: {file_data}") |
|
self.document_data[file_path] = file_data |
|
self.document_metadata[file_path] = file_metadata |
|
|
|
def process_file(self, file_path, file_index, file_reader, addl_metadata): |
|
file_name = os.path.basename(file_path) |
|
|
|
if file_name in self.document_data: |
|
return |
|
|
|
file_type = file_name.split(".")[-1] |
|
|
|
read_methods = { |
|
"pdf": file_reader.read_pdf, |
|
"txt": file_reader.read_txt, |
|
"docx": file_reader.read_docx, |
|
"srt": file_reader.read_srt, |
|
"tex": file_reader.read_tex_from_url, |
|
} |
|
if file_type not in read_methods: |
|
self.logger.warning(f"Unsupported file type: {file_type}") |
|
return |
|
|
|
try: |
|
documents = read_methods[file_type](file_path) |
|
|
|
self.process_documents( |
|
documents, file_path, file_type, "file", addl_metadata |
|
) |
|
except Exception as e: |
|
self.logger.error(f"Error processing file {file_name}: {str(e)}") |
|
|
|
def process_weblink(self, link, link_index, file_reader, addl_metadata): |
|
if link in self.document_data: |
|
return |
|
|
|
self.logger.info(f"Reading link {link_index + 1} : {link}") |
|
|
|
try: |
|
if "youtube" in link: |
|
documents = file_reader.read_youtube_transcript(link) |
|
else: |
|
documents = file_reader.read_html(link) |
|
|
|
self.process_documents(documents, link, "txt", "link", addl_metadata) |
|
except Exception as e: |
|
self.logger.error(f"Error Reading link {link_index + 1} : {link}: {str(e)}") |
|
|
|
def save_document_data(self): |
|
if not os.path.exists(f"{self.config['log_chunk_dir']}/docs"): |
|
os.makedirs(f"{self.config['log_chunk_dir']}/docs") |
|
self.logger.info( |
|
f"Creating directory {self.config['log_chunk_dir']}/docs for document data" |
|
) |
|
self.logger.info( |
|
f"Saving document content to {self.config['log_chunk_dir']}/docs/doc_content.json" |
|
) |
|
if not os.path.exists(f"{self.config['log_chunk_dir']}/metadata"): |
|
os.makedirs(f"{self.config['log_chunk_dir']}/metadata") |
|
self.logger.info( |
|
f"Creating directory {self.config['log_chunk_dir']}/metadata for document metadata" |
|
) |
|
self.logger.info( |
|
f"Saving document metadata to {self.config['log_chunk_dir']}/metadata/doc_metadata.json" |
|
) |
|
with open( |
|
f"{self.config['log_chunk_dir']}/docs/doc_content.json", "w" |
|
) as json_file: |
|
json.dump(self.document_data, json_file, indent=4) |
|
with open( |
|
f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "w" |
|
) as json_file: |
|
json.dump(self.document_metadata, json_file, indent=4) |
|
|
|
def load_document_data(self): |
|
with open( |
|
f"{self.config['log_chunk_dir']}/docs/doc_content.json", "r" |
|
) as json_file: |
|
self.document_data = json.load(json_file) |
|
with open( |
|
f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "r" |
|
) as json_file: |
|
self.document_metadata = json.load(json_file) |
|
|
|
|
|
class DataLoader: |
|
def __init__(self, config, logger=None): |
|
self.file_reader = FileReader(logger=logger, kind=config["llm_params"]["pdf_reader"]) |
|
self.chunk_processor = ChunkProcessor(config, logger=logger) |
|
|
|
def get_chunks(self, uploaded_files, weblinks): |
|
return self.chunk_processor.chunk_docs( |
|
self.file_reader, uploaded_files, weblinks |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
import yaml |
|
|
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(logging.INFO) |
|
|
|
with open("../code/modules/config/config.yml", "r") as f: |
|
config = yaml.safe_load(f) |
|
|
|
STORAGE_DIR = os.path.join(BASE_DIR, config['vectorstore']["data_path"]) |
|
uploaded_files = [ |
|
os.path.join(STORAGE_DIR, file) for file in os.listdir(STORAGE_DIR) if file != "urls.txt" |
|
] |
|
|
|
data_loader = DataLoader(config, logger=logger) |
|
document_chunks, document_names, documents, document_metadata = ( |
|
data_loader.get_chunks( |
|
["https://dl4ds.github.io/sp2024/static_files/lectures/05_loss_functions_v2.pdf"], |
|
[], |
|
) |
|
) |
|
|
|
print(document_names[:5]) |
|
print(len(document_chunks)) |
|
|
|
|