Spaces:
Build error
Build error
import torch | |
import torch.nn as nn | |
from torch.utils.data import Dataset, DataLoader | |
LABEL_MAP = {'negative': 0, | |
'not included':0, | |
'0':0, | |
0:0, | |
'excluded':0, | |
'positive': 1, | |
'included':1, | |
'1':1, | |
1:1, | |
} | |
class SLR_DataSet(Dataset): | |
def __init__(self, | |
treat_text =None, | |
etailment_txt =None, | |
LABEL_MAP= None, | |
NA = None, | |
**args): | |
self.tokenizer = args.get('tokenizer') | |
self.data = args.get('data').reset_index() | |
self.max_seq_length = args.get("max_seq_length", 512) | |
self.INPUT_NAME = args.get("input", 'x') | |
self.LABEL_NAME = args.get("output", None) | |
self.treat_text = treat_text | |
self.etailment_txt = etailment_txt | |
self.LABEL_MAP=LABEL_MAP | |
self.NA=NA | |
if not self.INPUT_NAME in self.data.columns: | |
self.data[self.INPUT_NAME] = np.nan | |
# Tokenizing and processing text | |
def encode_text(self, example): | |
comment_text = example[self.INPUT_NAME] | |
if not isinstance(self.treat_text,type(None)): | |
comment_text = self.treat_text(comment_text) | |
if example[self.LABEL_NAME] is np.NaN and self.NA != None: | |
labels = self.NA | |
elif self.LABEL_NAME != None: | |
try: | |
labels = self.LABEL_MAP[example[self.LABEL_NAME]] | |
except: | |
labels = -1 | |
# raise TypeError(f"Label passed {example[self.LABEL_NAME]}, is not be in LABEL_MAP") | |
# print('Not handle LABEL_MAP') | |
else: | |
labels = None | |
if self.etailment_txt: | |
tensor_data = self.tokenize((comment_text, self.etailment_txt), labels ) | |
else: | |
tensor_data = self.tokenize((comment_text), labels) | |
return tensor_data | |
def tokenize(self, comment_text, labels): | |
encoding = self.tokenizer.encode_plus( | |
(comment_text), | |
add_special_tokens=True, | |
max_length=self.max_seq_length, | |
return_token_type_ids=True, | |
padding="max_length", | |
truncation=True, | |
return_attention_mask=True, | |
return_tensors='pt', | |
) | |
if labels != None: | |
return tuple((( | |
encoding["input_ids"].flatten(), | |
encoding["attention_mask"].flatten(), | |
encoding["token_type_ids"].flatten() | |
), | |
torch.tensor([torch.tensor(labels).to(int)]) | |
)) | |
else: | |
return tuple((( | |
encoding["input_ids"].flatten(), | |
encoding["attention_mask"].flatten(), | |
encoding["token_type_ids"].flatten() | |
), | |
torch.empty(0) | |
)) | |
def __len__(self): | |
return len(self.data) | |
# Returning data | |
def __getitem__(self, index: int): | |
# print(index) | |
data_row = self.data.iloc[index] | |
tensor_data = self.encode_text(data_row) | |
return tensor_data | |
from tqdm import tqdm | |
import gc | |
from IPython.display import clear_output | |
from collections import namedtuple | |
features = namedtuple('features', ['bert', 'feature_map']) | |
Output = namedtuple('Output', ['loss', 'features', 'logit']) | |
bert_tuple = namedtuple('bert',['hidden_states', 'attentions']) | |
class loop(): | |
def train_loop(self, model,device, optimizer, data_train_loader, scheduler = None, data_valid_loader = None, | |
epochs = 4, print_info = 1000000000, metrics = True, log = None, metrics_print = True): | |
# Start the model's parameters | |
table.reset() | |
model.to(device) | |
model.train() | |
# Task epochs (Inner epochs) | |
for epoch in range(0, epochs): | |
train_loss, _, out = self.batch_loop(data_train_loader, model, optimizer, device) | |
if scheduler is not None: | |
for sched in scheduler: | |
sched.step() | |
if (epoch % print_info == 0): | |
if metrics: | |
labels = self.map_batch(out[1]).to(int).squeeze() | |
logits = self.map_batch(out[0]).squeeze() | |
train_metrics, _ = plot(logits, labels, 0.9) | |
del labels, logits | |
train_metrics['Loss'] = torch.Tensor(train_loss).mean().item() | |
if not isinstance(log,type(None)): | |
log({"train_"+ x :y for x,y in train_metrics.items()}) | |
table(train_metrics, epoch, "Train") | |
else: | |
print("Loss: ", torch.Tensor(train_loss).mean().item()) | |
if data_valid_loader: | |
valid_loss, _, out = self.eval_loop(data_valid_loader, model, device=device) | |
if metrics: | |
global out2 | |
out2 = out | |
labels = self.map_batch(out[1]).to(int).squeeze() | |
logits = self.map_batch(out[0]).squeeze() | |
valid_metrics, _ = plot(logits, labels, 0.9) | |
valid_metrics['Loss'] = torch.Tensor(valid_loss).mean().item() | |
del labels, logits | |
if not isinstance(log,type(None)): | |
log({"valid_"+ x :y for x,y in train_metrics.items()}) | |
table(valid_metrics, epoch, "Valid") | |
if metrics_print: | |
print(table.data_frame().round(4)) | |
else: | |
print("Valid Loss: ", torch.Tensor(valid_loss).mean().item()) | |
return table.data_frame() | |
def batch_loop(self, loader, model, optimizer, device): | |
all_loss = [] | |
features_lst = [] | |
attention_lst = [] | |
logits = [] | |
outputs = [] | |
# Test's Batch loop | |
for inner_step, batch in enumerate(tqdm(loader, | |
desc="Train validation | ", | |
ncols=80)) : | |
input, output =batch | |
input = tuple(t.to(device) for t in input) | |
if isinstance(output, torch.Tensor): | |
output = output.to(device) | |
optimizer.zero_grad() | |
# Predictions | |
loss, feature, logit = model(input, output) | |
# compute grads | |
loss.backward() | |
# update parameters | |
optimizer.step() | |
input = tuple(t.to("cpu") for t in input) | |
if isinstance(output, torch.Tensor): | |
output = output.to("cpu") | |
if isinstance(loss, torch.Tensor): | |
all_loss.append(loss.to('cpu').detach().clone()) | |
if isinstance(logit, torch.Tensor): | |
logits.append(logit.to('cpu').detach().clone()) | |
if isinstance(output, torch.Tensor): | |
outputs.append(output.to('cpu').detach().clone()) | |
if len(feature.feature_map)!=0: | |
features_lst.append([x.to('cpu').detach().clone() for x in feature.feature_map]) | |
del batch, input, output, loss, feature, logit | |
# model.to('cpu') | |
gc.collect() | |
torch.cuda.empty_cache() | |
# del model, optimizer | |
return Output(all_loss, features(None,features_lst), (logits, outputs)) | |
def eval_loop(self, loader, model, device, attention= False, hidden_states=False): | |
all_loss = [] | |
features_lst = [] | |
attention_lst = [] | |
hidden_states_lst = [] | |
logits = [] | |
outputs = [] | |
model.eval() | |
with torch.no_grad(): | |
# Test's Batch loop | |
for inner_step, batch in enumerate(tqdm(loader, | |
desc="Test validation | ", | |
ncols=80)) : | |
input, output =batch | |
input = tuple(t.to(device) for t in input) | |
if output.numel()!=0: | |
# Predictions | |
loss, feature, logit = model(input, output.to(device), | |
attention= attention, hidden_states=hidden_states) | |
else: | |
# Predictions | |
loss, feature, logit = model(input, | |
attention= attention, hidden_states=hidden_states) | |
input = tuple(t.to("cpu") for t in input) | |
if isinstance(output, torch.Tensor): | |
output = output.to("cpu") | |
if isinstance(loss, torch.Tensor): | |
all_loss.append(loss.to('cpu').detach().clone()) | |
if isinstance(logit, torch.Tensor): | |
logits.append(logit.to('cpu').detach().clone()) | |
try: | |
if not isinstance(feature.bert.attentions, type(None)): | |
attention_lst.append([x.to('cpu').detach().clone() for x in feature.bert.attentions]) | |
except: | |
attention_lst = None | |
try: | |
if not isinstance(feature.bert.hidden_states, type(None)): | |
hidden_states_lst.append([x.to('cpu').detach().clone() for x in feature.bert.hidden_states]) | |
except: | |
hidden_states_lst = None | |
if isinstance(output, torch.Tensor): | |
outputs.append(output.to('cpu').detach().clone()) | |
if len(feature.feature_map)!=0: | |
features_lst.append([x.to('cpu').detach().clone() for x in feature.feature_map]) | |
del batch, input, output, loss, feature, logit | |
# model.to('cpu') | |
gc.collect() | |
torch.cuda.empty_cache() | |
# del model, optimizer | |
return Output(all_loss, features(bert_tuple(hidden_states_lst,attention_lst),features_lst), (logits, outputs)) | |
# Process predictions and map the feature_map in tsne | |
def map_batch(features): | |
features = torch.cat(features, dim =0) | |
# features = np.concatenate(np.array(features,dtype=object)).astype(np.float32) | |
# features = torch.tensor(features) | |
return features.detach().clone() | |
class table: | |
data = [] | |
index = [] | |
def __init__(self, data, epochs, name): | |
self.index.append((epochs, name)) | |
self.data.append(data) | |
def data_frame(cls): | |
clear_output() | |
index = pd.MultiIndex.from_tuples(cls.index, names=["Epochs", "Data"]) | |
data = pd.DataFrame(cls.data, index=index) | |
return data | |
def reset(cls): | |
cls.data = [] | |
cls.index = [] | |
from collections import namedtuple | |
# Declaring namedtuple() | |
# Pre-trained model | |
class Encoder(nn.Module): | |
def __init__(self, layers, freeze_bert, model): | |
super(Encoder, self).__init__() | |
# Dummy Parameter | |
self.dummy_param = nn.Parameter(torch.empty(0)) | |
# Pre-trained model | |
self.model = deepcopy(model) | |
# Freezing bert parameters | |
if freeze_bert: | |
for param in self.model.parameters(): | |
param.requires_grad = freeze_bert | |
# Selecting hidden layers of the pre-trained model | |
old_model_encoder = self.model.encoder.layer | |
new_model_encoder = nn.ModuleList() | |
for i in layers: | |
new_model_encoder.append(old_model_encoder[i]) | |
self.model.encoder.layer = new_model_encoder | |
# Feed forward | |
def forward(self, output_attentions=False,output_hidden_states=False, **x): | |
return self.model(output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=True, | |
**x) | |
# Complete model | |
class SLR_Classifier(nn.Module): | |
def __init__(self, **data): | |
super(SLR_Classifier, self).__init__() | |
# Dummy Parameter | |
self.dummy_param = nn.Parameter(torch.empty(0)) | |
# Loss function | |
# Binary Cross Entropy with logits reduced to mean | |
self.loss_fn = nn.BCEWithLogitsLoss(reduction = 'mean', | |
pos_weight=torch.FloatTensor([data.get("pos_weight", 2.5)])) | |
# Pre-trained model | |
self.Encoder = Encoder(layers = data.get("bert_layers", range(12)), | |
freeze_bert = data.get("freeze_bert", False), | |
model = data.get("model"), | |
) | |
# Feature Map Layer | |
self.feature_map = nn.Sequential( | |
# nn.LayerNorm(self.Encoder.model.config.hidden_size), | |
nn.BatchNorm1d(self.Encoder.model.config.hidden_size), | |
# nn.Dropout(data.get("drop", 0.5)), | |
nn.Linear(self.Encoder.model.config.hidden_size, 200), | |
nn.Dropout(data.get("drop", 0.5)), | |
) | |
# Classifier Layer | |
self.classifier = nn.Sequential( | |
# nn.LayerNorm(self.Encoder.model.config.hidden_size), | |
# nn.Dropout(data.get("drop", 0.5)), | |
# nn.BatchNorm1d(self.Encoder.model.config.hidden_size), | |
# nn.Dropout(data.get("drop", 0.5)), | |
nn.Tanh(), | |
nn.Linear(200, 1) | |
) | |
# Initializing layer parameters | |
nn.init.normal_(self.feature_map[1].weight, mean=0, std=0.00001) | |
nn.init.zeros_(self.feature_map[1].bias) | |
# Feed forward | |
def forward(self, input, output=None, attention= False, hidden_states=False): | |
# input, output = batch | |
input_ids, attention_mask, token_type_ids = input | |
predict = self.Encoder(output_attentions=attention, | |
output_hidden_states=hidden_states, | |
**{"input_ids":input_ids, | |
"attention_mask":attention_mask, | |
"token_type_ids":token_type_ids | |
}) | |
feature_maped = self.feature_map(predict['pooler_output']) | |
# print(feature_maped) | |
logit = self.classifier(feature_maped) | |
# predict = torch.sigmoid(logit) | |
if not isinstance(output, type(None)): | |
# Loss function | |
loss = self.loss_fn(logit.to(torch.float), output.to(torch.float)) | |
return Output(loss, features(predict, feature_maped), logit) | |
else: | |
return Output(None, features(predict, feature_maped), logit) | |
def fit(self, optimizer, data_train_loader, scheduler = None, data_valid_loader = None, | |
epochs = 4, print_info = 1000000000, metrics = True, log = None, metrics_print = True): | |
return loop.train_loop(self, | |
device = self.dummy_param.device, | |
optimizer=optimizer, | |
scheduler= scheduler, | |
data_train_loader=data_train_loader, | |
data_valid_loader= data_valid_loader, | |
epochs = epochs, | |
print_info = print_info, | |
metrics = metrics, | |
log= log, | |
metrics_print=metrics_print) | |
def evaluate(self, loader, attention= False, hidden_states=False): | |
# global feature | |
all_loss, feature, (logits, outputs) = loop.eval_loop(loader, self, self.dummy_param.device, | |
attention= attention, hidden_states=hidden_states) | |
logits = loop.map_batch(logits) | |
if len(outputs) != 0: | |
outputs = loop.map_batch(outputs) | |
return Output(np.mean(all_loss), feature, (logits, outputs)) |