zmbfeng's picture
missing first page fixed
1832c6c
import streamlit as st
import pdf2image
import utils
import numpy as np
import cv2
import os
import io
from PIL import Image
import re
import shutil
import zipfile
from io import BytesIO
temp_figure_dir="pdf_figures/"
temp_table_dir="pdf_tables/"
temp_textbox_dir="pdf_textbox/"
import time
# get https://github.com/oschwartz10612/poppler-windows/releases/tag/v22.01.0-0
# poppler-utils:
# Installed: 22.02.0-2ubuntu0.4
# install https://github.com/UB-Mannheim/tesseract/wiki
#page extraction disabled
def clean_filename(filename, replace_char=' '):
# Check for empty filename or None
if not filename or filename.isspace():
return None # Return None or maybe an empty string, depending on your requirements
cleaned_name = filename.strip() # Trim whitespace from the ends
# Platform-specific checks and clean-up
if os.name == 'nt': # Windows
invalid_chars = r'<>:"/\\|?*\0'
invalid_names = {"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4",
"COM5", "COM6", "COM7", "COM8", "COM9", "LPT1", "LPT2",
"LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"}
# Replace invalid names with a placeholder or modify it in a specific way
base_name, _, ext = cleaned_name.partition('.')
if base_name.upper() in invalid_names:
cleaned_name = replace_char * len(base_name) + '.' + ext
else: # POSIX (Linux, macOS, etc.)
invalid_chars = '/\0'
# Remove invalid characters
for char in invalid_chars:
cleaned_name = cleaned_name.replace(char, replace_char)
# Optionally, remove any double spaces and strip leading/trailing spaces
cleaned_name = re.sub(' +', ' ', cleaned_name).strip()
cleaned_name = re.sub(r'[\n]', '_', cleaned_name)
return cleaned_name
def manage_temp_to_be_zipped_directory(directory_path):
if os.path.exists(directory_path):
# Remove the directory and all its contents
shutil.rmtree(directory_path)
print(f"Directory '{directory_path}' was removed.")
# Optionally, you might want to recreate the directory immediately after deleting
os.makedirs(directory_path)
print(f"Directory '{directory_path}' was recreated.")
else:
# Create the directory since it does not exist
os.makedirs(directory_path)
print(f"Directory '{directory_path}' was created.")
def zip_directory(directory_path):
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
zip_file.write(file_path, arcname=file)
zip_buffer.seek(0)
return zip_buffer
def is_new_pdf_upload(uploaded_file):
if 'last_pdf_uploaded_file' in st.session_state:
# Check if the newly uploaded file is different from the last one
if (uploaded_file.name != st.session_state.last_pdf_uploaded_file['name'] or
uploaded_file.size != st.session_state.last_pdf_uploaded_file['size']):
st.session_state.last_pdf_uploaded_file = {'name': uploaded_file.name, 'size': uploaded_file.size}
# st.write("A new src image file has been uploaded.")
return True
else:
# st.write("The same src image file has been re-uploaded.")
return False
else:
# st.write("This is the first file upload detected.")
st.session_state.last_pdf_uploaded_file = {'name': uploaded_file.name, 'size': uploaded_file.size}
return True
# Store current file details in session state
big_text = """
<div style='text-align: center;'>
<h1 style='font-size: 30x;'>Locked PDF Ingestion</h1>
</div>
"""
# Display the styled text
st.markdown(big_text, unsafe_allow_html=True)
if 'is_initialized' not in st.session_state:
pdf_path = 'uploaded_pdf/data_sheet.pdf'
st.session_state['is_initialized'] = True
# page_count = utils.get_pdf_page_count(pdf_path)
# print("page_count=",page_count)
# page_count=5
# print("new page_count=",page_count)
# read_pdf_progress_bar = st.progress(0)
# st.session_state.color_image_list = []
# st.session_state.gray_image_np_list = []
# for page_number in range(page_count):
# image = pdf2image.convert_from_path(pdf_path, first_page=page_number+1, last_page=page_number+1)
# st.session_state.color_image_list.append(image[0])
# progress_percentage = (page_number) / (page_count-1)
# read_pdf_progress_bar.progress(progress_percentage)
# gray_pdf_image_np_list = []
# read_pdf_progress_bar.progress(0)
# for index, image in enumerate(st.session_state.color_image_list):
# image_np = np.array(image)
# st.session_state.gray_image_np_list.append(cv2.cvtColor(np.array(image_np), cv2.COLOR_BGR2GRAY))
# progress_percentage = (index) / (page_count - 1)
# read_pdf_progress_bar.progress(progress_percentage)
# # cv2.line(st.session_state.gray_image_np_list[37], (174, 227), (174, 1790), 0, 2)
# # cv2.line(st.session_state.gray_image_np_list[37], (1550, 227), (1550, 1790), 0, 2)
# # cv2.line(st.session_state.gray_image_np_list[38], (226,227),(226,1444), 0,3)
# # cv2.line(st.session_state.gray_image_np_list[38], (1601,227),(1601,1444), 0,2)
# st.session_state.img_index = 0
# st.session_state.stop_button_clicked=False
# # st.image(st.session_state.gray_image_np_list[38])
uploaded_locked_pdf_file = st.file_uploader("Upload a locked pdf",
type=['pdf'])
st.markdown(
f'<a href="https://ikmtechnology.github.io/ikmtechnology/data_sheet.pdf" target="_blank">Sample 1 download and then upload to above</a>',
unsafe_allow_html=True)
if uploaded_locked_pdf_file is not None:
if is_new_pdf_upload(uploaded_locked_pdf_file):
# To see details
# file_details = {"FileName": uploaded_driving_video_file.name, "FileType": uploaded_driving_video_file.type, "FileSize": uploaded_driving_video_file.size}
# st.write(file_details)
save_path = './uploaded_videos'
if not os.path.exists(save_path):
os.makedirs(save_path)
with open(os.path.join(save_path, uploaded_locked_pdf_file.name), "wb") as f:
f.write(uploaded_locked_pdf_file.getbuffer()) # Write the file to the specified location
st.success(f'Saved file temp_{uploaded_locked_pdf_file.name} in {save_path}')
st.session_state.uploaded_pdf_path=os.path.join(save_path, uploaded_locked_pdf_file.name)
st.session_state.page_count = utils.get_pdf_page_count(st.session_state.uploaded_pdf_path)
print("page_count=",st.session_state.page_count)
if 'extracted_text' in st.session_state:
del st.session_state.extracted_text
st.rerun()
if 'page_count' in st.session_state:
st.write(f"total page count = {st.session_state.page_count}")
if 'num_pages_to_extract'not in st.session_state:
st.session_state.start_page_to_extract = st.slider('Start page number:', min_value=1, max_value=st.session_state.page_count - 1, value=1, key='num_pages_to_extract_slider')
else:
st.session_state.start_page_to_extract = st.slider('Start page number:', min_value=1, max_value=st.session_state.page_count - 1, value=st.session_state.start_page_to_extract, key='num_pages_to_extract_slider')
if 'num_pages_to_extract2'not in st.session_state:
st.session_state.end_page_to_extract = st.slider('End page number', min_value=1, max_value=st.session_state.page_count - 1, value=st.session_state.page_count - 1, key='num_pages_to_extract_slider2')
else:
st.session_state.end_page_to_extract = st.slider('End Page number', min_value=1, max_value=st.session_state.page_count - 1, value=st.session_state.end_page_to_extract, key='num_pages_to_extract_slider2')
st.write(f"num of pages to extract {st.session_state.start_page_to_extract}")
if 'run_button' in st.session_state and st.session_state.run_button == True:
st.session_state.running = True
else:
st.session_state.running = False
read_pdf_progress_bar = st.progress(0)
if st.button('Extract Pages', disabled=st.session_state.running, key='run_button'):
st.session_state.color_image_list = []
st.session_state.gray_image_np_list = []
st.session_state.pdf_figures_image_list=[]
st.session_state.pdf_tables_image_list = []
st.session_state.pdf_textbox_image_list=[]
pdf_tables_image_list=[]
st.session_state.pdf_text_list=[]
for page_number in range(st.session_state.start_page_to_extract-1,
st.session_state.end_page_to_extract + 1):
# print(f"page_number={page_number}")
image = pdf2image.convert_from_path(st.session_state.uploaded_pdf_path, first_page=page_number+1, last_page=page_number+1)
st.session_state.color_image_list.append(image[0])
progress_percentage = (page_number-st.session_state.start_page_to_extract+1) / (st.session_state.end_page_to_extract - st.session_state.start_page_to_extract+1)
read_pdf_progress_bar.progress(progress_percentage)
read_pdf_progress_bar.progress(0)
for index, image in enumerate(st.session_state.color_image_list):
# st.write("actual page = " + str(index + st.session_state.num_pages_to_extract))
image_np = np.array(image)
gray_image_np=cv2.cvtColor(np.array(image_np), cv2.COLOR_BGR2GRAY)
# uncomment to find xy coordinates
# cv2.imwrite(f"gray_image_{index}.png", gray_image_np)
# st.image(Image.fromarray(gray_image_np))
if index + st.session_state.start_page_to_extract == 34:
cv2.line(gray_image_np, (223, 414), (223, 1185), 0, 2)
cv2.line(gray_image_np, (1527, 414), (1527, 1185), 0, 2)
if index + st.session_state.start_page_to_extract == 35:
cv2.line(gray_image_np, (176, 248), (176, 1760), 0, 2)
cv2.line(gray_image_np, (1551, 248), (1551, 1760), 0, 2)
if index + st.session_state.start_page_to_extract == 36:
cv2.line(gray_image_np, (225, 229), (225, 1703), 0, 2)
cv2.line(gray_image_np, (1601, 229), (1601, 1703), 0, 2)
if index + st.session_state.start_page_to_extract == 37:
cv2.line(gray_image_np, (173, 227), (173, 1790), 0, 2)
cv2.line(gray_image_np, (1550, 227), (1550, 1790), 0, 2)
if index + st.session_state.start_page_to_extract == 38:
cv2.line(gray_image_np, (222, 227), (222, 1444), 0, 2)
cv2.line(gray_image_np, (1600, 227), (1600, 1444), 0, 2)
if index + st.session_state.start_page_to_extract == 39:
cv2.line(gray_image_np, (175, 227), (175, 1229), 0, 2)
cv2.line(gray_image_np, (1551, 227), (1551, 1229), 0, 2)
# st.image(Image.fromarray(gray_image_np))
st.session_state.gray_image_np_list.append(gray_image_np)
# cv2.line(st.session_state.gray_image_np_list[38], (226,227),(226,1444), 0,3)
# cv2.line(st.session_state.gray_image_np_list[38], (1601,227),(1601,1444), 0,2)
progress_percentage = (index) / len(st.session_state.color_image_list)
read_pdf_progress_bar.progress(progress_percentage)
st.session_state.extracted_text = ""
manage_temp_to_be_zipped_directory(temp_figure_dir)
manage_temp_to_be_zipped_directory(temp_table_dir)
manage_temp_to_be_zipped_directory(temp_textbox_dir)
for index, gray_pdf_image_np in enumerate(st.session_state.gray_image_np_list):
try:
figures_image_list,tables_image_list,textbox_image_list,text=utils.gray_pdf_image_np_to_text(index,gray_pdf_image_np, debug=True)
if textbox_image_list:
print("index="+str(index)+" txt book " + str(len(textbox_image_list)))
st.session_state.pdf_figures_image_list.append(figures_image_list)
st.session_state.pdf_tables_image_list.append(tables_image_list)
st.session_state.pdf_textbox_image_list.append(textbox_image_list)
if st.session_state.pdf_figures_image_list[index]:
if st.session_state.pdf_figures_image_list[index]:
for pdf_figure_text_image in st.session_state.pdf_figures_image_list[index]:
raw_image_file_name = f"page_{index+1}_{pdf_figure_text_image[0]}.png"
cleaned_image_file_name = clean_filename(raw_image_file_name)
Image.fromarray(pdf_figure_text_image[1]).save(temp_figure_dir+cleaned_image_file_name)
if st.session_state.pdf_tables_image_list:
if st.session_state.pdf_tables_image_list[index]:
for pdf_table_text_image in st.session_state.pdf_tables_image_list[index]:
raw_image_file_name = f"page_{index+1}_{pdf_table_text_image[0]}.png"
cleaned_image_file_name = clean_filename(raw_image_file_name)
Image.fromarray(pdf_table_text_image[1]).save(temp_table_dir + cleaned_image_file_name)
if st.session_state.pdf_textbox_image_list:
textbox_index = 1
if st.session_state.pdf_textbox_image_list[index]:
for pdf_textbox_image in st.session_state.pdf_textbox_image_list[index]:
raw_image_file_name = f"page_{index+1}_textbox_{textbox_index}.png"
cleaned_image_file_name = clean_filename(raw_image_file_name)
Image.fromarray(pdf_textbox_image).save(temp_textbox_dir + cleaned_image_file_name)
textbox_index = textbox_index + 1
st.session_state.pdf_text_list.append(text)
st.session_state.extracted_text=st.session_state.extracted_text+f"<Page {index+1} start>\n" + text + f"\n<Page {index+1} end>\n>"
# st.write(text)
# print(text)
progress_percentage = (index) / len(st.session_state.color_image_list)
read_pdf_progress_bar.progress(progress_percentage)
except Exception as e:
# Code to handle any other exception
print(f"An error occurred: {e}")
st.session_state.figure_zip_bytes=zip_directory(temp_figure_dir)
st.session_state.table_zip_bytes = zip_directory(temp_table_dir)
st.session_state.textbox_zip_bytes = zip_directory(temp_textbox_dir)
#add_animation_to_image()
#st.session_state['video_generated'] = True
st.rerun()
if 'extracted_text' in st.session_state:
string_buffer = io.StringIO(st.session_state.extracted_text)
txt_file_path=uploaded_locked_pdf_file.name.replace(".pdf", ".txt")
st.download_button(label="Download Extraction txt File",
data=string_buffer.getvalue(),
file_name=txt_file_path,
mime="text/plain")
download_figure_zip_file_name = uploaded_locked_pdf_file.name.replace(".pdf", "_figures.zip")
download_table_zip_file_name = uploaded_locked_pdf_file.name.replace(".pdf", "_tables.zip")
download_textbox_zip_file_name = uploaded_locked_pdf_file.name.replace(".pdf", "_textbox.zip")
st.download_button(
label="Download Figures ZIP",
data=st.session_state.figure_zip_bytes,
file_name=download_figure_zip_file_name,
mime="application/zip"
)
st.download_button(
label="Download Tables ZIP",
data=st.session_state.table_zip_bytes,
file_name=download_table_zip_file_name,
mime="application/zip"
)
st.download_button(
label="Download Textbox ZIP",
data=st.session_state.textbox_zip_bytes,
file_name=download_textbox_zip_file_name,
mime="application/zip"
)
# st.image(Image.fromarray(bgr_image))
# for index,pdf_text in enumerate(st.session_state.pdf_text_list):
for index, gray_pdf_image_np in enumerate(st.session_state.gray_image_np_list):
#st.write(f"Page {index+1} \n\n {st.session_state.pdf_text_list[index]}\n")
if not st.session_state.pdf_figures_image_list[index]:
st.write("no figures")
else:
for pdf_figure_text_image in st.session_state.pdf_figures_image_list[index]:
st.write(pdf_figure_text_image[0])
st.image(Image.fromarray(pdf_figure_text_image[1]))
if not st.session_state.pdf_tables_image_list[index]:
st.write("no tables")
else:
for pdf_table_text_image in st.session_state.pdf_tables_image_list[index]:
st.write(pdf_table_text_image[0])
st.image(Image.fromarray(pdf_table_text_image[1]))
if not st.session_state.pdf_textbox_image_list[index]:
st.write("no textbox")
else:
for text_box_index,pdf_textbox_image in enumerate(st.session_state.pdf_textbox_image_list[index]):
st.write("text box "+str(text_box_index))
st.image(Image.fromarray(pdf_textbox_image))
# for index, gray_pdf_image_np in enumerate(st.session_state.gray_image_np_list[0:5], start=0):
# print("index="+str(index))
#
# text=utils.gray_pdf_image_np_to_text(index,gray_pdf_image_np, debug=True)
# st.write(text)
#if 'img_index' not in st.session_state:
# if st.button("Stop"):
# st.session_state.stop_button_clicked = True
# st.write(str(st.session_state.img_index+1) +"/" + str(len(st.session_state.color_image_list)))
# st.image(st.session_state.gray_image_np_list[st.session_state.img_index], use_column_width=True)
# if not st.session_state.stop_button_clicked:
# if st.session_state.img_index < len(st.session_state.color_image_list) - 1:
# st.session_state.img_index += 1
# time.sleep(3)
# st.rerun()
# col1, col2 = st.columns(2)
# with col1:
# if st.button("Previous"):
# print("Previous pressed")
# # Decrease index, wrap around if it goes below 0
# print("st.session_state.img_index =", str(st.session_state.img_index))
# if st.session_state.img_index > 0:
# print("case 1 before st.session_state.img_index =",str(st.session_state.img_index))
# st.session_state.img_index -= 1
# print("case 2 after st.session_state.img_index =", str(st.session_state.img_index))
# else:
# print("case 2 st.session_state.img_index =", str(st.session_state.img_index))
# st.session_state.img_index = len(st.session_state.color_image_list) - 1
# with col2:
# if st.button("Next"):
#
# print("Next pressed")
# # Increase index, wrap around if it goes past the last image
# if st.session_state.img_index < len(st.session_state.color_image_list) - 1:
# st.session_state.img_index += 1
#
# else:
# st.session_state.img_index = 0
# #
# total_pages = 100
# print(f"total_pages = {total_pages}")
# st.write(f"total_pages = {total_pages}")
# for page_number in range(total_pages):
# pdf_image_list = convert_from_path(pdf_path)
# images = convert_from_path(pdf_path, first_page=page_number + 1, last_page=page_number + 1)
# progress = (page_number + 1) / total_pages * 100
# print(f"Progress: {progress:.2f}%")
# print("done")