Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pdf2image | |
import utils | |
import numpy as np | |
import cv2 | |
import os | |
import io | |
from PIL import Image | |
import re | |
import shutil | |
import zipfile | |
from io import BytesIO | |
temp_figure_dir="pdf_figures/" | |
temp_table_dir="pdf_tables/" | |
temp_textbox_dir="pdf_textbox/" | |
import time | |
# get https://github.com/oschwartz10612/poppler-windows/releases/tag/v22.01.0-0 | |
# poppler-utils: | |
# Installed: 22.02.0-2ubuntu0.4 | |
# install https://github.com/UB-Mannheim/tesseract/wiki | |
#page extraction disabled | |
def clean_filename(filename, replace_char=' '): | |
# Check for empty filename or None | |
if not filename or filename.isspace(): | |
return None # Return None or maybe an empty string, depending on your requirements | |
cleaned_name = filename.strip() # Trim whitespace from the ends | |
# Platform-specific checks and clean-up | |
if os.name == 'nt': # Windows | |
invalid_chars = r'<>:"/\\|?*\0' | |
invalid_names = {"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", | |
"COM5", "COM6", "COM7", "COM8", "COM9", "LPT1", "LPT2", | |
"LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"} | |
# Replace invalid names with a placeholder or modify it in a specific way | |
base_name, _, ext = cleaned_name.partition('.') | |
if base_name.upper() in invalid_names: | |
cleaned_name = replace_char * len(base_name) + '.' + ext | |
else: # POSIX (Linux, macOS, etc.) | |
invalid_chars = '/\0' | |
# Remove invalid characters | |
for char in invalid_chars: | |
cleaned_name = cleaned_name.replace(char, replace_char) | |
# Optionally, remove any double spaces and strip leading/trailing spaces | |
cleaned_name = re.sub(' +', ' ', cleaned_name).strip() | |
cleaned_name = re.sub(r'[\n]', '_', cleaned_name) | |
return cleaned_name | |
def manage_temp_to_be_zipped_directory(directory_path): | |
if os.path.exists(directory_path): | |
# Remove the directory and all its contents | |
shutil.rmtree(directory_path) | |
print(f"Directory '{directory_path}' was removed.") | |
# Optionally, you might want to recreate the directory immediately after deleting | |
os.makedirs(directory_path) | |
print(f"Directory '{directory_path}' was recreated.") | |
else: | |
# Create the directory since it does not exist | |
os.makedirs(directory_path) | |
print(f"Directory '{directory_path}' was created.") | |
def zip_directory(directory_path): | |
zip_buffer = BytesIO() | |
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
for root, dirs, files in os.walk(directory_path): | |
for file in files: | |
file_path = os.path.join(root, file) | |
zip_file.write(file_path, arcname=file) | |
zip_buffer.seek(0) | |
return zip_buffer | |
def is_new_pdf_upload(uploaded_file): | |
if 'last_pdf_uploaded_file' in st.session_state: | |
# Check if the newly uploaded file is different from the last one | |
if (uploaded_file.name != st.session_state.last_pdf_uploaded_file['name'] or | |
uploaded_file.size != st.session_state.last_pdf_uploaded_file['size']): | |
st.session_state.last_pdf_uploaded_file = {'name': uploaded_file.name, 'size': uploaded_file.size} | |
# st.write("A new src image file has been uploaded.") | |
return True | |
else: | |
# st.write("The same src image file has been re-uploaded.") | |
return False | |
else: | |
# st.write("This is the first file upload detected.") | |
st.session_state.last_pdf_uploaded_file = {'name': uploaded_file.name, 'size': uploaded_file.size} | |
return True | |
# Store current file details in session state | |
big_text = """ | |
<div style='text-align: center;'> | |
<h1 style='font-size: 30x;'>Locked PDF Ingestion</h1> | |
</div> | |
""" | |
# Display the styled text | |
st.markdown(big_text, unsafe_allow_html=True) | |
if 'is_initialized' not in st.session_state: | |
pdf_path = 'uploaded_pdf/data_sheet.pdf' | |
st.session_state['is_initialized'] = True | |
# page_count = utils.get_pdf_page_count(pdf_path) | |
# print("page_count=",page_count) | |
# page_count=5 | |
# print("new page_count=",page_count) | |
# read_pdf_progress_bar = st.progress(0) | |
# st.session_state.color_image_list = [] | |
# st.session_state.gray_image_np_list = [] | |
# for page_number in range(page_count): | |
# image = pdf2image.convert_from_path(pdf_path, first_page=page_number+1, last_page=page_number+1) | |
# st.session_state.color_image_list.append(image[0]) | |
# progress_percentage = (page_number) / (page_count-1) | |
# read_pdf_progress_bar.progress(progress_percentage) | |
# gray_pdf_image_np_list = [] | |
# read_pdf_progress_bar.progress(0) | |
# for index, image in enumerate(st.session_state.color_image_list): | |
# image_np = np.array(image) | |
# st.session_state.gray_image_np_list.append(cv2.cvtColor(np.array(image_np), cv2.COLOR_BGR2GRAY)) | |
# progress_percentage = (index) / (page_count - 1) | |
# read_pdf_progress_bar.progress(progress_percentage) | |
# # cv2.line(st.session_state.gray_image_np_list[37], (174, 227), (174, 1790), 0, 2) | |
# # cv2.line(st.session_state.gray_image_np_list[37], (1550, 227), (1550, 1790), 0, 2) | |
# # cv2.line(st.session_state.gray_image_np_list[38], (226,227),(226,1444), 0,3) | |
# # cv2.line(st.session_state.gray_image_np_list[38], (1601,227),(1601,1444), 0,2) | |
# st.session_state.img_index = 0 | |
# st.session_state.stop_button_clicked=False | |
# # st.image(st.session_state.gray_image_np_list[38]) | |
uploaded_locked_pdf_file = st.file_uploader("Upload a locked pdf", | |
type=['pdf']) | |
st.markdown( | |
f'<a href="https://ikmtechnology.github.io/ikmtechnology/data_sheet.pdf" target="_blank">Sample 1 download and then upload to above</a>', | |
unsafe_allow_html=True) | |
if uploaded_locked_pdf_file is not None: | |
if is_new_pdf_upload(uploaded_locked_pdf_file): | |
# To see details | |
# file_details = {"FileName": uploaded_driving_video_file.name, "FileType": uploaded_driving_video_file.type, "FileSize": uploaded_driving_video_file.size} | |
# st.write(file_details) | |
save_path = './uploaded_videos' | |
if not os.path.exists(save_path): | |
os.makedirs(save_path) | |
with open(os.path.join(save_path, uploaded_locked_pdf_file.name), "wb") as f: | |
f.write(uploaded_locked_pdf_file.getbuffer()) # Write the file to the specified location | |
st.success(f'Saved file temp_{uploaded_locked_pdf_file.name} in {save_path}') | |
st.session_state.uploaded_pdf_path=os.path.join(save_path, uploaded_locked_pdf_file.name) | |
st.session_state.page_count = utils.get_pdf_page_count(st.session_state.uploaded_pdf_path) | |
print("page_count=",st.session_state.page_count) | |
if 'extracted_text' in st.session_state: | |
del st.session_state.extracted_text | |
st.rerun() | |
if 'page_count' in st.session_state: | |
st.write(f"total page count = {st.session_state.page_count}") | |
if 'num_pages_to_extract'not in st.session_state: | |
st.session_state.start_page_to_extract = st.slider('Start page number:', min_value=1, max_value=st.session_state.page_count - 1, value=1, key='num_pages_to_extract_slider') | |
else: | |
st.session_state.start_page_to_extract = st.slider('Start page number:', min_value=1, max_value=st.session_state.page_count - 1, value=st.session_state.start_page_to_extract, key='num_pages_to_extract_slider') | |
if 'num_pages_to_extract2'not in st.session_state: | |
st.session_state.end_page_to_extract = st.slider('End page number', min_value=1, max_value=st.session_state.page_count - 1, value=st.session_state.page_count - 1, key='num_pages_to_extract_slider2') | |
else: | |
st.session_state.end_page_to_extract = st.slider('End Page number', min_value=1, max_value=st.session_state.page_count - 1, value=st.session_state.end_page_to_extract, key='num_pages_to_extract_slider2') | |
st.write(f"num of pages to extract {st.session_state.start_page_to_extract}") | |
if 'run_button' in st.session_state and st.session_state.run_button == True: | |
st.session_state.running = True | |
else: | |
st.session_state.running = False | |
read_pdf_progress_bar = st.progress(0) | |
if st.button('Extract Pages', disabled=st.session_state.running, key='run_button'): | |
st.session_state.color_image_list = [] | |
st.session_state.gray_image_np_list = [] | |
st.session_state.pdf_figures_image_list=[] | |
st.session_state.pdf_tables_image_list = [] | |
st.session_state.pdf_textbox_image_list=[] | |
pdf_tables_image_list=[] | |
st.session_state.pdf_text_list=[] | |
for page_number in range(st.session_state.start_page_to_extract-1, | |
st.session_state.end_page_to_extract + 1): | |
# print(f"page_number={page_number}") | |
image = pdf2image.convert_from_path(st.session_state.uploaded_pdf_path, first_page=page_number+1, last_page=page_number+1) | |
st.session_state.color_image_list.append(image[0]) | |
progress_percentage = (page_number-st.session_state.start_page_to_extract+1) / (st.session_state.end_page_to_extract - st.session_state.start_page_to_extract+1) | |
read_pdf_progress_bar.progress(progress_percentage) | |
read_pdf_progress_bar.progress(0) | |
for index, image in enumerate(st.session_state.color_image_list): | |
# st.write("actual page = " + str(index + st.session_state.num_pages_to_extract)) | |
image_np = np.array(image) | |
gray_image_np=cv2.cvtColor(np.array(image_np), cv2.COLOR_BGR2GRAY) | |
# uncomment to find xy coordinates | |
# cv2.imwrite(f"gray_image_{index}.png", gray_image_np) | |
# st.image(Image.fromarray(gray_image_np)) | |
if index + st.session_state.start_page_to_extract == 34: | |
cv2.line(gray_image_np, (223, 414), (223, 1185), 0, 2) | |
cv2.line(gray_image_np, (1527, 414), (1527, 1185), 0, 2) | |
if index + st.session_state.start_page_to_extract == 35: | |
cv2.line(gray_image_np, (176, 248), (176, 1760), 0, 2) | |
cv2.line(gray_image_np, (1551, 248), (1551, 1760), 0, 2) | |
if index + st.session_state.start_page_to_extract == 36: | |
cv2.line(gray_image_np, (225, 229), (225, 1703), 0, 2) | |
cv2.line(gray_image_np, (1601, 229), (1601, 1703), 0, 2) | |
if index + st.session_state.start_page_to_extract == 37: | |
cv2.line(gray_image_np, (173, 227), (173, 1790), 0, 2) | |
cv2.line(gray_image_np, (1550, 227), (1550, 1790), 0, 2) | |
if index + st.session_state.start_page_to_extract == 38: | |
cv2.line(gray_image_np, (222, 227), (222, 1444), 0, 2) | |
cv2.line(gray_image_np, (1600, 227), (1600, 1444), 0, 2) | |
if index + st.session_state.start_page_to_extract == 39: | |
cv2.line(gray_image_np, (175, 227), (175, 1229), 0, 2) | |
cv2.line(gray_image_np, (1551, 227), (1551, 1229), 0, 2) | |
# st.image(Image.fromarray(gray_image_np)) | |
st.session_state.gray_image_np_list.append(gray_image_np) | |
# cv2.line(st.session_state.gray_image_np_list[38], (226,227),(226,1444), 0,3) | |
# cv2.line(st.session_state.gray_image_np_list[38], (1601,227),(1601,1444), 0,2) | |
progress_percentage = (index) / len(st.session_state.color_image_list) | |
read_pdf_progress_bar.progress(progress_percentage) | |
st.session_state.extracted_text = "" | |
manage_temp_to_be_zipped_directory(temp_figure_dir) | |
manage_temp_to_be_zipped_directory(temp_table_dir) | |
manage_temp_to_be_zipped_directory(temp_textbox_dir) | |
for index, gray_pdf_image_np in enumerate(st.session_state.gray_image_np_list): | |
try: | |
figures_image_list,tables_image_list,textbox_image_list,text=utils.gray_pdf_image_np_to_text(index,gray_pdf_image_np, debug=True) | |
if textbox_image_list: | |
print("index="+str(index)+" txt book " + str(len(textbox_image_list))) | |
st.session_state.pdf_figures_image_list.append(figures_image_list) | |
st.session_state.pdf_tables_image_list.append(tables_image_list) | |
st.session_state.pdf_textbox_image_list.append(textbox_image_list) | |
if st.session_state.pdf_figures_image_list[index]: | |
if st.session_state.pdf_figures_image_list[index]: | |
for pdf_figure_text_image in st.session_state.pdf_figures_image_list[index]: | |
raw_image_file_name = f"page_{index+1}_{pdf_figure_text_image[0]}.png" | |
cleaned_image_file_name = clean_filename(raw_image_file_name) | |
Image.fromarray(pdf_figure_text_image[1]).save(temp_figure_dir+cleaned_image_file_name) | |
if st.session_state.pdf_tables_image_list: | |
if st.session_state.pdf_tables_image_list[index]: | |
for pdf_table_text_image in st.session_state.pdf_tables_image_list[index]: | |
raw_image_file_name = f"page_{index+1}_{pdf_table_text_image[0]}.png" | |
cleaned_image_file_name = clean_filename(raw_image_file_name) | |
Image.fromarray(pdf_table_text_image[1]).save(temp_table_dir + cleaned_image_file_name) | |
if st.session_state.pdf_textbox_image_list: | |
textbox_index = 1 | |
if st.session_state.pdf_textbox_image_list[index]: | |
for pdf_textbox_image in st.session_state.pdf_textbox_image_list[index]: | |
raw_image_file_name = f"page_{index+1}_textbox_{textbox_index}.png" | |
cleaned_image_file_name = clean_filename(raw_image_file_name) | |
Image.fromarray(pdf_textbox_image).save(temp_textbox_dir + cleaned_image_file_name) | |
textbox_index = textbox_index + 1 | |
st.session_state.pdf_text_list.append(text) | |
st.session_state.extracted_text=st.session_state.extracted_text+f"<Page {index+1} start>\n" + text + f"\n<Page {index+1} end>\n>" | |
# st.write(text) | |
# print(text) | |
progress_percentage = (index) / len(st.session_state.color_image_list) | |
read_pdf_progress_bar.progress(progress_percentage) | |
except Exception as e: | |
# Code to handle any other exception | |
print(f"An error occurred: {e}") | |
st.session_state.figure_zip_bytes=zip_directory(temp_figure_dir) | |
st.session_state.table_zip_bytes = zip_directory(temp_table_dir) | |
st.session_state.textbox_zip_bytes = zip_directory(temp_textbox_dir) | |
#add_animation_to_image() | |
#st.session_state['video_generated'] = True | |
st.rerun() | |
if 'extracted_text' in st.session_state: | |
string_buffer = io.StringIO(st.session_state.extracted_text) | |
txt_file_path=uploaded_locked_pdf_file.name.replace(".pdf", ".txt") | |
st.download_button(label="Download Extraction txt File", | |
data=string_buffer.getvalue(), | |
file_name=txt_file_path, | |
mime="text/plain") | |
download_figure_zip_file_name = uploaded_locked_pdf_file.name.replace(".pdf", "_figures.zip") | |
download_table_zip_file_name = uploaded_locked_pdf_file.name.replace(".pdf", "_tables.zip") | |
download_textbox_zip_file_name = uploaded_locked_pdf_file.name.replace(".pdf", "_textbox.zip") | |
st.download_button( | |
label="Download Figures ZIP", | |
data=st.session_state.figure_zip_bytes, | |
file_name=download_figure_zip_file_name, | |
mime="application/zip" | |
) | |
st.download_button( | |
label="Download Tables ZIP", | |
data=st.session_state.table_zip_bytes, | |
file_name=download_table_zip_file_name, | |
mime="application/zip" | |
) | |
st.download_button( | |
label="Download Textbox ZIP", | |
data=st.session_state.textbox_zip_bytes, | |
file_name=download_textbox_zip_file_name, | |
mime="application/zip" | |
) | |
# st.image(Image.fromarray(bgr_image)) | |
# for index,pdf_text in enumerate(st.session_state.pdf_text_list): | |
for index, gray_pdf_image_np in enumerate(st.session_state.gray_image_np_list): | |
#st.write(f"Page {index+1} \n\n {st.session_state.pdf_text_list[index]}\n") | |
if not st.session_state.pdf_figures_image_list[index]: | |
st.write("no figures") | |
else: | |
for pdf_figure_text_image in st.session_state.pdf_figures_image_list[index]: | |
st.write(pdf_figure_text_image[0]) | |
st.image(Image.fromarray(pdf_figure_text_image[1])) | |
if not st.session_state.pdf_tables_image_list[index]: | |
st.write("no tables") | |
else: | |
for pdf_table_text_image in st.session_state.pdf_tables_image_list[index]: | |
st.write(pdf_table_text_image[0]) | |
st.image(Image.fromarray(pdf_table_text_image[1])) | |
if not st.session_state.pdf_textbox_image_list[index]: | |
st.write("no textbox") | |
else: | |
for text_box_index,pdf_textbox_image in enumerate(st.session_state.pdf_textbox_image_list[index]): | |
st.write("text box "+str(text_box_index)) | |
st.image(Image.fromarray(pdf_textbox_image)) | |
# for index, gray_pdf_image_np in enumerate(st.session_state.gray_image_np_list[0:5], start=0): | |
# print("index="+str(index)) | |
# | |
# text=utils.gray_pdf_image_np_to_text(index,gray_pdf_image_np, debug=True) | |
# st.write(text) | |
#if 'img_index' not in st.session_state: | |
# if st.button("Stop"): | |
# st.session_state.stop_button_clicked = True | |
# st.write(str(st.session_state.img_index+1) +"/" + str(len(st.session_state.color_image_list))) | |
# st.image(st.session_state.gray_image_np_list[st.session_state.img_index], use_column_width=True) | |
# if not st.session_state.stop_button_clicked: | |
# if st.session_state.img_index < len(st.session_state.color_image_list) - 1: | |
# st.session_state.img_index += 1 | |
# time.sleep(3) | |
# st.rerun() | |
# col1, col2 = st.columns(2) | |
# with col1: | |
# if st.button("Previous"): | |
# print("Previous pressed") | |
# # Decrease index, wrap around if it goes below 0 | |
# print("st.session_state.img_index =", str(st.session_state.img_index)) | |
# if st.session_state.img_index > 0: | |
# print("case 1 before st.session_state.img_index =",str(st.session_state.img_index)) | |
# st.session_state.img_index -= 1 | |
# print("case 2 after st.session_state.img_index =", str(st.session_state.img_index)) | |
# else: | |
# print("case 2 st.session_state.img_index =", str(st.session_state.img_index)) | |
# st.session_state.img_index = len(st.session_state.color_image_list) - 1 | |
# with col2: | |
# if st.button("Next"): | |
# | |
# print("Next pressed") | |
# # Increase index, wrap around if it goes past the last image | |
# if st.session_state.img_index < len(st.session_state.color_image_list) - 1: | |
# st.session_state.img_index += 1 | |
# | |
# else: | |
# st.session_state.img_index = 0 | |
# # | |
# total_pages = 100 | |
# print(f"total_pages = {total_pages}") | |
# st.write(f"total_pages = {total_pages}") | |
# for page_number in range(total_pages): | |
# pdf_image_list = convert_from_path(pdf_path) | |
# images = convert_from_path(pdf_path, first_page=page_number + 1, last_page=page_number + 1) | |
# progress = (page_number + 1) / total_pages * 100 | |
# print(f"Progress: {progress:.2f}%") | |
# print("done") |