File size: 4,605 Bytes
a0d91d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import os
import copy
from typing import Optional, Union

import numpy as np
from torch.utils.data import Dataset

from .pipelines import Compose
from .builder import DATASETS
from mogen.core.evaluation import build_evaluator


@DATASETS.register_module()
class BaseMotionDataset(Dataset):
    """Base motion dataset.
    Args:
        data_prefix (str): the prefix of data path.
        pipeline (list): a list of dict, where each element represents
            a operation defined in `mogen.datasets.pipelines`.
        ann_file (str | None, optional): the annotation file. When ann_file is
            str, the subclass is expected to read from the ann_file. When
            ann_file is None, the subclass is expected to read according
            to data_prefix.
        test_mode (bool): in train mode or test mode. Default: None.
        dataset_name (str | None, optional): the name of dataset. It is used
            to identify the type of evaluation metric. Default: None.
    """

    def __init__(self,
                 data_prefix: str,
                 pipeline: list,
                 dataset_name: Optional[Union[str, None]] = None,
                 fixed_length: Optional[Union[int, None]] = None,
                 ann_file: Optional[Union[str, None]] = None,
                 motion_dir: Optional[Union[str, None]] = None,
                 eval_cfg: Optional[Union[dict, None]] = None,
                 test_mode: Optional[bool] = False):
        super(BaseMotionDataset, self).__init__()

        self.data_prefix = data_prefix
        self.pipeline = Compose(pipeline)
        self.dataset_name = dataset_name
        self.fixed_length = fixed_length
        self.ann_file = os.path.join(data_prefix, 'datasets', dataset_name, ann_file)
        self.motion_dir = os.path.join(data_prefix, 'datasets', dataset_name, motion_dir)
        self.eval_cfg = copy.deepcopy(eval_cfg)
        self.test_mode = test_mode

        self.load_annotations()
        if self.test_mode:
            self.prepare_evaluation()

    def load_anno(self, name):
        motion_path = os.path.join(self.motion_dir, name + '.npy')
        motion_data = np.load(motion_path)
        return {'motion': motion_data}
        

    def load_annotations(self):
        """Load annotations from ``ann_file`` to ``data_infos``"""
        self.data_infos = []
        for line in open(self.ann_file, 'r').readlines():
            line = line.strip()
            self.data_infos.append(self.load_anno(line))
            

    def prepare_data(self, idx: int):
        """"Prepare raw data for the f'{idx'}-th data."""
        results = copy.deepcopy(self.data_infos[idx])
        results['dataset_name'] = self.dataset_name
        results['sample_idx'] = idx
        return self.pipeline(results)

    def __len__(self):
        """Return the length of current dataset."""
        if self.test_mode:
            return len(self.eval_indexes)
        elif self.fixed_length is not None:
            return self.fixed_length
        return len(self.data_infos)

    def __getitem__(self, idx: int):
        """Prepare data for the ``idx``-th data.
        As for video dataset, we can first parse raw data for each frame. Then
        we combine annotations from all frames. This interface is used to
        simplify the logic of video dataset and other special datasets.
        """
        if self.test_mode:
            idx = self.eval_indexes[idx]
        elif self.fixed_length is not None:
            idx = idx % len(self.data_infos)
        return self.prepare_data(idx)

    def prepare_evaluation(self):
        self.evaluators = []
        self.eval_indexes = []
        for _ in range(self.eval_cfg['replication_times']):
            eval_indexes = np.arange(len(self.data_infos)) 
            if self.eval_cfg.get('shuffle_indexes', False):
                np.random.shuffle(eval_indexes)
            self.eval_indexes.append(eval_indexes)
        for metric in self.eval_cfg['metrics']:
            evaluator, self.eval_indexes = build_evaluator(
                metric, self.eval_cfg, len(self.data_infos), self.eval_indexes)
            self.evaluators.append(evaluator)
        
        self.eval_indexes = np.concatenate(self.eval_indexes)
            
    def evaluate(self, results, work_dir, logger=None):
        metrics = {}
        device = results[0]['motion'].device
        for evaluator in self.evaluators:
            evaluator.to_device(device)
            metrics.update(evaluator.evaluate(results))
        if logger is not None:
            logger.info(metrics)
        return metrics