File size: 10,217 Bytes
004a744
 
 
 
 
 
5c621a1
004a744
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1464729
 
004a744
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e062a30
20f0ac1
004a744
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5568e6f
004a744
 
 
 
 
 
7e9c9f7
5d57f7a
004a744
 
20f0ac1
 
cae91d6
20f0ac1
5d57f7a
004a744
afe2042
20f0ac1
004a744
20f0ac1
 
4ebe9a5
 
004a744
20f0ac1
5d57f7a
5c621a1
 
4ebe9a5
2e44a68
20f0ac1
 
 
 
1464729
20f0ac1
5e22f3f
20f0ac1
5e22f3f
20f0ac1
 
61cf92f
 
1adfa10
 
20f0ac1
 
 
 
 
 
 
 
 
 
1464729
1ab6079
5e22f3f
1464729
20f0ac1
 
 
1adfa10
 
 
 
cae91d6
 
20f0ac1
846b634
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import spacy
from spacy.language import Language
from spacy.lang.it import Italian
import re
from transformers import pipeline
from gradio.inputs import File
import gradio as gr
from pdf2image import convert_from_path
import pytesseract
import tempfile
import os
from gradio.inputs import Dropdown
import gradio as gr
import tempfile
import os
from pdf2image import convert_from_path
import pytesseract
import fitz
from pdf2image import convert_from_bytes


def preprocess_punctuation(text):
  pattern = r'(?<![a-z])[a-zA-Z\.]{1,4}(?:\.[a-zA-Z\.]{1,4})*\.(?!\s*[A-Z])'
  matches = re.findall(pattern, text)
  res = [*set(matches)]
  #res = [r for r in res if not nlp(r).ents or 
       #not any(ent.label_ in nlp.get_pipe('ner').labels for ent in nlp(r).ents)] #optimized
  return res


def preprocess_text(text):
  prep_text = re.sub(r'\n\s*\n', '\n', text)
  prep_text = re.sub(r'\n{2,}', '\n', prep_text)
#string_with_single_newlines_and_no_blank_lines = re.sub(r' {2,}', ' ', string_with_single_newlines_and_no_blank_lines)
#print(string_with_single_newlines_and_no_blank_lines)
  return prep_text



@Language.component('custom_tokenizer')
def custom_tokenizer(doc):
    # Define a custom rule to ignore colons as a sentence boundary
    for token in doc[:-1]:
        if (token.text == ":"):
            doc[token.i+1].is_sent_start = False
    return doc



def get_sentences(text, dictionary = None):
  cl_sentences = []
  chars_to_strip = [' ', '\n']
  chars_to_strip_str = ''.join(set(chars_to_strip))
  nlp = spacy.load("it_core_news_lg")  #load ita moodel
  nlp.add_pipe("custom_tokenizer", before="parser")

  for punct in preprocess_punctuation(text):
    nlp.tokenizer.add_special_case(punct, [{spacy.symbols.ORTH: punct, spacy.symbols.NORM: punct}])

  doc = nlp(text)  # Process the text with spaCy
  sentences = list(doc.sents)  # Split the text into sentences
  for sentence in sentences:
    sent = sentence.text
    cl_sentence = ' '.join(filter(None, sent.lstrip(chars_to_strip_str).rstrip(chars_to_strip_str).split(' ')))
    if cl_sentence!= '':
      cl_sentences.append(cl_sentence)
  return cl_sentences




def extract_numbers(text, given_strings):
    # Split text into a list of words
    words = text.split()
    # Find the indices of the given strings in the list of words
    indices = [i for i, word in enumerate(words) if any(s in word for s in given_strings)]
    # Initialize an empty list to store the numbers
    numbers = []
    # Loop through each index
    for index in indices:
        # Define the range of words to search for numbers
        start = max(index - 1, 0)
        end = min(index + 2, len(words))
        # Extract the words within the range
        context = words[start:end]
        # Check if the context contains mathematical operators
        if any(re.match(r'[+\*/]', word) for word in context):
            continue
        # Find all numbers in the context
        context_numbers = [
            float(re.sub('[^0-9\.,]+', '', word).replace(',', '.'))
            if re.sub('[^0-9\.,]+', '', word).replace(',', '.').replace('.', '', 1).isdigit()
            else int(re.sub('[^0-9]+', '', word))
            if re.sub('[^0-9]+', '', word).isdigit()
            else None
            for word in context
        ]
        # Add the numbers to the list
        numbers.extend(context_numbers)
    return numbers



def get_text_and_values(text, key_list):
  sentences = get_sentences(text)
  total_numbers= []
  infoDict = {}
  for sentence in sentences:
    numbers = extract_numbers(text = sentence, given_strings = key_list)
    total_numbers.append(numbers)
    if not numbers:
      continue
    else: infoDict[sentence] = numbers
  return infoDict


def get_useful_text(dictionary):
  keysList = list(dictionary.keys())
  tx = ('\n------------------------\n'.join(keysList))
  return tx

def get_values(dictionary):
  pr = list(dictionary.values())
  return pr


def initialize_qa_transformer(model):
  qa = pipeline("text2text-generation", model=model)
  return qa


def get_answers_unfiltered(dictionary, question, qa_pipeline):
  keysList = list(dictionary.keys())
  answers = []
  for kl in keysList:
    answer = qa_pipeline(f'{kl} Domanda: {question}')
    answers.append(answer)
  return answers


