File size: 3,275 Bytes
6755a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from copy import deepcopy
from itertools import product
import os
from typing import Dict, List
import logging

import pandas as pd

from .path_util import get_dir_file_map
from .signature import get_signature_of_string

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name


def generate_tasks(
    path: str,
    key: str = None,
    sep: str = ",",
    exts: List[str] = None,
    subset_row: str = None,
) -> List[Dict]:
    """读取文件,生成任务表格

    Args:
        path (str): 任务文件路径
        key (str, optional): 作为任务名的字段. Defaults to None.
        sep (str, optional): 表格字段分隔符. Defaults to ",".
        exts (List[str], optional): 如果是文件夹,目前文件类型. Defaults to None.
        subset_row (str, optional): 将1:2_3:4的字符串转化成整数索引列表,方便取子任务. Defaults to None.

    Returns:
        List[Dict]: 列表后的任务字典列表
    """
    if os.path.isdir(path):
        tasks = get_dir_file_map(path=path, exts=exts)
        tasks = [{key: k, path: v} for k, v in tasks.items()]
    else:
        ext = os.path.splitext(os.path.basename(path))[0]
        if ext == "csv":
            tasks = pd.read_csv(path, sep=sep)
            if subset_row is not None:
                subset_row = read_subset_rows(subset_row)
                tasks = tasks.iloc[subset_row]
            tasks = tasks.to_dict(orient="records")
        else:
            tasks = [{key: path}]
    return tasks


def get_filename_from_str(string, n=100, has_signature=True, n_signature=8):
    name = string[:n]
    if has_signature:
        signature = get_signature_of_string(string, n_signature)
        name = "{}_{}".format(name, signature)
    return name


def read_subset_rows(string: str) -> List:
    """将1:2_3:4的字符串转化成整数索引列表,方便取子任务

    Args:
        string (str): _description_

    Returns:
        List: _description_
    """
    string = string.split("_")
    lst = []
    for s in string:
        if ":" in s:
            # 采用左闭、右闭方式
            start, end = [int(x) for x in s.split(":")]
            sub_lst = range(start, end + 1)
        else:
            sub_lst = [int(x) for x in s.split(",")]
        lst.extend(sub_lst)
    lst = sorted(set(lst))
    return lst


def fiss_tasks(tasks: List[Dict], task_fission_sep: str = "|") -> List[Dict]:
    """fiss tasks if task_fission_sep in value by product"""
    new_tasks = []
    for task in tasks:
        combination_fields = [
            k for k, v in task.items() if isinstance(v, str) and task_fission_sep in v
        ]
        if len(combination_fields) == 0:
            new_tasks.append(task)
            continue
        product_fields = [
            task[field].split(task_fission_sep) for field in combination_fields
        ]
        product_fields = list(product(*product_fields))
        # print("combination_fields", combination_fields)
        # print("product_fields", product_fields)
        for values in product_fields:
            task_cp = deepcopy(task)
            for i, field in enumerate(combination_fields):
                task_cp[field] = values[i]
            new_tasks.append(task_cp)
    return new_tasks