|
""" |
|
Mostly adapted from https://github.com/jwcarr/eyekit/blob/350d055eecaa1581b03db5a847424825ffbb10f6/eyekit/_snap.py |
|
""" |
|
|
|
import os |
|
import numpy as np |
|
from sklearn.cluster import KMeans |
|
from icecream import ic |
|
|
|
ic.configureOutput(includeContext=True) |
|
|
|
os.environ["OMP_NUM_THREADS"] = "1" |
|
|
|
|
|
def apply_classic_algo( |
|
dffix, |
|
trial, |
|
algo="slice", |
|
algo_params=dict(x_thresh=192, y_thresh=32, w_thresh=32, n_thresh=90), |
|
): |
|
fixation_array = dffix.loc[:, ["x", "y"]].values |
|
y_diff = trial["y_diff"] |
|
if "y_char_unique" in trial: |
|
midlines = trial["y_char_unique"] |
|
else: |
|
midlines = trial["y_midline"] |
|
if len(midlines) == 1: |
|
corrected_fix_y_vals = np.ones((fixation_array.shape[0])) * midlines[0] |
|
elif fixation_array.shape[0] <= 2: |
|
corrected_fix_y_vals = np.ones((fixation_array.shape[0])) * midlines[0] |
|
|
|
else: |
|
if algo == "slice": |
|
corrected_fix_y_vals = slice(fixation_array, midlines, line_height=y_diff, **algo_params) |
|
elif algo == "warp": |
|
word_center_list = [(word["word_x_center"], word["word_y_center"]) for word in trial["words_list"]] |
|
corrected_fix_y_vals = warp(fixation_array, word_center_list) |
|
elif algo == "chain": |
|
corrected_fix_y_vals = chain(fixation_array, midlines, **algo_params) |
|
elif algo == "cluster": |
|
corrected_fix_y_vals = cluster(fixation_array, midlines) |
|
elif algo == "merge": |
|
corrected_fix_y_vals = merge(fixation_array, midlines, **algo_params) |
|
elif algo == "regress": |
|
corrected_fix_y_vals = regress(fixation_array, midlines, **algo_params) |
|
elif algo == "segment": |
|
corrected_fix_y_vals = segment(fixation_array, midlines, **algo_params) |
|
elif algo == "split": |
|
corrected_fix_y_vals = split(fixation_array, midlines, **algo_params) |
|
elif algo == "stretch": |
|
corrected_fix_y_vals = stretch(fixation_array, midlines, **algo_params) |
|
elif algo == "attach": |
|
corrected_fix_y_vals = attach(fixation_array, midlines) |
|
elif algo == "compare": |
|
word_center_list = [(word["word_x_center"], word["word_y_center"]) for word in trial["words_list"]] |
|
n_nearest_lines = min(algo_params["n_nearest_lines"], len(midlines) - 1) |
|
algo_params["n_nearest_lines"] = n_nearest_lines |
|
corrected_fix_y_vals = compare(fixation_array, np.array(word_center_list), **algo_params) |
|
else: |
|
raise NotImplementedError(f"{algo} not implemented") |
|
corrected_fix_y_vals = np.round(corrected_fix_y_vals, decimals=2) |
|
corrected_line_nums = [trial["y_char_unique"].index(y) for y in corrected_fix_y_vals] |
|
dffix[f"y_{algo}"] = corrected_fix_y_vals |
|
dffix[f"line_num_{algo}"] = corrected_line_nums |
|
dffix = dffix.copy() |
|
return dffix |
|
|
|
|
|
def slice(fixation_XY, midlines, line_height: float, x_thresh=192, y_thresh=32, w_thresh=32, n_thresh=90): |
|
""" |
|
Form a set of runs and then reduce the set to *m* by repeatedly merging |
|
those that appear to be on the same line. Merged sequences are then |
|
assigned to text lines in positional order. Default params: |
|
`x_thresh=192`, `y_thresh=32`, `w_thresh=32`, `n_thresh=90`. Requires |
|
NumPy. Original method by [Glandorf & Schroeder (2021)](https://doi.org/10.1016/j.procs.2021.09.069). |
|
""" |
|
fixation_XY = np.array(fixation_XY, dtype=float) |
|
line_Y = np.array(midlines, dtype=float) |
|
proto_lines, phantom_proto_lines = {}, {} |
|
|
|
dist_X = abs(np.diff(fixation_XY[:, 0])) |
|
dist_Y = abs(np.diff(fixation_XY[:, 1])) |
|
end_run_indices = list(np.where(np.logical_or(dist_X > x_thresh, dist_Y > y_thresh))[0] + 1) |
|
run_starts = [0] + end_run_indices |
|
run_ends = end_run_indices + [len(fixation_XY)] |
|
runs = [list(range(start, end)) for start, end in zip(run_starts, run_ends)] |
|
|
|
longest_run_i = np.argmax([fixation_XY[run[-1], 0] - fixation_XY[run[0], 0] for run in runs]) |
|
proto_lines[0] = runs.pop(longest_run_i) |
|
|
|
while runs: |
|
merger_on_this_iteration = False |
|
for proto_line_i, direction in [(min(proto_lines), -1), (max(proto_lines), 1)]: |
|
|
|
proto_lines[proto_line_i + direction] = [] |
|
|
|
if proto_lines[proto_line_i]: |
|
proto_line_XY = fixation_XY[proto_lines[proto_line_i]] |
|
else: |
|
proto_line_XY = phantom_proto_lines[proto_line_i] |
|
|
|
run_differences = np.zeros(len(runs)) |
|
for run_i, run in enumerate(runs): |
|
y_diffs = [y - proto_line_XY[np.argmin(abs(proto_line_XY[:, 0] - x)), 1] for x, y in fixation_XY[run]] |
|
run_differences[run_i] = np.mean(y_diffs) |
|
|
|
merge_into_current = list(np.where(abs(run_differences) < w_thresh)[0]) |
|
|
|
merge_into_adjacent = list( |
|
np.where( |
|
np.logical_and( |
|
run_differences * direction >= w_thresh, |
|
run_differences * direction < n_thresh, |
|
) |
|
)[0] |
|
) |
|
|
|
for index in merge_into_current: |
|
proto_lines[proto_line_i].extend(runs[index]) |
|
for index in merge_into_adjacent: |
|
proto_lines[proto_line_i + direction].extend(runs[index]) |
|
|
|
if not merge_into_adjacent: |
|
average_x, average_y = np.mean(proto_line_XY, axis=0) |
|
adjacent_y = average_y + line_height * direction |
|
phantom_proto_lines[proto_line_i + direction] = np.array([[average_x, adjacent_y]]) |
|
|
|
for index in sorted(merge_into_current + merge_into_adjacent, reverse=True): |
|
del runs[index] |
|
merger_on_this_iteration = True |
|
|
|
if not merger_on_this_iteration: |
|
break |
|
|
|
for run in runs: |
|
best_pl_distance = np.inf |
|
best_pl_assignemnt = None |
|
for proto_line_i in proto_lines: |
|
if proto_lines[proto_line_i]: |
|
proto_line_XY = fixation_XY[proto_lines[proto_line_i]] |
|
else: |
|
proto_line_XY = phantom_proto_lines[proto_line_i] |
|
y_diffs = [y - proto_line_XY[np.argmin(abs(proto_line_XY[:, 0] - x)), 1] for x, y in fixation_XY[run]] |
|
pl_distance = abs(np.mean(y_diffs)) |
|
if pl_distance < best_pl_distance: |
|
best_pl_distance = pl_distance |
|
best_pl_assignemnt = proto_line_i |
|
proto_lines[best_pl_assignemnt].extend(run) |
|
|
|
while len(proto_lines) > len(line_Y): |
|
top, bot = min(proto_lines), max(proto_lines) |
|
if len(proto_lines[top]) < len(proto_lines[bot]): |
|
proto_lines[top + 1].extend(proto_lines[top]) |
|
del proto_lines[top] |
|
else: |
|
proto_lines[bot - 1].extend(proto_lines[bot]) |
|
del proto_lines[bot] |
|
|
|
for line_i, proto_line_i in enumerate(sorted(proto_lines)): |
|
fixation_XY[proto_lines[proto_line_i], 1] = line_Y[line_i] |
|
return fixation_XY[:, 1] |
|
|
|
|
|
def attach(fixation_XY, line_Y): |
|
n = len(fixation_XY) |
|
for fixation_i in range(n): |
|
line_i = np.argmin(abs(line_Y - fixation_XY[fixation_i, 1])) |
|
fixation_XY[fixation_i, 1] = line_Y[line_i] |
|
return fixation_XY[:, 1] |
|
|
|
|
|
def chain(fixation_XY, midlines, x_thresh=192, y_thresh=32): |
|
""" |
|
Chain consecutive fixations that are sufficiently close to each other, and |
|
then assign chains to their closest text lines. Default params: |
|
`x_thresh=192`, `y_thresh=32`. Requires NumPy. Original method |
|
implemented in [popEye](https://github.com/sascha2schroeder/popEye/). |
|
""" |
|
try: |
|
import numpy as np |
|
except ModuleNotFoundError as e: |
|
e.msg = "The chain method requires NumPy." |
|
raise |
|
fixation_XY = np.array(fixation_XY) |
|
line_Y = np.array(midlines) |
|
dist_X = abs(np.diff(fixation_XY[:, 0])) |
|
dist_Y = abs(np.diff(fixation_XY[:, 1])) |
|
end_chain_indices = list(np.where(np.logical_or(dist_X > x_thresh, dist_Y > y_thresh))[0] + 1) |
|
end_chain_indices.append(len(fixation_XY)) |
|
start_of_chain = 0 |
|
for end_of_chain in end_chain_indices: |
|
mean_y = np.mean(fixation_XY[start_of_chain:end_of_chain, 1]) |
|
line_i = np.argmin(abs(line_Y - mean_y)) |
|
fixation_XY[start_of_chain:end_of_chain, 1] = line_Y[line_i] |
|
start_of_chain = end_of_chain |
|
return fixation_XY[:, 1] |
|
|
|
|
|
def cluster(fixation_XY, line_Y): |
|
m = len(line_Y) |
|
fixation_Y = fixation_XY[:, 1].reshape(-1, 1) |
|
if fixation_Y.shape[0] < m: |
|
ic(f"CLUSTER failed because of low number of fixations: {fixation_XY.shape}") |
|
ic("Assigned all fixation to first line") |
|
return np.ones_like(fixation_XY[:, 1]) * line_Y[0] |
|
clusters = KMeans(m, n_init=100, max_iter=300).fit_predict(fixation_Y) |
|
centers = [fixation_Y[clusters == i].mean() for i in range(m)] |
|
ordered_cluster_indices = np.argsort(centers) |
|
for fixation_i, cluster_i in enumerate(clusters): |
|
line_i = np.where(ordered_cluster_indices == cluster_i)[0][0] |
|
fixation_XY[fixation_i, 1] = line_Y[line_i] |
|
return fixation_XY[:, 1] |
|
|
|
|
|
def compare(fixation_XY, word_XY, x_thresh=512, n_nearest_lines=3): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
line_Y = np.unique(word_XY[:, 1]) |
|
n = len(fixation_XY) |
|
diff_X = np.diff(fixation_XY[:, 0]) |
|
end_line_indices = list(np.where(diff_X < -x_thresh)[0] + 1) |
|
end_line_indices.append(n) |
|
start_of_line = 0 |
|
for end_of_line in end_line_indices: |
|
gaze_line = fixation_XY[start_of_line:end_of_line] |
|
mean_y = np.mean(gaze_line[:, 1]) |
|
lines_ordered_by_proximity = np.argsort(abs(line_Y - mean_y)) |
|
nearest_line_I = lines_ordered_by_proximity[:n_nearest_lines] |
|
line_costs = np.zeros(n_nearest_lines) |
|
for candidate_i in range(n_nearest_lines): |
|
candidate_line_i = nearest_line_I[candidate_i] |
|
text_line = word_XY[word_XY[:, 1] == line_Y[candidate_line_i]] |
|
dtw_cost, dtw_path = dynamic_time_warping(gaze_line[:, 0:1], text_line[:, 0:1]) |
|
line_costs[candidate_i] = dtw_cost |
|
line_i = nearest_line_I[np.argmin(line_costs)] |
|
fixation_XY[start_of_line:end_of_line, 1] = line_Y[line_i] |
|
start_of_line = end_of_line |
|
return fixation_XY[:, 1] |
|
|
|
|
|
def merge(fixation_XY, midlines, text_right_to_left=False, y_thresh=32, gradient_thresh=0.1, error_thresh=20): |
|
""" |
|
Form a set of progressive sequences and then reduce the set to *m* by |
|
repeatedly merging those that appear to be on the same line. Merged |
|
sequences are then assigned to text lines in positional order. Default |
|
params: `y_thresh=32`, `gradient_thresh=0.1`, `error_thresh=20`. Requires |
|
NumPy. Original method by [Špakov et al. (2019)](https://doi.org/10.3758/s13428-018-1120-x). |
|
""" |
|
try: |
|
import numpy as np |
|
except ModuleNotFoundError as e: |
|
e.msg = "The merge method requires NumPy." |
|
raise |
|
fixation_XY = np.array(fixation_XY) |
|
line_Y = np.array(midlines) |
|
diff_X = np.diff(fixation_XY[:, 0]) |
|
dist_Y = abs(np.diff(fixation_XY[:, 1])) |
|
if text_right_to_left: |
|
sequence_boundaries = list(np.where(np.logical_or(diff_X > 0, dist_Y > y_thresh))[0] + 1) |
|
else: |
|
sequence_boundaries = list(np.where(np.logical_or(diff_X < 0, dist_Y > y_thresh))[0] + 1) |
|
sequence_starts = [0] + sequence_boundaries |
|
sequence_ends = sequence_boundaries + [len(fixation_XY)] |
|
sequences = [list(range(start, end)) for start, end in zip(sequence_starts, sequence_ends)] |
|
for min_i, min_j, remove_constraints in [ |
|
(3, 3, False), |
|
(1, 3, False), |
|
(1, 1, False), |
|
(1, 1, True), |
|
]: |
|
while len(sequences) > len(line_Y): |
|
best_merger = None |
|
best_error = np.inf |
|
for i in range(len(sequences) - 1): |
|
if len(sequences[i]) < min_i: |
|
continue |
|
for j in range(i + 1, len(sequences)): |
|
if len(sequences[j]) < min_j: |
|
continue |
|
candidate_XY = fixation_XY[sequences[i] + sequences[j]] |
|
gradient, intercept = np.polyfit(candidate_XY[:, 0], candidate_XY[:, 1], 1) |
|
residuals = candidate_XY[:, 1] - (gradient * candidate_XY[:, 0] + intercept) |
|
error = np.sqrt(sum(residuals**2) / len(candidate_XY)) |
|
if remove_constraints or (abs(gradient) < gradient_thresh and error < error_thresh): |
|
if error < best_error: |
|
best_merger = (i, j) |
|
best_error = error |
|
if best_merger is None: |
|
break |
|
merge_i, merge_j = best_merger |
|
merged_sequence = sequences[merge_i] + sequences[merge_j] |
|
sequences.append(merged_sequence) |
|
del sequences[merge_j], sequences[merge_i] |
|
mean_Y = [fixation_XY[sequence, 1].mean() for sequence in sequences] |
|
ordered_sequence_indices = np.argsort(mean_Y) |
|
for line_i, sequence_i in enumerate(ordered_sequence_indices): |
|
fixation_XY[sequences[sequence_i], 1] = line_Y[line_i] |
|
return fixation_XY[:, 1] |
|
|
|
|
|
def regress( |
|
fixation_XY, |
|
midlines, |
|
slope_bounds=(-0.1, 0.1), |
|
offset_bounds=(-50, 50), |
|
std_bounds=(1, 20), |
|
): |
|
""" |
|
Find *m* regression lines that best fit the fixations and group fixations |
|
according to best fit regression lines, and then assign groups to text |
|
lines in positional order. Default params: `slope_bounds=(-0.1, 0.1)`, |
|
`offset_bounds=(-50, 50)`, `std_bounds=(1, 20)`. Requires SciPy. |
|
Original method by [Cohen (2013)](https://doi.org/10.3758/s13428-012-0280-3). |
|
""" |
|
try: |
|
import numpy as np |
|
from scipy.optimize import minimize |
|
from scipy.stats import norm |
|
except ModuleNotFoundError as e: |
|
e.msg = "The regress method requires SciPy." |
|
raise |
|
fixation_XY = np.array(fixation_XY) |
|
line_Y = np.array(midlines) |
|
density = np.zeros((len(fixation_XY), len(line_Y))) |
|
|
|
def fit_lines(params): |
|
k = slope_bounds[0] + (slope_bounds[1] - slope_bounds[0]) * norm.cdf(params[0]) |
|
o = offset_bounds[0] + (offset_bounds[1] - offset_bounds[0]) * norm.cdf(params[1]) |
|
s = std_bounds[0] + (std_bounds[1] - std_bounds[0]) * norm.cdf(params[2]) |
|
predicted_Y_from_slope = fixation_XY[:, 0] * k |
|
line_Y_plus_offset = line_Y + o |
|
for line_i in range(len(line_Y)): |
|
fit_Y = predicted_Y_from_slope + line_Y_plus_offset[line_i] |
|
density[:, line_i] = norm.logpdf(fixation_XY[:, 1], fit_Y, s) |
|
return -sum(density.max(axis=1)) |
|
|
|
best_fit = minimize(fit_lines, [0, 0, 0], method="powell") |
|
fit_lines(best_fit.x) |
|
return line_Y[density.argmax(axis=1)] |
|
|
|
|
|
def segment(fixation_XY, midlines, text_right_to_left=False): |
|
""" |
|
Segment fixation sequence into *m* subsequences based on *m*–1 most-likely |
|
return sweeps, and then assign subsequences to text lines in chronological |
|
order. Requires NumPy. Original method by |
|
[Abdulin & Komogortsev (2015)](https://doi.org/10.1109/BTAS.2015.7358786). |
|
""" |
|
try: |
|
import numpy as np |
|
except ModuleNotFoundError as e: |
|
e.msg = "The segment method requires NumPy." |
|
raise |
|
fixation_XY = np.array(fixation_XY) |
|
line_Y = np.array(midlines) |
|
diff_X = np.diff(fixation_XY[:, 0]) |
|
saccades_ordered_by_length = np.argsort(diff_X) |
|
if text_right_to_left: |
|
line_change_indices = saccades_ordered_by_length[-(len(line_Y) - 1) :] |
|
else: |
|
line_change_indices = saccades_ordered_by_length[: len(line_Y) - 1] |
|
current_line_i = 0 |
|
for fixation_i in range(len(fixation_XY)): |
|
fixation_XY[fixation_i, 1] = line_Y[current_line_i] |
|
if fixation_i in line_change_indices: |
|
current_line_i += 1 |
|
return fixation_XY[:, 1] |
|
|
|
|
|
def split(fixation_XY, midlines, text_right_to_left=False): |
|
""" |
|
Split fixation sequence into subsequences based on best candidate return |
|
sweeps, and then assign subsequences to closest text lines. Requires |
|
SciPy. Original method by [Carr et al. (2022)](https://doi.org/10.3758/s13428-021-01554-0). |
|
""" |
|
try: |
|
import numpy as np |
|
from scipy.cluster.vq import kmeans2 |
|
except ModuleNotFoundError as e: |
|
e.msg = "The split method requires SciPy." |
|
raise |
|
fixation_XY = np.array(fixation_XY) |
|
line_Y = np.array(midlines) |
|
diff_X = np.array(np.diff(fixation_XY[:, 0]), dtype=float).reshape(-1, 1) |
|
centers, clusters = kmeans2(diff_X, 2, iter=100, minit="++", missing="raise") |
|
if text_right_to_left: |
|
sweep_marker = np.argmax(centers) |
|
else: |
|
sweep_marker = np.argmin(centers) |
|
end_line_indices = list(np.where(clusters == sweep_marker)[0] + 1) |
|
end_line_indices.append(len(fixation_XY)) |
|
start_of_line = 0 |
|
for end_of_line in end_line_indices: |
|
mean_y = np.mean(fixation_XY[start_of_line:end_of_line, 1]) |
|
line_i = np.argmin(abs(line_Y - mean_y)) |
|
fixation_XY[start_of_line:end_of_line] = line_Y[line_i] |
|
start_of_line = end_of_line |
|
return fixation_XY[:, 1] |
|
|
|
|
|
def stretch(fixation_XY, midlines, stretch_bounds=(0.9, 1.1), offset_bounds=(-50, 50)): |
|
""" |
|
Find a stretch factor and offset that results in a good alignment between |
|
the fixations and lines of text, and then assign the transformed fixations |
|
to the closest text lines. Default params: `stretch_bounds=(0.9, 1.1)`, |
|
`offset_bounds=(-50, 50)`. Requires SciPy. |
|
Original method by [Lohmeier (2015)](http://www.monochromata.de/master_thesis/ma1.3.pdf). |
|
""" |
|
try: |
|
import numpy as np |
|
from scipy.optimize import minimize |
|
except ModuleNotFoundError as e: |
|
e.msg = "The stretch method requires SciPy." |
|
raise |
|
fixation_Y = np.array(fixation_XY)[:, 1] |
|
line_Y = np.array(midlines) |
|
n = len(fixation_Y) |
|
corrected_Y = np.zeros(n) |
|
|
|
def fit_lines(params): |
|
candidate_Y = fixation_Y * params[0] + params[1] |
|
for fixation_i in range(n): |
|
line_i = np.argmin(abs(line_Y - candidate_Y[fixation_i])) |
|
corrected_Y[fixation_i] = line_Y[line_i] |
|
return sum(abs(candidate_Y - corrected_Y)) |
|
|
|
best_fit = minimize(fit_lines, [1, 0], method="powell", bounds=[stretch_bounds, offset_bounds]) |
|
fit_lines(best_fit.x) |
|
return corrected_Y |
|
|
|
|
|
def warp(fixation_XY, word_center_list): |
|
""" |
|
Map fixations to word centers using [Dynamic Time |
|
Warping](https://en.wikipedia.org/wiki/Dynamic_time_warping). This finds a |
|
monotonically increasing mapping between fixations and words with the |
|
shortest overall distance, effectively resulting in *m* subsequences. |
|
Fixations are then assigned to the lines that their mapped words belong |
|
to, effectively assigning subsequences to text lines in chronological |
|
order. Requires NumPy. |
|
Original method by [Carr et al. (2022)](https://doi.org/10.3758/s13428-021-01554-0). |
|
""" |
|
try: |
|
import numpy as np |
|
except ModuleNotFoundError as e: |
|
e.msg = "The warp method requires NumPy." |
|
raise |
|
fixation_XY = np.array(fixation_XY) |
|
word_XY = np.array([word_center for word_center in word_center_list]) |
|
n1 = len(fixation_XY) |
|
n2 = len(word_XY) |
|
cost = np.zeros((n1 + 1, n2 + 1)) |
|
cost[0, :] = np.inf |
|
cost[:, 0] = np.inf |
|
cost[0, 0] = 0 |
|
for fixation_i in range(n1): |
|
for word_i in range(n2): |
|
distance = np.sqrt(sum((fixation_XY[fixation_i] - word_XY[word_i]) ** 2)) |
|
cost[fixation_i + 1, word_i + 1] = distance + min( |
|
cost[fixation_i, word_i + 1], |
|
cost[fixation_i + 1, word_i], |
|
cost[fixation_i, word_i], |
|
) |
|
cost = cost[1:, 1:] |
|
warping_path = [[] for _ in range(n1)] |
|
while fixation_i > 0 or word_i > 0: |
|
warping_path[fixation_i].append(word_i) |
|
possible_moves = [np.inf, np.inf, np.inf] |
|
if fixation_i > 0 and word_i > 0: |
|
possible_moves[0] = cost[fixation_i - 1, word_i - 1] |
|
if fixation_i > 0: |
|
possible_moves[1] = cost[fixation_i - 1, word_i] |
|
if word_i > 0: |
|
possible_moves[2] = cost[fixation_i, word_i - 1] |
|
best_move = np.argmin(possible_moves) |
|
if best_move == 0: |
|
fixation_i -= 1 |
|
word_i -= 1 |
|
elif best_move == 1: |
|
fixation_i -= 1 |
|
else: |
|
word_i -= 1 |
|
warping_path[0].append(0) |
|
for fixation_i, words_mapped_to_fixation_i in enumerate(warping_path): |
|
candidate_Y = list(word_XY[words_mapped_to_fixation_i, 1]) |
|
fixation_XY[fixation_i, 1] = max(set(candidate_Y), key=candidate_Y.count) |
|
return fixation_XY[:, 1] |
|
|
|
|
|
def dynamic_time_warping(sequence1, sequence2): |
|
n1 = len(sequence1) |
|
n2 = len(sequence2) |
|
dtw_cost = np.zeros((n1 + 1, n2 + 1)) |
|
dtw_cost[0, :] = np.inf |
|
dtw_cost[:, 0] = np.inf |
|
dtw_cost[0, 0] = 0 |
|
for i in range(n1): |
|
for j in range(n2): |
|
this_cost = np.sqrt(sum((sequence1[i] - sequence2[j]) ** 2)) |
|
dtw_cost[i + 1, j + 1] = this_cost + min(dtw_cost[i, j + 1], dtw_cost[i + 1, j], dtw_cost[i, j]) |
|
dtw_cost = dtw_cost[1:, 1:] |
|
dtw_path = [[] for _ in range(n1)] |
|
while i > 0 or j > 0: |
|
dtw_path[i].append(j) |
|
possible_moves = [np.inf, np.inf, np.inf] |
|
if i > 0 and j > 0: |
|
possible_moves[0] = dtw_cost[i - 1, j - 1] |
|
if i > 0: |
|
possible_moves[1] = dtw_cost[i - 1, j] |
|
if j > 0: |
|
possible_moves[2] = dtw_cost[i, j - 1] |
|
best_move = np.argmin(possible_moves) |
|
if best_move == 0: |
|
i -= 1 |
|
j -= 1 |
|
elif best_move == 1: |
|
i -= 1 |
|
else: |
|
j -= 1 |
|
dtw_path[0].append(0) |
|
return dtw_cost[-1, -1], dtw_path |
|
|
|
|
|
def wisdom_of_the_crowd(assignments): |
|
""" |
|
For each fixation, choose the y-value with the most votes across multiple |
|
algorithms. In the event of a tie, the left-most algorithm is given |
|
priority. |
|
""" |
|
assignments = np.column_stack(assignments) |
|
correction = [] |
|
for row in assignments: |
|
candidates = list(row) |
|
candidate_counts = {y: candidates.count(y) for y in set(candidates)} |
|
best_count = max(candidate_counts.values()) |
|
best_candidates = [y for y, c in candidate_counts.items() if c == best_count] |
|
if len(best_candidates) == 1: |
|
correction.append(best_candidates[0]) |
|
else: |
|
for y in row: |
|
if y in best_candidates: |
|
correction.append(y) |
|
break |
|
return correction |
|
|