documind-api-v2 / utils.py
pvanand's picture
Upload 7 files
5d42805 verified
"""
Contains Utility functions for LLM and Database module. Along with some other misllaneous functions.
"""
from turtle import clear
from pymupdf import pymupdf
#from docx import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
#import tiktoken
import base64
import hashlib
from typing import List
from openai import OpenAI
#from dotenv import load_dotenv
import os
import hashlib
from datetime import datetime
from typing import List, Optional, Dict, Any, Tuple
def generate_file_id(file_bytes: bytes) -> str:
"""Generate a 4-character unique file ID for given file."""
hash_obj = hashlib.sha256()
hash_obj.update(file_bytes[:4096]) # Still hash the first 4096 bytes
# Take first 2 bytes (16 bits) and convert to base36 (alphanumeric)
file_id = hex(int.from_bytes(hash_obj.digest()[:2], 'big'))[2:].zfill(4)
return file_id
def process_pdf_to_chunks(
pdf_content: bytes,
file_name: str,
chunk_size: int = 512,
chunk_overlap: int = 20
) -> Tuple[List[Dict[str, Any]], str]:
"""
Process PDF content into chunks with column layout detection and proper image handling
"""
doc = pymupdf.open(stream=pdf_content, filetype="pdf")
document_text = ""
all_images = []
image_positions = []
char_to_page_map = []
layout_info = {}
doc_id = generate_file_id(pdf_content)
def detect_columns(blocks):
"""Detect if page has multiple columns based on text block positions"""
if not blocks:
return 1
x_positions = [block[0] for block in blocks]
x_positions.sort()
if len(x_positions) > 1:
gaps = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)]
significant_gaps = [gap for gap in gaps if gap > page.rect.width * 0.15]
return len(significant_gaps) + 1
return 1
def sort_blocks_by_position(blocks, num_columns):
"""Sort blocks by column and vertical position"""
if num_columns == 1:
return sorted(blocks, key=lambda b: b[0][1]) # b[0] is the bbox tuple, b[0][1] is y coordinate
page_width = page.rect.width
column_width = page_width / num_columns
def get_column(block):
bbox = block[0] # Get the bounding box tuple
x_coord = bbox[0] # Get the x coordinate (first element)
return int(x_coord // column_width)
return sorted(blocks, key=lambda b: (get_column(b), b[0][1]))
# Process each page
for page_num, page in enumerate(doc, 1):
blocks = page.get_text_blocks()
images = page.get_images()
# Detect layout
num_columns = detect_columns(blocks)
layout_info[page_num] = {
"columns": num_columns,
"width": page.rect.width,
"height": page.rect.height
}
# Create elements list with both text and images
elements = [(block[:4], block[4], "text") for block in blocks]
# Add images to elements
for img in images:
try:
img_rects = page.get_image_rects(img[0])
if img_rects and len(img_rects) > 0:
img_bbox = img_rects[0]
if img_bbox:
img_data = (img_bbox, img[0], "image")
elements.append(img_data)
except Exception as e:
print(f"Error processing image: {e}")
continue
# Sort elements by position
sorted_elements = sort_blocks_by_position(elements, num_columns)
# Process elements in order
page_text = ""
for element in sorted_elements:
if element[2] == "text":
text_content = element[1]
page_text += text_content
char_to_page_map.extend([page_num] * len(text_content))
else:
xref = element[1]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
# Convert image bytes to base64
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
all_images.append(image_base64) # Store base64 encoded image
image_marker = f"\n<img_{len(all_images)-1}>\n"
image_positions.append((len(all_images)-1, len(document_text) + len(page_text)))
page_text += image_marker
char_to_page_map.extend([page_num] * len(image_marker))
document_text += page_text
# Create chunks
splitter = RecursiveCharacterTextSplitter(
#separators=["\n\n", "\n", " ", ""],
#keep_separator=True
).from_tiktoken_encoder(
encoding_name="cl100k_base",
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
text_chunks = splitter.split_text(document_text)
# Process chunks with metadata
processed_chunks = []
for chunk_idx, chunk in enumerate(text_chunks):
chunk_start = document_text.find(chunk)
chunk_end = chunk_start + len(chunk)
# Get page range and layout info
chunk_pages = sorted(set(char_to_page_map[chunk_start:chunk_end]))
chunk_layouts = {page: layout_info[page] for page in chunk_pages}
# Get images for this chunk
chunk_images = []
for img_idx, img_pos in image_positions:
if chunk_start <= img_pos <= chunk_end:
chunk_images.append(all_images[img_idx]) # Already base64 encoded
# Clean the chunk text
#cleaned_chunk = clean_text_for_llm(chunk)
chunk_dict = {
"text": chunk,
"metadata": {
"created_date": datetime.now().isoformat(),
"file_name": file_name,
"images": chunk_images,
"document_id": doc_id,
"location": {
"char_start": chunk_start,
"char_end": chunk_end,
"pages": chunk_pages,
"chunk_index": chunk_idx,
"total_chunks": len(text_chunks),
"layout": chunk_layouts
}
}
}
processed_chunks.append(chunk_dict)
return processed_chunks, doc_id
# import re
# import unicodedata
# from typing import Optional
# # Compile regex patterns once
# HTML_TAG_PATTERN = re.compile(r'<[^>]+>')
# MULTIPLE_NEWLINES = re.compile(r'\n\s*\n')
# MULTIPLE_SPACES = re.compile(r'\s+')
# def clean_text_for_llm(text: Optional[str]) -> str:
# """
# Efficiently clean and normalize text for LLM processing.
# """
# # Early returns
# if not text:
# return ""
# if not isinstance(text, str):
# try:
# text = str(text)
# except Exception:
# return ""
# # Single-pass character filtering
# chars = []
# prev_char = ''
# space_pending = False
# for char in text:
# # Skip null bytes and most control characters
# if char == '\0' or unicodedata.category(char).startswith('C'):
# if char not in '\n\t':
# continue
# # Convert escaped sequences
# if prev_char == '\\':
# if char == 'n':
# chars[-1] = '\n'
# continue
# if char == 't':
# chars[-1] = '\t'
# continue
# # Handle whitespace
# if char.isspace():
# if not space_pending:
# space_pending = True
# continue
# if space_pending:
# chars.append(' ')
# space_pending = False
# chars.append(char)
# prev_char = char
# # Join characters and perform remaining operations
# text = ''.join(chars)
# # Remove HTML tags
# #text = HTML_TAG_PATTERN.sub('', text)
# # Normalize Unicode in a single pass
# text = unicodedata.normalize('NFKC', text)
# # Clean up newlines
# text = MULTIPLE_NEWLINES.sub('\n', text)
# Final trim
# return text.strip()