File size: 4,931 Bytes
8c70653
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import glob
import os
import random
from multiprocessing import Manager
from typing import List, Tuple

import numpy as np
import torch
from torch.utils.data import Dataset


class WaveGradDataset(Dataset):
    """
    WaveGrad Dataset searchs for all the wav files under root path
    and converts them to acoustic features on the fly and returns
    random segments of (audio, feature) couples.
    """

    def __init__(
        self,
        ap,
        items,
        seq_len,
        hop_len,
        pad_short,
        conv_pad=2,
        is_training=True,
        return_segments=True,
        use_noise_augment=False,
        use_cache=False,
        verbose=False,
    ):

        super().__init__()
        self.ap = ap
        self.item_list = items
        self.seq_len = seq_len if return_segments else None
        self.hop_len = hop_len
        self.pad_short = pad_short
        self.conv_pad = conv_pad
        self.is_training = is_training
        self.return_segments = return_segments
        self.use_cache = use_cache
        self.use_noise_augment = use_noise_augment
        self.verbose = verbose

        if return_segments:
            assert seq_len % hop_len == 0, " [!] seq_len has to be a multiple of hop_len."
        self.feat_frame_len = seq_len // hop_len + (2 * conv_pad)

        # cache acoustic features
        if use_cache:
            self.create_feature_cache()

    def create_feature_cache(self):
        self.manager = Manager()
        self.cache = self.manager.list()
        self.cache += [None for _ in range(len(self.item_list))]

    @staticmethod
    def find_wav_files(path):
        return glob.glob(os.path.join(path, "**", "*.wav"), recursive=True)

    def __len__(self):
        return len(self.item_list)

    def __getitem__(self, idx):
        item = self.load_item(idx)
        return item

    def load_test_samples(self, num_samples: int) -> List[Tuple]:
        """Return test samples.

        Args:
            num_samples (int): Number of samples to return.

        Returns:
            List[Tuple]: melspectorgram and audio.

        Shapes:
            - melspectrogram (Tensor): :math:`[C, T]`
            - audio (Tensor): :math:`[T_audio]`
        """
        samples = []
        return_segments = self.return_segments
        self.return_segments = False
        for idx in range(num_samples):
            mel, audio = self.load_item(idx)
            samples.append([mel, audio])
        self.return_segments = return_segments
        return samples

    def load_item(self, idx):
        """load (audio, feat) couple"""
        # compute features from wav
        wavpath = self.item_list[idx]

        if self.use_cache and self.cache[idx] is not None:
            audio = self.cache[idx]
        else:
            audio = self.ap.load_wav(wavpath)

            if self.return_segments:
                # correct audio length wrt segment length
                if audio.shape[-1] < self.seq_len + self.pad_short:
                    audio = np.pad(
                        audio, (0, self.seq_len + self.pad_short - len(audio)), mode="constant", constant_values=0.0
                    )
                assert (
                    audio.shape[-1] >= self.seq_len + self.pad_short
                ), f"{audio.shape[-1]} vs {self.seq_len + self.pad_short}"

            # correct the audio length wrt hop length
            p = (audio.shape[-1] // self.hop_len + 1) * self.hop_len - audio.shape[-1]
            audio = np.pad(audio, (0, p), mode="constant", constant_values=0.0)

            if self.use_cache:
                self.cache[idx] = audio

        if self.return_segments:
            max_start = len(audio) - self.seq_len
            start = random.randint(0, max_start)
            end = start + self.seq_len
            audio = audio[start:end]

        if self.use_noise_augment and self.is_training and self.return_segments:
            audio = audio + (1 / 32768) * torch.randn_like(audio)

        mel = self.ap.melspectrogram(audio)
        mel = mel[..., :-1]  # ignore the padding

        audio = torch.from_numpy(audio).float()
        mel = torch.from_numpy(mel).float().squeeze(0)
        return (mel, audio)

    @staticmethod
    def collate_full_clips(batch):
        """This is used in tune_wavegrad.py.
        It pads sequences to the max length."""
        max_mel_length = max([b[0].shape[1] for b in batch]) if len(batch) > 1 else batch[0][0].shape[1]
        max_audio_length = max([b[1].shape[0] for b in batch]) if len(batch) > 1 else batch[0][1].shape[0]

        mels = torch.zeros([len(batch), batch[0][0].shape[0], max_mel_length])
        audios = torch.zeros([len(batch), max_audio_length])

        for idx, b in enumerate(batch):
            mel = b[0]
            audio = b[1]
            mels[idx, :, : mel.shape[1]] = mel
            audios[idx, : audio.shape[0]] = audio

        return mels, audios