def get_total(answered_values, text, keywords, raw_values, unique_values = False):
  numeric_list = [num for sublist in raw_values for num in sublist if isinstance(num, (int, float))]
  #numbers = [float(x[0]['generated_text']) for x in answered_values if x[0]['generated_text'].isdigit()]
  pattern = r'\d+(?:[.,]\d+)?'
  numbers = []
  for sub_lst in answered_values:
      for d in sub_lst:
          for k, v in d.items():
            # Replace commas with dots
              v = v.replace(',', '.')
            # Extract numbers and convert to float
              numbers += [float(match) for match in re.findall(pattern, v) if (float(match) >= 5.0) and (float(match) in numeric_list)]
  ###### remove duplicates
  if unique_values:
    numbers = list(set(numbers))
  ######
  total = 0
  sum = 0
  total_list = []
# Define a regular expression pattern that will match a number
  pattern = r'\d+'
# Loop through the keywords and search for them in the text
  found = False
  for keyword in keywords:
    # Build a regular expression pattern that looks for the keyword
    # followed by up to three words, then a number
      keyword_pattern = f'{keyword}(\\s+\\w+){{0,3}}\\s+({pattern})'
      match = re.search(keyword_pattern, text, re.IGNORECASE)
      if match:
        # If we find a match, print the number and set found to True
          number = match.group(2)
          if (number in numbers) and (number in numeric_list):
            total_list.append(int(number))
            print(f"Found a value ({number}) for keyword '{keyword}'.")
            found = True  

# If we didn't find a match
  if not found:
    for value in numbers:
      if value in numeric_list:
        total += value
    total_list.append(total)
 #If there is more than one total, it means different lots with many total measures for each house. Calculate the sum of the totals mq  
  for value in total_list:
    sum += value
  return numbers, sum



def extractor_clean(text, k_words, transformer, question, total_kwords, return_text = False):

  tex = ''
  dictionary = get_text_and_values(text, k_words)
  raw = get_values(dictionary)
  qa = initialize_qa_transformer(transformer)
  val = get_answers_unfiltered(dictionary, question = question, qa_pipeline = qa)
  keywords = ['totale', 'complessivo', 'complessiva']
  values = get_total(answered_values= val, raw_values = raw, text = text, keywords = total_kwords, unique_values = True)
  if return_text:
    tex = get_useful_text(dictionary)
    return values, return_text, tex
  elif return_text == False:
    return values, return_text



def pdf_ocr(file, model_t, question):
    # Convert PDF to image
    with tempfile.TemporaryDirectory() as path:
        with open(file, "rb") as f:
            content = f.read()

        with fitz.open(stream=content, filetype="pdf") as doc:
            num_pages = len(doc)

            # Extract text from the PDF
            text = ""
            for page in doc:
                text += page.get_text()

            # Perform OCR on the PDF if the extracted text is empty
            if not text:
                # Convert PDF pages to images
                images = convert_from_bytes(content)
                for i, img in enumerate(images):
                    text += pytesseract.image_to_string(img, lang='ita')

                # Clear the image list to free up memory
                del images

    ks = ('mq', 'MQ', 'Mq', 'metri quadri', 'm2')
    quest = "Quanti metri quadri misura la superficie?"
    totalK = ['totale', 'complessivo', 'complessiva']

    extracted_values = extractor_clean(text=text, k_words=ks, transformer=model_t, question=question, total_kwords=totalK, return_text=True)
    values_output = extracted_values[0][0]
    sor_values = sorted(values_output)
    total_output = f'{extracted_values[0][1]}  Mq'
    text_output = extracted_values[2]

    immobile_values = [f'{i + 1}. Immobile :  {value}  Mq\n' for i, value in enumerate(sor_values)]
    immobile_values = '\n'.join(immobile_values)

    return immobile_values, total_output, text_output


def ocr_interface(pdf_file, model_t='it5/it5-base-question-answering', question="Quanti metri quadri misura l'immobile?"):
    # Call the pdf_ocr function
    values, total, text = pdf_ocr(pdf_file.name, model_t, question)
    return values, total, text


# Start the UI
with gr.Blocks(theme=gr.themes.Soft()) as demo:

    gr.Markdown(
    '''
    # PDF Mq Extractor
    Demo for ITAL-IA
    ''')
    with gr.Tab("Extractor"):
      with gr.Row():
        pdf_input = gr.components.File(label="PDF File")
     
      with gr.Row():
          model_input = gr.components.Dropdown(['it5/it5-base-question-answering', 'it5/it5-small-question-answering'],
                                               value='it5/it5-base-question-answering', label = 'Select model')
          question_input = gr.components.Dropdown(["Quanti metri quadri misura l'immobile?"],
                                                  value = "Quanti metri quadri misura l'immobile?", label = 'Question')
      
      with gr.Column():
          gr.Markdown(
          '''
          # Output values
          Values extracted from the pdf document
          ''')
      
      with gr.Row():

          text_output = gr.components.Textbox(label="Ref. Text")
          values_output = gr.components.Textbox(label="Area Values - sorted by value")
          total_output = gr.components.Textbox(label="Total")
          
      with gr.Row():
          extract_button = gr.Button("Extract")


    extract_button.click(fn = ocr_interface,
                         inputs=[pdf_input, model_input, question_input], outputs=[values_output, total_output, text_output])

    gr.Examples(['Example1(scannedDoc).pdf', 'Example2.pdf', 'Example3Large.pdf'], inputs = pdf_input, 
                cache_examples = True, fn = ocr_interface, outputs = [values_output, total_output, text_output])


demo.launch()