import os import numpy as np import torch import torch.nn as nn import copy from tqdm import tqdm def flatten(v): """ Flatten a list of lists/tuples """ return [x for y in v for x in y] def rescale(x): """ Rescale a tensor to 0-1 """ return (x - x.min()) / (x.max() - x.min()) def find_max_epoch(path): """ Find maximum epoch/iteration in path, formatted ${n_iter}.pkl E.g. 100000.pkl Parameters: path (str): checkpoint path Returns: maximum iteration, -1 if there is no (valid) checkpoint """ files = os.listdir(path) epoch = -1 for f in files: if len(f) <= 4: continue if f[-4:] == '.pkl': try: epoch = max(epoch, int(f[:-4])) except: continue #print(path, epoch, flush=True) return epoch def print_size(net): """ Print the number of parameters of a network """ if net is not None and isinstance(net, torch.nn.Module): module_parameters = filter(lambda p: p.requires_grad, net.parameters()) params = sum([np.prod(p.size()) for p in module_parameters]) print("{} Parameters: {:.6f}M".format( net.__class__.__name__, params / 1e6), flush=True) # Utilities for diffusion models def std_normal(size): """ Generate the standard Gaussian variable of a certain size """ return torch.normal(0, 1, size=size) def calc_noise_scale_embedding(noise_scales, noise_scale_embed_dim_in): """ Embed a noise scale $t$ into a higher dimensional space E.g. the embedding vector in the 128-dimensional space is [sin(t * 10^(0*4/63)), ... , sin(t * 10^(63*4/63)), cos(t * 10^(0*4/63)), ... , cos(t * 10^(63*4/63))] Parameters: noise_scales (torch.long tensor, shape=(batchsize, 1)): noise scales for batch data noise_scale_embed_dim_in (int, default=128): dimensionality of the embedding space for discrete noise scales Returns: the embedding vectors (torch.tensor, shape=(batchsize, noise_scale_embed_dim_in)): """ assert noise_scale_embed_dim_in % 2 == 0 half_dim = noise_scale_embed_dim_in // 2 _embed = np.log(10000) / (half_dim - 1) _embed = torch.exp(torch.arange(half_dim) * -_embed) _embed = noise_scales * _embed noise_scale_embed = torch.cat((torch.sin(_embed), torch.cos(_embed)), 1) return noise_scale_embed def calc_diffusion_hyperparams_given_beta(beta): """ Compute diffusion process hyperparameters Parameters: beta (tensor): beta schedule Returns: a dictionary of diffusion hyperparameters including: T (int), beta/alpha/sigma (torch.tensor on cpu, shape=(T, )) These cpu tensors are changed to cuda tensors on each individual gpu """ T = len(beta) alpha = 1 - beta sigma = beta + 0 for t in range(1, T): alpha[t] *= alpha[t-1] # \alpha^2_t = \prod_{s=1}^t (1-\beta_s) sigma[t] *= (1-alpha[t-1]) / (1-alpha[t]) # \sigma^2_t = \beta_t * (1-\alpha_{t-1}) / (1-\alpha_t) alpha = torch.sqrt(alpha) sigma = torch.sqrt(sigma) _dh = {} _dh["T"], _dh["beta"], _dh["alpha"], _dh["sigma"] = T, beta, alpha, sigma diffusion_hyperparams = _dh return diffusion_hyperparams def calc_diffusion_hyperparams(T, beta_0, beta_T, tau, N, beta_N, alpha_N, rho): """ Compute diffusion process hyperparameters Parameters: T (int): number of noise scales beta_0 and beta_T (float): beta schedule start/end value, where any beta_t in the middle is linearly interpolated Returns: a dictionary of diffusion hyperparameters including: T (int), beta/alpha/sigma (torch.tensor on cpu, shape=(T, )) These cpu tensors are changed to cuda tensors on each individual gpu """ beta = torch.linspace(beta_0, beta_T, T) alpha = 1 - beta sigma = beta + 0 for t in range(1, T): alpha[t] *= alpha[t-1] # \alpha^2_t = \prod_{s=1}^t (1-\beta_s) sigma[t] *= (1-alpha[t-1]) / (1-alpha[t]) # \sigma^2_t = \beta_t * (1-\alpha_{t-1}) / (1-\alpha_t) alpha = torch.sqrt(alpha) sigma = torch.sqrt(sigma) _dh = {} _dh["T"], _dh["beta"], _dh["alpha"], _dh["sigma"] = T, beta, alpha, sigma _dh["tau"], _dh["N"], _dh["betaN"], _dh["alphaN"], _dh["rho"] = tau, N, beta_N, alpha_N, rho diffusion_hyperparams = _dh return diffusion_hyperparams def sampling_given_noise_schedule( net, size, diffusion_hyperparams, inference_noise_schedule, condition=None, ddim=False, return_sequence=False): """ Perform the complete sampling step according to p(x_0|x_T) = \prod_{t=1}^T p_{\theta}(x_{t-1}|x_t) Parameters: net (torch network): the wavenet models size (tuple): size of tensor to be generated, usually is (number of audios to generate, channels=1, length of audio) diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams note, the tensors need to be cuda tensors condition (torch.tensor): ground truth mel spectrogram read from disk None if used for unconditional generation Returns: the generated audio(s) in torch.tensor, shape=size """ _dh = diffusion_hyperparams T, alpha = _dh["T"], _dh["alpha"] assert len(alpha) == T assert len(size) == 3 N = len(inference_noise_schedule) beta_infer = inference_noise_schedule alpha_infer = 1 - beta_infer sigma_infer = beta_infer + 0 for n in range(1, N): alpha_infer[n] *= alpha_infer[n - 1] sigma_infer[n] *= (1 - alpha_infer[n - 1]) / (1 - alpha_infer[n]) alpha_infer = torch.sqrt(alpha_infer) sigma_infer = torch.sqrt(sigma_infer) # Mapping noise scales to time steps steps_infer = [] for n in range(N): step = map_noise_scale_to_time_step(alpha_infer[n], alpha) if step >= 0: steps_infer.append(step) steps_infer = torch.FloatTensor(steps_infer) # N may change since alpha_infer can be out of the range of alpha N = len(steps_infer) x = std_normal(size) if return_sequence: x_ = copy.deepcopy(x) xs = [x_] with torch.no_grad(): for n in tqdm(range(N - 1, -1, -1), desc='FastDiff sample time step', total=N): diffusion_steps = (steps_infer[n] * torch.ones((size[0], 1))) epsilon_theta = net((x, condition, diffusion_steps,)) if ddim: alpha_next = alpha_infer[n] / (1 - beta_infer[n]).sqrt() c1 = alpha_next / alpha_infer[n] c2 = -(1 - alpha_infer[n] ** 2.).sqrt() * c1 c3 = (1 - alpha_next ** 2.).sqrt() x = c1 * x + c2 * epsilon_theta + c3 * epsilon_theta # std_normal(size) else: x -= beta_infer[n] / torch.sqrt(1 - alpha_infer[n] ** 2.) * epsilon_theta x /= torch.sqrt(1 - beta_infer[n]) if n > 0: x = x + sigma_infer[n] * std_normal(size) if return_sequence: x_ = copy.deepcopy(x) xs.append(x_) if return_sequence: return xs return x def noise_scheduling(net, size, diffusion_hyperparams, condition=None, ddim=False): """ Perform the complete sampling step according to p(x_0|x_T) = \prod_{t=1}^T p_{\theta}(x_{t-1}|x_t) Parameters: net (torch network): the wavenet models size (tuple): size of tensor to be generated, usually is (number of audios to generate, channels=1, length of audio) diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams note, the tensors need to be cuda tensors condition (torch.tensor): ground truth mel spectrogram read from disk None if used for unconditional generation Returns: noise schedule: a list of noise scales in torch.tensor, length <= N """ _dh = diffusion_hyperparams N, betaN, alphaN, rho, alpha = _dh["N"], _dh["betaN"], _dh["alphaN"], _dh["rho"], _dh["alpha"] print('begin noise scheduling, maximum number of reverse steps = %d' % (N)) betas = [] x = std_normal(size) with torch.no_grad(): beta_cur = torch.ones(1, 1, 1).cuda() * betaN alpha_cur = torch.ones(1, 1, 1).cuda() * alphaN for n in range(N - 1, -1, -1): # print(n, beta_cur.squeeze().item(), alpha_cur.squeeze().item()) step = map_noise_scale_to_time_step(alpha_cur.squeeze().item(), alpha) if step >= 0: betas.append(beta_cur.squeeze().item()) diffusion_steps = (step * torch.ones((size[0], 1))).cuda() epsilon_theta = net((x, condition, diffusion_steps,)) if ddim: alpha_nxt = alpha_cur / (1 - beta_cur).sqrt() c1 = alpha_nxt / alpha_cur c2 = -(1 - alpha_cur ** 2.).sqrt() * c1 c3 = (1 - alpha_nxt ** 2.).sqrt() x = c1 * x + c2 * epsilon_theta + c3 * epsilon_theta # std_normal(size) else: x -= beta_cur / torch.sqrt(1 - alpha_cur ** 2.) * epsilon_theta x /= torch.sqrt(1 - beta_cur) alpha_nxt, beta_nxt = alpha_cur, beta_cur alpha_cur = alpha_nxt / (1 - beta_nxt).sqrt() if alpha_cur > 1: break beta_cur = net.noise_pred( x.squeeze(1), (beta_nxt.view(-1, 1), (1 - alpha_cur ** 2.).view(-1, 1))) if beta_cur.squeeze().item() < rho: break return torch.FloatTensor(betas[::-1]).cuda() def theta_timestep_loss(net, X, diffusion_hyperparams, reverse=False): """ Compute the training loss for learning theta Parameters: net (torch network): the wavenet models X (tuple, shape=(2,)): training data in tuple form (mel_spectrograms, audios) mel_spectrograms: torch.tensor, shape is batchsize followed by each mel_spectrogram shape audios: torch.tensor, shape=(batchsize, 1, length of audio) diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams note, the tensors need to be cuda tensors Returns: theta loss """ assert type(X) == tuple and len(X) == 2 loss_fn = nn.MSELoss() _dh = diffusion_hyperparams T, alpha = _dh["T"], _dh["alpha"] mel_spectrogram, audio = X B, C, L = audio.shape # B is batchsize, C=1, L is audio length ts = torch.randint(T, size=(B, 1, 1)).cuda() # randomly sample steps from 1~T z = std_normal(audio.shape) delta = (1 - alpha[ts] ** 2.).sqrt() alpha_cur = alpha[ts] noisy_audio = alpha_cur * audio + delta * z # compute x_t from q(x_t|x_0) epsilon_theta = net((noisy_audio, mel_spectrogram, ts.view(B, 1),)) if reverse: x0 = (noisy_audio - delta * epsilon_theta) / alpha_cur return loss_fn(epsilon_theta, z), x0 return loss_fn(epsilon_theta, z) def phi_loss(net, X, diffusion_hyperparams): """ Compute the training loss for learning phi Parameters: net (torch network): the wavenet models X (tuple, shape=(2,)): training data in tuple form (mel_spectrograms, audios) mel_spectrograms: torch.tensor, shape is batchsize followed by each mel_spectrogram shape audios: torch.tensor, shape=(batchsize, 1, length of audio) diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams note, the tensors need to be cuda tensors Returns: phi loss """ assert type(X) == tuple and len(X) == 2 _dh = diffusion_hyperparams T, alpha, tau = _dh["T"], _dh["alpha"], _dh["tau"] mel_spectrogram, audio = X B, C, L = audio.shape # B is batchsize, C=1, L is audio length ts = torch.randint(tau, T - tau, size=(B,)).cuda() # randomly sample steps from 1~T alpha_cur = alpha.index_select(0, ts).view(B, 1, 1) alpha_nxt = alpha.index_select(0, ts + tau).view(B, 1, 1) beta_nxt = 1 - (alpha_nxt / alpha_cur) ** 2. delta = (1 - alpha_cur ** 2.).sqrt() z = std_normal(audio.shape) noisy_audio = alpha_cur * audio + delta * z # compute x_t from q(x_t|x_0) epsilon_theta = net((noisy_audio, mel_spectrogram, ts.view(B, 1),)) beta_est = net.noise_pred(noisy_audio.squeeze(1), (beta_nxt.view(B, 1), delta.view(B, 1) ** 2.)) phi_loss = 1 / (2. * (delta ** 2. - beta_est)) * ( delta * z - beta_est / delta * epsilon_theta) ** 2. phi_loss += torch.log(1e-8 + delta ** 2. / (beta_est + 1e-8)) / 4. phi_loss = (torch.mean(phi_loss, -1, keepdim=True) + beta_est / delta ** 2 / 2.).mean() return phi_loss def compute_hyperparams_given_schedule(beta): """ Compute diffusion process hyperparameters Parameters: beta (tensor): beta schedule Returns: a dictionary of diffusion hyperparameters including: T (int), beta/alpha/sigma (torch.tensor on cpu, shape=(T, )) These cpu tensors are changed to cuda tensors on each individual gpu """ T = len(beta) alpha = 1 - beta sigma = beta + 0 for t in range(1, T): alpha[t] *= alpha[t - 1] # \alpha^2_t = \prod_{s=1}^t (1-\beta_s) sigma[t] *= (1 - alpha[t - 1]) / (1 - alpha[t]) # \sigma^2_t = \beta_t * (1-\alpha_{t-1}) / (1-\alpha_t) alpha = torch.sqrt(alpha) sigma = torch.sqrt(sigma) _dh = {} _dh["T"], _dh["beta"], _dh["alpha"], _dh["sigma"] = T, beta, alpha, sigma diffusion_hyperparams = _dh return diffusion_hyperparams def map_noise_scale_to_time_step(alpha_infer, alpha): if alpha_infer < alpha[-1]: return len(alpha) - 1 if alpha_infer > alpha[0]: return 0 for t in range(len(alpha) - 1): if alpha[t+1] <= alpha_infer <= alpha[t]: step_diff = alpha[t] - alpha_infer step_diff /= alpha[t] - alpha[t+1] return t + step_diff.item() return -1 def calc_diffusion_step_embedding(diffusion_steps, diffusion_step_embed_dim_in): """ Embed a diffusion step $t$ into a higher dimensional space E.g. the embedding vector in the 128-dimensional space is [sin(t * 10^(0*4/63)), ... , sin(t * 10^(63*4/63)), cos(t * 10^(0*4/63)), ... , cos(t * 10^(63*4/63))] Parameters: diffusion_steps (torch.long tensor, shape=(batchsize, 1)): diffusion steps for batch data diffusion_step_embed_dim_in (int, default=128): dimensionality of the embedding space for discrete diffusion steps Returns: the embedding vectors (torch.tensor, shape=(batchsize, diffusion_step_embed_dim_in)): """ assert diffusion_step_embed_dim_in % 2 == 0 half_dim = diffusion_step_embed_dim_in // 2 _embed = np.log(10000) / (half_dim - 1) _embed = torch.exp(torch.arange(half_dim) * -_embed) _embed = diffusion_steps * _embed diffusion_step_embed = torch.cat((torch.sin(_embed), torch.cos(_embed)), 1) return diffusion_step_embed