tabel_ocr / process.py
Raj-Master's picture
Create process.py
14e58a7
from paddleocr import PaddleOCR
import numpy as np
import pandas as pd
ocr = PaddleOCR(use_angle_cls=True, lang="ch")
def filter_columns(columns: np.ndarray):
for idx, col in enumerate(columns):
if idx >= len(columns) - 1:
break
nxt = columns[idx + 1]
threshold = ((col[2] - col[0]) + (nxt[2] - nxt[0])) / 2
if (col[2] - columns[idx + 1][0]) > threshold * 0.5:
col[1], col[2], col[3] = min(col[1], nxt[1]), nxt[2], max(col[3], nxt[3])
columns = np.delete(columns, idx + 1, 0)
idx -= 1
return columns
def process_text(row):
# * concatenate the text of the cell and return the coordinates and the text of the cell
coor = np.array([None, None])
text = ""
for txt in row:
coor[0], coor[1] = (
txt[0][0][1] if coor[0] is None or txt[0][0][1] < coor[0] else coor[0],
txt[0][2][1] if coor[1] is None or txt[0][2][1] > coor[1] else coor[1],
)
text += f"{txt[1][0]} "
text = text.strip()
row = [coor, text]
return row
def extract_text_of_col(col_img):
"""'
* extract text from the column image and calculate the average length of the row in the column
* the average is calculated by summing the length of each row then divide the total by the number of rows inside the column
* return the text and the average length
"""
result = ocr.ocr(col_img, cls=False)
ocr_res = []
for ps, (text, score) in result[0]:
x1 = min(p[0] for p in ps)
y1 = min(p[1] for p in ps)
x2 = max(p[0] for p in ps)
y2 = max(p[1] for p in ps)
word_info = {
'bbox': [x1, y1, x2, y2],
'text': text
}
ocr_res.append(word_info)
threshold = 0
print(result)
for idx in range(len(result)):
summ = 0
length = len(result[idx])
for line in result[idx]:
summ += line[0][2][1] - line[0][0][1]
if length > 0:
threshold += summ / len(result[idx])
return result, threshold / len(result),ocr_res
def prepare_cols(result, threshold):
"""
** columns are seperated **
* add each element from the extracted text to its row according to the coordinate intersection with respect to the average length of the row
* the intersection is True if the intersected part is bigger than the threshold number (ex: half of the average length of the row)
* return the column of the arranged rows
"""
col = []
for idx in range(len(result)):
row = []
for i, line in enumerate(result[idx]):
if i == 0:
row.append(line)
if i == len(result[idx]) - 1:
col.append(process_text(row))
continue
if (
line[0][0][1] >= row[-1][0][0][1] and line[0][2][1] >= row[-1][0][2][1]
) and (
line[0][2][1] > row[-1][0][0][1]
and line[0][0][1] < row[-1][0][2][1]
and (abs(line[0][0][1] - row[-1][0][2][1]) > threshold)
):
row.append(line)
elif (
line[0][0][1] <= row[-1][0][0][1] and line[0][2][1] <= row[-1][0][2][1]
) and (
line[0][2][1] > row[-1][0][0][1]
and line[0][0][1] < row[-1][0][2][1]
and (abs(line[0][2][1] - row[-1][0][0][1]) > threshold)
):
row.append(line)
elif (
line[0][0][1] <= row[-1][0][0][1] and line[0][2][1] >= row[-1][0][2][1]
) and (
line[0][2][1] > row[-1][0][0][1]
and line[0][0][1] < row[-1][0][2][1]
and (abs(row[-1][0][2][1] - row[-1][0][0][1]) > threshold)
):
row.append(line)
elif (
line[0][0][1] >= row[-1][0][0][1] and line[0][2][1] <= row[-1][0][2][1]
) and (
line[0][2][1] > row[-1][0][0][1]
and line[0][0][1] < row[-1][0][2][1]
and (abs(line[0][0][1] - line[0][2][1]) > threshold)
):
row.append(line)
elif (
line[0][0][1] == row[-1][0][0][1] and line[0][2][1] == row[-1][0][2][1]
) and (
line[0][2][1] > row[-1][0][0][1]
and line[0][0][1] < row[-1][0][2][1]
and (abs(line[0][2][1] - row[-1][0][0][1]) > threshold)
):
row.append(line)
else:
col.append(process_text(row))
row = [line]
if i == len(result[idx]) - 1:
col.append(process_text(row))
return col
def prepare_coordinates(cols):
"""
* find the column with the maximum number of rows
* create a key value pair in which the key is the coordinates of each row in the column with the highest number of rows
and the value is an empty numpy array which has length of number of detected columns
"""
max_col = max(cols, key=len)
array = np.empty(len(cols), dtype=object)
array.fill(np.nan)
coor_dict = {tuple(k[0]): array for k in max_col}
return coor_dict
def process_cols(cols, threshold):
coor_dict = prepare_coordinates(cols)
"""
* loop over each element inside each column and find the right place for it inside the dataframe by using the coordinates intersection with respect to the average length of the row
* the intersection is True if the intersected part is bigger than the threshold number (ex: half of the average length of the row)
"""
for idx, col in enumerate(cols):
for element in col:
for coor, row in coor_dict.items():
if (coor[0] >= element[0][0] and coor[1] >= element[0][1]) and (
(coor[1] > element[0][0])
and (coor[0] < element[0][1])
and (abs(coor[0] - element[0][1]) > threshold)
):
new = row.copy()
new[idx] = element[1]
coor_dict[coor] = new
elif (coor[0] <= element[0][0] and coor[1] <= element[0][1]) and (
(coor[1] > element[0][0])
and (coor[0] < element[0][1])
and (abs(coor[1] - element[0][0]) > threshold)
):
new = row.copy()
new[idx] = element[1]
coor_dict[coor] = new
elif (coor[0] >= element[0][0] and coor[1] <= element[0][1]) and (
(coor[1] > element[0][0])
and (coor[0] < element[0][1])
and (abs(coor[1] - coor[0]) > threshold)
):
new = row.copy()
new[idx] = element[1]
coor_dict[coor] = new
elif (coor[0] <= element[0][0] and coor[1] >= element[0][1]) and (
(coor[1] > element[0][0])
and (coor[0] < element[0][1])
and (abs(element[0][1] - element[0][0]) > threshold)
):
new = row.copy()
new[idx] = element[1]
coor_dict[coor] = new
data = [row for row in coor_dict.values()]
return data
def valid_row(row):
return (
(row[0] is not np.nan)
or (row[-1] is not np.nan)
or (row[-2] is not np.nan)
or (row[-3] is not np.nan)
)
def finalize_data(data: list, page_enumeration: int):
idx = 0
while idx <= len(data) - 1:
row = data[idx]
if valid_row(row) and row[0] is np.nan:
# * add the date to the valid row if it's empty
try:
row[0] = data[idx - 1][0]
data[idx] = row
except:
data.pop(idx)
idx = (idx - 1) if idx > 0 else idx
continue
if not valid_row(row):
if idx == 0:
data.pop(idx)
continue
for i, col in enumerate(row):
# * merge description to the previous row if the current row is not valid
if (col is not None) and (col is not np.nan):
data[idx - 1][i] = str(data[idx - 1][i]) + f" {col}"
data.pop(idx)
idx -= 1
continue
idx += 1
page_idx = ["page"] + [page_enumeration for i in range(len(data) - 1)]
data: pd.DataFrame = pd.DataFrame(data)
data.insert(0, "page", page_idx)
return data