|
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed |
|
import json |
|
from sys import platform as _platform |
|
from functools import partial |
|
import multiprocessing |
|
import os |
|
from tqdm.auto import tqdm |
|
from multi_proc_funcs import DIST_MODELS_FOLDER, process_trial_choice, set_up_models |
|
import sys |
|
import pandas as pd |
|
|
|
|
|
def get_cpu_count(): |
|
if os.sys.platform in ("linux", "linux2", "darwin"): |
|
return os.cpu_count() |
|
elif os.sys.platform == "win32": |
|
return multiprocessing.cpu_count() |
|
else: |
|
return 1 |
|
|
|
|
|
def process_asc_files_in_multi_proc( |
|
algo_choice, |
|
choice_handle_short_and_close_fix, |
|
discard_fixations_without_sfix, |
|
discard_far_out_of_text_fix, |
|
x_thres_in_chars, |
|
y_thresh_in_heights, |
|
short_fix_threshold, |
|
merge_distance_threshold, |
|
discard_long_fix, |
|
discard_long_fix_threshold, |
|
discard_blinks, |
|
measures_to_calculate_multi_asc, |
|
include_coords_multi_asc, |
|
sent_measures_to_calculate_multi_asc, |
|
trials_by_ids, |
|
classic_algos_cfg, |
|
models_dict, |
|
fix_cols_to_add_multi_asc, |
|
): |
|
funcc = partial( |
|
process_trial_choice, |
|
algo_choice=algo_choice, |
|
choice_handle_short_and_close_fix=choice_handle_short_and_close_fix, |
|
for_multi=True, |
|
discard_fixations_without_sfix=discard_fixations_without_sfix, |
|
discard_far_out_of_text_fix=discard_far_out_of_text_fix, |
|
x_thres_in_chars=x_thres_in_chars, |
|
y_thresh_in_heights=y_thresh_in_heights, |
|
short_fix_threshold=short_fix_threshold, |
|
merge_distance_threshold=merge_distance_threshold, |
|
discard_long_fix=discard_long_fix, |
|
discard_long_fix_threshold=discard_long_fix_threshold, |
|
discard_blinks=discard_blinks, |
|
measures_to_calculate_multi_asc=measures_to_calculate_multi_asc, |
|
include_coords_multi_asc=include_coords_multi_asc, |
|
sent_measures_to_calculate_multi_asc=sent_measures_to_calculate_multi_asc, |
|
classic_algos_cfg=classic_algos_cfg, |
|
models_dict=models_dict, |
|
fix_cols_to_add=fix_cols_to_add_multi_asc, |
|
) |
|
workers = min(len(trials_by_ids), 32, get_cpu_count() - 1) |
|
with multiprocessing.Pool(workers) as pool: |
|
out = pool.map(funcc, trials_by_ids.values()) |
|
return out |
|
|
|
|
|
def make_json_compatible(obj): |
|
if isinstance(obj, dict): |
|
return {k: make_json_compatible(v) for k, v in obj.items()} |
|
elif isinstance(obj, list): |
|
return [make_json_compatible(v) for v in obj] |
|
elif isinstance(obj, pd.DataFrame): |
|
return obj.to_dict(orient="records") |
|
elif isinstance(obj, pd.Series): |
|
return obj.to_dict() |
|
else: |
|
return obj |
|
|
|
|
|
def main(): |
|
try: |
|
input_data = sys.stdin.buffer.read() |
|
|
|
( |
|
algo_choice, |
|
choice_handle_short_and_close_fix, |
|
discard_fixations_without_sfix, |
|
discard_far_out_of_text_fix, |
|
x_thres_in_chars, |
|
y_thresh_in_heights, |
|
short_fix_threshold, |
|
merge_distance_threshold, |
|
discard_long_fix, |
|
discard_long_fix_threshold, |
|
discard_blinks, |
|
measures_to_calculate_multi_asc, |
|
include_coords_multi_asc, |
|
sent_measures_to_calculate_multi_asc, |
|
trials_by_ids, |
|
classic_algos_cfg, |
|
models_dict, |
|
fix_cols_to_add_multi_asc, |
|
) = json.loads(input_data) |
|
if ( |
|
"DIST" in algo_choice |
|
or "Wisdom_of_Crowds_with_DIST" in algo_choice |
|
or "DIST-Ensemble" in algo_choice |
|
or "Wisdom_of_Crowds_with_DIST_Ensemble" in algo_choice |
|
): |
|
del models_dict |
|
models_dict = set_up_models(DIST_MODELS_FOLDER) |
|
else: |
|
models_dict = {} |
|
out = process_asc_files_in_multi_proc( |
|
algo_choice, |
|
choice_handle_short_and_close_fix, |
|
discard_fixations_without_sfix, |
|
discard_far_out_of_text_fix, |
|
x_thres_in_chars, |
|
y_thresh_in_heights, |
|
short_fix_threshold, |
|
merge_distance_threshold, |
|
discard_long_fix, |
|
discard_long_fix_threshold, |
|
discard_blinks, |
|
measures_to_calculate_multi_asc, |
|
include_coords_multi_asc, |
|
sent_measures_to_calculate_multi_asc, |
|
trials_by_ids, |
|
classic_algos_cfg, |
|
models_dict, |
|
fix_cols_to_add_multi_asc, |
|
) |
|
out2 = [] |
|
for dffix, trial in out: |
|
dffix = dffix.to_dict("records") |
|
trial = make_json_compatible(trial) |
|
out2.append((dffix, trial)) |
|
json_data_out = json.dumps(out2) |
|
sys.stdout.flush() |
|
print(json_data_out) |
|
except Exception as e: |
|
print(json.dumps({"error": str(e)})) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|