GazeGenie / process_asc_files_in_multi_p.py
hugpv's picture
initial commit
da572bf
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
import json
from sys import platform as _platform
from functools import partial
import multiprocessing
import os
from tqdm.auto import tqdm
from multi_proc_funcs import DIST_MODELS_FOLDER, process_trial_choice, set_up_models
import sys
import pandas as pd
def get_cpu_count():
if os.sys.platform in ("linux", "linux2", "darwin"):
return os.cpu_count()
elif os.sys.platform == "win32":
return multiprocessing.cpu_count()
else:
return 1
def process_asc_files_in_multi_proc(
algo_choice,
choice_handle_short_and_close_fix,
discard_fixations_without_sfix,
discard_far_out_of_text_fix,
x_thres_in_chars,
y_thresh_in_heights,
short_fix_threshold,
merge_distance_threshold,
discard_long_fix,
discard_long_fix_threshold,
discard_blinks,
measures_to_calculate_multi_asc,
include_coords_multi_asc,
sent_measures_to_calculate_multi_asc,
trials_by_ids,
classic_algos_cfg,
models_dict,
fix_cols_to_add_multi_asc,
):
funcc = partial(
process_trial_choice,
algo_choice=algo_choice,
choice_handle_short_and_close_fix=choice_handle_short_and_close_fix,
for_multi=True,
discard_fixations_without_sfix=discard_fixations_without_sfix,
discard_far_out_of_text_fix=discard_far_out_of_text_fix,
x_thres_in_chars=x_thres_in_chars,
y_thresh_in_heights=y_thresh_in_heights,
short_fix_threshold=short_fix_threshold,
merge_distance_threshold=merge_distance_threshold,
discard_long_fix=discard_long_fix,
discard_long_fix_threshold=discard_long_fix_threshold,
discard_blinks=discard_blinks,
measures_to_calculate_multi_asc=measures_to_calculate_multi_asc,
include_coords_multi_asc=include_coords_multi_asc,
sent_measures_to_calculate_multi_asc=sent_measures_to_calculate_multi_asc,
classic_algos_cfg=classic_algos_cfg,
models_dict=models_dict,
fix_cols_to_add=fix_cols_to_add_multi_asc,
)
workers = min(len(trials_by_ids), 32, get_cpu_count() - 1)
with multiprocessing.Pool(workers) as pool:
out = pool.map(funcc, trials_by_ids.values())
return out
def make_json_compatible(obj):
if isinstance(obj, dict):
return {k: make_json_compatible(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [make_json_compatible(v) for v in obj]
elif isinstance(obj, pd.DataFrame):
return obj.to_dict(orient="records")
elif isinstance(obj, pd.Series):
return obj.to_dict()
else:
return obj
def main():
try:
input_data = sys.stdin.buffer.read()
(
algo_choice,
choice_handle_short_and_close_fix,
discard_fixations_without_sfix,
discard_far_out_of_text_fix,
x_thres_in_chars,
y_thresh_in_heights,
short_fix_threshold,
merge_distance_threshold,
discard_long_fix,
discard_long_fix_threshold,
discard_blinks,
measures_to_calculate_multi_asc,
include_coords_multi_asc,
sent_measures_to_calculate_multi_asc,
trials_by_ids,
classic_algos_cfg,
models_dict,
fix_cols_to_add_multi_asc,
) = json.loads(input_data)
if (
"DIST" in algo_choice
or "Wisdom_of_Crowds_with_DIST" in algo_choice
or "DIST-Ensemble" in algo_choice
or "Wisdom_of_Crowds_with_DIST_Ensemble" in algo_choice
):
del models_dict # Needed to stop pickling from failing for multiproc
models_dict = set_up_models(DIST_MODELS_FOLDER)
else:
models_dict = {}
out = process_asc_files_in_multi_proc(
algo_choice,
choice_handle_short_and_close_fix,
discard_fixations_without_sfix,
discard_far_out_of_text_fix,
x_thres_in_chars,
y_thresh_in_heights,
short_fix_threshold,
merge_distance_threshold,
discard_long_fix,
discard_long_fix_threshold,
discard_blinks,
measures_to_calculate_multi_asc,
include_coords_multi_asc,
sent_measures_to_calculate_multi_asc,
trials_by_ids,
classic_algos_cfg,
models_dict,
fix_cols_to_add_multi_asc,
)
out2 = []
for dffix, trial in out:
dffix = dffix.to_dict("records")
trial = make_json_compatible(trial)
out2.append((dffix, trial))
json_data_out = json.dumps(out2)
sys.stdout.flush()
print(json_data_out)
except Exception as e:
print(json.dumps({"error": str(e)}))
if __name__ == "__main__":
main()