|
"""Mostly adapted from https://github.com/martin-vasilev/EMreading |
|
Moslty deprecated in favour of alternative methods.""" |
|
|
|
from icecream import ic |
|
from io import StringIO |
|
import re |
|
import numpy as np |
|
import pandas as pd |
|
|
|
|
|
def assign_chars_to_words(df): |
|
df.reset_index(inplace=True, names="index_temp") |
|
df["wordID"] = "" |
|
df["char_word"] = -1 |
|
word_list = [] |
|
cols = [] |
|
sent_list = df["sent"].unique() |
|
|
|
for i in range(len(sent_list)): |
|
word_list = df[df["sent"] == i]["word"].unique() |
|
for j in range(len(word_list)): |
|
cols = df[(df["sent"] == i) & (df["word"] == word_list[j])].index |
|
df.loc[cols, "wordID"] = "".join(df["char"].loc[cols]) |
|
df.loc[(df["sent"] == i) & (df["word"] == word_list[j]), "char_word"] = [k for k in range(len(cols))] |
|
df.set_index("index_temp", inplace=True) |
|
return df |
|
|
|
|
|
def round_and_int(value): |
|
if not pd.isna(value): |
|
return int(round(value)) |
|
else: |
|
return None |
|
|
|
|
|
def get_coord_map(coords, x=1920, y=1080): |
|
""" |
|
Original R version: |
|
```R |
|
# Use stimuli information to create a coordinate map_arr for each pixel on the screen |
|
# This makes it possible to find exactly what participants were fixating |
|
coord_map_arr<- function(coords, x=resolution_x, y= resolution_y){ |
|
|
|
coords$id<- 1:nrow(coords) |
|
map_arr<- data.frame(matrix(NA, nrow = y, ncol = x)) |
|
|
|
for(i in 1:nrow(coords)){ |
|
map_arr[coords$y1[i]:coords$y2[i],coords$x1[i]:coords$x2[i]]<- coords$id[i] |
|
|
|
} |
|
|
|
return(map_arr) |
|
}``` |
|
""" |
|
coords.reset_index(drop=True, inplace=True) |
|
y1 = coords["char_ymin"].map(round_and_int) |
|
y2 = coords["char_ymax"].map(round_and_int) |
|
x1 = coords["char_xmin"].map(round_and_int) |
|
x2 = coords["char_xmax"].map(round_and_int) |
|
coords["id"] = np.arange(len(coords)) |
|
map_arr = np.full((y, x), np.nan) |
|
|
|
for i in range(len(coords)): |
|
map_arr[y1[i] : y2[i] + 1, x1[i] : x2[i] + 1] = coords["id"].iloc[i] |
|
|
|
np.sum(pd.isna(map_arr), axis=None) |
|
return map_arr |
|
|
|
|
|
def get_char_num_for_each_line(df): |
|
df.reset_index(inplace=True, names="index_temp") |
|
df["line_char"] = np.nan |
|
unq_line = df["assigned_line"].unique() |
|
for i in unq_line: |
|
assigned_line = df[df["assigned_line"] == i].index |
|
df.loc[assigned_line, "line_char"] = range(len(assigned_line)) |
|
df.set_index("index_temp", inplace=True) |
|
return df |
|
|
|
|
|
def parse_fix( |
|
file, |
|
trial_db, |
|
): |
|
|
|
indexrange = list(range(trial_db["trial_start_idx"], trial_db["trial_end_idx"] + 1)) |
|
|
|
sfix_stamps = [i for i in indexrange if re.search(r"(?i)(SFIX)", file[i])] |
|
|
|
efix_stamps = [i for i in indexrange if re.search(r"(?i)EFIX", file[i])] |
|
|
|
if len(sfix_stamps) > (len(efix_stamps) + 1): |
|
ic(f"length mismatch parse_fix of {len(sfix_stamps) - (len(efix_stamps))}") |
|
|
|
if not sfix_stamps or not efix_stamps: |
|
raw_fix = None |
|
return raw_fix |
|
for safe_num in range(25): |
|
if efix_stamps[0] < sfix_stamps[0]: |
|
efix_stamps = efix_stamps[1:] |
|
elif efix_stamps[-1] <= sfix_stamps[-1]: |
|
sfix_stamps = sfix_stamps[:-1] |
|
elif efix_stamps[0] >= sfix_stamps[0]: |
|
sfix_stamps = sfix_stamps[1:] |
|
if not (len(efix_stamps) != len(sfix_stamps) and len(efix_stamps) > 1 and len(sfix_stamps) > 1): |
|
break |
|
|
|
def parse_sacc(string): |
|
a = string.split(" ") |
|
return float(a[2]) |
|
|
|
esacc_flag = [file[f - 1] if "ESACC" in file[f - 1] else None for f in sfix_stamps] |
|
saccDur = [] |
|
for k in esacc_flag: |
|
if k is None: |
|
saccDur.append(None) |
|
else: |
|
saccDur.append(parse_sacc(k)) |
|
|
|
s_time = [int(file[s].strip().split(" ")[-1]) for s in sfix_stamps] |
|
|
|
e_time = [int(file[s - 1].strip().split(" ")[0]) for s in efix_stamps] |
|
if len(s_time) != len(e_time): |
|
if s_time[-1] > e_time[-1]: |
|
s_time = s_time[:-1] |
|
|
|
fixDur = [e_time[index] - s_time[index] for index in range(len(s_time))] |
|
fixDur = [e - s for e, s in zip(e_time, s_time)] |
|
assert ~(np.asarray(fixDur) < 0).any() |
|
x = [float(file[fidx].split("\t")[3]) for fidx in efix_stamps] |
|
y = [float(file[fidx].split("\t")[4]) for fidx in efix_stamps] |
|
blink_stamp = [index for index in indexrange if "EBLINK" in file[index]] |
|
blink_time = [float(file[index].strip().replace("\t", " ").split(" ")[2]) - 1 for index in blink_stamp] |
|
index = np.searchsorted(s_time, blink_time, side="right") - 1 |
|
blink = np.zeros((len(s_time))) |
|
blink[index] = -1 |
|
raw_fix = pd.DataFrame( |
|
{"s_time": s_time, "e_time": e_time, "fixDur": fixDur, "saccDur": saccDur, "x": x, "y": y, "blink": blink} |
|
) |
|
return raw_fix |
|
|
|
|
|
def process_fix_EM(fix, coords_map, coords, SL): |
|
resolution_y, resolution_x = coords_map.shape |
|
loc = None |
|
raw_fix = pd.DataFrame() |
|
num_fixations = len(fix) |
|
SFIX = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
EFIX = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
x = np.full(num_fixations, np.nan) |
|
y = np.full(num_fixations, np.nan) |
|
fix_num = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
fix_dur = np.full(num_fixations, None) |
|
sent = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
line = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
word = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
char_trial = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
char_line = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
word_line = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
max_sent = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
max_word = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
regress = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
blink = pd.array([None] * num_fixations, dtype=pd.BooleanDtype()) |
|
outOfBnds = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
outsideText = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
wordID = np.full(num_fixations, None) |
|
land_pos = pd.array([None] * num_fixations, dtype=pd.Int64Dtype()) |
|
sacc_len = np.full(num_fixations, np.nan) |
|
|
|
max_sentence = coords["in_sentence_number"].max() |
|
|
|
curr_sent = np.zeros((max_sentence + 1, 2)) |
|
curr_sent[: max_sentence + 1, 0] = np.arange(0, max_sentence + 1) |
|
|
|
if isinstance(coords["index"], str): |
|
coords["index"] = pd.to_numeric(coords["index"], errors="coerce") |
|
|
|
for j in range(len(fix)): |
|
if (fix["y"][j] > 0) and (fix["x"][j] > 0) and (fix["y"][j] <= resolution_y) and (fix["x"][j] <= resolution_x): |
|
loc = coords_map[round(fix["y"][j]), round(fix["x"][j])] |
|
if pd.isnull(loc): |
|
loc = None |
|
else: |
|
loc = None |
|
|
|
fix_num[j] = j |
|
fix_dur[j] = fix["duration"][j] |
|
SFIX[j] = fix["start_uncorrected"][j] |
|
EFIX[j] = fix["stop_uncorrected"][j] |
|
x[j] = fix["x"][j] |
|
y[j] = fix["y"][j] |
|
blink[j] = fix["blink"][j] |
|
|
|
if x[j] < 1 or x[j] > resolution_x or y[j] < 1 or y[j] > resolution_y: |
|
outOfBnds[j] = 1 |
|
else: |
|
outOfBnds[j] = 0 |
|
outsideText[j] = 1 if loc is None else 0 |
|
|
|
if fix["x"][j] < 0: |
|
loc = None |
|
outOfBnds[j] = 1 |
|
outsideText[j] = 1 |
|
|
|
if loc is not None: |
|
sent[j] = coords["in_sentence_number"][loc] |
|
line[j] = coords["assigned_line"][loc] |
|
word[j] = coords["in_word_number"][loc] |
|
word_line[j] = coords["wordline"][loc] |
|
char_trial[j] = coords["index"][loc] + 1 |
|
char_line[j] = coords["letline"][loc] |
|
wordID[j] = coords["in_word"][loc] |
|
land_pos[j] = coords["letword"][loc] |
|
|
|
if j > 0 and not pd.isna(char_trial[j]) and not pd.isna(char_trial[j - 1]): |
|
sacc_len[j] = abs(char_trial[j] - char_trial[j - 1]) |
|
else: |
|
sacc_len[j] = np.nan |
|
else: |
|
sent[j] = np.nan |
|
line[j] = np.nan |
|
word[j] = np.nan |
|
word_line[j] = np.nan |
|
char_trial[j] = np.nan |
|
char_line[j] = np.nan |
|
wordID[j] = np.nan |
|
land_pos[j] = np.nan |
|
sacc_len[j] = np.nan |
|
|
|
if SL: |
|
if loc is not None: |
|
if j == 0: |
|
max_sent[j] = sent[j] |
|
else: |
|
max_sent[j] = max_sent[j - 1] if pd.isna(sent[j]) or pd.isna(max_sent[j - 1]) else max_sent[j - 1] |
|
if not (pd.isna(max_sent[j]) or pd.isna(sent[j])) and sent[j] > max_sent[j]: |
|
max_sent[j] = sent[j] |
|
|
|
if j == 0: |
|
max_word[j] = abs(word[j]) |
|
curr_sent[sent[j] - 1, 1] = abs(word[j]) |
|
else: |
|
max_word[j] = ( |
|
curr_sent[sent[j] - 1, 1] |
|
if pd.isna(word[j]) or pd.isna(curr_sent[sent[j] - 1, 1]) |
|
else curr_sent[sent[j] - 1, 1] |
|
) |
|
if not (pd.isna(word[j]) or pd.isna(max_word[j])) and abs(word[j]) > curr_sent[sent[j] - 1, 1]: |
|
max_word[j] = abs(word[j]) |
|
curr_sent[sent[j] - 1, 1] = abs(word[j]) |
|
|
|
if not (pd.isna(word[j]) or pd.isna(max_word[j])) and abs(word[j]) < max_word[j]: |
|
regress[j] = 1 |
|
else: |
|
regress[j] = 0 |
|
|
|
if j > 0 and not pd.isna(word[j]): |
|
if pd.isna(regress[j - 1]): |
|
regress[j] = np.nan |
|
else: |
|
if abs(word[j]) == max_word[j] and regress[j - 1] == 1 and word[j] in np.unique(word[:j]): |
|
regress[j] = 1 |
|
|
|
raw_fix = pd.DataFrame( |
|
{ |
|
"start_uncorrected": SFIX, |
|
"stop_uncorrected": EFIX, |
|
"x": x, |
|
"y": y, |
|
"fixation_number": fix_num, |
|
"on_sentence_number_EM": sent, |
|
"line_EM": line, |
|
"word_EM": word, |
|
"word_line_EM": word_line, |
|
"char_trial_EM": char_trial, |
|
"char_line_EM": char_line, |
|
"regress_EM": regress, |
|
"wordID_EM": wordID, |
|
"land_pos_EM": land_pos, |
|
"sacc_len_EM": sacc_len, |
|
"blink_EM": blink, |
|
"outOfBnds_EM": outOfBnds, |
|
"outsideText_EM": outsideText, |
|
} |
|
) |
|
|
|
fix2 = fix.merge( |
|
raw_fix, |
|
on=[ |
|
"start_uncorrected", |
|
"stop_uncorrected", |
|
"x", |
|
"y", |
|
"fixation_number", |
|
], |
|
how="left", |
|
) |
|
return fix2 |
|
|
|
|
|
def RS(i, rawfix, coords, reqYthresh, reqXthresh, Ythresh, Xthresh, threshSimilar): |
|
|
|
if i == 0: |
|
return 0 |
|
|
|
lw = coords["char_xmax"][0] - coords["char_xmin"][0] |
|
lh = coords["char_ymax"][0] - coords["char_ymin"][0] |
|
meetXthresh = False |
|
meetYthresh = False |
|
|
|
leftSacc = rawfix["x"][i] < rawfix["x"][i - 1] |
|
downSacc = rawfix["y"][i] > rawfix["y"][i - 1] |
|
|
|
if downSacc & reqYthresh: |
|
Ydiff = lh * Ythresh |
|
trueYdiff = rawfix["y"][i] - rawfix["y"][i - 1] |
|
meetYthresh = trueYdiff >= Ydiff |
|
|
|
if leftSacc & reqXthresh: |
|
Xdiff = lw * Xthresh |
|
trueXdiff = rawfix["x"][i - 1] - rawfix["x"][i] |
|
meetXthresh = trueXdiff >= Xdiff |
|
|
|
maxPoints = 1 + 2 |
|
if reqYthresh: |
|
maxPoints += 1 |
|
if reqXthresh: |
|
maxPoints += 1 |
|
|
|
currPoints = 0 |
|
if leftSacc: |
|
currPoints = currPoints + (1 / maxPoints) |
|
if meetXthresh: |
|
currPoints = currPoints + (1 / maxPoints) |
|
|
|
if downSacc: |
|
currPoints = currPoints + 2 * (1 / maxPoints) |
|
if meetYthresh: |
|
currPoints = currPoints + (1 / maxPoints) |
|
|
|
return round(currPoints, 2) |
|
|
|
|
|
def reMap(rawfix, i, coords_map, coords, newY=None): |
|
rawfix.set_index("fixation_number", inplace=True) |
|
assert i in rawfix.index, "Not in index" |
|
rawfix.loc[i, "reAligned"] = True |
|
rawfix.loc[i, "previous_line"] = rawfix.loc[i, "line_EM"] |
|
rawfix.loc[i, "previous_y"] = rawfix.loc[i, "y"] |
|
if newY != None: |
|
rawfix.loc[i, "y"] = newY |
|
loc = coords_map[round(rawfix.loc[i, "y"]), round(rawfix.loc[i, "x"])] |
|
if pd.isnull(loc): |
|
return rawfix |
|
rawfix.loc[i, "on_sentence_number_EM"] = coords["in_sentence_number"][loc] |
|
rawfix.loc[i, "word_EM"] = coords["in_word_number"][loc] |
|
rawfix.loc[i, "line_EM"] = coords["assigned_line"][loc] |
|
|
|
return rawfix.reset_index(drop=False, names=["fixation_number"]) |
|
|
|
|
|
def reAlign(rawfix, coords, coords_map, RSpar): |
|
|
|
ystart = coords["char_ymin"].min() |
|
yend = coords["char_ymax"].max() |
|
nlines = coords["assigned_line"].max() |
|
letterHeight = coords["char_ymax"][0] - coords["char_ymin"][0] |
|
xstart = pd.DataFrame(columns=["1", "2"]) |
|
xstart["1"] = np.arange(nlines + 1) |
|
ystart = pd.DataFrame(columns=["1", "2"]) |
|
ystart["1"] = np.arange(nlines + 1) |
|
xend = pd.DataFrame(columns=["1", "2"]) |
|
xend["1"] = np.arange(nlines + 1) |
|
yend = pd.DataFrame(columns=["1", "2"]) |
|
yend["1"] = np.arange(nlines + 1) |
|
rawfix["previous_x"] = np.nan |
|
|
|
for i in coords["assigned_line"].unique(): |
|
a = coords[coords["assigned_line"] == i] |
|
xstart.loc[i, "2"] = a["char_xmin"].min() |
|
xend.loc[i, "2"] = a["char_xmax"].max() |
|
ystart.loc[i, "2"] = a["char_ymin"].min() |
|
yend.loc[i, "2"] = a["char_ymax"].min() |
|
|
|
lineCenter = ystart["2"] + letterHeight / 2 |
|
|
|
rawfix["prob_return_sweep"] = np.nan |
|
rawfix["prob_interline_saccade"] = np.nan |
|
rawfix["reAligned"] = False |
|
rawfix["previous_y"] = np.nan |
|
rawfix["previous_line"] = np.nan |
|
|
|
for i in range(rawfix.shape[0]): |
|
rawfix.loc[i, "prob_return_sweep"] = RS( |
|
i, |
|
rawfix, |
|
coords, |
|
reqYthresh=True, |
|
reqXthresh=True, |
|
Ythresh=RSpar[0], |
|
Xthresh=RSpar[1], |
|
threshSimilar=RSpar[2], |
|
) |
|
|
|
if i > 0: |
|
if (rawfix["prob_return_sweep"][i] < 1) & (rawfix["y"][i] > rawfix["y"][i - 1] + letterHeight / 2): |
|
rawfix.loc[i, "prob_return_sweep"] = 1 |
|
|
|
rawfix.loc[i, "previous_x"] = rawfix["x"][i] |
|
rawfix.loc[i, "previous_y"] = rawfix["y"][i] |
|
|
|
if i > 0: |
|
if rawfix["y"][i] < rawfix["y"][i - 1] - letterHeight / 2: |
|
rawfix.loc[i, "prob_interline_saccade"] = 1 |
|
else: |
|
rawfix.loc[i, "prob_interline_saccade"] = 0 |
|
|
|
RsweepFix = np.sort( |
|
np.concatenate( |
|
(np.where(rawfix["prob_return_sweep"] == 1)[0], np.where(rawfix["prob_interline_saccade"] == 1)[0]) |
|
) |
|
) |
|
|
|
for i in range(len(RsweepFix)): |
|
if i == 0: |
|
linePass = rawfix.loc[: RsweepFix[0] - 1] |
|
|
|
elif i >= len(RsweepFix): |
|
linePass = rawfix.loc[RsweepFix[-1] :] |
|
|
|
else: |
|
linePass = rawfix.loc[RsweepFix[i - 1] : RsweepFix[i] - 1] |
|
|
|
if linePass.shape[0] == 1: |
|
continue |
|
|
|
avgYpos = linePass["y"].mean(skipna=True) |
|
whichLine = min(range(len(lineCenter)), key=lambda index: abs(lineCenter[index] - avgYpos)) |
|
linePass.reset_index(inplace=True, drop=True) |
|
for j in range(linePass.shape[0]): |
|
onLine = (linePass["y"][j] >= ystart["2"][whichLine]) & (linePass["y"][j] <= yend["2"][whichLine]) |
|
|
|
if not onLine: |
|
if linePass["y"][j] < ystart["2"][whichLine]: |
|
rawfix = reMap( |
|
rawfix, linePass.loc[j, "fixation_number"], coords_map, coords, newY=ystart["2"][whichLine] + 5 |
|
) |
|
else: |
|
rawfix = reMap( |
|
rawfix, linePass.loc[j, "fixation_number"], coords_map, coords, newY=yend["2"][whichLine] - 5 |
|
) |
|
rawfix.loc[linePass.loc[j, "fixation_number"], "reAligned"] = True |
|
else: |
|
rawfix.loc[linePass.loc[j, "fixation_number"], "reAligned"] = False |
|
|
|
return rawfix |
|
|
|
|
|
def cleanData( |
|
raw_fix, |
|
algo_choice, |
|
removeBlinks=True, |
|
combineNearbySmallFix=True, |
|
combineMethod="char", |
|
combineDist=1, |
|
removeSmallFix=True, |
|
smallFixCutoff=80, |
|
remove_duration_outliers=True, |
|
outlierMethod="ms", |
|
outlierCutoff=800, |
|
keepRS=False, |
|
): |
|
|
|
if combineNearbySmallFix: |
|
nbefore = raw_fix.shape[0] |
|
which_comb = [] |
|
|
|
for i, _ in enumerate(raw_fix): |
|
prev_line_same = False |
|
next_line_same = False |
|
|
|
if (i > 0) and (i < nbefore - 1): |
|
if combineMethod == "char": |
|
if ( |
|
pd.isna(raw_fix[f"letternum_{algo_choice}"][i]) |
|
or pd.isna(raw_fix[f"letternum_{algo_choice}"][i - 1]) |
|
or pd.isna(raw_fix[f"letternum_{algo_choice}"][i + 1]) |
|
): |
|
continue |
|
|
|
if raw_fix["duration"][i] < smallFixCutoff: |
|
if ( |
|
not pd.isna(raw_fix[f"line_num_{algo_choice}"][i]) |
|
and not pd.isna(raw_fix[f"line_num_{algo_choice}"][i - 1]) |
|
and not pd.isna(raw_fix[f"line_num_{algo_choice}"][i + 1]) |
|
): |
|
|
|
if raw_fix[f"line_num_{algo_choice}"][i] == raw_fix[f"line_num_{algo_choice}"][i - 1]: |
|
prev_line_same = True |
|
if raw_fix[f"line_num_{algo_choice}"][i] == raw_fix[f"line_num_{algo_choice}"][i + 1]: |
|
next_line_same = True |
|
|
|
if combineMethod == "char": |
|
prev = abs(raw_fix[f"letternum_{algo_choice}"][i] - raw_fix[f"letternum_{algo_choice}"][i - 1]) |
|
after = abs(raw_fix[f"letternum_{algo_choice}"][i] - raw_fix[f"letternum_{algo_choice}"][i + 1]) |
|
|
|
else: |
|
prev = abs(round(raw_fix["x"][i]) - round(raw_fix["x"][i - 1])) |
|
after = abs(round(raw_fix["x"][i]) - round(raw_fix["x"][i + 1])) |
|
|
|
if prev <= combineDist: |
|
which_comb.append(i) |
|
|
|
if prev_line_same: |
|
|
|
raw_fix["duration"][i - 1] += raw_fix["duration"][i] |
|
|
|
if keepRS and (raw_fix["Rtn_sweep"][i] == 1): |
|
|
|
raw_fix["Rtn_sweep"][i - 1] = 1 |
|
|
|
if after <= combineDist: |
|
which_comb.append(i) |
|
|
|
if next_line_same: |
|
|
|
raw_fix["duration"][i + 1] += raw_fix["duration"][i] |
|
|
|
if keepRS and (raw_fix["Rtn_sweep"][i] == 1): |
|
|
|
raw_fix["Rtn_sweep"][i + 1] = 1 |
|
|
|
which_comb = list(set(which_comb)) |
|
|
|
if len(which_comb) > 0: |
|
raw_fix = raw_fix.drop(labels=which_comb, axis=0) |
|
nstart = raw_fix.shape[0] |
|
|
|
if removeBlinks: |
|
raw_fix = raw_fix[~raw_fix["blink"]].copy() |
|
nblink = nstart - raw_fix.shape[0] |
|
|
|
if remove_duration_outliers: |
|
if outlierMethod == "ms": |
|
outIndices = np.where(raw_fix["duration"] > outlierCutoff)[0] |
|
if len(outIndices) > 0: |
|
raw_fix = raw_fix.drop(outIndices).copy() |
|
elif outlierMethod == "std": |
|
nSubCutoff, nOutliers = [], 0 |
|
subM = np.mean(raw_fix["duration"]) |
|
subSTD = np.std(raw_fix["duration"]) |
|
cutoff = subM + outlierCutoff * subSTD |
|
nSubCutoff.append((len(np.where(raw_fix[raw_fix["duration"] > cutoff])[0]))) |
|
nOutliers = sum(nSubCutoff) |
|
|
|
return raw_fix.reset_index(drop=True) |
|
|
|
|
|
def get_space(s): |
|
if len(s) == 0 or s == " ": |
|
return 1 |
|
else: |
|
return None |
|
|
|
|
|
def get_num(string): |
|
strr = "".join([i for i in string if i.isdigit()]) |
|
if len(strr) > 0: |
|
return int(strr) |
|
else: |
|
ic(string) |
|
return strr |
|
|
|
|
|
def parse_itemID(trialid): |
|
I = re.search(r"I", trialid).start() |
|
condition = get_num(trialid[:I]) |
|
|
|
D = re.search(r"D", trialid).start() |
|
item = get_num(trialid[I + 1 : D]) |
|
depend = get_num(trialid[D:]) |
|
|
|
E = trialid[0] |
|
|
|
return {"trialid": trialid, "condition": condition, "item": item, "depend": depend, "trial_is": E} |
|
|
|
|
|
def get_coord(str_input): |
|
string = "\n".join( |
|
[l.split("\t")[1].strip() for l in str_input if (("DELAY" not in l) & ("BUTTON" not in l) & ("REGION" in l))] |
|
) |
|
|
|
df = pd.read_table( |
|
StringIO(string), |
|
sep=" ", |
|
names=["X" + str(i) for i in range(1, 12)], |
|
) |
|
df.loc[:, ["char_xmin", "char_ymin", "char_xmax", "char_ymax", "X11"]] = df[ |
|
["char_xmin", "char_ymin", "char_xmax", "char_ymax", "X11"] |
|
].apply(pd.to_numeric, errors="coerce") |
|
df.char = df.char.fillna("") |
|
|
|
a = df[df["char"] == ""].index |
|
for i in a: |
|
if "space" not in df.columns: |
|
df.loc[:, "space"] = None |
|
df.at[i, "space"] = 1 |
|
|
|
if "char_xmin" in df.columns and "char_ymin" in df.columns: |
|
df.at[i, "char_xmin"], df.at[i, "char_ymin"] = df.at[i, "char_ymin"], df.at[i, "char_xmax"] |
|
|
|
if "char_ymin" in df.columns and "char_xmax" in df.columns: |
|
df.at[i, "char_ymin"], df.at[i, "char_xmax"] = df.at[i, "char_xmax"], df.at[i, "char_ymax"] |
|
|
|
if "char_xmax" in df.columns and "char_ymax" in df.columns: |
|
df.at[i, "char_xmax"], df.at[i, "char_ymax"] = df.at[i, "char_ymax"], df.at[i, "X11"] |
|
df = df.drop(columns=["X1", "X2", "X3", "X5"]) |
|
return df |
|
|
|
|
|
def map_sent(df): |
|
|
|
sent_bnd = df[(df.char == ".") | (df.char == "?") | (df.char == "!")].index.tolist() |
|
|
|
if len(sent_bnd) > 0: |
|
sent = pd.Series([-1] * len(df)) |
|
|
|
for i, eidx in enumerate(sent_bnd): |
|
sidx = sent_bnd[i - 1] if i > 0 else 0 |
|
if i == len(sent_bnd) - 1: |
|
sent.loc[sidx:] = len(sent_bnd) - 1 |
|
else: |
|
sent.loc[sidx:eidx] = i |
|
df["sent"] = sent |
|
else: |
|
df["sent"] = 1 |
|
return df |
|
|
|
|
|
def map_line(df): |
|
df = df[~pd.isnull(df["char_ymin"])].reset_index(names="index_temp") |
|
|
|
lines = sorted(set(df["char_ymin"].values)) |
|
|
|
assigned_line = np.array([], dtype=int) |
|
|
|
for i in range(len(lines)): |
|
loc_lines = np.where(df["char_ymin"].values == lines[i])[0] |
|
assigned_line = np.concatenate((assigned_line, np.full(len(loc_lines), fill_value=i))) |
|
df.loc[len(assigned_line) - 1, "space"] = 2 |
|
|
|
df["assigned_line"] = assigned_line |
|
df.set_index("index_temp", inplace=True) |
|
|
|
return df |
|
|
|
|
|
def map_words(df): |
|
curr_sent, curr_line, curr_word = 0, 0, 0 |
|
df["space"] == 2 |
|
|
|
for i in df.index: |
|
newSent = curr_sent != df.loc[i, "sent"] |
|
newLine = curr_line != df.loc[i, "assigned_line"] |
|
|
|
df.loc[i, "word"] = curr_word |
|
if df.loc[i, "char"] == "" and not newSent: |
|
curr_word += 1 |
|
df.loc[i, "word"] = curr_word |
|
|
|
elif newLine: |
|
if df.loc[i, "char"] != ".": |
|
curr_word += 1 |
|
df.loc[i, "word"] = curr_word |
|
curr_line += 1 |
|
|
|
elif newSent: |
|
curr_sent += 1 |
|
curr_word = 0 |
|
df.loc[i, "word"] = curr_word |
|
|
|
return df |
|
|
|
|
|
def get_return_sweeps(raw_fix_new, coords, algo_choice): |
|
currentSent = 0 |
|
currentLine = 0 |
|
maxLine = 0 |
|
inReg = False |
|
|
|
curr_sent = np.zeros((max(coords["in_sentence_number"]) + 1, 4)) |
|
curr_sent[:, 0] = np.arange(0, max(coords["in_sentence_number"]) + 1) |
|
|
|
diff_sent = coords["in_sentence_number"].diff().fillna(0) |
|
last_words = coords.loc[np.where(diff_sent == 1), "in_word_number"] |
|
curr_sent[:, 2] = np.append(last_words.values, coords["in_word_number"].iloc[-1]) |
|
for m in range(1, len(raw_fix_new)): |
|
if not (pd.isna(raw_fix_new["char_line_EM"][m - 1]) or pd.isna(raw_fix_new["char_line_EM"][m])): |
|
raw_fix_new.at[m, "sacc_len_EM"] = abs(raw_fix_new["char_line_EM"][m] - raw_fix_new["char_line_EM"][m - 1]) |
|
|
|
if not pd.isna(raw_fix_new["line_EM"][m]): |
|
currentLine = raw_fix_new["line_EM"][m] |
|
|
|
if currentLine > maxLine: |
|
maxLine = currentLine |
|
raw_fix_new.at[m, "Rtn_sweep"] = 1 |
|
|
|
if m < len(raw_fix_new) - 1: |
|
sameLine = ( |
|
not (pd.isna(raw_fix_new["line_EM"][m + 1]) or pd.isna(raw_fix_new["line_EM"][m])) |
|
and raw_fix_new["line_EM"][m + 1] == raw_fix_new["line_EM"][m] |
|
) |
|
|
|
if raw_fix_new["x"][m + 1] < raw_fix_new["x"][m]: |
|
raw_fix_new.at[m, "Rtn_sweep_type"] = "undersweep" if sameLine else None |
|
else: |
|
raw_fix_new.at[m, "Rtn_sweep_type"] = "accurate" if sameLine else None |
|
else: |
|
raw_fix_new.at[m, "Rtn_sweep_type"] = np.nan |
|
else: |
|
raw_fix_new.at[m, "Rtn_sweep"] = 0 |
|
|
|
if not pd.isna(raw_fix_new["on_sentence_number_EM"][m]): |
|
if m == 1: |
|
curr_sent[int(raw_fix_new["on_sentence_number_EM"][m]), 2] = raw_fix_new["word_EM"][m] |
|
raw_fix_new.at[m, "regress_EM"] = 0 |
|
else: |
|
if raw_fix_new["word_EM"][m] > curr_sent[int(raw_fix_new["on_sentence_number_EM"][m]), 2]: |
|
curr_sent[int(raw_fix_new["on_sentence_number_EM"][m]), 2] = raw_fix_new["word_EM"][m] |
|
inReg = False |
|
|
|
if currentSent < raw_fix_new["on_sentence_number_EM"][m]: |
|
curr_sent[currentSent, 3] = 1 |
|
currentSent = raw_fix_new["on_sentence_number_EM"][m] |
|
|
|
if ( |
|
not pd.isna(raw_fix_new["on_sentence_number_EM"][m - 1]) |
|
and raw_fix_new["on_sentence_number_EM"][m] > raw_fix_new["on_sentence_number_EM"][m - 1] |
|
): |
|
curr_sent[int(raw_fix_new["on_sentence_number_EM"][m - 1]), 3] = 1 |
|
|
|
if ( |
|
raw_fix_new["word_EM"][m] < curr_sent[int(raw_fix_new["on_sentence_number_EM"][m]), 2] |
|
and curr_sent[int(raw_fix_new["on_sentence_number_EM"][m]), 3] == 0 |
|
): |
|
raw_fix_new.at[m, "regress_EM"] = 1 |
|
inReg = True |
|
else: |
|
if curr_sent[int(raw_fix_new["on_sentence_number_EM"][m]), 3] == 0: |
|
raw_fix_new.at[m, "regress_EM"] = 0 |
|
|
|
if ( |
|
raw_fix_new["word_EM"][m] == curr_sent[int(raw_fix_new["on_sentence_number_EM"][m]), 2] |
|
and inReg |
|
): |
|
raw_fix_new.at[m, "regress_EM"] = 1 |
|
else: |
|
raw_fix_new.at[m, "regress_EM"] = 1 |
|
raw_fix_new.at[m, "regress2nd_EM"] = 1 |
|
inReg = True |
|
return raw_fix_new |
|
|
|
|
|
def word_m_EM(n2): |
|
sub_list = [] |
|
item_list = [] |
|
cond_list = [] |
|
seq_list = [] |
|
word_list = [] |
|
wordID_list = [] |
|
sent_list = [] |
|
FFD_list = [] |
|
SFD_list = [] |
|
GD_list = [] |
|
TVT_list = [] |
|
nfix1_list = [] |
|
nfix2_list = [] |
|
nfixAll_list = [] |
|
regress_list = [] |
|
o = n2["sent"].unique() |
|
for k in range(len(o)): |
|
q = n2[n2["sent"] == o[k]] |
|
r = sorted(q["word"].unique()) |
|
|
|
for l in range(len(r)): |
|
word_list.append(r[l]) |
|
sub_list.append(n2["sub"].iloc[0]) |
|
item_list.append(n2["item"].iloc[0]) |
|
seq_list.append(n2["seq"].iloc[0]) |
|
cond_list.append(n2["cond"].iloc[0]) |
|
sent_list.append(o[k]) |
|
|
|
p = q[q["word"] == r[l]] |
|
|
|
if p.shape[0] == 0: |
|
FFD_list.append(None) |
|
SFD_list.append(None) |
|
GD_list.append(None) |
|
TVT_list.append(None) |
|
nfix1_list.append(0) |
|
nfix2_list.append(0) |
|
nfixAll_list.append(0) |
|
else: |
|
p1 = p[p["regress"] == 0] |
|
p2 = p[p["regress"] == 1] |
|
|
|
if p1.shape[0] == 0: |
|
FFD_list.append(None) |
|
SFD_list.append(None) |
|
GD_list.append(None) |
|
elif p1.shape[0] == 1: |
|
FFD_list.append(p1["fix_dur"].iloc[0]) |
|
SFD_list.append(p1["fix_dur"].iloc[0]) |
|
GD_list.append(p1["fix_dur"].iloc[0]) |
|
else: |
|
FFD_list.append(p1["fix_dur"].iloc[0]) |
|
SFD_list.append(None) |
|
GD_list.append(p1["fix_dur"].sum()) |
|
|
|
TVT_list.append(p["fix_dur"].sum()) |
|
nfix1_list.append(p1.shape[0]) |
|
nfix2_list.append(p2.shape[0]) |
|
nfixAll_list.append(p1.shape[0] + p2.shape[0]) |
|
|
|
wordID_list.append(p["wordID"].iloc[0]) |
|
|
|
if nfix2_list[-1] == 0: |
|
regress_list.append(0) |
|
else: |
|
regress_list.append(1) |
|
|
|
dataT = pd.DataFrame( |
|
{ |
|
"sub": sub_list, |
|
"item": item_list, |
|
"cond": cond_list, |
|
"seq": seq_list, |
|
"word": word_list, |
|
"wordID": wordID_list, |
|
"sent": sent_list, |
|
"FFD": FFD_list, |
|
"SFD": SFD_list, |
|
"GD": GD_list, |
|
"TVT": TVT_list, |
|
"nfix1": nfix1_list, |
|
"nfix2": nfix2_list, |
|
"nfixAll": nfixAll_list, |
|
"regress": regress_list, |
|
} |
|
) |
|
|
|
sub_list = [] |
|
item_list = [] |
|
cond_list = [] |
|
seq_list = [] |
|
word_list = [] |
|
wordID_list = [] |
|
sent_list = [] |
|
FFD_list = [] |
|
SFD_list = [] |
|
GD_list = [] |
|
TVT_list = [] |
|
nfix1_list = [] |
|
nfix2_list = [] |
|
nfixAll_list = [] |
|
regress_list = [] |
|
|
|
if "dataN" in locals(): |
|
dataN = pd.concat([dataN, dataT], ignore_index=True) |
|
else: |
|
dataN = dataT |
|
|
|
|
|
def word_measures_EM(data, algo_choice, include_time_stamps=False): |
|
add_blanks = False |
|
|
|
if "blink" in data.columns: |
|
required_columns = ["blink", "prev_blink", "after_blink"] |
|
if all(col in data.columns for col in required_columns): |
|
if (data["blink"] + data["prev_blink"] + data["after_blink"]).sum() == 0: |
|
ic("Blinks appear to be already excluded! \n\n") |
|
else: |
|
add_blanks = True |
|
ic("There appears to be valid blink data! We will map blinks to individual words. \n\n") |
|
|
|
regress_blinks = data[data["blink"] == 1 & ~data["regress_EM"].isna()].index |
|
|
|
if len(regress_blinks) < 1: |
|
BlinkFixTypeNotMapped = True |
|
ic( |
|
"Fixation type is not mapped for observations with blinks. Therefore, blinks can't be mapped in terms of 1st and 2nd pass reading." |
|
) |
|
ic( |
|
"Please note that, by default, blink fixation durations will also not be added to fixation duration measures for that word since it's assumed you will delete this word from analysis.\n" |
|
) |
|
ic("If you need to change this, see settings in the pre-processing function.\n\n") |
|
|
|
data_n = pd.DataFrame() |
|
|
|
o_k = sorted(np.unique(data[f"on_sentence_num_{algo_choice}"])) |
|
|
|
for k, sent_k in enumerate(o_k): |
|
q_k = data[data[f"on_sentence_num_{algo_choice}"] == sent_k] |
|
|
|
p1_k = q_k[q_k["regress_EM"] == 0].copy() |
|
p2_k = q_k[q_k["regress_EM"] == 1].copy() |
|
|
|
RS_word = np.nan |
|
check_next = False |
|
|
|
if max(data[f"line_num_{algo_choice}"]) > 1: |
|
for z, q_row in q_k.iterrows(): |
|
if not pd.isna(q_row["Rtn_sweep"]): |
|
if q_row["Rtn_sweep"] == 1: |
|
check_next = True |
|
RS_Word = ( |
|
q_row[f"line_word_{algo_choice}"] |
|
if not pd.isna(q_row[f"line_word_{algo_choice}"]) |
|
else np.nan |
|
) |
|
elif check_next and (pd.notna(q_row[f"line_word_{algo_choice}"])) and (q_row["regress_EM"]): |
|
break |
|
|
|
word_l = [] |
|
sub_l = [data.loc[0, "subject"]] * len(q_k) |
|
item_l = [data.loc[0, "item"]] * len(q_k) |
|
cond_l = [1] * len(q_k) |
|
|
|
for l, q_row in q_k.iterrows(): |
|
word_l.append(q_row[f"on_word_number_{algo_choice}"]) |
|
|
|
if add_blanks: |
|
sum_1st_pass = ( |
|
q_row["blink"] |
|
+ p1_k[p1_k.index[q_row.name]]["prev_blink"] |
|
+ p2_k[p2_k.index[q_row.name]]["after_blink"] |
|
).sum() |
|
|
|
blinks_l = [0] * len(word_l) |
|
if sum_1st_pass > 0: |
|
blinks_l[l] = 1 |
|
|
|
for l, q_row in q_k.iterrows(): |
|
word_line_l = [q_row[f"line_word_{algo_choice}"]] |
|
|
|
line_l = [q_row[f"line_num_{algo_choice}"]] |
|
|
|
if include_time_stamps: |
|
EFIX_SFD_l = [np.nan] |
|
|
|
for l, q_row in q_k.iterrows(): |
|
word_line_l.append(q_row[f"line_word_{algo_choice}"]) |
|
line_l.append(q_row[f"line_num_{algo_choice}"]) |
|
|
|
if include_time_stamps: |
|
|
|
if len(p1_k) > 0: |
|
|
|
if len(p1_k) == 1: |
|
EFIX_SFD_l.append(p1_k["stop_uncorrected"][0]) |
|
|
|
data_t = pd.DataFrame( |
|
list( |
|
zip( |
|
sub_l, |
|
item_l, |
|
cond_l, |
|
word_l, |
|
line_l, |
|
) |
|
), |
|
columns=[ |
|
"subject", |
|
"item", |
|
"condition", |
|
f"on_word_number_{algo_choice}", |
|
f"line_num_{algo_choice}", |
|
"FFD", |
|
"SFD", |
|
"GD", |
|
"TVT", |
|
"nfix1", |
|
"nfix2", |
|
"nfixAll", |
|
"regress", |
|
], |
|
) |
|
|
|
if add_blanks: |
|
data_t["blinks_1stPass"] = blinks_l |
|
|
|
data_n = pd.concat([data_n, data_t], ignore_index=True) |
|
|
|
return data_n |
|
|