ProDiff / utils /__init__.py
Rongjiehuang's picture
init
64e7f2f
raw
history blame
8.58 kB
import time
import sys
import types
import chardet
import numpy as np
import torch
import torch.distributed as dist
from utils.ckpt_utils import load_ckpt
def reduce_tensors(metrics):
new_metrics = {}
for k, v in metrics.items():
if isinstance(v, torch.Tensor):
dist.all_reduce(v)
v = v / dist.get_world_size()
if type(v) is dict:
v = reduce_tensors(v)
new_metrics[k] = v
return new_metrics
def tensors_to_scalars(tensors):
if isinstance(tensors, torch.Tensor):
tensors = tensors.item()
return tensors
elif isinstance(tensors, dict):
new_tensors = {}
for k, v in tensors.items():
v = tensors_to_scalars(v)
new_tensors[k] = v
return new_tensors
elif isinstance(tensors, list):
return [tensors_to_scalars(v) for v in tensors]
else:
return tensors
def tensors_to_np(tensors):
if isinstance(tensors, dict):
new_np = {}
for k, v in tensors.items():
if isinstance(v, torch.Tensor):
v = v.cpu().numpy()
if type(v) is dict:
v = tensors_to_np(v)
new_np[k] = v
elif isinstance(tensors, list):
new_np = []
for v in tensors:
if isinstance(v, torch.Tensor):
v = v.cpu().numpy()
if type(v) is dict:
v = tensors_to_np(v)
new_np.append(v)
elif isinstance(tensors, torch.Tensor):
v = tensors
if isinstance(v, torch.Tensor):
v = v.cpu().numpy()
if type(v) is dict:
v = tensors_to_np(v)
new_np = v
else:
raise Exception(f'tensors_to_np does not support type {type(tensors)}.')
return new_np
def move_to_cpu(tensors):
ret = {}
for k, v in tensors.items():
if isinstance(v, torch.Tensor):
v = v.cpu()
if type(v) is dict:
v = move_to_cpu(v)
ret[k] = v
return ret
def move_to_cuda(batch, gpu_id=0):
# base case: object can be directly moved using `cuda` or `to`
if callable(getattr(batch, 'cuda', None)):
return batch.cuda(gpu_id, non_blocking=True)
elif callable(getattr(batch, 'to', None)):
return batch.to(torch.device('cuda', gpu_id), non_blocking=True)
elif isinstance(batch, list):
for i, x in enumerate(batch):
batch[i] = move_to_cuda(x, gpu_id)
return batch
elif isinstance(batch, tuple):
batch = list(batch)
for i, x in enumerate(batch):
batch[i] = move_to_cuda(x, gpu_id)
return tuple(batch)
elif isinstance(batch, dict):
for k, v in batch.items():
batch[k] = move_to_cuda(v, gpu_id)
return batch
return batch
class AvgrageMeter(object):
def __init__(self):
self.reset()
def reset(self):
self.avg = 0
self.sum = 0
self.cnt = 0
def update(self, val, n=1):
self.sum += val * n
self.cnt += n
self.avg = self.sum / self.cnt
def collate_1d(values, pad_idx=0, left_pad=False, shift_right=False, max_len=None, shift_id=1):
"""Convert a list of 1d tensors into a padded 2d tensor."""
size = max(v.size(0) for v in values) if max_len is None else max_len
res = values[0].new(len(values), size).fill_(pad_idx)
def copy_tensor(src, dst):
assert dst.numel() == src.numel()
if shift_right:
dst[1:] = src[:-1]
dst[0] = shift_id
else:
dst.copy_(src)
for i, v in enumerate(values):
copy_tensor(v, res[i][size - len(v):] if left_pad else res[i][:len(v)])
return res
def collate_2d(values, pad_idx=0, left_pad=False, shift_right=False, max_len=None):
"""Convert a list of 2d tensors into a padded 3d tensor."""
size = max(v.size(0) for v in values) if max_len is None else max_len
res = values[0].new(len(values), size, values[0].shape[1]).fill_(pad_idx)
def copy_tensor(src, dst):
assert dst.numel() == src.numel()
if shift_right:
dst[1:] = src[:-1]
else:
dst.copy_(src)
for i, v in enumerate(values):
copy_tensor(v, res[i][size - len(v):] if left_pad else res[i][:len(v)])
return res
def _is_batch_full(batch, num_tokens, max_tokens, max_sentences):
if len(batch) == 0:
return 0
if len(batch) == max_sentences:
return 1
if num_tokens > max_tokens:
return 1
return 0
def batch_by_size(
indices, num_tokens_fn, max_tokens=None, max_sentences=None,
required_batch_size_multiple=1, distributed=False
):
"""
Yield mini-batches of indices bucketed by size. Batches may contain
sequences of different lengths.
Args:
indices (List[int]): ordered list of dataset indices
num_tokens_fn (callable): function that returns the number of tokens at
a given index
max_tokens (int, optional): max number of tokens in each batch
(default: None).
max_sentences (int, optional): max number of sentences in each
batch (default: None).
required_batch_size_multiple (int, optional): require batch size to
be a multiple of N (default: 1).
"""
max_tokens = max_tokens if max_tokens is not None else sys.maxsize
max_sentences = max_sentences if max_sentences is not None else sys.maxsize
bsz_mult = required_batch_size_multiple
if isinstance(indices, types.GeneratorType):
indices = np.fromiter(indices, dtype=np.int64, count=-1)
sample_len = 0
sample_lens = []
batch = []
batches = []
for i in range(len(indices)):
idx = indices[i]
num_tokens = num_tokens_fn(idx)
sample_lens.append(num_tokens)
sample_len = max(sample_len, num_tokens)
assert sample_len <= max_tokens, (
"sentence at index {} of size {} exceeds max_tokens "
"limit of {}!".format(idx, sample_len, max_tokens)
)
num_tokens = (len(batch) + 1) * sample_len
if _is_batch_full(batch, num_tokens, max_tokens, max_sentences):
mod_len = max(
bsz_mult * (len(batch) // bsz_mult),
len(batch) % bsz_mult,
)
batches.append(batch[:mod_len])
batch = batch[mod_len:]
sample_lens = sample_lens[mod_len:]
sample_len = max(sample_lens) if len(sample_lens) > 0 else 0
batch.append(idx)
if len(batch) > 0:
batches.append(batch)
return batches
def unpack_dict_to_list(samples):
samples_ = []
bsz = samples.get('outputs').size(0)
for i in range(bsz):
res = {}
for k, v in samples.items():
try:
res[k] = v[i]
except:
pass
samples_.append(res)
return samples_
def remove_padding(x, padding_idx=0):
if x is None:
return None
assert len(x.shape) in [1, 2]
if len(x.shape) == 2: # [T, H]
return x[np.abs(x).sum(-1) != padding_idx]
elif len(x.shape) == 1: # [T]
return x[x != padding_idx]
class Timer:
timer_map = {}
def __init__(self, name, enable=False):
if name not in Timer.timer_map:
Timer.timer_map[name] = 0
self.name = name
self.enable = enable
def __enter__(self):
if self.enable:
if torch.cuda.is_available():
torch.cuda.synchronize()
self.t = time.time()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.enable:
if torch.cuda.is_available():
torch.cuda.synchronize()
Timer.timer_map[self.name] += time.time() - self.t
if self.enable:
print(f'[Timer] {self.name}: {Timer.timer_map[self.name]}')
def print_arch(model, model_name='model'):
print(f"| {model_name} Arch: ", model)
num_params(model, model_name=model_name)
def num_params(model, print_out=True, model_name="model"):
parameters = filter(lambda p: p.requires_grad, model.parameters())
parameters = sum([np.prod(p.size()) for p in parameters]) / 1_000_000
if print_out:
print(f'| {model_name} Trainable Parameters: %.3fM' % parameters)
return parameters
def get_encoding(file):
with open(file, 'rb') as f:
encoding = chardet.detect(f.read())['encoding']
if encoding == 'GB2312':
encoding = 'GB18030'
return encoding