Spaces:
No application file
No application file
File size: 3,275 Bytes
6755a2d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
from copy import deepcopy
from itertools import product
import os
from typing import Dict, List
import logging
import pandas as pd
from .path_util import get_dir_file_map
from .signature import get_signature_of_string
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
def generate_tasks(
path: str,
key: str = None,
sep: str = ",",
exts: List[str] = None,
subset_row: str = None,
) -> List[Dict]:
"""读取文件,生成任务表格
Args:
path (str): 任务文件路径
key (str, optional): 作为任务名的字段. Defaults to None.
sep (str, optional): 表格字段分隔符. Defaults to ",".
exts (List[str], optional): 如果是文件夹,目前文件类型. Defaults to None.
subset_row (str, optional): 将1:2_3:4的字符串转化成整数索引列表,方便取子任务. Defaults to None.
Returns:
List[Dict]: 列表后的任务字典列表
"""
if os.path.isdir(path):
tasks = get_dir_file_map(path=path, exts=exts)
tasks = [{key: k, path: v} for k, v in tasks.items()]
else:
ext = os.path.splitext(os.path.basename(path))[0]
if ext == "csv":
tasks = pd.read_csv(path, sep=sep)
if subset_row is not None:
subset_row = read_subset_rows(subset_row)
tasks = tasks.iloc[subset_row]
tasks = tasks.to_dict(orient="records")
else:
tasks = [{key: path}]
return tasks
def get_filename_from_str(string, n=100, has_signature=True, n_signature=8):
name = string[:n]
if has_signature:
signature = get_signature_of_string(string, n_signature)
name = "{}_{}".format(name, signature)
return name
def read_subset_rows(string: str) -> List:
"""将1:2_3:4的字符串转化成整数索引列表,方便取子任务
Args:
string (str): _description_
Returns:
List: _description_
"""
string = string.split("_")
lst = []
for s in string:
if ":" in s:
# 采用左闭、右闭方式
start, end = [int(x) for x in s.split(":")]
sub_lst = range(start, end + 1)
else:
sub_lst = [int(x) for x in s.split(",")]
lst.extend(sub_lst)
lst = sorted(set(lst))
return lst
def fiss_tasks(tasks: List[Dict], task_fission_sep: str = "|") -> List[Dict]:
"""fiss tasks if task_fission_sep in value by product"""
new_tasks = []
for task in tasks:
combination_fields = [
k for k, v in task.items() if isinstance(v, str) and task_fission_sep in v
]
if len(combination_fields) == 0:
new_tasks.append(task)
continue
product_fields = [
task[field].split(task_fission_sep) for field in combination_fields
]
product_fields = list(product(*product_fields))
# print("combination_fields", combination_fields)
# print("product_fields", product_fields)
for values in product_fields:
task_cp = deepcopy(task)
for i, field in enumerate(combination_fields):
task_cp[field] = values[i]
new_tasks.append(task_cp)
return new_tasks
|