import pickle |
from io import StringIO |
import re |
import zipfile |
import os |
import plotly.graph_objects as go |
from io import StringIO |
import numpy as np |
import pandas as pd |
from PIL import Image |
import json |
from matplotlib import pyplot as plt |
import pathlib as pl |
import matplotlib as mpl |
from streamlit.runtime.uploaded_file_manager import UploadedFile |
from tqdm.auto import tqdm |
import time |
import requests |
from icecream import ic |
from matplotlib import font_manager |
from multi_proc_funcs import ( |
add_boxes_to_ax, |
add_text_to_ax, |
matplotlib_plot_df, |
save_trial_to_json, |
sigmoid, |
) |
import emreading_funcs as emf |
ic.configureOutput(includeContext=True) |
TEMP_FIGURE_STIMULUS_PATH = PLOTS_FOLDER / "temp_matplotlib_plot_stimulus.png" |
all_fonts = [x.name for x in font_manager.fontManager.ttflist] |
mpl.use("agg") |
DIST_MODELS_FOLDER = pl.Path("models") |
IMAGENET_MEAN = [0.485, 0.456, 0.406] |
IMAGENET_STD = [0.229, 0.224, 0.225] |
PLOTS_FOLDER = pl.Path("plots") |
names_dict = { |
"SSACC": {"Descr": "Start of Saccade", "Pattern": "SSACC <eye > <stime>"}, |
"ESACC": { |
"Descr": "End of Saccade", |
"Pattern": "ESACC <eye > <stime> <etime > <dur> <sxp > <syp> <exp > <eyp> <ampl > <pv >", |
}, |
"SFIX": {"Descr": "Start of Fixation", "Pattern": "SFIX <eye > <stime>"}, |
"EFIX": {"Descr": "End of Fixation", "Pattern": "EFIX <eye > <stime> <etime > <dur> <axp > <ayp> <aps >"}, |
"SBLINK": {"Descr": "Start of Blink", "Pattern": "SBLINK <eye > <stime>"}, |
"EBLINK": {"Descr": "End of Blink", "Pattern": "EBLINK <eye > <stime> <etime > <dur>"}, |
"DISPLAY ON": {"Descr": "Actual start of Trial", "Pattern": "DISPLAY ON"}, |
} |
metadata_strs = ["DISPLAY COORDS", "GAZE_COORDS", "FRAMERATE"] |
"start": "start_time", |
"stop": "end_time", |
"xs": "x", |
"ys": "y", |
} |
EMREADING_COLS_DROPLIST = ["hasText", "char_trial"] |
"sub": "subject", |
"item": "item", |
"condition": "condition", |
"SFIX": "start_time", |
"EFIX": "end_time", |
"xPos": "x", |
"yPos": "y", |
"fix_number": "fixation_number", |
"fix_dur": "duration", |
"wordID": "on_word_EM", |
"outOfBnds": "out_of_bounds", |
"outsideText": "out_of_text_area", |
} |
def download_url(url, target_filename): |
max_retries = 4 |
for attempt in range(1, max_retries + 1): |
try: |
r = requests.get(url) |
if r.status_code != 200: |
ic(f"Download failed due to unsuccessful response from server: {r.status_code}") |
return -1 |
open(target_filename, "wb").write(r.content) |
return 0 |
except Exception as e: |
if attempt < max_retries: |
time.sleep(2 * attempt) |
ic(f"Download failed due to an error; will try again in {attempt*2} seconds:", e) |
else: |
ic(f"Failed after all attempts ({url}). Error details:\n{e}") |
return -1 |
def asc_to_trial_ids( |
asc_file, close_gap_between_words, paragraph_trials_only, ias_files, trial_start_keyword, end_trial_at_keyword |
): |
asc_encoding = ["ISO-8859-15", "UTF-8"][0] |
trials_dict, lines = file_to_trials_and_lines( |
asc_file, |
asc_encoding, |
close_gap_between_words=close_gap_between_words, |
paragraph_trials_only=paragraph_trials_only, |
uploaded_ias_files=ias_files, |
trial_start_keyword=trial_start_keyword, |
end_trial_at_keyword=end_trial_at_keyword, |
) |
enum = ( |
trials_dict["paragraph_trials"] |
if paragraph_trials_only and "paragraph_trials" in trials_dict.keys() |
else range(trials_dict["max_trial_idx"]) |
) |
trials_by_ids = {trials_dict[idx]["trial_id"]: trials_dict[idx] for idx in enum} |
return trials_by_ids, lines, trials_dict |
def get_trials_list( |
asc_file, close_gap_between_words, paragraph_trials_only, ias_files, trial_start_keyword, end_trial_at_keyword |
): |
if hasattr(asc_file, "name"): |
savename = pl.Path(asc_file.name).stem |
else: |
savename = pl.Path(asc_file).stem |
trials_by_ids, lines, trials_dict = asc_to_trial_ids( |
asc_file, |
close_gap_between_words=close_gap_between_words, |
paragraph_trials_only=paragraph_trials_only, |
ias_files=ias_files, |
trial_start_keyword=trial_start_keyword, |
end_trial_at_keyword=end_trial_at_keyword, |
) |
trial_keys = list(trials_by_ids.keys()) |
savename = RESULTS_FOLDER / f"{savename}_metadata_overview.json" |
offload_list = [ |
"gaze_df", |
"dffix", |
"chars_df", |
"saccade_df", |
"x_char_unique", |
"line_heights", |
"chars_list", |
"words_list", |
"dffix_sacdf_popEye", |
"fixdf_popEye", |
"saccade_df", |
"sacdf_popEye", |
"combined_df", |
"events_df", |
] |
trials_dict_cut_down = {} |
for k_outer, v_outer in trials_dict.items(): |
if isinstance(v_outer, dict): |
trials_dict_cut_down[k_outer] = {} |
for prop, val in v_outer.items(): |
if prop not in offload_list: |
trials_dict_cut_down[k_outer][prop] = val |
else: |
trials_dict_cut_down[k_outer] = v_outer |
save_trial_to_json(trials_dict_cut_down, savename=savename) |
return trial_keys, trials_by_ids, lines, asc_file, trials_dict |
def calc_xdiff_ydiff(line_xcoords_no_pad, line_ycoords_no_pad, line_heights, allow_multiple_values=False): |
x_diffs = np.unique(np.diff(line_xcoords_no_pad)) |
if len(x_diffs) == 1: |
x_diff = x_diffs[0] |
elif not allow_multiple_values: |
x_diff = np.min(x_diffs) |
else: |
x_diff = x_diffs |
if np.unique(line_ycoords_no_pad).shape[0] == 1: |
return x_diff, line_heights[0] |
y_diffs = np.unique(np.diff(line_ycoords_no_pad)) |
if len(y_diffs) == 1: |
y_diff = y_diffs[0] |
elif len(y_diffs) == 0: |
y_diff = 0 |
elif not allow_multiple_values: |
y_diff = np.min(y_diffs) |
else: |
y_diff = y_diffs |
return np.round(x_diff, decimals=2), np.round(y_diff, decimals=2) |
def add_words(chars_list): |
chars_list_reconstructed = [] |
words_list = [] |
sentence_list = [] |
sentence_start_idx = 0 |
sentence_num = 0 |
word_start_idx = 0 |
chars_df = pd.DataFrame(chars_list) |
chars_df["char_width"] = chars_df.char_xmax - chars_df.char_xmin |
word_dict = None |
on_line_num = -1 |
line_change_on_next_char = False |
num_chars = len(chars_list) |
for idx, char_dict in enumerate(chars_list): |
on_line_num = char_dict["assigned_line"] |
if idx < num_chars - 1: |
line_change_on_next_char = on_line_num != chars_list[idx + 1]["assigned_line"] |
else: |
line_change_on_next_char = False |
chars_list_reconstructed.append(char_dict) |
if char_dict["char"] in [" "] or len(chars_list_reconstructed) == len(chars_list) or line_change_on_next_char: |
word_xmin = chars_list_reconstructed[word_start_idx]["char_xmin"] |
if chars_list_reconstructed[-1]["char"] == " " and len(chars_list_reconstructed) != 1: |
word_xmax = chars_list_reconstructed[-2]["char_xmax"] |
word = "".join( |
[ |
chars_list_reconstructed[idx]["char"] |
for idx in range(word_start_idx, len(chars_list_reconstructed) - 1) |
] |
) |
elif len(chars_list_reconstructed) == 1: |
word_xmax = chars_list_reconstructed[-1]["char_xmax"] |
word = " " |
else: |
word = "".join( |
[ |
chars_list_reconstructed[idx]["char"] |
for idx in range(word_start_idx, len(chars_list_reconstructed)) |
] |
) |
word_xmax = chars_list_reconstructed[-1]["char_xmax"] |
word_ymin = chars_list_reconstructed[word_start_idx]["char_ymin"] |
word_ymax = chars_list_reconstructed[word_start_idx]["char_ymax"] |
word_x_center = round((word_xmax - word_xmin) / 2 + word_xmin, ndigits=2) |
word_y_center = round((word_ymax - word_ymin) / 2 + word_ymin, ndigits=2) |
word_length = len(word) |
assigned_line = chars_list_reconstructed[word_start_idx]["assigned_line"] |
word_dict = dict( |
word_number=len(words_list), |
word=word, |
word_length=word_length, |
word_xmin=word_xmin, |
word_xmax=word_xmax, |
word_ymin=word_ymin, |
word_ymax=word_ymax, |
word_x_center=word_x_center, |
word_y_center=word_y_center, |
assigned_line=assigned_line, |
) |
if len(word) > 0 and word != " ": |
words_list.append(word_dict) |
for cidx, char_dict in enumerate(chars_list_reconstructed[word_start_idx:]): |
if char_dict["char"] == " ": |
char_dict["in_word_number"] = len(words_list) |
char_dict["in_word"] = " " |
char_dict["num_letters_from_start_of_word"] = 0 |
else: |
char_dict["in_word_number"] = len(words_list) - 1 |
char_dict["in_word"] = word |
char_dict["num_letters_from_start_of_word"] = cidx |
word_start_idx = idx + 1 |
if chars_list_reconstructed[-1]["char"] in [".", "!", "?"] or idx == (len(chars_list) - 1): |
if idx != sentence_start_idx: |
chars_df_temp = pd.DataFrame(chars_list_reconstructed[sentence_start_idx:]) |
line_texts = [] |
for sidx, subdf in chars_df_temp.groupby("assigned_line"): |
line_text = "_".join(subdf.char.values) |
line_text = line_text.replace("_ _", " ") |
line_text = line_text.replace("_", "") |
line_texts.append(line_text.strip()) |
sentence_text = " ".join(line_texts) |
sentence_dict = dict(sentence_num=sentence_num, sentence_text=sentence_text) |
sentence_list.append(sentence_dict) |
for c in chars_list_reconstructed[sentence_start_idx:]: |
c["in_sentence_number"] = sentence_num |
c["in_sentence"] = sentence_text |
sentence_start_idx = len(chars_list_reconstructed) |
sentence_num += 1 |
else: |
sentence_list[-1]["sentence_text"] += chars_list_reconstructed[sentence_start_idx]["char"] |
chars_list_reconstructed[idx]["in_sentence_number"] = sentence_list[-1]["sentence_num"] |
chars_list_reconstructed[idx]["in_sentence"] = sentence_list[-1]["sentence_text"] |
for cidx, char_dict in enumerate(chars_list_reconstructed): |
if ( |
char_dict["char"] == " " |
and (cidx + 1) < len(chars_list_reconstructed) |
and char_dict["assigned_line"] == chars_list_reconstructed[cidx + 1]["assigned_line"] |
): |
char_dict["in_word_number"] = chars_list_reconstructed[cidx + 1]["in_word_number"] |
char_dict["in_word"] = chars_list_reconstructed[cidx + 1]["in_word"] |
last_letter_in_word = words_list[-1]["word"][-1] |
last_letter_in_chars_list_reconstructed = char_dict["char"] |
if last_letter_in_word != last_letter_in_chars_list_reconstructed: |
if last_letter_in_chars_list_reconstructed in [".", "!", "?"]: |
words_list[-1] = dict( |
word_number=len(words_list), |
word=words_list[-1]["word"] + char_dict["char"], |
word_length=len(words_list[-1]["word"] + char_dict["char"]), |
word_xmin=words_list[-1]["word_xmin"], |
word_xmax=char_dict["char_xmax"], |
word_ymin=words_list[-1]["word_ymin"], |
word_ymax=words_list[-1]["word_ymax"], |
assigned_line=assigned_line, |
) |
word_x_center = round( |
(words_list[-1]["word_xmax"] - words_list[-1]["word_xmin"]) / 2 + words_list[-1]["word_xmin"], ndigits=2 |
) |
word_y_center = round( |
(words_list[-1]["word_ymax"] - word_dict["word_ymin"]) / 2 + words_list[-1]["word_ymin"], ndigits=2 |
) |
words_list[-1]["word_x_center"] = word_x_center |
words_list[-1]["word_y_center"] = word_y_center |
else: |
word_dict = dict( |
word_number=len(words_list), |
word=char_dict["char"], |
word_length=1, |
word_xmin=char_dict["char_xmin"], |
word_xmax=char_dict["char_xmax"], |
word_ymin=char_dict["char_ymin"], |
word_ymax=char_dict["char_ymax"], |
word_x_center=char_dict["char_x_center"], |
word_y_center=char_dict["char_y_center"], |
assigned_line=assigned_line, |
) |
words_list.append(word_dict) |
chars_list_reconstructed[-1]["in_word_number"] = len(words_list) - 1 |
chars_list_reconstructed[-1]["in_word"] = word_dict["word"] |
chars_list_reconstructed[-1]["num_letters_from_start_of_word"] = 0 |
if len(sentence_list) > 0: |
chars_list_reconstructed[-1]["in_sentence_number"] = sentence_num - 1 |
chars_list_reconstructed[-1]["in_sentence"] = sentence_list[-1]["sentence_text"] |
else: |
ic(f"Warning Sentence list empty: {sentence_list}") |
return words_list, chars_list_reconstructed |
def read_ias_file(ias_file, prefix): |
if isinstance(ias_file, UploadedFile): |
lines = StringIO(ias_file.getvalue().decode("utf-8")).readlines() |
ias_dicts = [] |
for l in lines: |
lsplit = l.strip().split("\t") |
ldict = { |
f"{prefix}_number": float(lsplit[1]), |
f"{prefix}_xmin": float(lsplit[2]), |
f"{prefix}_xmax": float(lsplit[4]), |
f"{prefix}_ymin": float(lsplit[3]), |
f"{prefix}_ymax": float(lsplit[5]), |
prefix: lsplit[6], |
} |
ias_dicts.append(ldict) |
ias_df = pd.DataFrame(ias_dicts) |
else: |
ias_df = pd.read_csv(ias_file, delimiter="\t", header=None) |
ias_df = ias_df.rename( |
{ |
1: f"{prefix}_number", |
2: f"{prefix}_xmin", |
4: f"{prefix}_xmax", |
3: f"{prefix}_ymin", |
5: f"{prefix}_ymax", |
6: prefix, |
}, |
axis=1, |
) |
first_line_df = ias_df[ias_df[f"{prefix}_ymin"] == ias_df.loc[0, f"{prefix}_ymin"]] |
words_include_spaces = ( |
first_line_df[f"{prefix}_xmax"].values == first_line_df[f"{prefix}_xmin"].shift(-1).values |
).any() |
ias_df[f"{prefix}_width"] = ias_df[f"{prefix}_xmax"] - ias_df[f"{prefix}_xmin"] |
if words_include_spaces: |
ias_df[f"{prefix}_length"] = ias_df[prefix].map(lambda x: len(x) + 1) |
ias_df[f"{prefix}_width_per_length"] = ias_df[f"{prefix}_width"] / ias_df[f"{prefix}_length"] |
ias_df[f"{prefix}_xmax"] = (ias_df[f"{prefix}_xmax"] - ias_df[f"{prefix}_width_per_length"]).round(2) |
ias_df[f"{prefix}_x_center"] = ( |
(ias_df[f"{prefix}_xmax"] - ias_df[f"{prefix}_xmin"]) / 2 + ias_df[f"{prefix}_xmin"] |
).round(2) |
ias_df[f"{prefix}_y_center"] = ( |
(ias_df[f"{prefix}_ymax"] - ias_df[f"{prefix}_ymin"]) / 2 + ias_df[f"{prefix}_ymin"] |
).round(2) |
unique_midlines = list(np.unique(ias_df[f"{prefix}_y_center"])) |
assigned_lines = [unique_midlines.index(x) for x in ias_df[f"{prefix}_y_center"]] |
ias_df["assigned_line"] = assigned_lines |
ias_df[f"{prefix}_number"] = np.arange(ias_df.shape[0]) |
return ias_df |
def get_chars_list_from_words_list(ias_df, prefix="word"): |
ias_df.reset_index(inplace=True, drop=True) |
unique_midlines = list(np.unique(ias_df[f"{prefix}_y_center"])) |
chars_list = [] |
for (idx, row), (next_idx, next_row) in zip(ias_df.iterrows(), ias_df.shift(-1).iterrows()): |
word = str(row[prefix]) |
letter_width = (row[f"{prefix}_xmax"] - row[f"{prefix}_xmin"]) / len(word) |
for i_w, letter in enumerate(word): |
char_dict = dict( |
in_word_number=idx, |
in_word=word, |
char_xmin=round(row[f"{prefix}_xmin"] + i_w * letter_width, 2), |
char_xmax=round(row[f"{prefix}_xmin"] + (i_w + 1) * letter_width, 2), |
char_ymin=row[f"{prefix}_ymin"], |
char_ymax=row[f"{prefix}_ymax"], |
char=letter, |
) |
char_dict["char_x_center"] = round( |
(char_dict["char_xmax"] - char_dict["char_xmin"]) / 2 + char_dict["char_xmin"], ndigits=2 |
) |
char_dict["char_y_center"] = round( |
(row[f"{prefix}_ymax"] - row[f"{prefix}_ymin"]) / 2 + row[f"{prefix}_ymin"], ndigits=2 |
) |
if i_w >= len(word) + 1: |
break |
char_dict["assigned_line"] = unique_midlines.index(char_dict["char_y_center"]) |
chars_list.append(char_dict) |
if chars_list[-1]["char"] != " " and row.assigned_line == next_row.assigned_line: |
char_dict = dict( |
char_xmin=chars_list[-1]["char_xmax"], |
char_xmax=round(chars_list[-1]["char_xmax"] + letter_width, 2), |
char_ymin=row[f"{prefix}_ymin"], |
char_ymax=row[f"{prefix}_ymax"], |
char=" ", |
) |
char_dict["char_x_center"] = round( |
(char_dict["char_xmax"] - char_dict["char_xmin"]) / 2 + char_dict["char_xmin"], ndigits=2 |
) |
char_dict["char_y_center"] = round( |
(row[f"{prefix}_ymax"] - row[f"{prefix}_ymin"]) / 2 + row[f"{prefix}_ymin"], ndigits=2 |
) |
char_dict["assigned_line"] = unique_midlines.index(char_dict["char_y_center"]) |
chars_list.append(char_dict) |
chars_df = pd.DataFrame(chars_list) |
chars_df.loc[:, ["in_word_number", "in_word"]] = chars_df.loc[:, ["in_word_number", "in_word"]].copy().ffill(axis=0) |
return chars_df.to_dict("records") |
def check_values(v1, v2): |
"""Function that compares two lists for equality. |
Returns True if both lists are the same; False if they are not; and None if either is None.""" |
if v1 is None or v2 is None or pd.isna(v1) or pd.isna(v2): |
return None |
if v1 != v2: |
return False |
if v1 != v2: |
return False |
return True |
def asc_lines_to_trials_by_trail_id( |
lines: list, |
paragraph_trials_only=True, |
filename: str = "", |
close_gap_between_words=True, |
ias_files=[], |
start_trial_at_keyword="START", |
end_trial_at_keyword="END", |
) -> dict: |
if len(ias_files) > 0: |
ias_files_dict = {pl.Path(f.name).stem: f for f in ias_files} |
else: |
ias_files_dict = {} |
if hasattr(filename, "name"): |
filename = filename.name |
subject = pl.Path(filename).stem |
y_px = [] |
x_px = [] |
calibration_offset = [] |
calibration_max_error = [] |
calibration_time = [] |
calibration_avg_error = [] |
trial_var_block_lines = None |
question_answer = None |
question_correct = None |
condition = "UNKNOWN" |
item = "UNKNOWN" |
depend = "UNKNOWN" |
trial_index = None |
fps = None |
display_coords = None |
trial_var_block_idx = -1 |
trials_dict = dict(paragraph_trials=[], paragraph_trial_IDs=[]) |
trial_idx = -1 |
trial_var_block_start_idx = -1 |
removed_trial_ids = [] |
ias_file = "" |
trial_var_block_lines_list = [] |
if "\n".join(map(str.strip, lines)).find("TRIAL_VAR") != -1: |
for idx, l in enumerate(tqdm(lines, desc=f"Checking for TRIAL_VAR lines for {filename}")): |
if trial_var_block_start_idx == -1 and "MSG" not in l: |
continue |
if "TRIAL_VAR" in l: |
if trial_var_block_start_idx == -1: |
trial_var_block_start_idx = idx |
continue |
else: |
if trial_var_block_start_idx != -1: |
trial_var_block_stop_idx = idx |
trial_var_block_lines = [ |
x.strip() for x in lines[trial_var_block_start_idx:trial_var_block_stop_idx] |
] |
trial_var_block_lines_list.append(trial_var_block_lines) |
trial_var_block_start_idx = -1 |
has_trial_var_lines = len(trial_var_block_lines_list) > 0 |
else: |
has_trial_var_lines = False |
for idx, l in enumerate(lines): |
if "MSG" not in l: |
continue |
parts = l.strip().split(" ") |
if "TRIALID" in l: |
trial_id = re.split(r"[ :\t]+", l.strip())[-1] |
trial_id_timestamp = parts[1] |
trial_idx += 1 |
if trial_id[0] in ["F", "P", "E"]: |
parse_dict = emf.parse_itemID(trial_id) |
condition = parse_dict["condition"] |
item = parse_dict["item"] |
depend = parse_dict["depend"] |
else: |
parse_dict = {} |
if trial_id[0] == "F": |
trial_is = "question" |
elif trial_id[0] == "P": |
trial_is = "practice" |
else: |
if has_trial_var_lines: |
trial_var_block_idx += 1 |
trial_var_block_lines = trial_var_block_lines_list[trial_var_block_idx] |
image_lines = [s for s in trial_var_block_lines if "img" in s] |
if len(image_lines) > 0: |
item = image_lines[0].split(" ")[-1] |
cond_lines = [s for s in trial_var_block_lines if "cond" in s] |
if len(cond_lines) > 0: |
condition = cond_lines[0].split(" ")[-1] |
item_lines = [s for s in trial_var_block_lines if "item" in s] |
if len(item_lines) > 0: |
item = item_lines[0].split(" ")[-1] |
trial_index_lines = [s for s in trial_var_block_lines if "Trial_Index" in s] |
if len(trial_index_lines) > 0: |
trial_index = trial_index_lines[0].split(" ")[-1] |
question_key_lines = [s for s in trial_var_block_lines if "QUESTION_KEY_PRESSED" in s] |
if len(question_key_lines) > 0: |
question_answer = question_key_lines[0].split(" ")[-1] |
question_response_lines = [s for s in trial_var_block_lines if " RESPONSE" in s] |
if len(question_response_lines) > 0: |
question_answer = question_response_lines[0].split(" ")[-1] |
question_correct_lines = [ |
s for s in trial_var_block_lines if ("QUESTION_ACCURACY" in s) | (" ACCURACY" in s) |
] |
if len(question_correct_lines) > 0: |
question_correct = question_correct_lines[0].split(" ")[-1] |
trial_is_lines = [s for s in trial_var_block_lines if "trial" in s] |
if len(trial_is_lines) > 0: |
trial_is_line = trial_is_lines[0].split(" ")[-1] |
if "pract" in trial_is_line or "end" in trial_is_line: |
trial_is = "practice" |
trial_id = f"{trial_is}_{trial_id}" |
else: |
trial_is = "paragraph" |
trial_id = f"{condition}_{trial_is}_{trial_id}" |
trials_dict["paragraph_trials"].append(trial_idx) |
trials_dict["paragraph_trial_IDs"].append(trial_id) |
else: |
trial_is = "paragraph" |
trial_id = f"{condition}_{trial_is}_{trial_id}_{trial_idx}" |
trials_dict["paragraph_trials"].append(trial_idx) |
trials_dict["paragraph_trial_IDs"].append(trial_id) |
else: |
if len(trial_id) > 1: |
condition = trial_id[1] |
trial_is = "paragraph" |
trials_dict["paragraph_trials"].append(trial_idx) |
trials_dict["paragraph_trial_IDs"].append(trial_id) |
trials_dict[trial_idx] = dict( |
subject=subject, |
filename=filename, |
trial_idx=trial_idx, |
trial_id=trial_id, |
trial_id_idx=idx, |
trial_id_timestamp=trial_id_timestamp, |
trial_is=trial_is, |
trial_var_block_lines=trial_var_block_lines, |
seq=trial_idx, |
item=item, |
depend=depend, |
condition=condition, |
parse_dict=parse_dict, |
) |
if question_answer is not None: |
trials_dict[trial_idx]["question_answer"] = question_answer |
if question_correct is not None: |
trials_dict[trial_idx]["question_correct"] = question_correct |
if trial_index is not None: |
trials_dict[trial_idx]["trial_index"] = trial_index |
last_trial_skipped = False |
elif "TRIAL_RESULT" in l or "stop_trial" in l: |
trials_dict[trial_idx]["trial_result_idx"] = idx |
trials_dict[trial_idx]["trial_result_timestamp"] = int(parts[0].split("\t")[1]) |
if len(parts) > 2: |
trials_dict[trial_idx]["trial_result_number"] = int(parts[2]) |
elif "QUESTION_ANSWER" in l and not has_trial_var_lines: |
trials_dict[trial_idx]["question_answer_idx"] = idx |
trials_dict[trial_idx]["question_answer_timestamp"] = int(parts[0].split("\t")[1]) |
if len(parts) > 2: |
trials_dict[trial_idx]["question_answer_question_trial"] = int( |
pd.to_numeric(l.strip().split(" ")[-1].strip(), errors="coerce") |
) |
elif "KEYBOARD" in l: |
trials_dict[trial_idx]["keyboard_press_idx"] = idx |
trials_dict[trial_idx]["keyboard_press_timestamp"] = int(parts[0].split("\t")[1]) |
elif "DISPLAY COORDS" in l and display_coords is None: |
display_coords = (float(parts[-4]), float(parts[-3]), float(parts[-2]), float(parts[-1])) |
elif "GAZE_COORDS" in l and display_coords is None: |
display_coords = (float(parts[-4]), float(parts[-3]), float(parts[-2]), float(parts[-1])) |
elif "FRAMERATE" in l: |
l_idx = parts.index(metadata_strs[2]) |
fps = float(parts[l_idx + 1]) |
elif "TRIAL ABORTED" in l or "TRIAL REPEATED" in l: |
if not last_trial_skipped: |
if trial_is == "paragraph": |
trials_dict["paragraph_trials"].remove(trial_idx) |
trial_idx -= 1 |
removed_trial_ids.append(trial_id) |
last_trial_skipped = True |
elif "IAREA FILE" in l: |
ias_file = parts[-1] |
ias_file_stem = ias_file.split("/")[-1].split("\\")[-1].split(".")[0] |
trials_dict[trial_idx]["ias_file_from_asc"] = ias_file |
trials_dict[trial_idx]["ias_file"] = ias_file_stem |
if item == "UNKNOWN": |
trials_dict[trial_idx]["item"] = ias_file_stem |
if ias_file_stem in ias_files_dict: |
try: |
ias_file = ias_files_dict[ias_file_stem] |
ias_df = read_ias_file(ias_file, prefix="word") |
trials_dict[trial_idx]["words_list"] = ias_df.to_dict("records") |
trials_dict[trial_idx]["chars_list"] = get_chars_list_from_words_list(ias_df, prefix="word") |
except Exception as e: |
ic(f"Reading ias file failed") |
ic(e) |
else: |
ic(f"IAS file {ias_file_stem} not found") |
elif "CALIBRATION" in l and "MSG" in l: |
calibration_method = parts[3].strip() |
if trial_idx > -1: |
trials_dict[trial_idx]["calibration_method"] = calibration_method |
elif "VALIDATION" in l and "MSG" in l and "ABORTED" not in l: |
try: |
calibration_time_line_parts = re.split(r"[ :\t]+", l.strip()) |
calibration_time.append(float(calibration_time_line_parts[1])) |
calibration_avg_error.append(float(calibration_time_line_parts[9])) |
calibration_max_error.append(float(calibration_time_line_parts[11])) |
calibration_offset.append(float(calibration_time_line_parts[14])) |
x_px.append(float(calibration_time_line_parts[-2].split(",")[0])) |
y_px.append(float(calibration_time_line_parts[-2].split(",")[1])) |
except Exception as e: |
ic(f"parsing VALIDATION failed for line {l}") |
trials_df = pd.DataFrame([trials_dict[i] for i in range(trial_idx) if i in trials_dict]) |
if ( |
question_correct is None |
and "trial_result_number" in trials_df.columns |
and "question_answer_question_trial" in trials_df.columns |
): |
trials_df["question_answer_selection"] = trials_df["trial_result_number"].shift(-1).values |
trials_df["correct_trial_answer_would_be"] = trials_df["question_answer_question_trial"].shift(-1).values |
trials_df["question_correct"] = [ |
check_values(a, b) |
for a, b in zip(trials_df["question_answer_selection"], trials_df["correct_trial_answer_would_be"]) |
] |
for pidx, prow in trials_df.loc[trials_df.trial_is == "paragraph", :].iterrows(): |
trials_dict[pidx]["question_correct"] = prow["question_correct"] |
if prow["question_correct"] is not None: |
trials_dict[pidx]["question_answer_selection"] = prow["question_answer_selection"] |
trials_dict[pidx]["correct_trial_answer_would_be"] = prow["correct_trial_answer_would_be"] |
else: |
trials_dict[pidx]["question_answer_selection"] = None |
trials_dict[pidx]["correct_trial_answer_would_be"] = None |
if "question_correct" in trials_df.columns: |
paragraph_trials_df = trials_df.loc[trials_df.trial_is == "paragraph", :] |
overall_question_answer_value_counts = ( |
paragraph_trials_df["question_correct"].dropna().astype(int).value_counts().to_dict() |
) |
overall_question_answer_value_counts_normed = ( |
paragraph_trials_df["question_correct"].dropna().astype(int).value_counts(normalize=True).to_dict() |
) |
else: |
overall_question_answer_value_counts = None |
overall_question_answer_value_counts_normed = None |
if paragraph_trials_only: |
trials_dict_temp = trials_dict.copy() |
for k in trials_dict_temp.keys(): |
if k not in ["paragraph_trials"] + trials_dict_temp["paragraph_trials"]: |
trials_dict.pop(k) |
if len(trials_dict_temp["paragraph_trials"]): |
trial_idx = trials_dict_temp["paragraph_trials"][-1] |
else: |
return trials_dict |
trials_dict["display_coords"] = display_coords |
trials_dict["fps"] = fps |
trials_dict["max_trial_idx"] = trial_idx |
trials_dict["overall_question_answer_value_counts"] = overall_question_answer_value_counts |
trials_dict["overall_question_answer_value_counts_normed"] = overall_question_answer_value_counts_normed |
enum = ( |
trials_dict["paragraph_trials"] |
if ("paragraph_trials" in trials_dict.keys() and paragraph_trials_only) |
else range(len(trials_dict)) |
) |
for trial_idx in enum: |
if trial_idx not in trials_dict.keys(): |
continue |
if "chars_list" in trials_dict[trial_idx]: |
chars_list = trials_dict[trial_idx]["chars_list"] |
else: |
chars_list = [] |
if "display_coords" not in trials_dict[trial_idx].keys(): |
trials_dict[trial_idx]["display_coords"] = trials_dict["display_coords"] |
trials_dict[trial_idx]["overall_question_answer_value_counts"] = trials_dict[ |
"overall_question_answer_value_counts" |
] |
trials_dict[trial_idx]["overall_question_answer_value_counts_normed"] = trials_dict[ |
"overall_question_answer_value_counts_normed" |
] |
trial_start_idx = trials_dict[trial_idx]["trial_id_idx"] |
trial_end_idx = trials_dict[trial_idx]["trial_result_idx"] |
trial_lines = lines[trial_start_idx:trial_end_idx] |
if len(y_px) > 0: |
trials_dict[trial_idx]["y_px"] = y_px |
trials_dict[trial_idx]["x_px"] = x_px |
if "calibration_method" not in trials_dict[trial_idx]: |
trials_dict[trial_idx]["calibration_method"] = calibration_method |
trials_dict[trial_idx]["calibration_offset"] = calibration_offset |
trials_dict[trial_idx]["calibration_max_error"] = calibration_max_error |
trials_dict[trial_idx]["calibration_time"] = calibration_time |
trials_dict[trial_idx]["calibration_avg_error"] = calibration_avg_error |
for idx, l in enumerate(trial_lines): |
parts = l.strip().split(" ") |
if "START" in l and " MSG" not in l: |
trials_dict[trial_idx]["text_end_idx"] = trial_start_idx + idx |
trials_dict[trial_idx]["start_idx"] = trial_start_idx + idx + 7 |
trials_dict[trial_idx]["start_time"] = int(parts[0].split("\t")[1]) |
elif "END" in l and "ENDBUTTON" not in l and " MSG" not in l: |
trials_dict[trial_idx]["end_idx"] = trial_start_idx + idx - 2 |
trials_dict[trial_idx]["end_time"] = int(parts[0].split("\t")[1]) |
elif "MSG" not in l: |
continue |
elif "ENDBUTTON" in l: |
trials_dict[trial_idx]["endbutton_idx"] = trial_start_idx + idx |
trials_dict[trial_idx]["endbutton_time"] = int(parts[0].split("\t")[1]) |
elif "SYNCTIME" in l: |
trials_dict[trial_idx]["synctime"] = trial_start_idx + idx |
trials_dict[trial_idx]["synctime_time"] = int(parts[0].split("\t")[1]) |
elif start_trial_at_keyword in l: |
trials_dict[trial_idx][f"{start_trial_at_keyword}_line_idx"] = trial_start_idx + idx |
trials_dict[trial_idx][f"{start_trial_at_keyword}_time"] = int(parts[0].split("\t")[1]) |
elif "GAZE TARGET OFF" in l: |
trials_dict[trial_idx]["gaze_targ_off_time"] = int(parts[0].split("\t")[1]) |
elif "GAZE TARGET ON" in l: |
trials_dict[trial_idx]["gaze_targ_on_time"] = int(parts[0].split("\t")[1]) |
trials_dict[trial_idx]["gaze_targ_on_time_idx"] = trial_start_idx + idx |
elif "DISPLAY_SENTENCE" in l: |
trials_dict[trial_idx]["gaze_targ_on_time"] = int(parts[0].split("\t")[1]) |
trials_dict[trial_idx]["gaze_targ_on_time_idx"] = trial_start_idx + idx |
elif "DISPLAY TEXT" in l: |
trials_dict[trial_idx]["text_start_idx"] = trial_start_idx + idx |
elif "REGION CHAR" in l: |
rg_idx = parts.index("CHAR") |
if len(parts[rg_idx:]) > 8: |
char = " " |
idx_correction = 1 |
elif len(parts[rg_idx:]) == 3: |
char = " " |
if "REGION CHAR" not in trial_lines[idx + 1]: |
parts = trial_lines[idx + 1].strip().split(" ") |
idx_correction = -rg_idx - 4 |
else: |
char = parts[rg_idx + 3] |
idx_correction = 0 |
try: |
char_dict = { |
"char": char, |
"char_xmin": float(parts[rg_idx + 4 + idx_correction]), |
"char_ymin": float(parts[rg_idx + 5 + idx_correction]), |
"char_xmax": float(parts[rg_idx + 6 + idx_correction]), |
"char_ymax": float(parts[rg_idx + 7 + idx_correction]), |
} |
char_dict["char_y_center"] = round( |
(char_dict["char_ymax"] - char_dict["char_ymin"]) / 2 + char_dict["char_ymin"], ndigits=2 |
) |
char_dict["char_x_center"] = round( |
(char_dict["char_xmax"] - char_dict["char_xmin"]) / 2 + char_dict["char_xmin"], ndigits=2 |
) |
chars_list.append(char_dict) |
except Exception as e: |
ic(f"char_dict creation failed for parts {parts}") |
ic(e) |
if start_trial_at_keyword == "SYNCTIME" and "synctime_time" in trials_dict[trial_idx]: |
trials_dict[trial_idx]["trial_start_time"] = trials_dict[trial_idx]["synctime_time"] |
trials_dict[trial_idx]["trial_start_idx"] = trials_dict[trial_idx]["synctime"] |
elif start_trial_at_keyword == "GAZE TARGET ON" and "gaze_targ_on_time" in trials_dict[trial_idx]: |
trials_dict[trial_idx]["trial_start_time"] = trials_dict[trial_idx]["gaze_targ_on_time"] |
trials_dict[trial_idx]["trial_start_idx"] = trials_dict[trial_idx]["gaze_targ_on_time_idx"] |
elif start_trial_at_keyword == "START": |
trials_dict[trial_idx]["trial_start_time"] = trials_dict[trial_idx]["start_time"] |
trials_dict[trial_idx]["trial_start_idx"] = trials_dict[trial_idx]["start_idx"] |
elif f"{start_trial_at_keyword}_time" in trials_dict[trial_idx]: |
trials_dict[trial_idx]["trial_start_time"] = trials_dict[trial_idx][f"{start_trial_at_keyword}_time"] |
trials_dict[trial_idx]["trial_start_idx"] = trials_dict[trial_idx][f"{start_trial_at_keyword}_line_idx"] |
else: |
trials_dict[trial_idx]["trial_start_time"] = trials_dict[trial_idx]["start_time"] |
trials_dict[trial_idx]["trial_start_idx"] = trials_dict[trial_idx]["start_idx"] |
if end_trial_at_keyword == "ENDBUTTON" and "endbutton_time" in trials_dict[trial_idx]: |
trials_dict[trial_idx]["trial_end_time"] = trials_dict[trial_idx]["endbutton_time"] |
trials_dict[trial_idx]["trial_end_idx"] = trials_dict[trial_idx]["endbutton_idx"] |
elif end_trial_at_keyword == "END" and "end_idx" in trials_dict[trial_idx]: |
trials_dict[trial_idx]["trial_end_time"] = trials_dict[trial_idx]["end_time"] |
trials_dict[trial_idx]["trial_end_idx"] = trials_dict[trial_idx]["end_idx"] |
elif end_trial_at_keyword == "KEYBOARD" and "keyboard_press_idx" in trials_dict[trial_idx]: |
trials_dict[trial_idx]["trial_end_idx"] = trials_dict[trial_idx]["keyboard_press_idx"] |
else: |
trials_dict[trial_idx]["trial_end_idx"] = trials_dict[trial_idx]["trial_result_idx"] |
if trials_dict[trial_idx]["trial_end_idx"] < trials_dict[trial_idx]["trial_start_idx"]: |
raise ValueError(f"trial_start_idx is larger than trial_end_idx for trial_idx {trial_idx}") |
if len(chars_list) > 0: |
line_ycoords = [] |
for idx in range(len(chars_list)): |
chars_list[idx]["char_y_center"] = round( |
(chars_list[idx]["char_ymax"] - chars_list[idx]["char_ymin"]) / 2 + chars_list[idx]["char_ymin"], |
ndigits=2, |
) |
if chars_list[idx]["char_y_center"] not in line_ycoords: |
line_ycoords.append(chars_list[idx]["char_y_center"]) |
for idx in range(len(chars_list)): |
chars_list[idx]["assigned_line"] = line_ycoords.index(chars_list[idx]["char_y_center"]) |
letter_width_avg = np.mean( |
[x["char_xmax"] - x["char_xmin"] for x in chars_list if x["char_xmax"] > x["char_xmin"]] |
) |
line_heights = [round(abs(x["char_ymax"] - x["char_ymin"]), 3) for x in chars_list] |
line_xcoords_all = [x["char_x_center"] for x in chars_list] |
line_xcoords_no_pad = np.unique(line_xcoords_all) |
line_ycoords_all = [x["char_y_center"] for x in chars_list] |
line_ycoords_no_pad = np.unique(line_ycoords_all) |
trials_dict[trial_idx]["x_char_unique"] = list(line_xcoords_no_pad) |
trials_dict[trial_idx]["y_char_unique"] = list(line_ycoords_no_pad) |
x_diff, y_diff = calc_xdiff_ydiff( |
line_xcoords_no_pad, line_ycoords_no_pad, line_heights, allow_multiple_values=False |
) |
trials_dict[trial_idx]["x_diff"] = float(x_diff) |
trials_dict[trial_idx]["y_diff"] = float(y_diff) |
trials_dict[trial_idx]["num_char_lines"] = len(line_ycoords_no_pad) |
trials_dict[trial_idx]["letter_width_avg"] = letter_width_avg |
trials_dict[trial_idx]["line_heights"] = line_heights |
words_list_from_func, chars_list_reconstructed = add_words(chars_list) |
words_list = words_list_from_func |
if close_gap_between_words: |
for widx in range(1, len(words_list)): |
if words_list[widx]["assigned_line"] == words_list[widx - 1]["assigned_line"]: |
word_sep_half_width = (words_list[widx]["word_xmin"] - words_list[widx - 1]["word_xmax"]) / 2 |
words_list[widx - 1]["word_xmax"] = words_list[widx - 1]["word_xmax"] + word_sep_half_width |
words_list[widx]["word_xmin"] = words_list[widx]["word_xmin"] - word_sep_half_width |
else: |
chars_df = pd.DataFrame(chars_list_reconstructed) |
chars_df.loc[ |
chars_df["char"] == " ", ["in_word", "in_word_number", "num_letters_from_start_of_word"] |
] = pd.NA |
chars_list_reconstructed = chars_df.to_dict("records") |
trials_dict[trial_idx]["words_list"] = words_list |
trials_dict[trial_idx]["chars_list"] = chars_list_reconstructed |
return trials_dict |
def get_lines_from_file(uploaded_file, asc_encoding="ISO-8859-15"): |
if isinstance(uploaded_file, str) or isinstance(uploaded_file, pl.Path): |
with open(uploaded_file, "r", encoding=asc_encoding) as f: |
lines = f.readlines() |
else: |
stringio = StringIO(uploaded_file.getvalue().decode(asc_encoding)) |
loaded_str = stringio.read() |
lines = loaded_str.split("\n") |
return lines |
def file_to_trials_and_lines( |
uploaded_file, |
asc_encoding: str = "ISO-8859-15", |
close_gap_between_words=True, |
paragraph_trials_only=True, |
uploaded_ias_files=[], |
trial_start_keyword="START", |
end_trial_at_keyword="END", |
): |
lines = get_lines_from_file(uploaded_file, asc_encoding=asc_encoding) |
trials_dict = asc_lines_to_trials_by_trail_id( |
lines, |
paragraph_trials_only, |
uploaded_file, |
close_gap_between_words=close_gap_between_words, |
ias_files=uploaded_ias_files, |
start_trial_at_keyword=trial_start_keyword, |
end_trial_at_keyword=end_trial_at_keyword, |
) |
if "paragraph_trials" not in trials_dict.keys() and "trial_is" in trials_dict[0].keys(): |
paragraph_trials = [] |
for k in range(trials_dict["max_trial_idx"]): |
if trials_dict[k]["trial_is"] == "paragraph": |
paragraph_trials.append(k) |
trials_dict["paragraph_trials"] = paragraph_trials |
enum = ( |
trials_dict["paragraph_trials"] |
if paragraph_trials_only and "paragraph_trials" in trials_dict.keys() |
else range(trials_dict["max_trial_idx"]) |
) |
for k in enum: |
if "chars_list" in trials_dict[k].keys(): |
max_line = trials_dict[k]["chars_list"][-1]["assigned_line"] |
words_on_lines = {x: [] for x in range(max_line + 1)} |
[words_on_lines[x["assigned_line"]].append(x["char"]) for x in trials_dict[k]["chars_list"]] |
line_list = ["".join([s for s in v]) for idx, v in words_on_lines.items()] |
sentences_temp = "".join([x["char"] for x in trials_dict[k]["chars_list"]]) |
sentences = re.split(r"(?<!\w\.\w.)(?<![A-Z]\.)(?<![A-Z][a-z]\.)(?<=\.|\?)", sentences_temp) |
text = "\n".join([x for x in line_list]) |
trials_dict[k]["sentence_list"] = [s for s in sentences if len(s) > 0] |
trials_dict[k]["line_list"] = line_list |
trials_dict[k]["text"] = text |
trials_dict[k]["max_line"] = max_line |
return trials_dict, lines |
def discard_empty_str_from_list(l): |
return [x for x in l if len(x) > 0] |
def make_folders(gradio_temp_folder, gradio_temp_unzipped_folder, PLOTS_FOLDER): |
gradio_temp_folder.mkdir(exist_ok=True) |
gradio_temp_unzipped_folder.mkdir(exist_ok=True) |
PLOTS_FOLDER.mkdir(exist_ok=True) |
return 0 |
def plotly_plot_with_image( |
dffix, |
trial, |
algo_choice, |
saccade_df=None, |
to_plot_list=["Uncorrected Fixations", "Corrected Fixations", "Word boxes"], |
lines_in_plot="Uncorrected", |
scale_factor=0.5, |
font="DejaVu Sans Mono", |
box_annotations: list = None, |
): |
mpl_fig, img_width, img_height = matplotlib_plot_df( |
dffix, |
trial, |
algo_choice, |
None, |
desired_dpi=300, |
fix_to_plot=[], |
stim_info_to_plot=to_plot_list, |
font=font, |
box_annotations=box_annotations, |
) |
mpl_fig.savefig(TEMP_FIGURE_STIMULUS_PATH) |
plt.close(mpl_fig) |
if lines_in_plot == "Uncorrected": |
uncorrected_plot_mode = "markers+lines+text" |
else: |
uncorrected_plot_mode = "markers+text" |
if lines_in_plot == "Corrected": |
corrected_plot_mode = "markers+lines+text" |
else: |
corrected_plot_mode = "markers+text" |
if lines_in_plot == "Both": |
uncorrected_plot_mode = "markers+lines+text" |
corrected_plot_mode = "markers+lines+text" |
fig = go.Figure() |
fig.add_trace( |
go.Scatter( |
x=[0, img_width * scale_factor], |
y=[img_height * scale_factor, 0], |
mode="markers", |
marker_opacity=0, |
name="scale_helper", |
) |
) |
fig.update_xaxes(visible=False, range=[0, img_width * scale_factor]) |
fig.update_yaxes( |
visible=False, |
range=[img_height * scale_factor, 0], |
scaleanchor="x", |
) |
if ( |
"Words" in to_plot_list |
or "Word boxes" in to_plot_list |
or "Character boxes" in to_plot_list |
or "Characters" in to_plot_list |
): |
imsource = Image.open(str(TEMP_FIGURE_STIMULUS_PATH)) |
fig.add_layout_image( |
dict( |
x=0, |
sizex=img_width * scale_factor, |
y=0, |
sizey=img_height * scale_factor, |
xref="x", |
yref="y", |
opacity=1.0, |
layer="below", |
sizing="stretch", |
source=imsource, |
) |
) |
duration_scaled = dffix.duration - dffix.duration.min() |
duration_scaled = ((duration_scaled / duration_scaled.max()) - 0.5) * 3 |
duration = sigmoid(duration_scaled) * 50 * scale_factor |
if "Uncorrected Fixations" in to_plot_list: |
fig.add_trace( |
go.Scatter( |
x=dffix.x * scale_factor, |
y=dffix.y * scale_factor, |
mode=uncorrected_plot_mode, |
name="Raw fixations", |
marker=dict( |
color=COLORS[-1], |
symbol="arrow", |
size=duration.values, |
angleref="previous", |
), |
line=dict(color=COLORS[-1], width=2 * scale_factor), |
text=np.arange(dffix.shape[0]), |
textposition="top right", |
textfont=dict( |
family="sans serif", |
size=23 * scale_factor, |
color=COLORS[-1], |
), |
hovertext=[f"x:{x}, y:{y}, n:{num}" for x, y, num in zip(dffix.x, dffix[f"y"], range(dffix.shape[0]))], |
opacity=0.9, |
) |
) |
if "Corrected Fixations" in to_plot_list: |
if isinstance(algo_choice, list): |
algo_choices = algo_choice |
repeats = range(len(algo_choice)) |
else: |
algo_choices = [algo_choice] |
repeats = range(1) |
for algoIdx in repeats: |
algo_choice = algo_choices[algoIdx] |
if f"y_{algo_choice}" in dffix.columns: |
fig.add_trace( |
go.Scatter( |
x=dffix.x * scale_factor, |
y=dffix.loc[:, f"y_{algo_choice}"] * scale_factor, |
mode=corrected_plot_mode, |
name=algo_choice, |
marker=dict( |
color=COLORS[algoIdx], |
symbol="arrow", |
size=duration.values, |
angleref="previous", |
), |
line=dict(color=COLORS[algoIdx], width=1.5 * scale_factor), |
text=np.arange(dffix.shape[0]), |
textposition="top center", |
textfont=dict( |
family="sans serif", |
size=22 * scale_factor, |
color=COLORS[algoIdx], |
), |
hovertext=[ |
f"x:{x}, y:{y}, n:{num}" |
for x, y, num in zip(dffix.x, dffix[f"y_{algo_choice}"], range(dffix.shape[0])) |
], |
opacity=0.9, |
) |
) |
if "Saccades" in to_plot_list: |
duration_scaled = saccade_df.duration - saccade_df.duration.min() |
duration_scaled = ((duration_scaled / duration_scaled.max()) - 0.5) * 3 |
duration = sigmoid(duration_scaled) * 65 * scale_factor |
starting_coordinates = [tuple(row * scale_factor) for row in saccade_df.loc[:, ["xs", "ys"]].values] |
ending_coordinates = [tuple(row * scale_factor) for row in saccade_df.loc[:, ["xe", "ye"]].values] |
for sidx, (start, end) in enumerate(zip(starting_coordinates, ending_coordinates)): |
if sidx == 0: |
show_legend = True |
else: |
show_legend = False |
fig.add_trace( |
go.Scatter( |
x=[start[0], end[0]], |
y=[start[1], end[1]], |
mode="markers+lines+text", |
line=dict(color=COLORS[-1], width=1.5 * scale_factor, dash="dash"), |
showlegend=show_legend, |
legendgroup="1", |
name="Saccades", |
text=sidx, |
textposition="top center", |
textfont=dict(family="sans serif", size=22 * scale_factor, color=COLORS[-1]), |
marker=dict( |
color=COLORS[-1], |
symbol="arrow", |
size=duration.values, |
angleref="previous", |
), |
) |
) |
if "Saccades snapped to line" in to_plot_list: |
duration_scaled = saccade_df.duration - saccade_df.duration.min() |
duration_scaled = ((duration_scaled / duration_scaled.max()) - 0.5) * 3 |
duration = sigmoid(duration_scaled) * 65 * scale_factor |
if isinstance(algo_choice, list): |
algo_choices = algo_choice |
repeats = range(len(algo_choice)) |
else: |
algo_choices = [algo_choice] |
repeats = range(1) |
for algoIdx in repeats: |
algo_choice = algo_choices[algoIdx] |
if f"ys_{algo_choice}" in saccade_df.columns: |
starting_coordinates = [ |
tuple(row * scale_factor) for row in saccade_df.loc[:, ["xs", f"ys_{algo_choice}"]].values |
] |
ending_coordinates = [ |
tuple(row * scale_factor) for row in saccade_df.loc[:, ["xe", f"ye_{algo_choice}"]].values |
] |
for sidx, (start, end) in enumerate(zip(starting_coordinates, ending_coordinates)): |
if sidx == 0: |
show_legend = True |
else: |
show_legend = False |
fig.add_trace( |
go.Scatter( |
x=[start[0], end[0]], |
y=[start[1], end[1]], |
mode="markers+lines", |
line=dict(color=COLORS[algoIdx], width=1.5 * scale_factor, dash="dash"), |
showlegend=show_legend, |
legendgroup="2", |
text=sidx, |
textposition="top center", |
textfont=dict(family="sans serif", size=22 * scale_factor, color=COLORS[algoIdx]), |
name="Saccades snapped to line", |
marker=dict( |
color=COLORS[algoIdx], |
symbol="arrow", |
size=duration.values, |
angleref="previous", |
), |
) |
) |
fig.update_layout( |
plot_bgcolor=None, |
width=img_width * scale_factor, |
height=img_height * scale_factor, |
margin={"l": 0, "r": 0, "t": 0, "b": 0}, |
legend=dict(orientation="h", yanchor="bottom", y=-0.1, xanchor="right", x=0.8), |
) |
for trace in fig["data"]: |
if trace["name"] == "scale_helper": |
trace["showlegend"] = False |
return fig |
def plot_fix_measure( |
dffix, |
plot_choices, |
x_axis_selection, |
margin=dict(t=40, l=10, r=10, b=1), |
label_start="Fixation", |
): |
y_label = f"{label_start} Feature" |
if x_axis_selection == "Index": |
num_datapoints = dffix.shape[0] |
x_label = f"{label_start} Number" |
x_nums = np.arange(num_datapoints) |
elif x_axis_selection == "Start Time": |
x_label = f"{label_start} Start Time" |
x_nums = dffix["start_time"] |
layout = dict( |
plot_bgcolor="white", |
autosize=True, |
margin=margin, |
xaxis=dict( |
title=x_label, |
linecolor="black", |
range=[x_nums.min() - 1, x_nums.max() + 1], |
showgrid=False, |
mirror="all", |
showline=True, |
), |
yaxis=dict( |
title=y_label, |
side="left", |
linecolor="black", |
showgrid=False, |
mirror="all", |
showline=True, |
), |
legend=dict(orientation="v", yanchor="middle", y=0.95, xanchor="left", x=1.05), |
) |
fig = go.Figure(layout=layout) |
for pidx, plot_choice in enumerate(plot_choices): |
fig.add_trace( |
go.Scatter( |
x=x_nums, |
y=dffix.loc[:, plot_choice], |
mode="markers", |
name=plot_choice, |
marker_color=COLORS[pidx], |
marker_size=3, |
showlegend=True, |
) |
) |
fig.update_yaxes(zeroline=True, zerolinewidth=1, zerolinecolor="black") |
return fig |
def plot_y_corr(dffix, algo_choice, margin=dict(t=40, l=10, r=10, b=1)): |
num_datapoints = len(dffix.x) |
layout = dict( |
plot_bgcolor="white", |
autosize=True, |
margin=margin, |
xaxis=dict( |
title="Fixation Index", |
linecolor="black", |
range=[-1, num_datapoints + 1], |
showgrid=False, |
mirror="all", |
showline=True, |
), |
yaxis=dict( |
title="y correction", |
side="left", |
linecolor="black", |
showgrid=False, |
mirror="all", |
showline=True, |
), |
legend=dict(orientation="v", yanchor="middle", y=0.95, xanchor="left", x=1.05), |
) |
if isinstance(dffix, dict): |
dffix = dffix["value"] |
algo_string = algo_choice[0] if isinstance(algo_choice, list) else algo_choice |
if f"y_{algo_string}_correction" not in dffix.columns: |
ic("No line-assignment column found in dataframe") |
return go.Figure(layout=layout) |
if isinstance(dffix, dict): |
dffix = dffix["value"] |
fig = go.Figure(layout=layout) |
if isinstance(algo_choice, list): |
algo_choices = algo_choice |
repeats = range(len(algo_choice)) |
else: |
algo_choices = [algo_choice] |
repeats = range(1) |
for algoIdx in repeats: |
algo_choice = algo_choices[algoIdx] |
fig.add_trace( |
go.Scatter( |
x=np.arange(num_datapoints), |
y=dffix.loc[:, f"y_{algo_choice}_correction"], |
mode="markers", |
name=f"{algo_choice} y correction", |
marker_color=COLORS[algoIdx], |
marker_size=3, |
showlegend=True, |
) |
) |
fig.update_yaxes(zeroline=True, zerolinewidth=1, zerolinecolor="black") |
return fig |
if not os.path.isdir(EXAMPLES_FOLDER): |
if not os.path.exists(EXAMPLES_ASC_ZIP_FILENAME): |
if os.path.exists(EXAMPLES_ASC_ZIP_FILENAME): |
EXAMPLE_ASC_FILES = [x for x in EXAMPLES_FOLDER_PATH.glob("*.asc")] |
if len(EXAMPLE_ASC_FILES) != 4: |
try: |
with zipfile.ZipFile(EXAMPLES_ASC_ZIP_FILENAME, "r") as zip_ref: |
zip_ref.extractall(EXAMPLES_FOLDER) |
except Exception as e: |
ic(e) |
ic(f"Extracting {EXAMPLES_ASC_ZIP_FILENAME} failed") |
EXAMPLE_ASC_FILES = [x for x in EXAMPLES_FOLDER_PATH.glob("*.asc")] |
else: |