""" Contains Utility functions for LLM and Database module. Along with some other misllaneous functions. """ from turtle import clear from pymupdf import pymupdf #from docx import Document from langchain_text_splitters import RecursiveCharacterTextSplitter #import tiktoken import base64 import hashlib from typing import List from openai import OpenAI #from dotenv import load_dotenv import os import hashlib from datetime import datetime from typing import List, Optional, Dict, Any, Tuple def generate_file_id(file_bytes: bytes) -> str: """Generate a 4-character unique file ID for given file.""" hash_obj = hashlib.sha256() hash_obj.update(file_bytes[:4096]) # Still hash the first 4096 bytes # Take first 2 bytes (16 bits) and convert to base36 (alphanumeric) file_id = hex(int.from_bytes(hash_obj.digest()[:2], 'big'))[2:].zfill(4) return file_id def process_pdf_to_chunks( pdf_content: bytes, file_name: str, chunk_size: int = 512, chunk_overlap: int = 20 ) -> Tuple[List[Dict[str, Any]], str]: """ Process PDF content into chunks with column layout detection and proper image handling """ doc = pymupdf.open(stream=pdf_content, filetype="pdf") document_text = "" all_images = [] image_positions = [] char_to_page_map = [] layout_info = {} doc_id = generate_file_id(pdf_content) def detect_columns(blocks): """Detect if page has multiple columns based on text block positions""" if not blocks: return 1 x_positions = [block[0] for block in blocks] x_positions.sort() if len(x_positions) > 1: gaps = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)] significant_gaps = [gap for gap in gaps if gap > page.rect.width * 0.15] return len(significant_gaps) + 1 return 1 def sort_blocks_by_position(blocks, num_columns): """Sort blocks by column and vertical position""" if num_columns == 1: return sorted(blocks, key=lambda b: b[0][1]) # b[0] is the bbox tuple, b[0][1] is y coordinate page_width = page.rect.width column_width = page_width / num_columns def get_column(block): bbox = block[0] # Get the bounding box tuple x_coord = bbox[0] # Get the x coordinate (first element) return int(x_coord // column_width) return sorted(blocks, key=lambda b: (get_column(b), b[0][1])) # Process each page for page_num, page in enumerate(doc, 1): blocks = page.get_text_blocks() images = page.get_images() # Detect layout num_columns = detect_columns(blocks) layout_info[page_num] = { "columns": num_columns, "width": page.rect.width, "height": page.rect.height } # Create elements list with both text and images elements = [(block[:4], block[4], "text") for block in blocks] # Add images to elements for img in images: try: img_rects = page.get_image_rects(img[0]) if img_rects and len(img_rects) > 0: img_bbox = img_rects[0] if img_bbox: img_data = (img_bbox, img[0], "image") elements.append(img_data) except Exception as e: print(f"Error processing image: {e}") continue # Sort elements by position sorted_elements = sort_blocks_by_position(elements, num_columns) # Process elements in order page_text = "" for element in sorted_elements: if element[2] == "text": text_content = element[1] page_text += text_content char_to_page_map.extend([page_num] * len(text_content)) else: xref = element[1] base_image = doc.extract_image(xref) image_bytes = base_image["image"] # Convert image bytes to base64 image_base64 = base64.b64encode(image_bytes).decode('utf-8') all_images.append(image_base64) # Store base64 encoded image image_marker = f"\n\n" image_positions.append((len(all_images)-1, len(document_text) + len(page_text))) page_text += image_marker char_to_page_map.extend([page_num] * len(image_marker)) document_text += page_text # Create chunks splitter = RecursiveCharacterTextSplitter( #separators=["\n\n", "\n", " ", ""], #keep_separator=True ).from_tiktoken_encoder( encoding_name="cl100k_base", chunk_size=chunk_size, chunk_overlap=chunk_overlap ) text_chunks = splitter.split_text(document_text) # Process chunks with metadata processed_chunks = [] for chunk_idx, chunk in enumerate(text_chunks): chunk_start = document_text.find(chunk) chunk_end = chunk_start + len(chunk) # Get page range and layout info chunk_pages = sorted(set(char_to_page_map[chunk_start:chunk_end])) chunk_layouts = {page: layout_info[page] for page in chunk_pages} # Get images for this chunk chunk_images = [] for img_idx, img_pos in image_positions: if chunk_start <= img_pos <= chunk_end: chunk_images.append(all_images[img_idx]) # Already base64 encoded # Clean the chunk text #cleaned_chunk = clean_text_for_llm(chunk) chunk_dict = { "text": chunk, "metadata": { "created_date": datetime.now().isoformat(), "file_name": file_name, "images": chunk_images, "document_id": doc_id, "location": { "char_start": chunk_start, "char_end": chunk_end, "pages": chunk_pages, "chunk_index": chunk_idx, "total_chunks": len(text_chunks), "layout": chunk_layouts } } } processed_chunks.append(chunk_dict) return processed_chunks, doc_id # import re # import unicodedata # from typing import Optional # # Compile regex patterns once # HTML_TAG_PATTERN = re.compile(r'<[^>]+>') # MULTIPLE_NEWLINES = re.compile(r'\n\s*\n') # MULTIPLE_SPACES = re.compile(r'\s+') # def clean_text_for_llm(text: Optional[str]) -> str: # """ # Efficiently clean and normalize text for LLM processing. # """ # # Early returns # if not text: # return "" # if not isinstance(text, str): # try: # text = str(text) # except Exception: # return "" # # Single-pass character filtering # chars = [] # prev_char = '' # space_pending = False # for char in text: # # Skip null bytes and most control characters # if char == '\0' or unicodedata.category(char).startswith('C'): # if char not in '\n\t': # continue # # Convert escaped sequences # if prev_char == '\\': # if char == 'n': # chars[-1] = '\n' # continue # if char == 't': # chars[-1] = '\t' # continue # # Handle whitespace # if char.isspace(): # if not space_pending: # space_pending = True # continue # if space_pending: # chars.append(' ') # space_pending = False # chars.append(char) # prev_char = char # # Join characters and perform remaining operations # text = ''.join(chars) # # Remove HTML tags # #text = HTML_TAG_PATTERN.sub('', text) # # Normalize Unicode in a single pass # text = unicodedata.normalize('NFKC', text) # # Clean up newlines # text = MULTIPLE_NEWLINES.sub('\n', text) # Final trim # return text.strip()