from icecream import ic |
from matplotlib import pyplot as plt |
import pathlib as pl |
import json |
from PIL import Image |
from torch.utils.data.dataloader import DataLoader as dl |
import matplotlib.patches as patches |
from torch.utils.data import Dataset as torch_dset |
import torchvision.transforms.functional as tvfunc |
import einops as eo |
from collections.abc import Iterable |
import numpy as np |
import pandas as pd |
from matplotlib import font_manager |
from matplotlib.font_manager import FontProperties |
from matplotlib.patches import Rectangle |
from tqdm.auto import tqdm |
import torch as t |
import plotly.express as px |
import copy |
import yaml |
import classic_correction_algos as calgo |
import analysis_funcs as anf |
import models |
import popEye_funcs as pf |
from loss_functions import corn_label_from_logits |
import torch.multiprocessing |
torch.multiprocessing.set_sharing_strategy('file_system') |
ic.configureOutput(includeContext=True) |
PLOTS_FOLDER = pl.Path("plots") |
event_strs = [ |
"EFIX", |
"EFIX R", |
"EFIX L", |
"SSACC", |
"ESACC", |
"SFIX", |
"MSG", |
"INPUT", |
"END", |
"START", |
] |
AVAILABLE_FONTS = [x.name for x in font_manager.fontManager.ttflist] |
COLORS = px.colors.qualitative.Alphabet |
RESULTS_FOLDER = pl.Path("results") |
PLOTS_FOLDER = pl.Path("plots") |
DIST_MODELS_FOLDER = pl.Path("models") |
IMAGENET_MEAN = [0.485, 0.456, 0.406] |
IMAGENET_STD = [0.229, 0.224, 0.225] |
"letternum", |
"letter", |
"on_word_number", |
"on_word", |
"on_sentence", |
"num_words_in_sentence", |
"on_sentence_num", |
"word_land", |
"line_let", |
"line_word", |
"sac_in", |
"sac_out", |
"word_launch", |
"word_refix", |
"word_reg_in", |
"word_reg_out", |
"sentence_reg_in", |
"word_firstskip", |
"word_run", |
"sentence_run", |
"word_run_fix", |
"word_cland", |
] |
"angle_incoming", |
"angle_outgoing", |
"line_let_from_last_letter", |
"sentence_word", |
"line_let_previous", |
"line_let_next", |
"sentence_refix", |
"word_reg_out_to", |
"word_reg_in_from", |
"sentence_reg_out", |
"sentence_reg_in_from", |
"sentence_reg_out_to", |
"sentence_firstskip", |
"word_runid", |
"sentence_runid", |
"word_fix", |
"sentence_fix", |
"sentence_run_fix", |
] |
class DSet(torch_dset): |
def __init__( |
self, |
in_sequence: t.Tensor, |
chars_center_coords_padded: t.Tensor, |
out_categories: t.Tensor, |
trialslist: list, |
padding_list: list = None, |
padding_at_end: bool = False, |
return_images_for_conv: bool = False, |
im_partial_string: str = "fixations_chars_channel_sep", |
input_im_shape=[224, 224], |
) -> None: |
super().__init__() |
self.in_sequence = in_sequence |
self.chars_center_coords_padded = chars_center_coords_padded |
self.out_categories = out_categories |
self.padding_list = padding_list |
self.padding_at_end = padding_at_end |
self.trialslist = trialslist |
self.return_images_for_conv = return_images_for_conv |
self.input_im_shape = input_im_shape |
if return_images_for_conv: |
self.im_partial_string = im_partial_string |
self.plot_files = [ |
str(x["plot_file"]).replace("fixations_words", im_partial_string) for x in self.trialslist |
] |
def __getitem__(self, index): |
if self.return_images_for_conv: |
im = Image.open(self.plot_files[index]) |
if [im.size[1], im.size[0]] != self.input_im_shape: |
im = tvfunc.resize(im, self.input_im_shape) |
im = tvfunc.normalize(tvfunc.to_tensor(im), IMAGENET_MEAN, IMAGENET_STD) |
if self.chars_center_coords_padded is not None: |
if self.padding_list is not None: |
attention_mask = t.ones(self.in_sequence[index].shape[:-1], dtype=t.long) |
if self.padding_at_end: |
if self.padding_list[index] > 0: |
attention_mask[-self.padding_list[index] :] = 0 |
else: |
attention_mask[: self.padding_list[index]] = 0 |
if self.return_images_for_conv: |
return ( |
self.in_sequence[index], |
self.chars_center_coords_padded[index], |
im, |
attention_mask, |
self.out_categories[index], |
) |
return ( |
self.in_sequence[index], |
self.chars_center_coords_padded[index], |
attention_mask, |
self.out_categories[index], |
) |
else: |
if self.return_images_for_conv: |
return ( |
self.in_sequence[index], |
self.chars_center_coords_padded[index], |
im, |
self.out_categories[index], |
) |
else: |
return (self.in_sequence[index], self.chars_center_coords_padded[index], self.out_categories[index]) |
if self.padding_list is not None: |
attention_mask = t.ones(self.in_sequence[index].shape[:-1], dtype=t.long) |
if self.padding_at_end: |
if self.padding_list[index] > 0: |
attention_mask[-self.padding_list[index] :] = 0 |
else: |
attention_mask[: self.padding_list[index]] = 0 |
if self.return_images_for_conv: |
return (self.in_sequence[index], im, attention_mask, self.out_categories[index]) |
else: |
return (self.in_sequence[index], attention_mask, self.out_categories[index]) |
if self.return_images_for_conv: |
return (self.in_sequence[index], im, self.out_categories[index]) |
else: |
return (self.in_sequence[index], self.out_categories[index]) |
def __len__(self): |
if isinstance(self.in_sequence, t.Tensor): |
return self.in_sequence.shape[0] |
else: |
return len(self.in_sequence) |
def remove_compile_from_model(model): |
if hasattr(model.project, "_orig_mod"): |
model.project = model.project._orig_mod |
model.chars_conv = model.chars_conv._orig_mod |
model.chars_classifier = model.chars_classifier._orig_mod |
model.layer_norm_in = model.layer_norm_in._orig_mod |
model.bert_model = model.bert_model._orig_mod |
model.linear = model.linear._orig_mod |
return model |
def remove_compile_from_dict(state_dict): |
for key in list(state_dict.keys()): |
newkey = key.replace("._orig_mod.", ".") |
state_dict[newkey] = state_dict.pop(key) |
return state_dict |
def load_model(model_file, cfg): |
try: |
model_loaded = t.load(model_file, map_location="cpu", weights_only=True) |
if "hyper_parameters" in model_loaded.keys(): |
model_cfg_temp = model_loaded["hyper_parameters"]["cfg"] |
else: |
model_cfg_temp = cfg |
model_state_dict = model_loaded["state_dict"] |
except Exception as e: |
ic(e) |
ic(f"Failed to load {model_file}") |
return None |
model = models.LitModel( |
[1, 500, 3], |
model_cfg_temp["hidden_dim_bert"], |
model_cfg_temp["num_attention_heads"], |
model_cfg_temp["n_layers_BERT"], |
model_cfg_temp["loss_function"], |
1e-4, |
model_cfg_temp["weight_decay"], |
model_cfg_temp, |
model_cfg_temp["use_lr_warmup"], |
model_cfg_temp["use_reduce_on_plateau"], |
track_gradient_histogram=model_cfg_temp["track_gradient_histogram"], |
register_forw_hook=model_cfg_temp["track_activations_via_hook"], |
char_dims=model_cfg_temp["char_dims"], |
) |
model = remove_compile_from_model(model) |
model_state_dict = remove_compile_from_dict(model_state_dict) |
with t.no_grad(): |
model.load_state_dict(model_state_dict, strict=False) |
model.eval() |
model.freeze() |
return model |
def find_and_load_model(model_date: str): |
model_cfg_file = list(DIST_MODELS_FOLDER.glob(f"*{model_date}*.yaml")) |
if len(model_cfg_file) == 0: |
ic(f"No model cfg yaml found for {model_date}") |
return None, None |
model_cfg_file = model_cfg_file[0] |
with open(model_cfg_file) as f: |
model_cfg = yaml.safe_load(f) |
model_file = list(pl.Path("models").glob(f"*{model_date}*.ckpt"))[0] |
model = load_model(model_file, model_cfg) |
return model, model_cfg |
def set_up_models(dist_models_folder): |
out_dict = {} |
dist_models_with_norm = list(dist_models_folder.glob("*normalize_by_line_height_and_width_True*.ckpt")) |
dist_models_without_norm = list(dist_models_folder.glob("*normalize_by_line_height_and_width_False*.ckpt")) |
DIST_MODEL_DATE_WITH_NORM = dist_models_with_norm[0].stem.split("_")[1] |
models_without_norm_df = [find_and_load_model(m_file.stem.split("_")[1]) for m_file in dist_models_without_norm] |
models_with_norm_df = [find_and_load_model(m_file.stem.split("_")[1]) for m_file in dist_models_with_norm] |
model_cfg_without_norm_df = [x[1] for x in models_without_norm_df if x[1] is not None][0] |
model_cfg_with_norm_df = [x[1] for x in models_with_norm_df if x[1] is not None][0] |
models_without_norm_df = [x[0] for x in models_without_norm_df if x[0] is not None] |
models_with_norm_df = [x[0] for x in models_with_norm_df if x[0] is not None] |
ensemble_model_avg = models.EnsembleModel( |
models_without_norm_df, models_with_norm_df, learning_rate=0.0058, use_simple_average=True |
) |
out_dict["ensemble_model_avg"] = ensemble_model_avg |
out_dict["model_cfg_without_norm_df"] = model_cfg_without_norm_df |
out_dict["model_cfg_with_norm_df"] = model_cfg_with_norm_df |
single_DIST_model, single_DIST_model_cfg = find_and_load_model(model_date=DIST_MODEL_DATE_WITH_NORM) |
out_dict["single_DIST_model"] = single_DIST_model |
out_dict["single_DIST_model_cfg"] = single_DIST_model_cfg |
return out_dict |
def reorder_columns( |
df, |
cols=[ |
"subject", |
"trial_id", |
"item", |
"condition", |
"fixation_number", |
"num", |
"word_number", |
"sentence_number", |
"duration", |
"start_uncorrected", |
"stop_uncorrected", |
"start_time", |
"end_time", |
"corrected_start_time", |
"corrected_end_time", |
"dX", |
"dY", |
], |
): |
existing_cols = [col for col in cols if col in df.columns] |
other_cols = [col for col in df.columns if col not in cols] |
return df[existing_cols + other_cols] |
def nan_or_int_minus_one(x): |
if not pd.isna(x): |
return int(x - 1.0) |
else: |
return pd.NA |
def add_popEye_cols_to_chars_df(chars_df): |
if "letternum" not in chars_df.columns or "letline" not in chars_df.columns: |
chars_df.reset_index(drop=False, inplace=True) |
chars_df.rename({"index": "letternum"}, axis=1, inplace=True) |
chars_df.loc[:, "letline"] = -1 |
chars_df["wordline"] = ( |
chars_df.groupby("assigned_line")["in_word_number"].rank(method="dense").map(nan_or_int_minus_one) |
) |
chars_df["wordsent"] = ( |
chars_df.groupby("in_sentence_number")["in_word_number"].rank(method="dense").map(nan_or_int_minus_one) |
) |
chars_df["letword"] = ( |
chars_df.groupby("in_word_number")["letternum"].rank(method="dense").map(nan_or_int_minus_one) |
) |
for line_idx in chars_df.assigned_line.unique(): |
chars_df.loc[chars_df.assigned_line == line_idx, "letline"] = ( |
chars_df.loc[chars_df.assigned_line == line_idx, "char"].reset_index().index |
) |
return chars_df |
def add_boxes_to_ax( |
chars_list, |
ax, |
font_to_use="DejaVu Sans Mono", |
fontsize=21, |
prefix="char", |
box_annotations: list = None, |
edgecolor="grey", |
linewidth=0.8, |
): |
if box_annotations is None: |
enum = chars_list |
else: |
enum = zip(chars_list, box_annotations) |
for v in enum: |
if box_annotations is not None: |
v, annot_text = v |
x0, y0 = v[f"{prefix}_xmin"], v[f"{prefix}_ymin"] |
xdiff, ydiff = v[f"{prefix}_xmax"] - v[f"{prefix}_xmin"], v[f"{prefix}_ymax"] - v[f"{prefix}_ymin"] |
ax.add_patch(Rectangle((x0, y0), xdiff, ydiff, edgecolor=edgecolor, facecolor="none", lw=linewidth, alpha=0.4)) |
if box_annotations is not None: |
ax.annotate( |
str(annot_text), |
(x0 + xdiff / 2, y0), |
horizontalalignment="center", |
verticalalignment="center", |
fontproperties=FontProperties(family=font_to_use, style="normal", size=fontsize / 1.5), |
) |
def add_text_to_ax( |
chars_list, |
ax, |
font_to_use="DejaVu Sans Mono", |
fontsize=21, |
prefix="char", |
): |
font_props = FontProperties(family=font_to_use, style="normal", size=fontsize) |
enum = chars_list |
for v in enum: |
ax.text( |
v[f"{prefix}_x_center"], |
v[f"{prefix}_y_center"], |
v[prefix], |
horizontalalignment="center", |
verticalalignment="center", |
fontproperties=font_props, |
) |
def set_font_from_chars_list(trial): |
if "chars_list" in trial: |
chars_df = pd.DataFrame(trial["chars_list"]) |
line_diffs = np.diff(chars_df.char_y_center.unique()) |
y_diffs = np.unique(line_diffs) |
if len(y_diffs) == 1: |
y_diff = y_diffs[0] |
else: |
y_diff = np.min(y_diffs) |
y_diff = round(y_diff * 2) / 2 |
else: |
y_diff = 1 / 0.333 * 18 |
font_size = y_diff * 0.333 |
return round((font_size) * 4, ndigits=0) / 4 |
def get_plot_props(trial, available_fonts): |
if "font" in trial.keys(): |
font = trial["font"] |
font_size = trial["font_size"] |
if font not in available_fonts: |
font = "DejaVu Sans Mono" |
else: |
font = "DejaVu Sans Mono" |
font_size = 21 |
dpi = 96 |
if "display_coords" in trial.keys() and trial["display_coords"] is not None: |
screen_res = (trial["display_coords"][2], trial["display_coords"][3]) |
else: |
screen_res = (1920, 1080) |
return font, font_size, dpi, screen_res |
def get_font_and_font_size_from_trial(trial): |
font_face, font_size, dpi, screen_res = get_plot_props(trial, AVAILABLE_FONTS) |
if font_size is None and "font_size" in trial: |
font_size = trial["font_size"] |
elif font_size is None: |
font_size = set_font_from_chars_list(trial) |
return font_face, font_size |
def sigmoid(x): |
return 1 / (1 + np.exp(-1 * x)) |
def matplotlib_plot_df( |
dffix, |
trial, |
algo_choice, |
dffix_no_clean=None, |
desired_dpi=300, |
fix_to_plot=[], |
stim_info_to_plot=["Characters", "Word boxes"], |
box_annotations: list = None, |
font=None, |
use_duration_arrow_sizes=True, |
): |
chars_df = pd.DataFrame(trial["chars_list"]) if "chars_list" in trial else None |
if chars_df is not None: |
font_face, font_size = get_font_and_font_size_from_trial(trial) |
font_size = font_size * 0.65 |
else: |
ic("No character or word information available to plot") |
if "display_coords" in trial: |
desired_width_in_pixels = trial["display_coords"][2] + 1 |
desired_height_in_pixels = trial["display_coords"][3] + 1 |
else: |
desired_width_in_pixels = 1920 |
desired_height_in_pixels = 1080 |
figure_width = desired_width_in_pixels / desired_dpi |
figure_height = desired_height_in_pixels / desired_dpi |
fig = plt.figure(figsize=(figure_width, figure_height), dpi=desired_dpi) |
ax = fig.add_subplot(1, 1, 1) |
fig.subplots_adjust(bottom=0) |
fig.subplots_adjust(top=1) |
fig.subplots_adjust(right=1) |
fig.subplots_adjust(left=0) |
if font is None: |
if "font" in trial and trial["font"] in AVAILABLE_FONTS: |
font_to_use = trial["font"] |
else: |
font_to_use = "DejaVu Sans Mono" |
else: |
font_to_use = font |
if "font_size" in trial: |
font_size = trial["font_size"] |
else: |
font_size = 20 |
if "Words" in stim_info_to_plot and "words_list" in trial: |
add_text_to_ax( |
trial["words_list"], |
ax, |
font_to_use, |
prefix="word", |
fontsize=font_size / 3.89, |
) |
if "Word boxes" in stim_info_to_plot and "words_list" in trial: |
add_boxes_to_ax( |
trial["words_list"], |
ax, |
font_to_use, |
prefix="word", |
fontsize=font_size / 3.89, |
box_annotations=box_annotations, |
edgecolor="black", |
linewidth=0.9, |
) |
if "Characters" in stim_info_to_plot and "chars_list" in trial: |
add_text_to_ax( |
trial["chars_list"], |
ax, |
font_to_use, |
prefix="char", |
fontsize=font_size / 3.89, |
) |
if "Character boxes" in stim_info_to_plot and "chars_list" in trial: |
add_boxes_to_ax( |
trial["chars_list"], |
ax, |
font_to_use, |
prefix="char", |
fontsize=font_size / 3.89, |
box_annotations=box_annotations, |
) |
if "Uncorrected Fixations" in fix_to_plot and dffix_no_clean is None: |
if use_duration_arrow_sizes and "duration" in dffix.columns: |
duration_scaled = dffix.duration - dffix.duration.min() |
duration_scaled = (((duration_scaled / duration_scaled.max()) - 0.5) * 3).values |
durations = sigmoid(duration_scaled) * 50 * 0.5 |
if use_duration_arrow_sizes: |
ax.plot( |
dffix.x, |
dffix.y, |
label="Raw fixations", |
color="blue", |
alpha=0.5, |
) |
add_arrow_annotations(dffix, "y", ax, "blue", durations[:-1]) |
else: |
ax.plot( |
dffix.x, |
dffix.y, |
label="Remaining fixations", |
color="blue", |
alpha=0.5, |
) |
add_arrow_annotations(dffix, "y", ax, "blue", 4) |
if dffix_no_clean is not None and "Uncorrected Fixations" in fix_to_plot: |
ax.plot( |
dffix_no_clean.x, |
dffix_no_clean.y, |
label="All fixations", |
color="k", |
alpha=0.5, |
lw=1, |
) |
add_arrow_annotations(dffix_no_clean, "y", ax, "k", 4) |
if "was_discarded_due_blinks" in dffix_no_clean.columns and dffix_no_clean["was_discarded_due_blinks"].any(): |
discarded_blink_fix = dffix_no_clean.loc[dffix_no_clean["was_discarded_due_blinks"], :].copy() |
ax.scatter( |
discarded_blink_fix.x, |
discarded_blink_fix.y, |
s=12, |
label="Discarded due to blinks", |
lw=1.5, |
edgecolors="orange", |
facecolors="none", |
) |
if ( |
"was_discarded_due_to_long_duration" in dffix_no_clean.columns |
and dffix_no_clean["was_discarded_due_to_long_duration"].any() |
): |
discarded_long_fix = dffix_no_clean.loc[dffix_no_clean["was_discarded_due_to_long_duration"], :].copy() |
ax.scatter( |
discarded_long_fix.x, |
discarded_long_fix.y, |
s=18, |
label="Overly long fixations", |
lw=0.8, |
edgecolors="purple", |
facecolors="none", |
) |
if "was_merged" in dffix_no_clean.columns: |
merged_fix = dffix_no_clean.loc[dffix_no_clean["was_merged"], :].copy() |
if not merged_fix.empty: |
ax.scatter( |
merged_fix.x, |
merged_fix.y, |
s=7, |
label="Merged short fixations", |
lw=1, |
edgecolors="red", |
facecolors="none", |
) |
if "was_discarded_outside_text" in dffix_no_clean.columns: |
was_discarded_outside_text_fix = dffix_no_clean.loc[dffix_no_clean["was_discarded_outside_text"], :].copy() |
if not was_discarded_outside_text_fix.empty: |
ax.scatter( |
was_discarded_outside_text_fix.x, |
was_discarded_outside_text_fix.y, |
s=8, |
label="Outside text fixations", |
lw=1.2, |
edgecolors="blue", |
facecolors="none", |
) |
if "was_discarded_short_fix" in dffix_no_clean.columns: |
was_discarded_short_fix_fix = dffix_no_clean.loc[dffix_no_clean["was_discarded_short_fix"], :].copy() |
if not was_discarded_short_fix_fix.empty: |
ax.scatter( |
was_discarded_short_fix_fix.x, |
was_discarded_short_fix_fix.y, |
label="Discarded short fixations", |
s=9, |
lw=1.5, |
edgecolors="green", |
facecolors="none", |
) |
if "Corrected Fixations" in fix_to_plot: |
if isinstance(algo_choice, list): |
algo_choices = algo_choice |
repeats = range(len(algo_choice)) |
else: |
algo_choices = [algo_choice] |
repeats = range(1) |
for algoIdx in repeats: |
algo_choice = algo_choices[algoIdx] |
if f"y_{algo_choice}" in dffix.columns: |
ax.plot( |
dffix.x, |
dffix.loc[:, f"y_{algo_choice}"], |
label=algo_choice, |
color=COLORS[algoIdx], |
alpha=0.6, |
linewidth=0.6, |
) |
add_arrow_annotations(dffix, f"y_{algo_choice}", ax, COLORS[algoIdx], 6) |
ax.set_xlim((0, desired_width_in_pixels)) |
ax.set_ylim((0, desired_height_in_pixels)) |
ax.invert_yaxis() |
if "Corrected Fixations" in fix_to_plot or "Uncorrected Fixations" in fix_to_plot: |
ax.legend(prop={"size": 5}) |
return fig, desired_width_in_pixels, desired_height_in_pixels |
def add_arrow_annotations(dffix, y_col, ax, color, size): |
x = dffix.x.values |
y = dffix.loc[:, y_col].values |
x = x[:-1] |
y = y[:-1] |
dX = -(x[1:] - x[:-1]) |
dY = -(y[1:] - y[:-1]) |
xpos = x[1:] |
ypos = y[1:] |
if isinstance(size, Iterable): |
use_size_idx = True |
else: |
use_size_idx = False |
s = size |
for fidx, (X, Y, dX, dY) in enumerate(zip(xpos, ypos, dX, dY)): |
if use_size_idx: |
s = size[fidx] |
ax.annotate( |
"", |
xytext=(X + 0.001 * dX, Y + 0.001 * dY), |
xy=(X, Y), |
arrowprops=dict(arrowstyle="fancy", color=color), |
size=s, |
alpha=0.3, |
) |
def plot_saccade_df(fix_df, sac_df, trial, show_numbers=False, add_lines_to_fix_df=False): |
stim_only_fig, _, _ = matplotlib_plot_df( |
fix_df, |
trial, |
None, |
dffix_no_clean=None, |
desired_dpi=300, |
fix_to_plot=[], |
stim_info_to_plot=["Characters", "Word boxes"], |
box_annotations=None, |
font=None, |
) |
if stim_only_fig is None: |
fig, ax = plt.subplots(1, 1, figsize=(8, 5), dpi=150) |
invert_ax_needed = True |
else: |
fig = stim_only_fig |
ax = fig.axes[0] |
invert_ax_needed = False |
def plot_arrow(x1, y1, x2, y2, scale_factor): |
"""Plot an arrow from (x1,y1) to (x2,y2) with adjustable size""" |
ax.arrow( |
x1, |
y1, |
(x2 - x1), |
(y2 - y1), |
color="k", |
alpha=0.7, |
length_includes_head=True, |
width=3 * scale_factor, |
head_width=15 * scale_factor, |
head_length=15 * scale_factor, |
) |
xs = sac_df["xs"].values |
ys = sac_df["ys"].values |
xe = sac_df["xe"].values |
ye = sac_df["ye"].values |
extent = np.sqrt((xs.min() - xe.max()) ** 2 + (ys.min() - ye.max()) ** 2) |
scale_factor = 0.0005 * extent |
for i in range(len(xs)): |
plot_arrow(xs[i], ys[i], xe[i], ye[i], scale_factor=scale_factor) |
if add_lines_to_fix_df: |
plotfunc = ax.plot |
else: |
plotfunc = ax.scatter |
if "x" in fix_df.columns: |
plotfunc(fix_df["x"], fix_df["y"], marker=".") |
else: |
plotfunc(fix_df["xs"], fix_df["ys"], marker=".") |
if invert_ax_needed: |
ax.invert_yaxis() |
if show_numbers: |
size = 8 * scale_factor |
xytext = ( |
1, |
-1, |
) |
for index, row in fix_df.iterrows(): |
ax.annotate( |
index, |
xy=(row["x"], row["y"]), |
textcoords="offset points", |
ha="center", |
xytext=xytext, |
va="bottom", |
color="k", |
size=size, |
) |
for index, row in sac_df.iterrows(): |
ax.annotate( |
index, |
xy=(row["xs"], row["ys"]), |
textcoords="offset points", |
ha="center", |
xytext=xytext, |
va="top", |
color="r", |
size=size, |
) |
return fig |
def get_events_df_from_lines_and_trial_selection(trial, trial_lines, discard_fixations_without_sfix): |
line_dicts = [] |
fixations_dicts = [] |
events_dicts = [] |
blink_started = False |
fixation_started = False |
esac_count = 0 |
efix_count = 0 |
sfix_count = 0 |
sblink_count = 0 |
eblink_times = [] |
eye_to_use = "R" |
for l in trial_lines: |
if "EFIX R" in l: |
eye_to_use = "R" |
break |
elif "EFIX L" in l: |
eye_to_use = "L" |
break |
for l in trial_lines: |
parts = [x.strip() for x in l.split("\t")] |
if f"EFIX {eye_to_use}" in l: |
efix_count += 1 |
if fixation_started: |
had_SFIX_before_it = True |
if parts[1] == "." and parts[2] == ".": |
continue |
fixation_started = False |
else: |
had_SFIX_before_it = False |
fix_dict = { |
"fixation_number": efix_count, |
"start_time": float(pd.to_numeric(parts[0].split()[-1].strip(), errors="coerce")), |
"end_time": float(pd.to_numeric(parts[1].strip(), errors="coerce")), |
"duration": float(pd.to_numeric(parts[2].strip(), errors="coerce")), |
"x": float(pd.to_numeric(parts[3].strip(), errors="coerce")), |
"y": float(pd.to_numeric(parts[4].strip(), errors="coerce")), |
"pupil_size": float(pd.to_numeric(parts[5].strip(), errors="coerce")), |
"had_SFIX_before_it": had_SFIX_before_it, |
"msg": "FIX", |
} |
if not discard_fixations_without_sfix or had_SFIX_before_it: |
fixations_dicts.append(fix_dict) |
events_dicts.append( |
{ |
"num": efix_count - 1, |
"start": float(pd.to_numeric(parts[0].split()[-1].strip(), errors="coerce")), |
"stop": float(pd.to_numeric(parts[1].strip(), errors="coerce")), |
"duration": float(pd.to_numeric(parts[2].strip(), errors="coerce")), |
"xs": float(pd.to_numeric(parts[3].strip(), errors="coerce")), |
"xe": None, |
"ys": float(pd.to_numeric(parts[4].strip(), errors="coerce")), |
"ye": None, |
"ampl": None, |
"pv": None, |
"pupil_size": float(pd.to_numeric(parts[5].strip(), errors="coerce")), |
"msg": "FIX", |
} |
) |
if len(fixations_dicts) >= 2: |
assert fixations_dicts[-1]["start_time"] > fixations_dicts[-2]["start_time"], "start times not in order" |
elif f"SFIX {eye_to_use}" in l: |
sfix_count += 1 |
fixation_started = True |
elif f"SBLINK {eye_to_use}" in l: |
sblink_count += 1 |
blink_started = True |
elif f"EBLINK {eye_to_use}" in l: |
blink_started = False |
blink_dict = { |
"num": len(eblink_times), |
"start": float(pd.to_numeric(parts[0].split()[-1].strip(), errors="coerce")), |
"stop": float(pd.to_numeric(parts[1].strip(), errors="coerce")), |
"duration": float(pd.to_numeric(parts[2].strip(), errors="coerce")), |
"xs": None, |
"xe": None, |
"ys": None, |
"ye": None, |
"ampl": None, |
"pv": None, |
"pupil_size": None, |
"msg": "BLINK", |
} |
events_dicts.append(blink_dict) |
eblink_times.append(float(pd.to_numeric(parts[-1], errors="coerce"))) |
elif "ESACC" in l: |
sac_dict = { |
"num": esac_count, |
"start": float(pd.to_numeric(parts[0].split()[-1].strip(), errors="coerce")), |
"stop": float(pd.to_numeric(parts[1].strip(), errors="coerce")), |
"duration": float(pd.to_numeric(parts[2].strip(), errors="coerce")), |
"xs": float(pd.to_numeric(parts[3].strip(), errors="coerce")), |
"ys": float(pd.to_numeric(parts[4].strip(), errors="coerce")), |
"xe": float(pd.to_numeric(parts[5].strip(), errors="coerce")), |
"ye": float(pd.to_numeric(parts[6].strip(), errors="coerce")), |
"ampl": float(pd.to_numeric(parts[7].strip(), errors="coerce")), |
"pv": float(pd.to_numeric(parts[8].strip(), errors="coerce")), |
"pupil_size": None, |
"msg": "SAC", |
} |
events_dicts.append(sac_dict) |
esac_count += 1 |
if not blink_started and not any([True for x in event_strs if x in l]): |
if len(parts) < 3 or (parts[1] == "." and parts[2] == "."): |
continue |
line_dicts.append( |
{ |
"idx": float(pd.to_numeric(parts[0].strip(), errors="coerce")), |
"x": float(pd.to_numeric(parts[1].strip(), errors="coerce")), |
"y": float(pd.to_numeric(parts[2].strip(), errors="coerce")), |
"p": float(pd.to_numeric(parts[3].strip(), errors="coerce")), |
"part_of_fixation": fixation_started, |
"fixation_number": sfix_count, |
"part_of_blink": blink_started, |
"blink_number": sblink_count, |
} |
) |
trial["eblink_times"] = eblink_times |
df = pd.DataFrame(line_dicts) |
df["x_smoothed"] = np.convolve(df.x, np.ones((5,)) / 5, mode="same") |
df["y_smoothed"] = np.convolve(df.y, np.ones((5,)) / 5, mode="same") |
df["time"] = df["idx"] - df["idx"].iloc[0] |
df = pf.compute_velocity(df) |
events_df = pd.DataFrame(events_dicts) |
events_df["start_uncorrected"] = events_df.start |
events_df["stop_uncorrected"] = events_df.stop |
events_df["start"] = events_df.start - trial["trial_start_time"] |
events_df["stop"] = events_df.stop - trial["trial_start_time"] |
events_df["start"] = events_df["start"].clip(0, events_df["start"].max()) |
events_df.sort_values(by="start", inplace=True) |
events_df.reset_index(drop=True, inplace=True) |
events_df = pf.event_long(events_df) |
events_df["duration"] = events_df["stop"] - events_df["start"] |
trial["efix_count"] = efix_count |
trial["eye_to_use"] = eye_to_use |
trial["sfix_count"] = sfix_count |
trial["sblink_count"] = sblink_count |
return trial, df, events_df |
def add_default_font_and_character_props_to_state(trial): |
chars_list = trial["chars_list"] |
chars_df = pd.DataFrame(trial["chars_list"]) |
line_diffs = np.diff(chars_df.char_y_center.unique()) |
y_diffs = np.unique(line_diffs) |
if len(y_diffs) > 1: |
y_diff = np.min(y_diffs) |
else: |
y_diff = y_diffs[0] |
y_diff = round(y_diff * 2) / 2 |
x_txt_start = chars_list[0]["char_xmin"] |
y_txt_start = chars_list[0]["char_y_center"] |
font_face, font_size = get_font_and_font_size_from_trial(trial) |
line_height = y_diff |
return y_diff, x_txt_start, y_txt_start, font_face, font_size, line_height |
def get_raw_events_df_and_trial(trial, discard_fixations_without_sfix): |
fname = pl.Path(trial["filename"]).stem |
trial_id = trial["trial_id"] |
trial_lines = trial.pop("trial_lines") |
trial["plot_file"] = str(PLOTS_FOLDER.joinpath(f"{fname}_{trial_id}_2ndInput_chars_channel_sep.png")) |
trial, df, events_df = get_events_df_from_lines_and_trial_selection( |
trial, trial_lines, discard_fixations_without_sfix |
) |
trial["gaze_df"] = df |
font, font_size, dpi, screen_res = get_plot_props(trial, AVAILABLE_FONTS) |
trial["font"] = font |
trial["font_size"] = font_size |
trial["dpi"] = dpi |
trial["screen_res"] = screen_res |
if "chars_list" in trial: |
chars_df = pd.DataFrame(trial["chars_list"]) |
chars_df = add_popEye_cols_to_chars_df(chars_df) |
if "index" not in chars_df.columns: |
chars_df.reset_index(inplace=True) |
trial["chars_df"] = chars_df.to_dict() |
trial["y_char_unique"] = list(chars_df.char_y_center.sort_values().unique()) |
return reorder_columns(events_df), trial |
def get_outlier_indeces( |
dffix, chars_df, x_thres_in_chars, y_thresh_in_heights, xcol, ycol, letter_width_avg, line_heights_avg |
): |
indeces_out = [] |
for linenum, line_chars_subdf in chars_df.groupby("assigned_line"): |
left = line_chars_subdf["char_xmin"].min() |
right = line_chars_subdf["char_xmax"].max() |
top = line_chars_subdf["char_ymin"].min() |
bottom = line_chars_subdf["char_ymax"].max() |
left_min = left - (x_thres_in_chars * letter_width_avg) |
right_max = right + (x_thres_in_chars * letter_width_avg) |
top_max = top - (line_heights_avg * y_thresh_in_heights) |
bottom_min = bottom + (line_heights_avg * y_thresh_in_heights) |
indeces_out_line = [] |
indeces_out_line.extend(list(dffix.loc[dffix[xcol] < left_min, :].index)) |
indeces_out_line.extend(list(dffix.loc[dffix[xcol] > right_max, :].index)) |
indeces_out_line.extend(list(dffix.loc[dffix[ycol] < top_max, :].index)) |
indeces_out_line.extend(list(dffix.loc[dffix[ycol] > bottom_min, :].index)) |
indeces_out_line_set = set(indeces_out_line) |
indeces_out.append(indeces_out_line_set) |
return list(set.intersection(*indeces_out)) |
def get_distance_between_fixations_in_characters_and_recalc_duration( |
fix, letter_width_avg, start_colname="start", stop_colname="stop", xcol="xs" |
): |
fix.reset_index(drop=True, inplace=True) |
fix.loc[:, "duration"] = fix[stop_colname] - fix[start_colname] |
fix.loc[:, "distance_in_char_widths"] = 0.0 |
for i in range(1, len(fix)): |
fix.loc[i, "distance_in_char_widths"] = np.round( |
np.abs(fix.loc[i, xcol] - fix.loc[i - 1, xcol]) / letter_width_avg, decimals=3 |
) |
return fix |
def clean_fixations_popeye_no_sacc(fix, trial, duration_threshold, distance_threshold): |
if "letter_width_avg" in trial: |
letter_width_avg = trial["letter_width_avg"] |
else: |
letter_width_avg = 12 |
stop_time_col, start_time_col = get_time_cols(fix) |
if "xs" in fix.columns: |
x_colname = "xs" |
y_colname = "ys" |
else: |
x_colname = "x" |
y_colname = "y" |
if "blink" not in fix.columns: |
fix["blink"] = 0 |
fix.dropna(subset=[x_colname, y_colname], how="any", axis=0, inplace=True) |
fix.reset_index(drop=True, inplace=True) |
fix = get_distance_between_fixations_in_characters_and_recalc_duration( |
fix, letter_width_avg, start_time_col, stop_time_col, x_colname |
) |
fix["num"] = np.arange(len(fix), dtype=int) |
i = 0 |
while i <= len(fix) - 1: |
merge_before = False |
merge_after = False |
if fix["duration"].iloc[i] <= duration_threshold: |
if i > 1: |
if ( |
fix["duration"].iloc[i - 1] > duration_threshold |
and fix["blink"].iloc[i - 1] == 0 |
and fix["distance_in_char_widths"].iloc[i] <= distance_threshold |
): |
merge_before = True |
if i < len(fix) - 1: |
if ( |
fix["duration"].iloc[i + 1] > duration_threshold |
and fix["blink"].iloc[i + 1] == 0 |
and fix["distance_in_char_widths"].iloc[i + 1] <= distance_threshold |
): |
merge_after = True |
if merge_before and not merge_after: |
merge = -1 |
elif not merge_before and merge_after: |
merge = 1 |
elif not merge_before and not merge_after: |
merge = 0 |
elif merge_before and merge_after: |
if fix["duration"].iloc[i - 1] >= fix["duration"].iloc[i + 1]: |
merge = -1 |
else: |
merge = 1 |
else: |
merge = 0 |
if merge == 0: |
i += 1 |
elif merge == -1: |
fix.loc[i - 1, stop_time_col] = fix.loc[i, stop_time_col] |
fix.loc[i - 1, x_colname] = round((fix.loc[i - 1, x_colname] + fix.loc[i, x_colname]) / 2) |
fix.loc[i - 1, y_colname] = round((fix.loc[i - 1, y_colname] + fix.loc[i, y_colname]) / 2) |
fix = fix.drop(i, axis=0) |
fix.reset_index(drop=True, inplace=True) |
start = fix[start_time_col].iloc[i - 1] |
stop = fix[stop_time_col].iloc[i - 1] |
fix = get_distance_between_fixations_in_characters_and_recalc_duration( |
fix, letter_width_avg, start_time_col, stop_time_col, x_colname |
) |
elif merge == 1: |
fix.loc[i + 1, start_time_col] = fix.loc[i, start_time_col] |
fix.loc[i + 1, x_colname] = round((fix.loc[i, x_colname] + fix.loc[i + 1, x_colname]) / 2) |
fix.loc[i + 1, y_colname] = round((fix.loc[i, y_colname] + fix.loc[i + 1, y_colname]) / 2) |
fix.drop(index=i, inplace=True) |
fix.reset_index(drop=True, inplace=True) |
start = fix.loc[i, start_time_col] |
stop = fix.loc[i, stop_time_col] |
fix = get_distance_between_fixations_in_characters_and_recalc_duration( |
fix, letter_width_avg, start_time_col, stop_time_col, x_colname |
) |
fix.loc[:, "num"] = np.arange(len(fix), dtype=int) |
if fix.iloc[-1]["duration"] < duration_threshold: |
fix = fix.iloc[:-1] |
trial["last_fixation_was_discarded_because_too_short"] = True |
else: |
trial["last_fixation_was_discarded_because_too_short"] = False |
fix.reset_index(drop=True, inplace=True) |
return fix.copy() |
def clean_dffix_own( |
trial: dict, |
choice_handle_short_and_close_fix: str, |
discard_far_out_of_text_fix, |
x_thres_in_chars, |
y_thresh_in_heights, |
short_fix_threshold, |
merge_distance_threshold: float, |
discard_long_fix: bool, |
discard_long_fix_threshold: int, |
discard_blinks: bool, |
dffix: pd.DataFrame, |
): |
dffix = dffix.dropna(how="all", axis=1).copy() |
if dffix.empty: |
return dffix, trial |
dffix = dffix.rename( |
{ |
k: v |
for k, v in { |
"xs": "x", |
"ys": "y", |
"num": "fixation_number", |
}.items() |
if v not in dffix.columns |
}, |
axis=1, |
) |
stop_time_col, start_time_col = get_time_cols(dffix) |
add_time_cols(dffix, stop_time_col, start_time_col) |
if "dffix_no_clean" not in trial: |
trial["dffix_no_clean"] = ( |
dffix.copy() |
) |
add_time_cols(trial["dffix_no_clean"], stop_time_col, start_time_col) |
trial["dffix_no_clean"]["was_merged"] = False |
trial["dffix_no_clean"]["was_discarded_short_fix"] = False |
trial["dffix_no_clean"]["was_discarded_outside_text"] = False |
num_fix_before_clean = trial["dffix_no_clean"].shape[0] |
trial["Fixation Cleaning Stats"] = {} |
trial["Fixation Cleaning Stats"]["Number of fixations before cleaning"] = num_fix_before_clean |
trial["Fixation Cleaning Stats"]["Discard fixation before or after blinks"] = discard_blinks |
if discard_blinks and "blink" in dffix.columns: |
trial["dffix_no_clean"]["was_discarded_due_blinks"] = False |
dffix = dffix[dffix["blink"] == False].copy() |
trial["dffix_no_clean"].loc[ |
~trial["dffix_no_clean"]["start_time"].isin(dffix["start_time"]), "was_discarded_due_blinks" |
] = True |
trial["Fixation Cleaning Stats"]["Number of discarded fixations due to blinks"] = ( |
num_fix_before_clean - dffix.shape[0] |
) |
trial["Fixation Cleaning Stats"]["Number of discarded fixations due to blinks (%)"] = round( |
100 |
* (trial["Fixation Cleaning Stats"]["Number of discarded fixations due to blinks"] / num_fix_before_clean), |
2, |
) |
trial["Fixation Cleaning Stats"]["Discard long fixations"] = discard_long_fix |
if discard_long_fix and not dffix.empty: |
dffix_before_long_fix_removal = dffix.copy() |
trial["dffix_no_clean"]["was_discarded_due_to_long_duration"] = False |
dffix = dffix[dffix["duration"] < discard_long_fix_threshold].copy() |
dffix_after_long_fix_removal = dffix.copy() |
trial["dffix_no_clean"].loc[ |
( |
~trial["dffix_no_clean"]["start_time"].isin(dffix_after_long_fix_removal["start_time"]) |
& (trial["dffix_no_clean"]["start_time"].isin(dffix_before_long_fix_removal["start_time"])) |
), |
"was_discarded_due_to_long_duration", |
] = True |
trial["Fixation Cleaning Stats"]["Number of discarded long fixations"] = num_fix_before_clean - dffix.shape[0] |
trial["Fixation Cleaning Stats"]["Number of discarded long fixations (%)"] = round( |
100 * (trial["Fixation Cleaning Stats"]["Number of discarded long fixations"] / num_fix_before_clean), 2 |
) |
num_fix_before_merge = dffix.shape[0] |
trial["Fixation Cleaning Stats"]["How short and close fixations were handled"] = choice_handle_short_and_close_fix |
if ( |
choice_handle_short_and_close_fix == "Merge" or choice_handle_short_and_close_fix == "Merge then discard" |
) and not dffix.empty: |
dffix_before_merge = dffix.copy() |
dffix = clean_fixations_popeye_no_sacc(dffix, trial, short_fix_threshold, merge_distance_threshold) |
dffix_after_merge = dffix.copy() |
trial["dffix_no_clean"].loc[ |
(~trial["dffix_no_clean"]["start_time"].isin(dffix_after_merge["start_time"])) |
& (trial["dffix_no_clean"]["start_time"].isin(dffix_before_merge["start_time"])), |
"was_merged", |
] = True |
if trial["last_fixation_was_discarded_because_too_short"]: |
trial["dffix_no_clean"].iloc[-1, trial["dffix_no_clean"].columns.get_loc("was_merged")] = False |
trial["dffix_no_clean"].iloc[-1, trial["dffix_no_clean"].columns.get_loc("was_discarded_short_fix")] = True |
trial["Fixation Cleaning Stats"]["Number of merged fixations"] = ( |
num_fix_before_merge - dffix_after_merge.shape[0] |
) |
trial["Fixation Cleaning Stats"]["Number of merged fixations (%)"] = round( |
100 * (trial["Fixation Cleaning Stats"]["Number of merged fixations"] / num_fix_before_merge), 2 |
) |
if not dffix.empty: |
dffix.reset_index(drop=True, inplace=True) |
dffix.loc[:, "fixation_number"] = np.arange(dffix.shape[0]) |
trial["x_thres_in_chars"], trial["y_thresh_in_heights"] = x_thres_in_chars, y_thresh_in_heights |
if "chars_list" in trial and not dffix.empty: |
indeces_out = get_outlier_indeces( |
dffix, |
pd.DataFrame(trial["chars_list"]), |
x_thres_in_chars, |
y_thresh_in_heights, |
"x", |
"y", |
trial["letter_width_avg"], |
np.mean(trial["line_heights"]), |
) |
else: |
indeces_out = [] |
dffix["is_far_out_of_text_uncorrected"] = "in" |
if len(indeces_out) > 0: |
times_out = dffix.loc[indeces_out, "start_time"].copy() |
dffix.loc[indeces_out, "is_far_out_of_text_uncorrected"] = "out" |
trial["Fixation Cleaning Stats"]["Far out of text fixations were discarded"] = discard_far_out_of_text_fix |
if discard_far_out_of_text_fix and len(indeces_out) > 0: |
num_fix_before_clean_via_discard_far_out_of_text_fix = dffix.shape[0] |
trial["dffix_no_clean"].loc[ |
trial["dffix_no_clean"]["start_time"].isin(times_out), "was_discarded_outside_text" |
] = True |
dffix = dffix.loc[dffix["is_far_out_of_text_uncorrected"] == "in", :].reset_index(drop=True).copy() |
trial["Fixation Cleaning Stats"]["Number of discarded far-out-of-text fixations"] = ( |
num_fix_before_clean_via_discard_far_out_of_text_fix - dffix.shape[0] |
) |
trial["Fixation Cleaning Stats"]["Number of discarded far-out-of-text fixations (%)"] = round( |
100 |
* ( |
trial["Fixation Cleaning Stats"]["Number of discarded far-out-of-text fixations"] |
/ num_fix_before_clean_via_discard_far_out_of_text_fix |
), |
2, |
) |
dffix = dffix.drop(columns="is_far_out_of_text_uncorrected") |
if ( |
choice_handle_short_and_close_fix == "Discard" |
or choice_handle_short_and_close_fix == "Merge then discard" |
and not dffix.empty |
): |
num_fix_before_clean_via_discard_short = dffix.shape[0] |
times_out = dffix.loc[(dffix["duration"] < short_fix_threshold), "start_time"].copy() |
if len(times_out) > 0: |
trial["dffix_no_clean"].loc[ |
trial["dffix_no_clean"]["start_time"].isin(times_out), "was_discarded_short_fix" |
] = True |
dffix = dffix[(dffix["duration"] >= short_fix_threshold)].reset_index(drop=True).copy() |
trial["Fixation Cleaning Stats"]["Number of discarded short fixations"] = ( |
num_fix_before_clean_via_discard_short - dffix.shape[0] |
) |
trial["Fixation Cleaning Stats"]["Number of discarded short fixations (%)"] = round( |
100 |
* (trial["Fixation Cleaning Stats"]["Number of discarded short fixations"]) |
/ num_fix_before_clean_via_discard_short, |
2, |
) |
trial["Fixation Cleaning Stats"]["Total number of discarded and merged fixations"] = ( |
num_fix_before_clean - dffix.shape[0] |
) |
trial["Fixation Cleaning Stats"]["Total number of discarded and merged fixations (%)"] = round( |
100 * trial["Fixation Cleaning Stats"]["Total number of discarded and merged fixations"] / num_fix_before_clean, |
2, |
) |
if not dffix.empty: |
droplist = ["num", "msg"] |
if discard_blinks: |
droplist += ["blink", "blink_before", "blink_after"] |
for col in droplist: |
if col in dffix.columns: |
dffix = dffix.drop(col, axis=1) |
if "start" in dffix.columns: |
dffix = dffix.drop(axis=1, labels=["start", "stop"]) |
if "corrected_start_time" not in dffix.columns: |
min_start_time = min(dffix["start_uncorrected"]) |
dffix["corrected_start_time"] = dffix["start_uncorrected"] - min_start_time |
dffix["corrected_end_time"] = dffix["stop_uncorrected"] - min_start_time |
assert all(np.diff(dffix["corrected_start_time"]) > 0), "start times not in order" |
dffix_no_clean_fig, _, _ = matplotlib_plot_df( |
dffix, |
trial, |
None, |
trial["dffix_no_clean"], |
box_annotations=None, |
fix_to_plot=["Uncorrected Fixations"], |
stim_info_to_plot=["Characters", "Word boxes"], |
) |
savename = f"{trial['subject']}_{trial['trial_id']}_clean_compare.png" |
dffix_no_clean_fig.savefig(RESULTS_FOLDER.joinpath(savename), dpi=300, bbox_inches="tight") |
plt.close(dffix_no_clean_fig) |
dffix_clean_fig, _, _ = matplotlib_plot_df( |
dffix, |
trial, |
None, |
None, |
box_annotations=None, |
fix_to_plot=["Uncorrected Fixations"], |
stim_info_to_plot=["Characters", "Word boxes"], |
use_duration_arrow_sizes=False, |
) |
savename = f"{trial['subject']}_{trial['trial_id']}_after_clean.png" |
dffix_clean_fig.savefig(RESULTS_FOLDER.joinpath(savename), dpi=300, bbox_inches="tight") |
plt.close(dffix_clean_fig) |
if "item" not in dffix.columns and "item" in trial: |
dffix.insert(loc=0, column="item", value=trial["item"]) |
if "condition" not in dffix.columns and "condition" in trial: |
dffix.insert(loc=0, column="condition", value=trial["condition"]) |
if "subject" not in dffix.columns and "subject" in trial: |
dffix.insert(loc=0, column="subject", value=trial["subject"]) |
if "trial_id" not in dffix.columns and "trial_id" in trial: |
dffix.insert(loc=0, column="trial_id", value=trial["trial_id"]) |
dffix = reorder_columns(dffix) |
return dffix, trial |
def add_time_cols(dffix, stop_time_col, start_time_col): |
if "start_time" not in dffix.columns: |
dffix["start_time"] = dffix[start_time_col] |
if "end_time" not in dffix.columns: |
dffix["end_time"] = dffix[stop_time_col] |
if "duration" not in dffix.columns: |
dffix["duration"] = dffix["end_time"] - dffix["start_time"] |
def get_time_cols(dffix): |
if "stop" in dffix.columns: |
stop_time_col = "stop" |
elif "end_time" in dffix.columns: |
stop_time_col = "end_time" |
elif "corrected_end_time" in dffix.columns: |
stop_time_col = "corrected_end_time" |
if "start" in dffix.columns: |
start_time_col = "start" |
elif "start_time" in dffix.columns: |
start_time_col = "start_time" |
elif "corrected_start_time" in dffix.columns: |
start_time_col = "corrected_start_time" |
return stop_time_col, start_time_col |
def trial_to_dfs( |
trial: dict, |
discard_fixations_without_sfix, |
choice_handle_short_and_close_fix, |
discard_far_out_of_text_fix, |
x_thres_in_chars, |
y_thresh_in_heights, |
short_fix_threshold, |
merge_distance_threshold, |
discard_long_fix, |
discard_long_fix_threshold, |
discard_blinks, |
): |
events_df, trial = get_raw_events_df_and_trial(trial, discard_fixations_without_sfix) |
dffix, trial = clean_dffix_own( |
trial, |
choice_handle_short_and_close_fix, |
discard_far_out_of_text_fix, |
x_thres_in_chars, |
y_thresh_in_heights, |
short_fix_threshold, |
merge_distance_threshold, |
discard_long_fix, |
discard_long_fix_threshold, |
discard_blinks, |
events_df[events_df["msg"] == "FIX"].copy(), |
) |
dffix = dffix.dropna(how="all", axis=1).copy() |
trial["dffix"] = dffix |
trial["events_df"] = events_df |
return dffix, trial |
def get_all_measures( |
trial, |
dffix, |
prefix, |
use_corrected_fixations=True, |
correction_algo="Wisdom_of_Crowds", |
measures_to_calculate=["initial_landing_position"], |
include_coords=False, |
save_to_csv=False, |
): |
stim_df = pd.DataFrame(trial[f"{prefix}s_list"]) |
if f"{prefix}_number" not in stim_df.columns: |
stim_df[f"{prefix}_number"] = np.arange(stim_df.shape[0]) |
if use_corrected_fixations: |
dffix_copy = copy.deepcopy(dffix) |
dffix_copy["y"] = dffix_copy[f"y_{correction_algo}"] |
else: |
dffix_copy = dffix |
correction_algo = "uncorrected" |
res_dfs = [] |
for measure in measures_to_calculate: |
if hasattr(anf, f"{measure}_own"): |
function = getattr(anf, f"{measure}_own") |
result = function(trial, dffix_copy, prefix, correction_algo) |
res_dfs.append(result) |
dfs_list = [df for df in [stim_df] + res_dfs if not df.empty] |
own_measure_df = stim_df |
if len(dfs_list) > 1: |
for df in dfs_list[1:]: |
droplist = [col for col in df.columns if (col != f"{prefix}_number" and col in stim_df.columns)] |
own_measure_df = own_measure_df.merge(df.drop(columns=droplist), how="left", on=[f"{prefix}_number"]) |
first_column = own_measure_df.pop(prefix) |
own_measure_df.insert(0, prefix, first_column) |
wordfirst = pf.aggregate_words_firstrun(dffix_copy, correction_algo, measures_to_calculate) |
wordtmp = pf.aggregate_words(dffix_copy, pd.DataFrame(trial["words_list"]), correction_algo, measures_to_calculate) |
out = pf.combine_words( |
dffix_copy, |
wordfirst=wordfirst, |
wordtmp=wordtmp, |
algo_choice=correction_algo, |
measures_to_calculate=measures_to_calculate, |
) |
extra_cols = list(set(out.columns) - set(own_measure_df.columns)) |
cols_to_add = ["word_number"] + extra_cols |
own_measure_df = pd.merge(own_measure_df, out.loc[:, cols_to_add], on="word_number", how="left") |
first_cols = [ |
"subject", |
"trial_id", |
"item", |
"condition", |
"question_correct", |
"word_number", |
"word", |
] |
for col in first_cols: |
if col in trial and col not in own_measure_df.columns: |
own_measure_df.insert(loc=0, column=col, value=trial[col]) |
own_measure_df = own_measure_df.dropna(how="all", axis=1).copy() |
if not include_coords: |
word_cols = ["word_xmin", "word_xmax", "word_ymax", "word_xmin", "word_ymin", "word_x_center", "word_y_center"] |
own_measure_df = own_measure_df.drop(columns=word_cols) |
own_measure_df = reorder_columns(own_measure_df) |
if "question_correct" in own_measure_df.columns: |
own_measure_df = own_measure_df.drop(columns=["question_correct"]) |
if save_to_csv: |
own_measure_df.to_csv( |
RESULTS_FOLDER / f"{trial['subject']}_{trial['trial_id']}_{correction_algo}_word_measures.csv" |
) |
return own_measure_df |
def add_line_overlaps_to_sample(trial, sample): |
char_df = pd.DataFrame(trial["chars_list"]) |
line_overlaps = [] |
for arr in sample: |
y_val = arr[1] |
line_overlap = t.tensor(-1, dtype=t.float32) |
for idx, (x1, x2) in enumerate(zip(char_df.char_ymin.unique(), char_df.char_ymax.unique())): |
if x1 <= y_val <= x2: |
line_overlap = t.tensor(idx, dtype=t.float32) |
break |
line_overlaps.append(line_overlap) |
line_olaps_tensor = t.stack(line_overlaps, dim=0) |
sample = t.cat([sample, line_olaps_tensor.unsqueeze(1)], dim=1) |
return sample |
def norm_coords_by_letter_min_x_y( |
sample_idx: int, |
trialslist: list, |
samplelist: list, |
chars_center_coords_list: list = None, |
): |
chars_df = pd.DataFrame(trialslist[sample_idx]["chars_list"]) |
trialslist[sample_idx]["x_char_unique"] = list(chars_df.char_xmin.unique()) |
min_x_chars = chars_df.char_xmin.min() |
min_y_chars = chars_df.char_ymin.min() |
norm_vector_substract = t.zeros( |
(1, samplelist[sample_idx].shape[1]), dtype=samplelist[sample_idx].dtype, device=samplelist[sample_idx].device |
) |
norm_vector_substract[0, 0] = norm_vector_substract[0, 0] + 1 * min_x_chars |
norm_vector_substract[0, 1] = norm_vector_substract[0, 1] + 1 * min_y_chars |
samplelist[sample_idx] = samplelist[sample_idx] - norm_vector_substract |
if chars_center_coords_list is not None: |
norm_vector_substract = norm_vector_substract.squeeze(0)[:2] |
if chars_center_coords_list[sample_idx].shape[-1] == norm_vector_substract.shape[-1] * 2: |
chars_center_coords_list[sample_idx][:, :2] -= norm_vector_substract |
chars_center_coords_list[sample_idx][:, 2:] -= norm_vector_substract |
else: |
chars_center_coords_list[sample_idx] -= norm_vector_substract |
return trialslist, samplelist, chars_center_coords_list |
def norm_coords_by_letter_positions( |
sample_idx: int, |
trialslist: list, |
samplelist: list, |
meanlist: list = None, |
stdlist: list = None, |
return_mean_std_lists=False, |
norm_by_char_averages=False, |
chars_center_coords_list: list = None, |
add_normalised_values_as_features=False, |
): |
chars_df = pd.DataFrame(trialslist[sample_idx]["chars_list"]) |
trialslist[sample_idx]["x_char_unique"] = list(chars_df.char_xmin.unique()) |
min_x_chars = chars_df.char_xmin.min() |
max_x_chars = chars_df.char_xmax.max() |
norm_vector_multi = t.ones( |
(1, samplelist[sample_idx].shape[1]), dtype=samplelist[sample_idx].dtype, device=samplelist[sample_idx].device |
) |
if norm_by_char_averages: |
chars_list = trialslist[sample_idx]["chars_list"] |
char_widths = np.asarray([x["char_xmax"] - x["char_xmin"] for x in chars_list]) |
char_heights = np.asarray([x["char_ymax"] - x["char_ymin"] for x in chars_list]) |
char_widths_average = np.mean(char_widths[char_widths > 0]) |
char_heights_average = np.mean(char_heights[char_heights > 0]) |
norm_vector_multi[0, 0] = norm_vector_multi[0, 0] * char_widths_average |
norm_vector_multi[0, 1] = norm_vector_multi[0, 1] * char_heights_average |
else: |
line_height = min(np.unique(trialslist[sample_idx]["line_heights"])) |
line_width = max_x_chars - min_x_chars |
norm_vector_multi[0, 0] = norm_vector_multi[0, 0] * line_width |
norm_vector_multi[0, 1] = norm_vector_multi[0, 1] * line_height |
assert ~t.any(t.isnan(norm_vector_multi)), "Nan found in char norming vector" |
norm_vector_multi = norm_vector_multi.squeeze(0) |
if add_normalised_values_as_features: |
norm_vector_multi = norm_vector_multi[norm_vector_multi != 1] |
normed_features = samplelist[sample_idx][:, : norm_vector_multi.shape[0]] / norm_vector_multi |
samplelist[sample_idx] = t.cat([samplelist[sample_idx], normed_features], dim=1) |
else: |
samplelist[sample_idx] = samplelist[sample_idx] / norm_vector_multi |
if chars_center_coords_list is not None: |
norm_vector_multi = norm_vector_multi[:2] |
if chars_center_coords_list[sample_idx].shape[-1] == norm_vector_multi.shape[-1] * 2: |
chars_center_coords_list[sample_idx][:, :2] /= norm_vector_multi |
chars_center_coords_list[sample_idx][:, 2:] /= norm_vector_multi |
else: |
chars_center_coords_list[sample_idx] /= norm_vector_multi |
if return_mean_std_lists: |
mean_val = samplelist[sample_idx].mean(axis=0).cpu().numpy() |
meanlist.append(mean_val) |
std_val = samplelist[sample_idx].std(axis=0).cpu().numpy() |
stdlist.append(std_val) |
assert ~any(pd.isna(mean_val)), "Nan found in mean_val" |
assert ~any(pd.isna(mean_val)), "Nan found in std_val" |
return trialslist, samplelist, meanlist, stdlist, chars_center_coords_list |
return trialslist, samplelist, chars_center_coords_list |
def get_fig_ax(screen_res, dpi, words_df, x_margin, y_margin, dffix=None, prefix="word"): |
fig = plt.figure(figsize=(screen_res[0] / dpi, screen_res[1] / dpi), dpi=dpi) |
ax = plt.Axes(fig, [0.0, 0.0, 1.0, 1.0]) |
ax.set_axis_off() |
if dffix is not None: |
ax.set_ylim((dffix.y.min(), dffix.y.max())) |
ax.set_xlim((dffix.x.min(), dffix.x.max())) |
else: |
ax.set_ylim((words_df[f"{prefix}_y_center"].min() - y_margin, words_df[f"{prefix}_y_center"].max() + y_margin)) |
ax.set_xlim((words_df[f"{prefix}_x_center"].min() - x_margin, words_df[f"{prefix}_x_center"].max() + x_margin)) |
ax.invert_yaxis() |
fig.add_axes(ax) |
return fig, ax |
def get_save_path(fpath, fname_ending): |
save_path = PLOTS_FOLDER.joinpath(f"{fpath.stem}_{fname_ending}.png") |
return save_path |
def save_im_load_convert(fpath, fig, fname_ending, mode): |
save_path = get_save_path(fpath, fname_ending) |
fig.savefig(save_path) |
im = Image.open(save_path).convert(mode) |
im.save(save_path) |
return im |
def plot_text_boxes_fixations( |
fpath, |
dpi, |
screen_res, |
set_font_size: bool, |
font_size: int, |
dffix=None, |
trial=None, |
): |
if isinstance(fpath, str): |
fpath = pl.Path(fpath) |
prefix = "char" |
if dffix is None: |
dffix = pd.read_csv(fpath) |
if trial is None: |
json_fpath = str(fpath).replace("_fixations.csv", "_trial.json") |
with open(json_fpath, "r") as f: |
trial = json.load(f) |
words_df = pd.DataFrame(trial[f"{prefix}s_list"]) |
x_right = words_df[f"{prefix}_xmin"] |
x_left = words_df[f"{prefix}_xmax"] |
y_top = words_df[f"{prefix}_ymax"] |
y_bottom = words_df[f"{prefix}_ymin"] |
if f"{prefix}_x_center" not in words_df.columns: |
words_df[f"{prefix}_x_center"] = (words_df[f"{prefix}_xmax"] - words_df[f"{prefix}_xmin"]) / 2 + words_df[ |
f"{prefix}_xmin" |
] |
words_df[f"{prefix}_y_center"] = (words_df[f"{prefix}_ymax"] - words_df[f"{prefix}_ymin"]) / 2 + words_df[ |
f"{prefix}_ymin" |
] |
x_margin = words_df[f"{prefix}_x_center"].mean() / 8 |
y_margin = words_df[f"{prefix}_y_center"].mean() / 4 |
times = dffix.corrected_start_time - dffix.corrected_start_time.min() |
times = times / times.max() |
times = np.linspace(0.25, 1, len(times)) |
if set_font_size: |
font = "monospace" |
else: |
font_size = trial["font_size"] * 27 // dpi |
font_props = FontProperties(family=font, style="normal", size=font_size) |
fig, ax = get_fig_ax(screen_res, dpi, words_df, x_margin, y_margin, prefix=prefix) |
ax.scatter(words_df[f"{prefix}_x_center"], words_df[f"{prefix}_y_center"], s=1, facecolor="k", alpha=0.01) |
for idx in range(len(x_left)): |
ax.text( |
words_df[f"{prefix}_x_center"][idx], |
words_df[f"{prefix}_y_center"][idx], |
words_df[prefix][idx], |
horizontalalignment="center", |
verticalalignment="center", |
fontproperties=font_props, |
) |
fname_ending = f"{prefix}s_grey" |
words_grey_im = save_im_load_convert(fpath, fig, fname_ending, "L") |
plt.close("all") |
fig, ax = get_fig_ax(screen_res, dpi, words_df, x_margin, y_margin, prefix=prefix) |
ax.scatter(words_df[f"{prefix}_x_center"], words_df[f"{prefix}_y_center"], s=1, facecolor="k", alpha=0.1) |
for idx in range(len(x_left)): |
xdiff = x_right[idx] - x_left[idx] |
ydiff = y_top[idx] - y_bottom[idx] |
rect = patches.Rectangle( |
(x_left[idx] - 1, y_bottom[idx] - 1), xdiff, ydiff, alpha=0.9, linewidth=1, edgecolor="k", facecolor="grey" |
) |
ax.add_patch(rect) |
fname_ending = f"{prefix}_boxes_grey" |
word_boxes_grey_im = save_im_load_convert(fpath, fig, fname_ending, "L") |
plt.close("all") |
fig, ax = get_fig_ax(screen_res, dpi, words_df, x_margin, y_margin, prefix=prefix) |
ax.scatter(dffix.x, dffix.y, facecolor="k", alpha=times) |
fname_ending = "fix_scatter_grey" |
fix_scatter_grey_im = save_im_load_convert(fpath, fig, fname_ending, "L") |
plt.close("all") |
arr_combo = np.stack( |
[ |
np.asarray(words_grey_im), |
np.asarray(word_boxes_grey_im), |
np.asarray(fix_scatter_grey_im), |
], |
axis=2, |
) |
im_combo = Image.fromarray(arr_combo) |
fname_ending = f"{prefix}s_channel_sep" |
im_combo.save(fpath) |
return im_combo |
def prep_data_for_dist(model_cfg, dffix, trial): |
if isinstance(dffix, dict): |
dffix = dffix["value"] |
sample_tensor = t.tensor(dffix.loc[:, model_cfg["sample_cols"]].to_numpy(), dtype=t.float32) |
if model_cfg["add_line_overlap_feature"]: |
sample_tensor = add_line_overlaps_to_sample(trial, sample_tensor) |
has_nans = t.any(t.isnan(sample_tensor)) |
assert not has_nans, "NaNs found in sample tensor" |
samplelist_eval = [sample_tensor] |
trialslist_eval = [trial] |
chars_center_coords_list_eval = None |
if model_cfg["norm_coords_by_letter_min_x_y"]: |
for sample_idx, _ in enumerate(samplelist_eval): |
trialslist_eval, samplelist_eval, chars_center_coords_list_eval = norm_coords_by_letter_min_x_y( |
sample_idx, |
trialslist_eval, |
samplelist_eval, |
chars_center_coords_list=chars_center_coords_list_eval, |
) |
if model_cfg["normalize_by_line_height_and_width"]: |
meanlist_eval, stdlist_eval = [], [] |
for sample_idx, _ in enumerate(samplelist_eval): |
( |
trialslist_eval, |
samplelist_eval, |
meanlist_eval, |
stdlist_eval, |
chars_center_coords_list_eval, |
) = norm_coords_by_letter_positions( |
sample_idx, |
trialslist_eval, |
samplelist_eval, |
meanlist_eval, |
stdlist_eval, |
return_mean_std_lists=True, |
norm_by_char_averages=model_cfg["norm_by_char_averages"], |
chars_center_coords_list=chars_center_coords_list_eval, |
add_normalised_values_as_features=model_cfg["add_normalised_values_as_features"], |
) |
sample_tensor = samplelist_eval[0] |
sample_means = t.tensor(model_cfg["sample_means"], dtype=t.float32) |
sample_std = t.tensor(model_cfg["sample_std"], dtype=t.float32) |
sample_tensor = (sample_tensor - sample_means) / sample_std |
sample_tensor = sample_tensor.unsqueeze(0) |
if not pl.Path(trial["plot_file"]).exists(): |
plot_text_boxes_fixations( |
fpath=trial["plot_file"], |
dpi=250, |
screen_res=(1024, 768), |
set_font_size=True, |
font_size=4, |
dffix=dffix, |
trial=trial, |
) |
val_set = DSet( |
sample_tensor, |
None, |
t.zeros((1, sample_tensor.shape[1])), |
trialslist_eval, |
padding_list=[0], |
padding_at_end=model_cfg["padding_at_end"], |
return_images_for_conv=True, |
im_partial_string=model_cfg["im_partial_string"], |
input_im_shape=model_cfg["char_plot_shape"], |
) |
val_loader = dl(val_set, batch_size=1, shuffle=False, num_workers=0) |
return val_loader, val_set |
def fold_in_seq_dim(out, y=None): |
batch_size, seq_len, num_classes = out.shape |
out = eo.rearrange(out, "b s c -> (b s) c", s=seq_len) |
if y is None: |
return out, None |
if len(y.shape) > 2: |
y = eo.rearrange(y, "b s c -> (b s) c", s=seq_len) |
else: |
y = eo.rearrange(y, "b s -> (b s)", s=seq_len) |
return out, y |
def logits_to_pred(out, y=None): |
seq_len = out.shape[1] |
out, y = fold_in_seq_dim(out, y) |
preds = corn_label_from_logits(out) |
preds = eo.rearrange(preds, "(b s) -> b s", s=seq_len) |
if y is not None: |
y = eo.rearrange(y.squeeze(), "(b s) -> b s", s=seq_len) |
y = y |
return preds, y |
def get_DIST_preds(dffix, trial, models_dict): |
algo_choice = "DIST" |
model = models_dict["single_DIST_model"] |
loader, dset = prep_data_for_dist(models_dict["single_DIST_model_cfg"], dffix, trial) |
batch = next(iter(loader)) |
if "cpu" not in str(model.device): |
batch = [x.cuda() for x in batch] |
try: |
out = model(batch) |
preds, y = logits_to_pred(out, y=None) |
if len(trial["y_char_unique"]) < 1: |
y_char_unique = pd.DataFrame(trial["chars_list"]).char_y_center.sort_values().unique() |
else: |
y_char_unique = trial["y_char_unique"] |
num_lines = trial["num_char_lines"] - 1 |
preds = t.clamp(preds, 0, num_lines).squeeze().cpu().numpy() |
y_pred_DIST = [y_char_unique[idx] for idx in preds] |
dffix[f"line_num_{algo_choice}"] = preds |
dffix[f"y_{algo_choice}"] = np.round(y_pred_DIST, decimals=2) |
dffix[f"y_{algo_choice}_correction"] = (dffix.loc[:, f"y_{algo_choice}"] - dffix.loc[:, "y"]).round(2) |
except Exception as e: |
ic(f"Exception on model(batch) for DIST \n{e}") |
return dffix |
def get_DIST_ensemble_preds( |
dffix, |
trial, |
model_cfg_without_norm_df, |
model_cfg_with_norm_df, |
ensemble_model_avg, |
): |
algo_choice = "DIST-Ensemble" |
loader_without_norm, dset_without_norm = prep_data_for_dist(model_cfg_without_norm_df, dffix, trial) |
loader_with_norm, dset_with_norm = prep_data_for_dist(model_cfg_with_norm_df, dffix, trial) |
batch_without_norm = next(iter(loader_without_norm)) |
batch_with_norm = next(iter(loader_with_norm)) |
out = ensemble_model_avg((batch_without_norm, batch_with_norm)) |
preds, y = logits_to_pred(out[0]["out_avg"], y=None) |
if len(trial["y_char_unique"]) < 1: |
y_char_unique = pd.DataFrame(trial["chars_list"]).char_y_center.sort_values().unique() |
else: |
y_char_unique = trial["y_char_unique"] |
num_lines = trial["num_char_lines"] - 1 |
preds = t.clamp(preds, 0, num_lines).squeeze().cpu().numpy() |
y_pred_DIST = [y_char_unique[idx] for idx in preds] |
dffix[f"line_num_{algo_choice}"] = preds |
dffix[f"y_{algo_choice}"] = np.round(y_pred_DIST, decimals=1) |
dffix[f"y_{algo_choice}_correction"] = (dffix.loc[:, f"y_{algo_choice}"] - dffix.loc[:, "y"]).round(1) |
return dffix |
def get_EDIST_preds_with_model_check(dffix, trial, models_dict): |
dffix = get_DIST_ensemble_preds( |
dffix, |
trial, |
models_dict["model_cfg_without_norm_df"], |
models_dict["model_cfg_with_norm_df"], |
models_dict["ensemble_model_avg"], |
) |
return dffix |
def get_all_classic_preds(dffix, trial, classic_algos_cfg): |
corrections = [] |
for algo, classic_params in copy.deepcopy(classic_algos_cfg).items(): |
dffix = calgo.apply_classic_algo(dffix, trial, algo, classic_params) |
corrections.append(np.asarray(dffix.loc[:, f"y_{algo}"])) |
return dffix, corrections |
def apply_woc(dffix, trial, corrections, algo_choice): |
corrected_Y = calgo.wisdom_of_the_crowd(corrections) |
dffix.loc[:, f"y_{algo_choice}"] = corrected_Y |
dffix[f"y_{algo_choice}_correction"] = (dffix.loc[:, f"y_{algo_choice}"] - dffix.loc[:, "y"]).round(1) |
corrected_line_nums = [trial["y_char_unique"].index(y) for y in corrected_Y] |
dffix.loc[:, f"line_num_y_{algo_choice}"] = corrected_line_nums |
dffix.loc[:, f"line_num_{algo_choice}"] = corrected_line_nums |
return dffix |
def apply_correction_algo(dffix, algo_choice, trial, models_dict, classic_algos_cfg): |
if algo_choice == "DIST": |
dffix = get_DIST_preds(dffix, trial, models_dict=models_dict) |
elif algo_choice == "DIST-Ensemble": |
dffix = get_EDIST_preds_with_model_check(dffix, trial, models_dict=models_dict) |
elif algo_choice == "Wisdom_of_Crowds_with_DIST": |
dffix, corrections = get_all_classic_preds(dffix, trial, classic_algos_cfg) |
dffix = get_DIST_preds(dffix, trial, models_dict=models_dict) |
for _ in range(3): |
corrections.append(np.asarray(dffix.loc[:, "y_DIST"])) |
dffix = apply_woc(dffix, trial, corrections, algo_choice) |
elif algo_choice == "Wisdom_of_Crowds_with_DIST_Ensemble": |
dffix, corrections = get_all_classic_preds(dffix, trial, classic_algos_cfg) |
dffix = get_EDIST_preds_with_model_check(dffix, trial, models_dict=models_dict) |
for _ in range(3): |
corrections.append(np.asarray(dffix.loc[:, "y_DIST-Ensemble"])) |
dffix = apply_woc(dffix, trial, corrections, algo_choice) |
elif algo_choice == "Wisdom_of_Crowds": |
dffix, corrections = get_all_classic_preds(dffix, trial, classic_algos_cfg) |
dffix = apply_woc(dffix, trial, corrections, algo_choice) |
else: |
algo_cfg = classic_algos_cfg[algo_choice] |
dffix = calgo.apply_classic_algo(dffix, trial, algo_choice, algo_cfg) |
dffix[f"y_{algo_choice}_correction"] = (dffix.loc[:, f"y_{algo_choice}"] - dffix.loc[:, "y"]).round(1) |
dffix = dffix.copy() |
return dffix |
def add_popEye_cols_to_dffix(dffix, algo_choice, chars_df, trial, xcol, cols_to_add: list): |
""" |
Required for word or sentence measures: |
- letternum |
- letter |
- on_word_number |
- on_word |
- on_sentence |
- num_words_in_sentence |
- on_sentence_num |
- word_land |
- line_let |
- line_word |
- sac_in |
- sac_out |
- word_launch |
- word_refix |
- word_reg_in |
- word_reg_out |
- sentence_reg_in |
- word_firstskip |
- word_run |
- sentence_run |
- word_run_fix |
- word_cland |
Optional: |
- line_let_from_last_letter |
- sentence_word |
- line_let_previous |
- line_let_next |
- sentence_refix |
- word_reg_out_to |
- word_reg_in_from |
- sentence_reg_out |
- sentence_reg_in_from |
- sentence_reg_out_to |
- sentence_firstskip |
- word_runid |
- sentence_runid |
- word_fix |
- sentence_fix |
""" |
if "angle_incoming" in cols_to_add: |
x_diff_incoming = dffix[xcol].values - dffix[xcol].shift(1).values |
y_diff_incoming = dffix["y"].values - dffix["y"].shift(1).values |
angle_incoming = np.arctan2(y_diff_incoming, x_diff_incoming) * (180 / np.pi) |
dffix["angle_incoming"] = angle_incoming |
if "angle_outgoing" in cols_to_add: |
x_diff_outgoing = dffix[xcol].shift(-1).values - dffix[xcol].values |
y_diff_outgoing = dffix["y"].shift(-1).values - dffix["y"].values |
angle_outgoing = np.arctan2(y_diff_outgoing, x_diff_outgoing) * (180 / np.pi) |
dffix["angle_outgoing"] = angle_outgoing |
dffix[f"line_change_{algo_choice}"] = np.concatenate( |
([0], np.diff(dffix[f"line_num_{algo_choice}"])), axis=0 |
).astype(int) |
for i in list(dffix.index): |
if dffix.loc[i, f"line_num_{algo_choice}"] > -1 and not pd.isna(dffix.loc[i, f"line_num_{algo_choice}"]): |
selected_stimmat = chars_df[ |
chars_df["assigned_line"] == dffix.loc[i, f"line_num_{algo_choice}"] |
].reset_index() |
selected_stimmat.loc[:, "letword"] = selected_stimmat.groupby("in_word_number")["letternum"].rank() |
letters_on_line = selected_stimmat.shape[0] |
out = dffix.loc[i, xcol] - selected_stimmat["char_x_center"] |
min_idx = out.abs().idxmin() |
dffix.loc[i, f"letternum_{algo_choice}"] = selected_stimmat.loc[min_idx, "letternum"] |
dffix.loc[i, f"letter_{algo_choice}"] = selected_stimmat.loc[min_idx, "char"] |
dffix.loc[i, f"line_let_{algo_choice}"] = selected_stimmat.loc[min_idx, "letline"] |
if "line_let_from_last_letter" in cols_to_add: |
dffix.loc[i, f"line_let_from_last_letter_{algo_choice}"] = ( |
letters_on_line - dffix.loc[i, f"line_let_{algo_choice}"] |
) |
word_min_idx = min_idx |
if ( |
selected_stimmat.loc[min_idx, "char"] == " " |
and (min_idx - 1) in selected_stimmat.index |
and (min_idx + 1) in selected_stimmat.index |
): |
dist_to_previous_letter = np.abs( |
dffix.loc[i, xcol] - selected_stimmat.loc[min_idx - 1, "char_x_center"] |
) |
dist_to_following_letter = np.abs( |
dffix.loc[i, xcol] - selected_stimmat.loc[min_idx + 1, "char_x_center"] |
) |
if dist_to_previous_letter < dist_to_following_letter: |
word_min_idx = min_idx - 1 |
if not pd.isna(selected_stimmat.loc[min_idx, "in_word_number"]): |
dffix.loc[i, f"on_word_number_{algo_choice}"] = selected_stimmat.loc[word_min_idx, "in_word_number"] |
dffix.loc[i, f"on_word_{algo_choice}"] = selected_stimmat.loc[word_min_idx, "in_word"] |
dffix.loc[i, f"word_land_{algo_choice}"] = selected_stimmat.loc[ |
word_min_idx, "num_letters_from_start_of_word" |
] |
dffix.loc[i, f"line_word_{algo_choice}"] = selected_stimmat.loc[word_min_idx, "wordline"] |
if "sentence_word" in cols_to_add: |
dffix.loc[i, f"sentence_word_{algo_choice}"] = selected_stimmat.loc[word_min_idx, "wordsent"] |
dffix.loc[i, "num_words_in_sentence"] = len(selected_stimmat.loc[word_min_idx, "in_sentence"].split(" ")) |
dffix.loc[i, f"on_sentence_num_{algo_choice}"] = selected_stimmat.loc[word_min_idx, "in_sentence_number"] |
dffix.loc[i, f"on_sentence_{algo_choice}"] = selected_stimmat.loc[word_min_idx, "in_sentence"] |
if "line_let_previous" in cols_to_add: |
dffix[f"line_let_previous_{algo_choice}"] = dffix[f"line_let_{algo_choice}"].shift(-1) |
if "line_let_next" in cols_to_add: |
dffix[f"line_let_next_{algo_choice}"] = dffix[f"line_let_{algo_choice}"].shift(1) |
dffix = pf.compute_saccade_length(dffix, chars_df, algo_choice) |
dffix = pf.compute_launch_distance(dffix, algo_choice) |
dffix = pf.compute_refixation(dffix, algo_choice) |
dffix = pf.compute_regression(dffix, algo_choice) |
dffix = pf.compute_firstskip(dffix, algo_choice) |
dffix = pf.compute_run(dffix, algo_choice) |
dffix = pf.compute_landing_position(dffix, algo_choice) |
dffix = dffix.loc[:, ~dffix.columns.duplicated()] |
return dffix |
def export_dataframe(df: pd.DataFrame, csv_name: str): |
if isinstance(df, dict): |
df = df["value"] |
df.to_csv(csv_name) |
return csv_name |
def _convert_to_json(obj): |
if isinstance(obj, (int, float, str, bool)): |
return obj |
elif isinstance(obj, dict): |
return {k: _convert_to_json(v) for k, v in obj.items()} |
elif isinstance(obj, list) or isinstance(obj, tuple): |
return [_convert_to_json(item) for item in obj] |
elif isinstance(obj, dict): |
return {k: _convert_to_json(val) for k, val in obj.items()} |
elif hasattr(obj, "to_dict"): |
return _convert_to_json(obj.to_dict()) |
elif hasattr(obj, "tolist"): |
return _convert_to_json(obj.tolist()) |
elif obj is None: |
return None |
else: |
raise TypeError(f"Object of type {type(obj)} is not JSON serializable") |
def save_trial_to_json(trial, savename): |
filtered_trial = {} |
for key, value in trial.items(): |
try: |
filtered_trial[key] = _convert_to_json(value) |
except TypeError as e: |
ic(f"Warning: Skipping non-serializable value for key '{key}' due to error: {e}") |
with open(savename, "w", encoding="utf-8") as f: |
json.dump(filtered_trial, f, ensure_ascii=False, indent=4) |
def export_trial(trial: dict): |
trial_id = trial["trial_id"] |
savename = RESULTS_FOLDER.joinpath(pl.Path(trial["filename"]).stem) |
trial_name = f"{savename}_{trial_id}_trial_info.json" |
filtered_trial = copy.deepcopy(trial) |
_ = [filtered_trial.pop(k) for k in list(filtered_trial.keys()) if isinstance(filtered_trial[k], pd.DataFrame)] |
_ = [ |
filtered_trial.pop(k) |
for k in list(filtered_trial.keys()) |
if k |
in [ |
"words_list", |
"chars_list", |
"chars_df_alt", |
"EMReading_fix", |
"chars_df", |
"dffix_sacdf_popEye", |
"fixdf_popEye", |
"sacdf_popEye", |
"saccade_df", |
"combined_df", |
"own_sentence_measures_dfs_for_algo", |
"own_word_measures_dfs_for_algo", |
] |
] |
filtered_trial["line_heights"] = list(np.unique(filtered_trial["line_heights"])) |
save_trial_to_json(filtered_trial, trial_name) |
return trial_name |
def add_cols_from_trial(trial, df, cols=["item", "condition", "trial_id", "subject"]): |
for col in cols: |
if col not in df.columns: |
df.insert(loc=0, column=col, value=trial[col]) |
def correct_df( |
dffix, |
algo_choice, |
trial, |
for_multi, |
is_outside_of_streamlit, |
classic_algos_cfg, |
models_dict, |
measures_to_calculate_multi_asc=[], |
include_coords_multi_asc=False, |
sent_measures_to_calc_multi=[], |
fix_cols_to_add=[], |
): |
if is_outside_of_streamlit: |
stqdm = tqdm |
else: |
from stqdm import stqdm |
if isinstance(dffix, dict): |
dffix = dffix["value"] |
if "x" not in dffix.keys() or "x" not in dffix.keys(): |
ic(f"x or y not in dffix") |
ic(dffix.columns) |
return dffix |
if isinstance(algo_choice, list): |
algo_choices = algo_choice |
repeats = range(len(algo_choice)) |
else: |
algo_choices = [algo_choice] |
repeats = range(1) |
chars_df = pd.DataFrame(trial["chars_df"]) if "chars_df" in trial else pd.DataFrame(trial["chars_list"]) |
if for_multi: |
own_word_measures_dfs_for_algo = [] |
own_sentence_measures_dfs_for_algo = [] |
trial["average_y_corrections"] = [] |
for algoIdx in stqdm(repeats, desc="Applying line-assignment algorithms"): |
algo_choice = algo_choices[algoIdx] |
dffix = apply_correction_algo(dffix, algo_choice, trial, models_dict, classic_algos_cfg) |
average_y_correction = (dffix[f"y_{algo_choice}"] - dffix["y"]).mean().round(1) |
trial["average_y_corrections"].append({"Algorithm": algo_choice, "average_y_correction": average_y_correction}) |
fig, desired_width_in_pixels, desired_height_in_pixels = matplotlib_plot_df( |
dffix, |
trial, |
algo_choice, |
None, |
box_annotations=None, |
fix_to_plot=["Uncorrected Fixations", "Corrected Fixations"], |
stim_info_to_plot=["Characters", "Word boxes"], |
) |
savename = f"{trial['subject']}_{trial['trial_id']}_corr_{algo_choice}_fix.png" |
fig.savefig(RESULTS_FOLDER.joinpath(savename), dpi=300) |
plt.close(fig) |
dffix = add_popEye_cols_to_dffix(dffix, algo_choice, chars_df, trial, "x", cols_to_add=fix_cols_to_add) |
if for_multi and len(measures_to_calculate_multi_asc) > 0 and dffix.shape[0] > 1: |
own_word_measures = get_all_measures( |
trial, |
dffix, |
prefix="word", |
use_corrected_fixations=True, |
correction_algo=algo_choice, |
measures_to_calculate=measures_to_calculate_multi_asc, |
include_coords=include_coords_multi_asc, |
) |
own_word_measures_dfs_for_algo.append(own_word_measures) |
sent_measures_multi = pf.compute_sentence_measures( |
dffix, pd.DataFrame(trial["chars_df"]), algo_choice, sent_measures_to_calc_multi |
) |
own_sentence_measures_dfs_for_algo.append(sent_measures_multi) |
if for_multi and len(own_word_measures_dfs_for_algo) > 0: |
words_df = ( |
pd.DataFrame(trial["chars_df"]) |
.drop_duplicates(subset="in_word_number", keep="first") |
.loc[:, ["in_word_number", "in_word"]] |
.rename({"in_word_number": "word_number", "in_word": "word"}, axis=1) |
.reset_index(drop=True) |
) |
add_cols_from_trial(trial, words_df, cols=["item", "condition", "trial_id", "subject"]) |
words_df["subject_trialID"] = [f"{id}_{num}" for id, num in zip(words_df["subject"], words_df["trial_id"])] |
words_df = words_df.merge( |
own_word_measures_dfs_for_algo[0], |
how="left", |
on=["subject", "trial_id", "item", "condition", "word_number", "word"], |
) |
for word_measure_df in own_word_measures_dfs_for_algo[1:]: |
words_df = words_df.merge( |
word_measure_df, how="left", on=["subject", "trial_id", "item", "condition", "word_number", "word"] |
) |
words_df = reorder_columns(words_df, ["subject", "trial_id", "item", "condition", "word_number", "word"]) |
sentence_df = ( |
pd.DataFrame(trial["chars_df"]) |
.drop_duplicates(subset="in_sentence_number", keep="first") |
.loc[ |
:, |
[ |
"in_sentence_number", |
"in_sentence", |
], |
] |
.rename({"in_sentence_number": "sentence_number", "in_sentence": "sentence"}, axis=1) |
.reset_index(drop=True) |
) |
add_cols_from_trial(trial, sentence_df, cols=["item", "condition", "trial_id", "subject"]) |
sentence_df["subject_trialID"] = [ |
f"{id}_{num}" for id, num in zip(sentence_df["subject"], sentence_df["trial_id"]) |
] |
sentence_df = sentence_df.merge( |
own_sentence_measures_dfs_for_algo[0], |
how="left", |
on=["item", "condition", "trial_id", "subject", "sentence_number", "sentence"], |
) |
for sent_measure_df in own_sentence_measures_dfs_for_algo[1:]: |
sentence_df = sentence_df.merge( |
sent_measure_df, |
how="left", |
on=["subject", "trial_id", "item", "condition", "sentence_number", "sentence", "number_of_words"], |
) |
sentence_df = reorder_columns( |
sentence_df, ["subject", "trial_id", "item", "condition", "sentence_number", "sentence", "number_of_words"] |
) |
trial["own_word_measures_dfs_for_algo"] = words_df |
trial["own_sentence_measures_dfs_for_algo"] = sentence_df |
dffix = reorder_columns(dffix) |
if for_multi: |
return dffix |
else: |
fix_cols_to_keep = [ |
c |
for c in dffix.columns |
if ( |
(any([lname in c for lname in ALL_FIX_MEASURES]) and any([lname in c for lname in fix_cols_to_add])) |
or (not any([lname in c for lname in ALL_FIX_MEASURES])) |
) |
] |
savename = RESULTS_FOLDER.joinpath(pl.Path(trial["filename"]).stem) |
csv_name = f"{savename}_{trial['trial_id']}_corrected_fixations.csv" |
csv_name = export_dataframe(dffix.loc[:, fix_cols_to_keep].copy(), csv_name) |
export_trial(trial) |
return dffix |
def process_trial_choice( |
trial: dict, |
algo_choice: str, |
choice_handle_short_and_close_fix, |
for_multi, |
discard_fixations_without_sfix, |
discard_far_out_of_text_fix, |
x_thres_in_chars, |
y_thresh_in_heights, |
short_fix_threshold, |
merge_distance_threshold, |
discard_long_fix, |
discard_long_fix_threshold, |
discard_blinks, |
measures_to_calculate_multi_asc, |
include_coords_multi_asc, |
sent_measures_to_calculate_multi_asc, |
classic_algos_cfg, |
models_dict, |
fix_cols_to_add, |
): |
dffix, trial = trial_to_dfs( |
trial=trial, |
choice_handle_short_and_close_fix=choice_handle_short_and_close_fix, |
discard_fixations_without_sfix=discard_fixations_without_sfix, |
discard_far_out_of_text_fix=discard_far_out_of_text_fix, |
x_thres_in_chars=x_thres_in_chars, |
y_thresh_in_heights=y_thresh_in_heights, |
short_fix_threshold=short_fix_threshold, |
discard_long_fix=discard_long_fix, |
discard_long_fix_threshold=discard_long_fix_threshold, |
merge_distance_threshold=merge_distance_threshold, |
discard_blinks=discard_blinks, |
) |
if "chars_list" in trial: |
chars_df = pd.DataFrame(trial["chars_df"]) |
trial["chars_df"] = chars_df.to_dict() |
trial["y_char_unique"] = list(chars_df.char_y_center.sort_values().unique()) |
if algo_choice is not None and ("chars_list" in trial or "words_list" in trial): |
if dffix.shape[0] > 1: |
dffix = correct_df( |
dffix, |
algo_choice, |
trial, |
for_multi=for_multi, |
is_outside_of_streamlit=False, |
classic_algos_cfg=classic_algos_cfg, |
models_dict=models_dict, |
measures_to_calculate_multi_asc=measures_to_calculate_multi_asc, |
include_coords_multi_asc=include_coords_multi_asc, |
sent_measures_to_calc_multi=sent_measures_to_calculate_multi_asc, |
fix_cols_to_add=fix_cols_to_add, |
) |
saccade_df = get_saccade_df(dffix, trial, algo_choice, trial.pop("events_df")) |
trial["saccade_df"] = saccade_df.to_dict() |
fig = plot_saccade_df(dffix, saccade_df, trial, True, False) |
fig.savefig(RESULTS_FOLDER / f"{trial['subject']}_{trial['trial_id']}_saccades.png") |
plt.close(fig) |
else: |
ic( |
f"π¨ Only {dffix.shape[0]} fixation left after processing. saccade_df not created for trial {trial['trial_id']} π¨" |
) |
else: |
ic("π¨ Stimulus information needed for fixation line-assignment π¨") |
for c in ["gaze_df", "dffix"]: |
if c in trial: |
trial.pop(c) |
return dffix, trial |
def get_saccade_df(dffix, trial, algo_choices, events_df): |
if not isinstance(algo_choices, list): |
algo_choices = [algo_choices] |
sac_df_as_detected = events_df[events_df["msg"] == "SAC"].copy() |
last_sacc_stop_time = sac_df_as_detected["stop_uncorrected"].iloc[-1] |
dffix_after_last_sacc = dffix.loc[dffix["start_uncorrected"] > last_sacc_stop_time, :].copy() |
if not dffix_after_last_sacc.empty: |
dffix_before_last_sacc = dffix.loc[dffix["start_uncorrected"] < last_sacc_stop_time, :].copy() |
dffix = pd.concat([dffix_before_last_sacc, dffix_after_last_sacc.iloc[[0], :]], axis=0) |
sac_df_as_detected = sac_df_as_detected[sac_df_as_detected["start"] >= dffix["end_time"].iloc[0]] |
sac_df_as_detected = sac_df_as_detected[sac_df_as_detected["stop"] <= dffix["start_time"].iloc[-1]] |
sac_index_keep = [ |
i for i, row in sac_df_as_detected.iterrows() if np.abs(row["start"] - dffix["start_time"].values).min() < 100 |
] |
sac_df_as_detected = sac_df_as_detected.loc[sac_index_keep, :] |
starts = pd.Series(dffix["start_time"].values, dffix["start_time"]) |
ends = pd.Series(dffix["end_time"].values, dffix["end_time"]) |
starts_reind = starts.reindex(sac_df_as_detected["stop"], method="bfill").dropna() |
ends_reind = ends.reindex(sac_df_as_detected["start"], method="ffill").dropna() |
sac_df_as_detected_start_indexed = sac_df_as_detected.copy().set_index("start") |
saccade_df = ( |
sac_df_as_detected_start_indexed.loc[ends_reind.index, :] |
.reset_index(drop=False) |
.rename({"start": "start_time", "stop": "end_time"}, axis=1) |
) |
saccade_df = pf.get_angle_and_eucl_dist(saccade_df) |
dffix_start_indexed = dffix.copy().set_index("start_time") |
dffix_end_indexed = dffix.copy().set_index("end_time") |
for algo_choice in algo_choices: |
saccade_df[f"ys_{algo_choice}"] = dffix_end_indexed.loc[ends_reind.values, f"y_{algo_choice}"].values |
saccade_df[f"ye_{algo_choice}"] = dffix_start_indexed.loc[starts_reind.values, f"y_{algo_choice}"].values |
saccade_df = pf.get_angle_and_eucl_dist(saccade_df, algo_choice) |
saccade_df[f"lines_{algo_choice}"] = dffix_end_indexed.loc[ends_reind.values, f"line_num_{algo_choice}"].values |
saccade_df[f"linee_{algo_choice}"] = dffix_start_indexed.loc[ |
starts_reind.values, f"line_num_{algo_choice}" |
].values |
saccade_df[f"line_word_s_{algo_choice}"] = dffix_end_indexed.loc[ |
ends_reind.values, f"line_word_{algo_choice}" |
].values |
saccade_df[f"line_word_e_{algo_choice}"] = dffix_start_indexed.loc[ |
starts_reind.values, f"line_word_{algo_choice}" |
].values |
saccade_df[f"lets_{algo_choice}"] = dffix_end_indexed.loc[ends_reind.values, f"letternum_{algo_choice}"].values |
saccade_df[f"lete_{algo_choice}"] = dffix_start_indexed.loc[ |
starts_reind.values, f"letternum_{algo_choice}" |
].values |
blink_df = events_df[events_df["msg"] == "BLINK"] |
for i in range(len(saccade_df)): |
if saccade_df.loc[i, "start_time"] in blink_df["start"]: |
saccade_df.loc[i, "blink"] = True |
saccade_df = pf.compute_non_line_dependent_saccade_measures(saccade_df, trial) |
for algo_choice in algo_choices: |
saccade_df = pf.compute_saccade_measures(saccade_df, trial, algo_choice) |
if "msg" in saccade_df.columns: |
saccade_df = saccade_df.drop(axis=1, labels=["msg"]) |
saccade_df = reorder_columns(saccade_df) |
return saccade_df.dropna(how="all", axis=1).copy() |