Spaces:
Runtime error
Runtime error
File size: 8,627 Bytes
14e58a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
from paddleocr import PaddleOCR
import numpy as np
import pandas as pd
ocr = PaddleOCR(use_angle_cls=True, lang="ch")
def filter_columns(columns: np.ndarray):
for idx, col in enumerate(columns):
if idx >= len(columns) - 1:
break
nxt = columns[idx + 1]
threshold = ((col[2] - col[0]) + (nxt[2] - nxt[0])) / 2
if (col[2] - columns[idx + 1][0]) > threshold * 0.5:
col[1], col[2], col[3] = min(col[1], nxt[1]), nxt[2], max(col[3], nxt[3])
columns = np.delete(columns, idx + 1, 0)
idx -= 1
return columns
def process_text(row):
# * concatenate the text of the cell and return the coordinates and the text of the cell
coor = np.array([None, None])
text = ""
for txt in row:
coor[0], coor[1] = (
txt[0][0][1] if coor[0] is None or txt[0][0][1] < coor[0] else coor[0],
txt[0][2][1] if coor[1] is None or txt[0][2][1] > coor[1] else coor[1],
)
text += f"{txt[1][0]} "
text = text.strip()
row = [coor, text]
return row
def extract_text_of_col(col_img):
"""'
* extract text from the column image and calculate the average length of the row in the column
* the average is calculated by summing the length of each row then divide the total by the number of rows inside the column
* return the text and the average length
"""
result = ocr.ocr(col_img, cls=False)
ocr_res = []
for ps, (text, score) in result[0]:
x1 = min(p[0] for p in ps)
y1 = min(p[1] for p in ps)
x2 = max(p[0] for p in ps)
y2 = max(p[1] for p in ps)
word_info = {
'bbox': [x1, y1, x2, y2],
'text': text
}
ocr_res.append(word_info)
threshold = 0
print(result)
for idx in range(len(result)):
summ = 0
length = len(result[idx])
for line in result[idx]:
summ += line[0][2][1] - line[0][0][1]
if length > 0:
threshold += summ / len(result[idx])
return result, threshold / len(result),ocr_res
def prepare_cols(result, threshold):
"""
** columns are seperated **
* add each element from the extracted text to its row according to the coordinate intersection with respect to the average length of the row
* the intersection is True if the intersected part is bigger than the threshold number (ex: half of the average length of the row)
* return the column of the arranged rows
"""
col = []
for idx in range(len(result)):
row = []
for i, line in enumerate(result[idx]):
if i == 0:
row.append(line)
if i == len(result[idx]) - 1:
col.append(process_text(row))
continue
if (
line[0][0][1] >= row[-1][0][0][1] and line[0][2][1] >= row[-1][0][2][1]
) and (
line[0][2][1] > row[-1][0][0][1]
and line[0][0][1] < row[-1][0][2][1]
and (abs(line[0][0][1] - row[-1][0][2][1]) > threshold)
):
row.append(line)
elif (
line[0][0][1] <= row[-1][0][0][1] and line[0][2][1] <= row[-1][0][2][1]
) and (
line[0][2][1] > row[-1][0][0][1]
and line[0][0][1] < row[-1][0][2][1]
and (abs(line[0][2][1] - row[-1][0][0][1]) > threshold)
):
row.append(line)
elif (
line[0][0][1] <= row[-1][0][0][1] and line[0][2][1] >= row[-1][0][2][1]
) and (
line[0][2][1] > row[-1][0][0][1]
and line[0][0][1] < row[-1][0][2][1]
and (abs(row[-1][0][2][1] - row[-1][0][0][1]) > threshold)
):
row.append(line)
elif (
line[0][0][1] >= row[-1][0][0][1] and line[0][2][1] <= row[-1][0][2][1]
) and (
line[0][2][1] > row[-1][0][0][1]
and line[0][0][1] < row[-1][0][2][1]
and (abs(line[0][0][1] - line[0][2][1]) > threshold)
):
row.append(line)
elif (
line[0][0][1] == row[-1][0][0][1] and line[0][2][1] == row[-1][0][2][1]
) and (
line[0][2][1] > row[-1][0][0][1]
and line[0][0][1] < row[-1][0][2][1]
and (abs(line[0][2][1] - row[-1][0][0][1]) > threshold)
):
row.append(line)
else:
col.append(process_text(row))
row = [line]
if i == len(result[idx]) - 1:
col.append(process_text(row))
return col
def prepare_coordinates(cols):
"""
* find the column with the maximum number of rows
* create a key value pair in which the key is the coordinates of each row in the column with the highest number of rows
and the value is an empty numpy array which has length of number of detected columns
"""
max_col = max(cols, key=len)
array = np.empty(len(cols), dtype=object)
array.fill(np.nan)
coor_dict = {tuple(k[0]): array for k in max_col}
return coor_dict
def process_cols(cols, threshold):
coor_dict = prepare_coordinates(cols)
"""
* loop over each element inside each column and find the right place for it inside the dataframe by using the coordinates intersection with respect to the average length of the row
* the intersection is True if the intersected part is bigger than the threshold number (ex: half of the average length of the row)
"""
for idx, col in enumerate(cols):
for element in col:
for coor, row in coor_dict.items():
if (coor[0] >= element[0][0] and coor[1] >= element[0][1]) and (
(coor[1] > element[0][0])
and (coor[0] < element[0][1])
and (abs(coor[0] - element[0][1]) > threshold)
):
new = row.copy()
new[idx] = element[1]
coor_dict[coor] = new
elif (coor[0] <= element[0][0] and coor[1] <= element[0][1]) and (
(coor[1] > element[0][0])
and (coor[0] < element[0][1])
and (abs(coor[1] - element[0][0]) > threshold)
):
new = row.copy()
new[idx] = element[1]
coor_dict[coor] = new
elif (coor[0] >= element[0][0] and coor[1] <= element[0][1]) and (
(coor[1] > element[0][0])
and (coor[0] < element[0][1])
and (abs(coor[1] - coor[0]) > threshold)
):
new = row.copy()
new[idx] = element[1]
coor_dict[coor] = new
elif (coor[0] <= element[0][0] and coor[1] >= element[0][1]) and (
(coor[1] > element[0][0])
and (coor[0] < element[0][1])
and (abs(element[0][1] - element[0][0]) > threshold)
):
new = row.copy()
new[idx] = element[1]
coor_dict[coor] = new
data = [row for row in coor_dict.values()]
return data
def valid_row(row):
return (
(row[0] is not np.nan)
or (row[-1] is not np.nan)
or (row[-2] is not np.nan)
or (row[-3] is not np.nan)
)
def finalize_data(data: list, page_enumeration: int):
idx = 0
while idx <= len(data) - 1:
row = data[idx]
if valid_row(row) and row[0] is np.nan:
# * add the date to the valid row if it's empty
try:
row[0] = data[idx - 1][0]
data[idx] = row
except:
data.pop(idx)
idx = (idx - 1) if idx > 0 else idx
continue
if not valid_row(row):
if idx == 0:
data.pop(idx)
continue
for i, col in enumerate(row):
# * merge description to the previous row if the current row is not valid
if (col is not None) and (col is not np.nan):
data[idx - 1][i] = str(data[idx - 1][i]) + f" {col}"
data.pop(idx)
idx -= 1
continue
idx += 1
page_idx = ["page"] + [page_enumeration for i in range(len(data) - 1)]
data: pd.DataFrame = pd.DataFrame(data)
data.insert(0, "page", page_idx)
return data
|