Rongjiehuang's picture
init
64e7f2f
import os
import numpy as np
import torch
import torch.nn as nn
import copy
from tqdm import tqdm
def flatten(v):
"""
Flatten a list of lists/tuples
"""
return [x for y in v for x in y]
def rescale(x):
"""
Rescale a tensor to 0-1
"""
return (x - x.min()) / (x.max() - x.min())
def find_max_epoch(path):
"""
Find maximum epoch/iteration in path, formatted ${n_iter}.pkl
E.g. 100000.pkl
Parameters:
path (str): checkpoint path
Returns:
maximum iteration, -1 if there is no (valid) checkpoint
"""
files = os.listdir(path)
epoch = -1
for f in files:
if len(f) <= 4:
continue
if f[-4:] == '.pkl':
try:
epoch = max(epoch, int(f[:-4]))
except:
continue
#print(path, epoch, flush=True)
return epoch
def print_size(net):
"""
Print the number of parameters of a network
"""
if net is not None and isinstance(net, torch.nn.Module):
module_parameters = filter(lambda p: p.requires_grad, net.parameters())
params = sum([np.prod(p.size()) for p in module_parameters])
print("{} Parameters: {:.6f}M".format(
net.__class__.__name__, params / 1e6), flush=True)
# Utilities for diffusion models
def std_normal(size):
"""
Generate the standard Gaussian variable of a certain size
"""
return torch.normal(0, 1, size=size)
def calc_noise_scale_embedding(noise_scales, noise_scale_embed_dim_in):
"""
Embed a noise scale $t$ into a higher dimensional space
E.g. the embedding vector in the 128-dimensional space is
[sin(t * 10^(0*4/63)), ... , sin(t * 10^(63*4/63)), cos(t * 10^(0*4/63)), ... , cos(t * 10^(63*4/63))]
Parameters:
noise_scales (torch.long tensor, shape=(batchsize, 1)):
noise scales for batch data
noise_scale_embed_dim_in (int, default=128):
dimensionality of the embedding space for discrete noise scales
Returns:
the embedding vectors (torch.tensor, shape=(batchsize, noise_scale_embed_dim_in)):
"""
assert noise_scale_embed_dim_in % 2 == 0
half_dim = noise_scale_embed_dim_in // 2
_embed = np.log(10000) / (half_dim - 1)
_embed = torch.exp(torch.arange(half_dim) * -_embed)
_embed = noise_scales * _embed
noise_scale_embed = torch.cat((torch.sin(_embed),
torch.cos(_embed)), 1)
return noise_scale_embed
def calc_diffusion_hyperparams_given_beta(beta):
"""
Compute diffusion process hyperparameters
Parameters:
beta (tensor): beta schedule
Returns:
a dictionary of diffusion hyperparameters including:
T (int), beta/alpha/sigma (torch.tensor on cpu, shape=(T, ))
These cpu tensors are changed to cuda tensors on each individual gpu
"""
T = len(beta)
alpha = 1 - beta
sigma = beta + 0
for t in range(1, T):
alpha[t] *= alpha[t-1] # \alpha^2_t = \prod_{s=1}^t (1-\beta_s)
sigma[t] *= (1-alpha[t-1]) / (1-alpha[t]) # \sigma^2_t = \beta_t * (1-\alpha_{t-1}) / (1-\alpha_t)
alpha = torch.sqrt(alpha)
sigma = torch.sqrt(sigma)
_dh = {}
_dh["T"], _dh["beta"], _dh["alpha"], _dh["sigma"] = T, beta, alpha, sigma
diffusion_hyperparams = _dh
return diffusion_hyperparams
def calc_diffusion_hyperparams(T, beta_0, beta_T, tau, N, beta_N, alpha_N, rho):
"""
Compute diffusion process hyperparameters
Parameters:
T (int): number of noise scales
beta_0 and beta_T (float): beta schedule start/end value,
where any beta_t in the middle is linearly interpolated
Returns:
a dictionary of diffusion hyperparameters including:
T (int), beta/alpha/sigma (torch.tensor on cpu, shape=(T, ))
These cpu tensors are changed to cuda tensors on each individual gpu
"""
beta = torch.linspace(beta_0, beta_T, T)
alpha = 1 - beta
sigma = beta + 0
for t in range(1, T):
alpha[t] *= alpha[t-1] # \alpha^2_t = \prod_{s=1}^t (1-\beta_s)
sigma[t] *= (1-alpha[t-1]) / (1-alpha[t]) # \sigma^2_t = \beta_t * (1-\alpha_{t-1}) / (1-\alpha_t)
alpha = torch.sqrt(alpha)
sigma = torch.sqrt(sigma)
_dh = {}
_dh["T"], _dh["beta"], _dh["alpha"], _dh["sigma"] = T, beta, alpha, sigma
_dh["tau"], _dh["N"], _dh["betaN"], _dh["alphaN"], _dh["rho"] = tau, N, beta_N, alpha_N, rho
diffusion_hyperparams = _dh
return diffusion_hyperparams
def sampling_given_noise_schedule(
net,
size,
diffusion_hyperparams,
inference_noise_schedule,
condition=None,
ddim=False,
return_sequence=False):
"""
Perform the complete sampling step according to p(x_0|x_T) = \prod_{t=1}^T p_{\theta}(x_{t-1}|x_t)
Parameters:
net (torch network): the wavenet models
size (tuple): size of tensor to be generated,
usually is (number of audios to generate, channels=1, length of audio)
diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams
note, the tensors need to be cuda tensors
condition (torch.tensor): ground truth mel spectrogram read from disk
None if used for unconditional generation
Returns:
the generated audio(s) in torch.tensor, shape=size
"""
_dh = diffusion_hyperparams
T, alpha = _dh["T"], _dh["alpha"]
assert len(alpha) == T
assert len(size) == 3
N = len(inference_noise_schedule)
beta_infer = inference_noise_schedule
alpha_infer = 1 - beta_infer
sigma_infer = beta_infer + 0
for n in range(1, N):
alpha_infer[n] *= alpha_infer[n - 1]
sigma_infer[n] *= (1 - alpha_infer[n - 1]) / (1 - alpha_infer[n])
alpha_infer = torch.sqrt(alpha_infer)
sigma_infer = torch.sqrt(sigma_infer)
# Mapping noise scales to time steps
steps_infer = []
for n in range(N):
step = map_noise_scale_to_time_step(alpha_infer[n], alpha)
if step >= 0:
steps_infer.append(step)
steps_infer = torch.FloatTensor(steps_infer)
# N may change since alpha_infer can be out of the range of alpha
N = len(steps_infer)
x = std_normal(size)
if return_sequence:
x_ = copy.deepcopy(x)
xs = [x_]
with torch.no_grad():
for n in tqdm(range(N - 1, -1, -1), desc='FastDiff sample time step', total=N):
diffusion_steps = (steps_infer[n] * torch.ones((size[0], 1)))
epsilon_theta = net((x, condition, diffusion_steps,))
if ddim:
alpha_next = alpha_infer[n] / (1 - beta_infer[n]).sqrt()
c1 = alpha_next / alpha_infer[n]
c2 = -(1 - alpha_infer[n] ** 2.).sqrt() * c1
c3 = (1 - alpha_next ** 2.).sqrt()
x = c1 * x + c2 * epsilon_theta + c3 * epsilon_theta # std_normal(size)
else:
x -= beta_infer[n] / torch.sqrt(1 - alpha_infer[n] ** 2.) * epsilon_theta
x /= torch.sqrt(1 - beta_infer[n])
if n > 0:
x = x + sigma_infer[n] * std_normal(size)
if return_sequence:
x_ = copy.deepcopy(x)
xs.append(x_)
if return_sequence:
return xs
return x
def noise_scheduling(net, size, diffusion_hyperparams, condition=None, ddim=False):
"""
Perform the complete sampling step according to p(x_0|x_T) = \prod_{t=1}^T p_{\theta}(x_{t-1}|x_t)
Parameters:
net (torch network): the wavenet models
size (tuple): size of tensor to be generated,
usually is (number of audios to generate, channels=1, length of audio)
diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams
note, the tensors need to be cuda tensors
condition (torch.tensor): ground truth mel spectrogram read from disk
None if used for unconditional generation
Returns:
noise schedule: a list of noise scales in torch.tensor, length <= N
"""
_dh = diffusion_hyperparams
N, betaN, alphaN, rho, alpha = _dh["N"], _dh["betaN"], _dh["alphaN"], _dh["rho"], _dh["alpha"]
print('begin noise scheduling, maximum number of reverse steps = %d' % (N))
betas = []
x = std_normal(size)
with torch.no_grad():
beta_cur = torch.ones(1, 1, 1).cuda() * betaN
alpha_cur = torch.ones(1, 1, 1).cuda() * alphaN
for n in range(N - 1, -1, -1):
# print(n, beta_cur.squeeze().item(), alpha_cur.squeeze().item())
step = map_noise_scale_to_time_step(alpha_cur.squeeze().item(), alpha)
if step >= 0:
betas.append(beta_cur.squeeze().item())
diffusion_steps = (step * torch.ones((size[0], 1))).cuda()
epsilon_theta = net((x, condition, diffusion_steps,))
if ddim:
alpha_nxt = alpha_cur / (1 - beta_cur).sqrt()
c1 = alpha_nxt / alpha_cur
c2 = -(1 - alpha_cur ** 2.).sqrt() * c1
c3 = (1 - alpha_nxt ** 2.).sqrt()
x = c1 * x + c2 * epsilon_theta + c3 * epsilon_theta # std_normal(size)
else:
x -= beta_cur / torch.sqrt(1 - alpha_cur ** 2.) * epsilon_theta
x /= torch.sqrt(1 - beta_cur)
alpha_nxt, beta_nxt = alpha_cur, beta_cur
alpha_cur = alpha_nxt / (1 - beta_nxt).sqrt()
if alpha_cur > 1:
break
beta_cur = net.noise_pred(
x.squeeze(1), (beta_nxt.view(-1, 1), (1 - alpha_cur ** 2.).view(-1, 1)))
if beta_cur.squeeze().item() < rho:
break
return torch.FloatTensor(betas[::-1]).cuda()
def theta_timestep_loss(net, X, diffusion_hyperparams, reverse=False):
"""
Compute the training loss for learning theta
Parameters:
net (torch network): the wavenet models
X (tuple, shape=(2,)): training data in tuple form (mel_spectrograms, audios)
mel_spectrograms: torch.tensor, shape is batchsize followed by each mel_spectrogram shape
audios: torch.tensor, shape=(batchsize, 1, length of audio)
diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams
note, the tensors need to be cuda tensors
Returns:
theta loss
"""
assert type(X) == tuple and len(X) == 2
loss_fn = nn.MSELoss()
_dh = diffusion_hyperparams
T, alpha = _dh["T"], _dh["alpha"]
mel_spectrogram, audio = X
B, C, L = audio.shape # B is batchsize, C=1, L is audio length
ts = torch.randint(T, size=(B, 1, 1)).cuda() # randomly sample steps from 1~T
z = std_normal(audio.shape)
delta = (1 - alpha[ts] ** 2.).sqrt()
alpha_cur = alpha[ts]
noisy_audio = alpha_cur * audio + delta * z # compute x_t from q(x_t|x_0)
epsilon_theta = net((noisy_audio, mel_spectrogram, ts.view(B, 1),))
if reverse:
x0 = (noisy_audio - delta * epsilon_theta) / alpha_cur
return loss_fn(epsilon_theta, z), x0
return loss_fn(epsilon_theta, z)
def phi_loss(net, X, diffusion_hyperparams):
"""
Compute the training loss for learning phi
Parameters:
net (torch network): the wavenet models
X (tuple, shape=(2,)): training data in tuple form (mel_spectrograms, audios)
mel_spectrograms: torch.tensor, shape is batchsize followed by each mel_spectrogram shape
audios: torch.tensor, shape=(batchsize, 1, length of audio)
diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams
note, the tensors need to be cuda tensors
Returns:
phi loss
"""
assert type(X) == tuple and len(X) == 2
_dh = diffusion_hyperparams
T, alpha, tau = _dh["T"], _dh["alpha"], _dh["tau"]
mel_spectrogram, audio = X
B, C, L = audio.shape # B is batchsize, C=1, L is audio length
ts = torch.randint(tau, T - tau, size=(B,)).cuda() # randomly sample steps from 1~T
alpha_cur = alpha.index_select(0, ts).view(B, 1, 1)
alpha_nxt = alpha.index_select(0, ts + tau).view(B, 1, 1)
beta_nxt = 1 - (alpha_nxt / alpha_cur) ** 2.
delta = (1 - alpha_cur ** 2.).sqrt()
z = std_normal(audio.shape)
noisy_audio = alpha_cur * audio + delta * z # compute x_t from q(x_t|x_0)
epsilon_theta = net((noisy_audio, mel_spectrogram, ts.view(B, 1),))
beta_est = net.noise_pred(noisy_audio.squeeze(1), (beta_nxt.view(B, 1), delta.view(B, 1) ** 2.))
phi_loss = 1 / (2. * (delta ** 2. - beta_est)) * (
delta * z - beta_est / delta * epsilon_theta) ** 2.
phi_loss += torch.log(1e-8 + delta ** 2. / (beta_est + 1e-8)) / 4.
phi_loss = (torch.mean(phi_loss, -1, keepdim=True) + beta_est / delta ** 2 / 2.).mean()
return phi_loss
def compute_hyperparams_given_schedule(beta):
"""
Compute diffusion process hyperparameters
Parameters:
beta (tensor): beta schedule
Returns:
a dictionary of diffusion hyperparameters including:
T (int), beta/alpha/sigma (torch.tensor on cpu, shape=(T, ))
These cpu tensors are changed to cuda tensors on each individual gpu
"""
T = len(beta)
alpha = 1 - beta
sigma = beta + 0
for t in range(1, T):
alpha[t] *= alpha[t - 1] # \alpha^2_t = \prod_{s=1}^t (1-\beta_s)
sigma[t] *= (1 - alpha[t - 1]) / (1 - alpha[t]) # \sigma^2_t = \beta_t * (1-\alpha_{t-1}) / (1-\alpha_t)
alpha = torch.sqrt(alpha)
sigma = torch.sqrt(sigma)
_dh = {}
_dh["T"], _dh["beta"], _dh["alpha"], _dh["sigma"] = T, beta, alpha, sigma
diffusion_hyperparams = _dh
return diffusion_hyperparams
def map_noise_scale_to_time_step(alpha_infer, alpha):
if alpha_infer < alpha[-1]:
return len(alpha) - 1
if alpha_infer > alpha[0]:
return 0
for t in range(len(alpha) - 1):
if alpha[t+1] <= alpha_infer <= alpha[t]:
step_diff = alpha[t] - alpha_infer
step_diff /= alpha[t] - alpha[t+1]
return t + step_diff.item()
return -1
def calc_diffusion_step_embedding(diffusion_steps, diffusion_step_embed_dim_in):
"""
Embed a diffusion step $t$ into a higher dimensional space
E.g. the embedding vector in the 128-dimensional space is
[sin(t * 10^(0*4/63)), ... , sin(t * 10^(63*4/63)), cos(t * 10^(0*4/63)), ... , cos(t * 10^(63*4/63))]
Parameters:
diffusion_steps (torch.long tensor, shape=(batchsize, 1)):
diffusion steps for batch data
diffusion_step_embed_dim_in (int, default=128):
dimensionality of the embedding space for discrete diffusion steps
Returns:
the embedding vectors (torch.tensor, shape=(batchsize, diffusion_step_embed_dim_in)):
"""
assert diffusion_step_embed_dim_in % 2 == 0
half_dim = diffusion_step_embed_dim_in // 2
_embed = np.log(10000) / (half_dim - 1)
_embed = torch.exp(torch.arange(half_dim) * -_embed)
_embed = diffusion_steps * _embed
diffusion_step_embed = torch.cat((torch.sin(_embed),
torch.cos(_embed)), 1)
return diffusion_step_embed