Spaces:
Runtime error
Runtime error
from paddleocr import PaddleOCR | |
import numpy as np | |
import pandas as pd | |
ocr = PaddleOCR(use_angle_cls=True, lang="ch") | |
def filter_columns(columns: np.ndarray): | |
for idx, col in enumerate(columns): | |
if idx >= len(columns) - 1: | |
break | |
nxt = columns[idx + 1] | |
threshold = ((col[2] - col[0]) + (nxt[2] - nxt[0])) / 2 | |
if (col[2] - columns[idx + 1][0]) > threshold * 0.5: | |
col[1], col[2], col[3] = min(col[1], nxt[1]), nxt[2], max(col[3], nxt[3]) | |
columns = np.delete(columns, idx + 1, 0) | |
idx -= 1 | |
return columns | |
def process_text(row): | |
# * concatenate the text of the cell and return the coordinates and the text of the cell | |
coor = np.array([None, None]) | |
text = "" | |
for txt in row: | |
coor[0], coor[1] = ( | |
txt[0][0][1] if coor[0] is None or txt[0][0][1] < coor[0] else coor[0], | |
txt[0][2][1] if coor[1] is None or txt[0][2][1] > coor[1] else coor[1], | |
) | |
text += f"{txt[1][0]} " | |
text = text.strip() | |
row = [coor, text] | |
return row | |
def extract_text_of_col(col_img): | |
"""' | |
* extract text from the column image and calculate the average length of the row in the column | |
* the average is calculated by summing the length of each row then divide the total by the number of rows inside the column | |
* return the text and the average length | |
""" | |
result = ocr.ocr(col_img, cls=False) | |
ocr_res = [] | |
for ps, (text, score) in result[0]: | |
x1 = min(p[0] for p in ps) | |
y1 = min(p[1] for p in ps) | |
x2 = max(p[0] for p in ps) | |
y2 = max(p[1] for p in ps) | |
word_info = { | |
'bbox': [x1, y1, x2, y2], | |
'text': text | |
} | |
ocr_res.append(word_info) | |
threshold = 0 | |
print(result) | |
for idx in range(len(result)): | |
summ = 0 | |
length = len(result[idx]) | |
for line in result[idx]: | |
summ += line[0][2][1] - line[0][0][1] | |
if length > 0: | |
threshold += summ / len(result[idx]) | |
return result, threshold / len(result),ocr_res | |
def prepare_cols(result, threshold): | |
""" | |
** columns are seperated ** | |
* add each element from the extracted text to its row according to the coordinate intersection with respect to the average length of the row | |
* the intersection is True if the intersected part is bigger than the threshold number (ex: half of the average length of the row) | |
* return the column of the arranged rows | |
""" | |
col = [] | |
for idx in range(len(result)): | |
row = [] | |
for i, line in enumerate(result[idx]): | |
if i == 0: | |
row.append(line) | |
if i == len(result[idx]) - 1: | |
col.append(process_text(row)) | |
continue | |
if ( | |
line[0][0][1] >= row[-1][0][0][1] and line[0][2][1] >= row[-1][0][2][1] | |
) and ( | |
line[0][2][1] > row[-1][0][0][1] | |
and line[0][0][1] < row[-1][0][2][1] | |
and (abs(line[0][0][1] - row[-1][0][2][1]) > threshold) | |
): | |
row.append(line) | |
elif ( | |
line[0][0][1] <= row[-1][0][0][1] and line[0][2][1] <= row[-1][0][2][1] | |
) and ( | |
line[0][2][1] > row[-1][0][0][1] | |
and line[0][0][1] < row[-1][0][2][1] | |
and (abs(line[0][2][1] - row[-1][0][0][1]) > threshold) | |
): | |
row.append(line) | |
elif ( | |
line[0][0][1] <= row[-1][0][0][1] and line[0][2][1] >= row[-1][0][2][1] | |
) and ( | |
line[0][2][1] > row[-1][0][0][1] | |
and line[0][0][1] < row[-1][0][2][1] | |
and (abs(row[-1][0][2][1] - row[-1][0][0][1]) > threshold) | |
): | |
row.append(line) | |
elif ( | |
line[0][0][1] >= row[-1][0][0][1] and line[0][2][1] <= row[-1][0][2][1] | |
) and ( | |
line[0][2][1] > row[-1][0][0][1] | |
and line[0][0][1] < row[-1][0][2][1] | |
and (abs(line[0][0][1] - line[0][2][1]) > threshold) | |
): | |
row.append(line) | |
elif ( | |
line[0][0][1] == row[-1][0][0][1] and line[0][2][1] == row[-1][0][2][1] | |
) and ( | |
line[0][2][1] > row[-1][0][0][1] | |
and line[0][0][1] < row[-1][0][2][1] | |
and (abs(line[0][2][1] - row[-1][0][0][1]) > threshold) | |
): | |
row.append(line) | |
else: | |
col.append(process_text(row)) | |
row = [line] | |
if i == len(result[idx]) - 1: | |
col.append(process_text(row)) | |
return col | |
def prepare_coordinates(cols): | |
""" | |
* find the column with the maximum number of rows | |
* create a key value pair in which the key is the coordinates of each row in the column with the highest number of rows | |
and the value is an empty numpy array which has length of number of detected columns | |
""" | |
max_col = max(cols, key=len) | |
array = np.empty(len(cols), dtype=object) | |
array.fill(np.nan) | |
coor_dict = {tuple(k[0]): array for k in max_col} | |
return coor_dict | |
def process_cols(cols, threshold): | |
coor_dict = prepare_coordinates(cols) | |
""" | |
* loop over each element inside each column and find the right place for it inside the dataframe by using the coordinates intersection with respect to the average length of the row | |
* the intersection is True if the intersected part is bigger than the threshold number (ex: half of the average length of the row) | |
""" | |
for idx, col in enumerate(cols): | |
for element in col: | |
for coor, row in coor_dict.items(): | |
if (coor[0] >= element[0][0] and coor[1] >= element[0][1]) and ( | |
(coor[1] > element[0][0]) | |
and (coor[0] < element[0][1]) | |
and (abs(coor[0] - element[0][1]) > threshold) | |
): | |
new = row.copy() | |
new[idx] = element[1] | |
coor_dict[coor] = new | |
elif (coor[0] <= element[0][0] and coor[1] <= element[0][1]) and ( | |
(coor[1] > element[0][0]) | |
and (coor[0] < element[0][1]) | |
and (abs(coor[1] - element[0][0]) > threshold) | |
): | |
new = row.copy() | |
new[idx] = element[1] | |
coor_dict[coor] = new | |
elif (coor[0] >= element[0][0] and coor[1] <= element[0][1]) and ( | |
(coor[1] > element[0][0]) | |
and (coor[0] < element[0][1]) | |
and (abs(coor[1] - coor[0]) > threshold) | |
): | |
new = row.copy() | |
new[idx] = element[1] | |
coor_dict[coor] = new | |
elif (coor[0] <= element[0][0] and coor[1] >= element[0][1]) and ( | |
(coor[1] > element[0][0]) | |
and (coor[0] < element[0][1]) | |
and (abs(element[0][1] - element[0][0]) > threshold) | |
): | |
new = row.copy() | |
new[idx] = element[1] | |
coor_dict[coor] = new | |
data = [row for row in coor_dict.values()] | |
return data | |
def valid_row(row): | |
return ( | |
(row[0] is not np.nan) | |
or (row[-1] is not np.nan) | |
or (row[-2] is not np.nan) | |
or (row[-3] is not np.nan) | |
) | |
def finalize_data(data: list, page_enumeration: int): | |
idx = 0 | |
while idx <= len(data) - 1: | |
row = data[idx] | |
if valid_row(row) and row[0] is np.nan: | |
# * add the date to the valid row if it's empty | |
try: | |
row[0] = data[idx - 1][0] | |
data[idx] = row | |
except: | |
data.pop(idx) | |
idx = (idx - 1) if idx > 0 else idx | |
continue | |
if not valid_row(row): | |
if idx == 0: | |
data.pop(idx) | |
continue | |
for i, col in enumerate(row): | |
# * merge description to the previous row if the current row is not valid | |
if (col is not None) and (col is not np.nan): | |
data[idx - 1][i] = str(data[idx - 1][i]) + f" {col}" | |
data.pop(idx) | |
idx -= 1 | |
continue | |
idx += 1 | |
page_idx = ["page"] + [page_enumeration for i in range(len(data) - 1)] | |
data: pd.DataFrame = pd.DataFrame(data) | |
data.insert(0, "page", page_idx) | |
return data | |