Spaces:
Runtime error
Runtime error
import os | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import copy | |
from tqdm import tqdm | |
def flatten(v): | |
""" | |
Flatten a list of lists/tuples | |
""" | |
return [x for y in v for x in y] | |
def rescale(x): | |
""" | |
Rescale a tensor to 0-1 | |
""" | |
return (x - x.min()) / (x.max() - x.min()) | |
def find_max_epoch(path): | |
""" | |
Find maximum epoch/iteration in path, formatted ${n_iter}.pkl | |
E.g. 100000.pkl | |
Parameters: | |
path (str): checkpoint path | |
Returns: | |
maximum iteration, -1 if there is no (valid) checkpoint | |
""" | |
files = os.listdir(path) | |
epoch = -1 | |
for f in files: | |
if len(f) <= 4: | |
continue | |
if f[-4:] == '.pkl': | |
try: | |
epoch = max(epoch, int(f[:-4])) | |
except: | |
continue | |
#print(path, epoch, flush=True) | |
return epoch | |
def print_size(net): | |
""" | |
Print the number of parameters of a network | |
""" | |
if net is not None and isinstance(net, torch.nn.Module): | |
module_parameters = filter(lambda p: p.requires_grad, net.parameters()) | |
params = sum([np.prod(p.size()) for p in module_parameters]) | |
print("{} Parameters: {:.6f}M".format( | |
net.__class__.__name__, params / 1e6), flush=True) | |
# Utilities for diffusion models | |
def std_normal(size): | |
""" | |
Generate the standard Gaussian variable of a certain size | |
""" | |
return torch.normal(0, 1, size=size) | |
def calc_noise_scale_embedding(noise_scales, noise_scale_embed_dim_in): | |
""" | |
Embed a noise scale $t$ into a higher dimensional space | |
E.g. the embedding vector in the 128-dimensional space is | |
[sin(t * 10^(0*4/63)), ... , sin(t * 10^(63*4/63)), cos(t * 10^(0*4/63)), ... , cos(t * 10^(63*4/63))] | |
Parameters: | |
noise_scales (torch.long tensor, shape=(batchsize, 1)): | |
noise scales for batch data | |
noise_scale_embed_dim_in (int, default=128): | |
dimensionality of the embedding space for discrete noise scales | |
Returns: | |
the embedding vectors (torch.tensor, shape=(batchsize, noise_scale_embed_dim_in)): | |
""" | |
assert noise_scale_embed_dim_in % 2 == 0 | |
half_dim = noise_scale_embed_dim_in // 2 | |
_embed = np.log(10000) / (half_dim - 1) | |
_embed = torch.exp(torch.arange(half_dim) * -_embed) | |
_embed = noise_scales * _embed | |
noise_scale_embed = torch.cat((torch.sin(_embed), | |
torch.cos(_embed)), 1) | |
return noise_scale_embed | |
def calc_diffusion_hyperparams_given_beta(beta): | |
""" | |
Compute diffusion process hyperparameters | |
Parameters: | |
beta (tensor): beta schedule | |
Returns: | |
a dictionary of diffusion hyperparameters including: | |
T (int), beta/alpha/sigma (torch.tensor on cpu, shape=(T, )) | |
These cpu tensors are changed to cuda tensors on each individual gpu | |
""" | |
T = len(beta) | |
alpha = 1 - beta | |
sigma = beta + 0 | |
for t in range(1, T): | |
alpha[t] *= alpha[t-1] # \alpha^2_t = \prod_{s=1}^t (1-\beta_s) | |
sigma[t] *= (1-alpha[t-1]) / (1-alpha[t]) # \sigma^2_t = \beta_t * (1-\alpha_{t-1}) / (1-\alpha_t) | |
alpha = torch.sqrt(alpha) | |
sigma = torch.sqrt(sigma) | |
_dh = {} | |
_dh["T"], _dh["beta"], _dh["alpha"], _dh["sigma"] = T, beta, alpha, sigma | |
diffusion_hyperparams = _dh | |
return diffusion_hyperparams | |
def calc_diffusion_hyperparams(T, beta_0, beta_T, tau, N, beta_N, alpha_N, rho): | |
""" | |
Compute diffusion process hyperparameters | |
Parameters: | |
T (int): number of noise scales | |
beta_0 and beta_T (float): beta schedule start/end value, | |
where any beta_t in the middle is linearly interpolated | |
Returns: | |
a dictionary of diffusion hyperparameters including: | |
T (int), beta/alpha/sigma (torch.tensor on cpu, shape=(T, )) | |
These cpu tensors are changed to cuda tensors on each individual gpu | |
""" | |
beta = torch.linspace(beta_0, beta_T, T) | |
alpha = 1 - beta | |
sigma = beta + 0 | |
for t in range(1, T): | |
alpha[t] *= alpha[t-1] # \alpha^2_t = \prod_{s=1}^t (1-\beta_s) | |
sigma[t] *= (1-alpha[t-1]) / (1-alpha[t]) # \sigma^2_t = \beta_t * (1-\alpha_{t-1}) / (1-\alpha_t) | |
alpha = torch.sqrt(alpha) | |
sigma = torch.sqrt(sigma) | |
_dh = {} | |
_dh["T"], _dh["beta"], _dh["alpha"], _dh["sigma"] = T, beta, alpha, sigma | |
_dh["tau"], _dh["N"], _dh["betaN"], _dh["alphaN"], _dh["rho"] = tau, N, beta_N, alpha_N, rho | |
diffusion_hyperparams = _dh | |
return diffusion_hyperparams | |
def sampling_given_noise_schedule( | |
net, | |
size, | |
diffusion_hyperparams, | |
inference_noise_schedule, | |
condition=None, | |
ddim=False, | |
return_sequence=False): | |
""" | |
Perform the complete sampling step according to p(x_0|x_T) = \prod_{t=1}^T p_{\theta}(x_{t-1}|x_t) | |
Parameters: | |
net (torch network): the wavenet models | |
size (tuple): size of tensor to be generated, | |
usually is (number of audios to generate, channels=1, length of audio) | |
diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams | |
note, the tensors need to be cuda tensors | |
condition (torch.tensor): ground truth mel spectrogram read from disk | |
None if used for unconditional generation | |
Returns: | |
the generated audio(s) in torch.tensor, shape=size | |
""" | |
_dh = diffusion_hyperparams | |
T, alpha = _dh["T"], _dh["alpha"] | |
assert len(alpha) == T | |
assert len(size) == 3 | |
N = len(inference_noise_schedule) | |
beta_infer = inference_noise_schedule | |
alpha_infer = 1 - beta_infer | |
sigma_infer = beta_infer + 0 | |
for n in range(1, N): | |
alpha_infer[n] *= alpha_infer[n - 1] | |
sigma_infer[n] *= (1 - alpha_infer[n - 1]) / (1 - alpha_infer[n]) | |
alpha_infer = torch.sqrt(alpha_infer) | |
sigma_infer = torch.sqrt(sigma_infer) | |
# Mapping noise scales to time steps | |
steps_infer = [] | |
for n in range(N): | |
step = map_noise_scale_to_time_step(alpha_infer[n], alpha) | |
if step >= 0: | |
steps_infer.append(step) | |
steps_infer = torch.FloatTensor(steps_infer) | |
# N may change since alpha_infer can be out of the range of alpha | |
N = len(steps_infer) | |
x = std_normal(size) | |
if return_sequence: | |
x_ = copy.deepcopy(x) | |
xs = [x_] | |
with torch.no_grad(): | |
for n in tqdm(range(N - 1, -1, -1), desc='FastDiff sample time step', total=N): | |
diffusion_steps = (steps_infer[n] * torch.ones((size[0], 1))) | |
epsilon_theta = net((x, condition, diffusion_steps,)) | |
if ddim: | |
alpha_next = alpha_infer[n] / (1 - beta_infer[n]).sqrt() | |
c1 = alpha_next / alpha_infer[n] | |
c2 = -(1 - alpha_infer[n] ** 2.).sqrt() * c1 | |
c3 = (1 - alpha_next ** 2.).sqrt() | |
x = c1 * x + c2 * epsilon_theta + c3 * epsilon_theta # std_normal(size) | |
else: | |
x -= beta_infer[n] / torch.sqrt(1 - alpha_infer[n] ** 2.) * epsilon_theta | |
x /= torch.sqrt(1 - beta_infer[n]) | |
if n > 0: | |
x = x + sigma_infer[n] * std_normal(size) | |
if return_sequence: | |
x_ = copy.deepcopy(x) | |
xs.append(x_) | |
if return_sequence: | |
return xs | |
return x | |
def noise_scheduling(net, size, diffusion_hyperparams, condition=None, ddim=False): | |
""" | |
Perform the complete sampling step according to p(x_0|x_T) = \prod_{t=1}^T p_{\theta}(x_{t-1}|x_t) | |
Parameters: | |
net (torch network): the wavenet models | |
size (tuple): size of tensor to be generated, | |
usually is (number of audios to generate, channels=1, length of audio) | |
diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams | |
note, the tensors need to be cuda tensors | |
condition (torch.tensor): ground truth mel spectrogram read from disk | |
None if used for unconditional generation | |
Returns: | |
noise schedule: a list of noise scales in torch.tensor, length <= N | |
""" | |
_dh = diffusion_hyperparams | |
N, betaN, alphaN, rho, alpha = _dh["N"], _dh["betaN"], _dh["alphaN"], _dh["rho"], _dh["alpha"] | |
print('begin noise scheduling, maximum number of reverse steps = %d' % (N)) | |
betas = [] | |
x = std_normal(size) | |
with torch.no_grad(): | |
beta_cur = torch.ones(1, 1, 1).cuda() * betaN | |
alpha_cur = torch.ones(1, 1, 1).cuda() * alphaN | |
for n in range(N - 1, -1, -1): | |
# print(n, beta_cur.squeeze().item(), alpha_cur.squeeze().item()) | |
step = map_noise_scale_to_time_step(alpha_cur.squeeze().item(), alpha) | |
if step >= 0: | |
betas.append(beta_cur.squeeze().item()) | |
diffusion_steps = (step * torch.ones((size[0], 1))).cuda() | |
epsilon_theta = net((x, condition, diffusion_steps,)) | |
if ddim: | |
alpha_nxt = alpha_cur / (1 - beta_cur).sqrt() | |
c1 = alpha_nxt / alpha_cur | |
c2 = -(1 - alpha_cur ** 2.).sqrt() * c1 | |
c3 = (1 - alpha_nxt ** 2.).sqrt() | |
x = c1 * x + c2 * epsilon_theta + c3 * epsilon_theta # std_normal(size) | |
else: | |
x -= beta_cur / torch.sqrt(1 - alpha_cur ** 2.) * epsilon_theta | |
x /= torch.sqrt(1 - beta_cur) | |
alpha_nxt, beta_nxt = alpha_cur, beta_cur | |
alpha_cur = alpha_nxt / (1 - beta_nxt).sqrt() | |
if alpha_cur > 1: | |
break | |
beta_cur = net.noise_pred( | |
x.squeeze(1), (beta_nxt.view(-1, 1), (1 - alpha_cur ** 2.).view(-1, 1))) | |
if beta_cur.squeeze().item() < rho: | |
break | |
return torch.FloatTensor(betas[::-1]).cuda() | |
def theta_timestep_loss(net, X, diffusion_hyperparams, reverse=False): | |
""" | |
Compute the training loss for learning theta | |
Parameters: | |
net (torch network): the wavenet models | |
X (tuple, shape=(2,)): training data in tuple form (mel_spectrograms, audios) | |
mel_spectrograms: torch.tensor, shape is batchsize followed by each mel_spectrogram shape | |
audios: torch.tensor, shape=(batchsize, 1, length of audio) | |
diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams | |
note, the tensors need to be cuda tensors | |
Returns: | |
theta loss | |
""" | |
assert type(X) == tuple and len(X) == 2 | |
loss_fn = nn.MSELoss() | |
_dh = diffusion_hyperparams | |
T, alpha = _dh["T"], _dh["alpha"] | |
mel_spectrogram, audio = X | |
B, C, L = audio.shape # B is batchsize, C=1, L is audio length | |
ts = torch.randint(T, size=(B, 1, 1)).cuda() # randomly sample steps from 1~T | |
z = std_normal(audio.shape) | |
delta = (1 - alpha[ts] ** 2.).sqrt() | |
alpha_cur = alpha[ts] | |
noisy_audio = alpha_cur * audio + delta * z # compute x_t from q(x_t|x_0) | |
epsilon_theta = net((noisy_audio, mel_spectrogram, ts.view(B, 1),)) | |
if reverse: | |
x0 = (noisy_audio - delta * epsilon_theta) / alpha_cur | |
return loss_fn(epsilon_theta, z), x0 | |
return loss_fn(epsilon_theta, z) | |
def phi_loss(net, X, diffusion_hyperparams): | |
""" | |
Compute the training loss for learning phi | |
Parameters: | |
net (torch network): the wavenet models | |
X (tuple, shape=(2,)): training data in tuple form (mel_spectrograms, audios) | |
mel_spectrograms: torch.tensor, shape is batchsize followed by each mel_spectrogram shape | |
audios: torch.tensor, shape=(batchsize, 1, length of audio) | |
diffusion_hyperparams (dict): dictionary of diffusion hyperparameters returned by calc_diffusion_hyperparams | |
note, the tensors need to be cuda tensors | |
Returns: | |
phi loss | |
""" | |
assert type(X) == tuple and len(X) == 2 | |
_dh = diffusion_hyperparams | |
T, alpha, tau = _dh["T"], _dh["alpha"], _dh["tau"] | |
mel_spectrogram, audio = X | |
B, C, L = audio.shape # B is batchsize, C=1, L is audio length | |
ts = torch.randint(tau, T - tau, size=(B,)).cuda() # randomly sample steps from 1~T | |
alpha_cur = alpha.index_select(0, ts).view(B, 1, 1) | |
alpha_nxt = alpha.index_select(0, ts + tau).view(B, 1, 1) | |
beta_nxt = 1 - (alpha_nxt / alpha_cur) ** 2. | |
delta = (1 - alpha_cur ** 2.).sqrt() | |
z = std_normal(audio.shape) | |
noisy_audio = alpha_cur * audio + delta * z # compute x_t from q(x_t|x_0) | |
epsilon_theta = net((noisy_audio, mel_spectrogram, ts.view(B, 1),)) | |
beta_est = net.noise_pred(noisy_audio.squeeze(1), (beta_nxt.view(B, 1), delta.view(B, 1) ** 2.)) | |
phi_loss = 1 / (2. * (delta ** 2. - beta_est)) * ( | |
delta * z - beta_est / delta * epsilon_theta) ** 2. | |
phi_loss += torch.log(1e-8 + delta ** 2. / (beta_est + 1e-8)) / 4. | |
phi_loss = (torch.mean(phi_loss, -1, keepdim=True) + beta_est / delta ** 2 / 2.).mean() | |
return phi_loss | |
def compute_hyperparams_given_schedule(beta): | |
""" | |
Compute diffusion process hyperparameters | |
Parameters: | |
beta (tensor): beta schedule | |
Returns: | |
a dictionary of diffusion hyperparameters including: | |
T (int), beta/alpha/sigma (torch.tensor on cpu, shape=(T, )) | |
These cpu tensors are changed to cuda tensors on each individual gpu | |
""" | |
T = len(beta) | |
alpha = 1 - beta | |
sigma = beta + 0 | |
for t in range(1, T): | |
alpha[t] *= alpha[t - 1] # \alpha^2_t = \prod_{s=1}^t (1-\beta_s) | |
sigma[t] *= (1 - alpha[t - 1]) / (1 - alpha[t]) # \sigma^2_t = \beta_t * (1-\alpha_{t-1}) / (1-\alpha_t) | |
alpha = torch.sqrt(alpha) | |
sigma = torch.sqrt(sigma) | |
_dh = {} | |
_dh["T"], _dh["beta"], _dh["alpha"], _dh["sigma"] = T, beta, alpha, sigma | |
diffusion_hyperparams = _dh | |
return diffusion_hyperparams | |
def map_noise_scale_to_time_step(alpha_infer, alpha): | |
if alpha_infer < alpha[-1]: | |
return len(alpha) - 1 | |
if alpha_infer > alpha[0]: | |
return 0 | |
for t in range(len(alpha) - 1): | |
if alpha[t+1] <= alpha_infer <= alpha[t]: | |
step_diff = alpha[t] - alpha_infer | |
step_diff /= alpha[t] - alpha[t+1] | |
return t + step_diff.item() | |
return -1 | |
def calc_diffusion_step_embedding(diffusion_steps, diffusion_step_embed_dim_in): | |
""" | |
Embed a diffusion step $t$ into a higher dimensional space | |
E.g. the embedding vector in the 128-dimensional space is | |
[sin(t * 10^(0*4/63)), ... , sin(t * 10^(63*4/63)), cos(t * 10^(0*4/63)), ... , cos(t * 10^(63*4/63))] | |
Parameters: | |
diffusion_steps (torch.long tensor, shape=(batchsize, 1)): | |
diffusion steps for batch data | |
diffusion_step_embed_dim_in (int, default=128): | |
dimensionality of the embedding space for discrete diffusion steps | |
Returns: | |
the embedding vectors (torch.tensor, shape=(batchsize, diffusion_step_embed_dim_in)): | |
""" | |
assert diffusion_step_embed_dim_in % 2 == 0 | |
half_dim = diffusion_step_embed_dim_in // 2 | |
_embed = np.log(10000) / (half_dim - 1) | |
_embed = torch.exp(torch.arange(half_dim) * -_embed) | |
_embed = diffusion_steps * _embed | |
diffusion_step_embed = torch.cat((torch.sin(_embed), | |
torch.cos(_embed)), 1) | |
return diffusion_step_embed |