meta-demo-app / utils.py
BecomeAllan
init_comit
6755d15
raw
history blame
14.7 kB
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
LABEL_MAP = {'negative': 0,
'not included':0,
'0':0,
0:0,
'excluded':0,
'positive': 1,
'included':1,
'1':1,
1:1,
}
class SLR_DataSet(Dataset):
def __init__(self,
treat_text =None,
etailment_txt =None,
LABEL_MAP= None,
NA = None,
**args):
self.tokenizer = args.get('tokenizer')
self.data = args.get('data').reset_index()
self.max_seq_length = args.get("max_seq_length", 512)
self.INPUT_NAME = args.get("input", 'x')
self.LABEL_NAME = args.get("output", None)
self.treat_text = treat_text
self.etailment_txt = etailment_txt
self.LABEL_MAP=LABEL_MAP
self.NA=NA
if not self.INPUT_NAME in self.data.columns:
self.data[self.INPUT_NAME] = np.nan
# Tokenizing and processing text
def encode_text(self, example):
comment_text = example[self.INPUT_NAME]
if not isinstance(self.treat_text,type(None)):
comment_text = self.treat_text(comment_text)
if example[self.LABEL_NAME] is np.NaN and self.NA != None:
labels = self.NA
elif self.LABEL_NAME != None:
try:
labels = self.LABEL_MAP[example[self.LABEL_NAME]]
except:
labels = -1
# raise TypeError(f"Label passed {example[self.LABEL_NAME]}, is not be in LABEL_MAP")
# print('Not handle LABEL_MAP')
else:
labels = None
if self.etailment_txt:
tensor_data = self.tokenize((comment_text, self.etailment_txt), labels )
else:
tensor_data = self.tokenize((comment_text), labels)
return tensor_data
def tokenize(self, comment_text, labels):
encoding = self.tokenizer.encode_plus(
(comment_text),
add_special_tokens=True,
max_length=self.max_seq_length,
return_token_type_ids=True,
padding="max_length",
truncation=True,
return_attention_mask=True,
return_tensors='pt',
)
if labels != None:
return tuple(((
encoding["input_ids"].flatten(),
encoding["attention_mask"].flatten(),
encoding["token_type_ids"].flatten()
),
torch.tensor([torch.tensor(labels).to(int)])
))
else:
return tuple(((
encoding["input_ids"].flatten(),
encoding["attention_mask"].flatten(),
encoding["token_type_ids"].flatten()
),
torch.empty(0)
))
def __len__(self):
return len(self.data)
# Returning data
def __getitem__(self, index: int):
# print(index)
data_row = self.data.iloc[index]
tensor_data = self.encode_text(data_row)
return tensor_data
from tqdm import tqdm
import gc
from IPython.display import clear_output
from collections import namedtuple
features = namedtuple('features', ['bert', 'feature_map'])
Output = namedtuple('Output', ['loss', 'features', 'logit'])
bert_tuple = namedtuple('bert',['hidden_states', 'attentions'])
class loop():
@classmethod
def train_loop(self, model,device, optimizer, data_train_loader, scheduler = None, data_valid_loader = None,
epochs = 4, print_info = 1000000000, metrics = True, log = None, metrics_print = True):
# Start the model's parameters
table.reset()
model.to(device)
model.train()
# Task epochs (Inner epochs)
for epoch in range(0, epochs):
train_loss, _, out = self.batch_loop(data_train_loader, model, optimizer, device)
if scheduler is not None:
for sched in scheduler:
sched.step()
if (epoch % print_info == 0):
if metrics:
labels = self.map_batch(out[1]).to(int).squeeze()
logits = self.map_batch(out[0]).squeeze()
train_metrics, _ = plot(logits, labels, 0.9)
del labels, logits
train_metrics['Loss'] = torch.Tensor(train_loss).mean().item()
if not isinstance(log,type(None)):
log({"train_"+ x :y for x,y in train_metrics.items()})
table(train_metrics, epoch, "Train")
else:
print("Loss: ", torch.Tensor(train_loss).mean().item())
if data_valid_loader:
valid_loss, _, out = self.eval_loop(data_valid_loader, model, device=device)
if metrics:
global out2
out2 = out
labels = self.map_batch(out[1]).to(int).squeeze()
logits = self.map_batch(out[0]).squeeze()
valid_metrics, _ = plot(logits, labels, 0.9)
valid_metrics['Loss'] = torch.Tensor(valid_loss).mean().item()
del labels, logits
if not isinstance(log,type(None)):
log({"valid_"+ x :y for x,y in train_metrics.items()})
table(valid_metrics, epoch, "Valid")
if metrics_print:
print(table.data_frame().round(4))
else:
print("Valid Loss: ", torch.Tensor(valid_loss).mean().item())
return table.data_frame()
@classmethod
def batch_loop(self, loader, model, optimizer, device):
all_loss = []
features_lst = []
attention_lst = []
logits = []
outputs = []
# Test's Batch loop
for inner_step, batch in enumerate(tqdm(loader,
desc="Train validation | ",
ncols=80)) :
input, output =batch
input = tuple(t.to(device) for t in input)
if isinstance(output, torch.Tensor):
output = output.to(device)
optimizer.zero_grad()
# Predictions
loss, feature, logit = model(input, output)
# compute grads
loss.backward()
# update parameters
optimizer.step()
input = tuple(t.to("cpu") for t in input)
if isinstance(output, torch.Tensor):
output = output.to("cpu")
if isinstance(loss, torch.Tensor):
all_loss.append(loss.to('cpu').detach().clone())
if isinstance(logit, torch.Tensor):
logits.append(logit.to('cpu').detach().clone())
if isinstance(output, torch.Tensor):
outputs.append(output.to('cpu').detach().clone())
if len(feature.feature_map)!=0:
features_lst.append([x.to('cpu').detach().clone() for x in feature.feature_map])
del batch, input, output, loss, feature, logit
# model.to('cpu')
gc.collect()
torch.cuda.empty_cache()
# del model, optimizer
return Output(all_loss, features(None,features_lst), (logits, outputs))
@classmethod
def eval_loop(self, loader, model, device, attention= False, hidden_states=False):
all_loss = []
features_lst = []
attention_lst = []
hidden_states_lst = []
logits = []
outputs = []
model.eval()
with torch.no_grad():
# Test's Batch loop
for inner_step, batch in enumerate(tqdm(loader,
desc="Test validation | ",
ncols=80)) :
input, output =batch
input = tuple(t.to(device) for t in input)
if output.numel()!=0:
# Predictions
loss, feature, logit = model(input, output.to(device),
attention= attention, hidden_states=hidden_states)
else:
# Predictions
loss, feature, logit = model(input,
attention= attention, hidden_states=hidden_states)
input = tuple(t.to("cpu") for t in input)
if isinstance(output, torch.Tensor):
output = output.to("cpu")
if isinstance(loss, torch.Tensor):
all_loss.append(loss.to('cpu').detach().clone())
if isinstance(logit, torch.Tensor):
logits.append(logit.to('cpu').detach().clone())
try:
if not isinstance(feature.bert.attentions, type(None)):
attention_lst.append([x.to('cpu').detach().clone() for x in feature.bert.attentions])
except:
attention_lst = None
try:
if not isinstance(feature.bert.hidden_states, type(None)):
hidden_states_lst.append([x.to('cpu').detach().clone() for x in feature.bert.hidden_states])
except:
hidden_states_lst = None
if isinstance(output, torch.Tensor):
outputs.append(output.to('cpu').detach().clone())
if len(feature.feature_map)!=0:
features_lst.append([x.to('cpu').detach().clone() for x in feature.feature_map])
del batch, input, output, loss, feature, logit
# model.to('cpu')
gc.collect()
torch.cuda.empty_cache()
# del model, optimizer
return Output(all_loss, features(bert_tuple(hidden_states_lst,attention_lst),features_lst), (logits, outputs))
# Process predictions and map the feature_map in tsne
@staticmethod
def map_batch(features):
features = torch.cat(features, dim =0)
# features = np.concatenate(np.array(features,dtype=object)).astype(np.float32)
# features = torch.tensor(features)
return features.detach().clone()
class table:
data = []
index = []
@torch.no_grad()
def __init__(self, data, epochs, name):
self.index.append((epochs, name))
self.data.append(data)
@classmethod
@torch.no_grad()
def data_frame(cls):
clear_output()
index = pd.MultiIndex.from_tuples(cls.index, names=["Epochs", "Data"])
data = pd.DataFrame(cls.data, index=index)
return data
@classmethod
@torch.no_grad()
def reset(cls):
cls.data = []
cls.index = []
from collections import namedtuple
# Declaring namedtuple()
# Pre-trained model
class Encoder(nn.Module):
def __init__(self, layers, freeze_bert, model):
super(Encoder, self).__init__()
# Dummy Parameter
self.dummy_param = nn.Parameter(torch.empty(0))
# Pre-trained model
self.model = deepcopy(model)
# Freezing bert parameters
if freeze_bert:
for param in self.model.parameters():
param.requires_grad = freeze_bert
# Selecting hidden layers of the pre-trained model
old_model_encoder = self.model.encoder.layer
new_model_encoder = nn.ModuleList()
for i in layers:
new_model_encoder.append(old_model_encoder[i])
self.model.encoder.layer = new_model_encoder
# Feed forward
def forward(self, output_attentions=False,output_hidden_states=False, **x):
return self.model(output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
**x)
# Complete model
class SLR_Classifier(nn.Module):
def __init__(self, **data):
super(SLR_Classifier, self).__init__()
# Dummy Parameter
self.dummy_param = nn.Parameter(torch.empty(0))
# Loss function
# Binary Cross Entropy with logits reduced to mean
self.loss_fn = nn.BCEWithLogitsLoss(reduction = 'mean',
pos_weight=torch.FloatTensor([data.get("pos_weight", 2.5)]))
# Pre-trained model
self.Encoder = Encoder(layers = data.get("bert_layers", range(12)),
freeze_bert = data.get("freeze_bert", False),
model = data.get("model"),
)
# Feature Map Layer
self.feature_map = nn.Sequential(
# nn.LayerNorm(self.Encoder.model.config.hidden_size),
nn.BatchNorm1d(self.Encoder.model.config.hidden_size),
# nn.Dropout(data.get("drop", 0.5)),
nn.Linear(self.Encoder.model.config.hidden_size, 200),
nn.Dropout(data.get("drop", 0.5)),
)
# Classifier Layer
self.classifier = nn.Sequential(
# nn.LayerNorm(self.Encoder.model.config.hidden_size),
# nn.Dropout(data.get("drop", 0.5)),
# nn.BatchNorm1d(self.Encoder.model.config.hidden_size),
# nn.Dropout(data.get("drop", 0.5)),
nn.Tanh(),
nn.Linear(200, 1)
)
# Initializing layer parameters
nn.init.normal_(self.feature_map[1].weight, mean=0, std=0.00001)
nn.init.zeros_(self.feature_map[1].bias)
# Feed forward
def forward(self, input, output=None, attention= False, hidden_states=False):
# input, output = batch
input_ids, attention_mask, token_type_ids = input
predict = self.Encoder(output_attentions=attention,
output_hidden_states=hidden_states,
**{"input_ids":input_ids,
"attention_mask":attention_mask,
"token_type_ids":token_type_ids
})
feature_maped = self.feature_map(predict['pooler_output'])
# print(feature_maped)
logit = self.classifier(feature_maped)
# predict = torch.sigmoid(logit)
if not isinstance(output, type(None)):
# Loss function
loss = self.loss_fn(logit.to(torch.float), output.to(torch.float))
return Output(loss, features(predict, feature_maped), logit)
else:
return Output(None, features(predict, feature_maped), logit)
def fit(self, optimizer, data_train_loader, scheduler = None, data_valid_loader = None,
epochs = 4, print_info = 1000000000, metrics = True, log = None, metrics_print = True):
return loop.train_loop(self,
device = self.dummy_param.device,
optimizer=optimizer,
scheduler= scheduler,
data_train_loader=data_train_loader,
data_valid_loader= data_valid_loader,
epochs = epochs,
print_info = print_info,
metrics = metrics,
log= log,
metrics_print=metrics_print)
def evaluate(self, loader, attention= False, hidden_states=False):
# global feature
all_loss, feature, (logits, outputs) = loop.eval_loop(loader, self, self.dummy_param.device,
attention= attention, hidden_states=hidden_states)
logits = loop.map_batch(logits)
if len(outputs) != 0:
outputs = loop.map_batch(outputs)
return Output(np.mean(all_loss), feature, (logits, outputs))