File size: 4,961 Bytes
da572bf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
import json
from sys import platform as _platform
from functools import partial
import multiprocessing
import os
from tqdm.auto import tqdm
from multi_proc_funcs import DIST_MODELS_FOLDER, process_trial_choice, set_up_models
import sys
import pandas as pd
def get_cpu_count():
if os.sys.platform in ("linux", "linux2", "darwin"):
return os.cpu_count()
elif os.sys.platform == "win32":
return multiprocessing.cpu_count()
else:
return 1
def process_asc_files_in_multi_proc(
algo_choice,
choice_handle_short_and_close_fix,
discard_fixations_without_sfix,
discard_far_out_of_text_fix,
x_thres_in_chars,
y_thresh_in_heights,
short_fix_threshold,
merge_distance_threshold,
discard_long_fix,
discard_long_fix_threshold,
discard_blinks,
measures_to_calculate_multi_asc,
include_coords_multi_asc,
sent_measures_to_calculate_multi_asc,
trials_by_ids,
classic_algos_cfg,
models_dict,
fix_cols_to_add_multi_asc,
):
funcc = partial(
process_trial_choice,
algo_choice=algo_choice,
choice_handle_short_and_close_fix=choice_handle_short_and_close_fix,
for_multi=True,
discard_fixations_without_sfix=discard_fixations_without_sfix,
discard_far_out_of_text_fix=discard_far_out_of_text_fix,
x_thres_in_chars=x_thres_in_chars,
y_thresh_in_heights=y_thresh_in_heights,
short_fix_threshold=short_fix_threshold,
merge_distance_threshold=merge_distance_threshold,
discard_long_fix=discard_long_fix,
discard_long_fix_threshold=discard_long_fix_threshold,
discard_blinks=discard_blinks,
measures_to_calculate_multi_asc=measures_to_calculate_multi_asc,
include_coords_multi_asc=include_coords_multi_asc,
sent_measures_to_calculate_multi_asc=sent_measures_to_calculate_multi_asc,
classic_algos_cfg=classic_algos_cfg,
models_dict=models_dict,
fix_cols_to_add=fix_cols_to_add_multi_asc,
)
workers = min(len(trials_by_ids), 32, get_cpu_count() - 1)
with multiprocessing.Pool(workers) as pool:
out = pool.map(funcc, trials_by_ids.values())
return out
def make_json_compatible(obj):
if isinstance(obj, dict):
return {k: make_json_compatible(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [make_json_compatible(v) for v in obj]
elif isinstance(obj, pd.DataFrame):
return obj.to_dict(orient="records")
elif isinstance(obj, pd.Series):
return obj.to_dict()
else:
return obj
def main():
try:
input_data = sys.stdin.buffer.read()
(
algo_choice,
choice_handle_short_and_close_fix,
discard_fixations_without_sfix,
discard_far_out_of_text_fix,
x_thres_in_chars,
y_thresh_in_heights,
short_fix_threshold,
merge_distance_threshold,
discard_long_fix,
discard_long_fix_threshold,
discard_blinks,
measures_to_calculate_multi_asc,
include_coords_multi_asc,
sent_measures_to_calculate_multi_asc,
trials_by_ids,
classic_algos_cfg,
models_dict,
fix_cols_to_add_multi_asc,
) = json.loads(input_data)
if (
"DIST" in algo_choice
or "Wisdom_of_Crowds_with_DIST" in algo_choice
or "DIST-Ensemble" in algo_choice
or "Wisdom_of_Crowds_with_DIST_Ensemble" in algo_choice
):
del models_dict # Needed to stop pickling from failing for multiproc
models_dict = set_up_models(DIST_MODELS_FOLDER)
else:
models_dict = {}
out = process_asc_files_in_multi_proc(
algo_choice,
choice_handle_short_and_close_fix,
discard_fixations_without_sfix,
discard_far_out_of_text_fix,
x_thres_in_chars,
y_thresh_in_heights,
short_fix_threshold,
merge_distance_threshold,
discard_long_fix,
discard_long_fix_threshold,
discard_blinks,
measures_to_calculate_multi_asc,
include_coords_multi_asc,
sent_measures_to_calculate_multi_asc,
trials_by_ids,
classic_algos_cfg,
models_dict,
fix_cols_to_add_multi_asc,
)
out2 = []
for dffix, trial in out:
dffix = dffix.to_dict("records")
trial = make_json_compatible(trial)
out2.append((dffix, trial))
json_data_out = json.dumps(out2)
sys.stdout.flush()
print(json_data_out)
except Exception as e:
print(json.dumps({"error": str(e)}))
if __name__ == "__main__":
main()
|