Spaces:
Sleeping
Sleeping
File size: 7,785 Bytes
5f732b0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import torch
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm
from transformers import (
DataCollatorForSeq2Seq, AutoTokenizer, AutoModelForSeq2SeqLM,
Seq2SeqTrainingArguments, Trainer, Seq2SeqTrainer
)
class T5Generator:
def __init__(self, model_checkpoint):
self.tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
self.model = AutoModelForSeq2SeqLM.from_pretrained(model_checkpoint)
self.data_collator = DataCollatorForSeq2Seq(self.tokenizer)
self.device = 'cuda' if torch.backends.cuda.is_built() else ('mps' if torch.backends.mps.is_built() else 'cpu')
def tokenize_function_inputs(self, sample):
"""
Udf to tokenize the input dataset.
"""
model_inputs = self.tokenizer(sample['text'], max_length=512, truncation=True)
labels = self.tokenizer(sample["labels"], max_length=64, truncation=True)
model_inputs["labels"] = labels["input_ids"]
return model_inputs
def train(self, tokenized_datasets, **kwargs):
"""
Train the generative model.
"""
#Set training arguments
args = Seq2SeqTrainingArguments(
**kwargs
)
# Define trainer object
trainer = Seq2SeqTrainer(
self.model,
args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["validation"] if tokenized_datasets.get("validation") is not None else None,
tokenizer=self.tokenizer,
data_collator=self.data_collator,
)
print("Trainer device:", trainer.args.device)
# Finetune the model
torch.cuda.empty_cache()
print('\nModel training started ....')
trainer.train()
# Save best model
trainer.save_model()
return trainer
def get_labels(self, tokenized_dataset, batch_size = 4, max_length = 128, sample_set = 'train'):
"""
Get the predictions from the trained model.
"""
def collate_fn(batch):
input_ids = [torch.tensor(example['input_ids']) for example in batch]
input_ids = pad_sequence(input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id)
return input_ids
dataloader = DataLoader(tokenized_dataset[sample_set], batch_size=batch_size, collate_fn=collate_fn)
predicted_output = []
self.model.to(self.device)
print('Model loaded to: ', self.device)
for batch in tqdm(dataloader):
batch = batch.to(self.device)
output_ids = self.model.generate(batch, max_length = max_length)
output_texts = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True)
for output_text in output_texts:
predicted_output.append(output_text)
return predicted_output
def get_metrics(self, y_true, y_pred, is_triplet_extraction=False):
total_pred = 0
total_gt = 0
tp = 0
if not is_triplet_extraction:
for gt, pred in zip(y_true, y_pred):
gt_list = gt.split(', ')
pred_list = pred.split(', ')
total_pred+=len(pred_list)
total_gt+=len(gt_list)
for gt_val in gt_list:
for pred_val in pred_list:
if pred_val in gt_val or gt_val in pred_val:
tp+=1
break
else:
for gt, pred in zip(y_true, y_pred):
gt_list = gt.split(', ')
pred_list = pred.split(', ')
total_pred+=len(pred_list)
total_gt+=len(gt_list)
for gt_val in gt_list:
gt_asp = gt_val.split(':')[0]
try:
gt_op = gt_val.split(':')[1]
except:
continue
try:
gt_sent = gt_val.split(':')[2]
except:
continue
for pred_val in pred_list:
pr_asp = pred_val.split(':')[0]
try:
pr_op = pred_val.split(':')[1]
except:
continue
try:
pr_sent = gt_val.split(':')[2]
except:
continue
if pr_asp in gt_asp and pr_op in gt_op and gt_sent == pr_sent:
tp+=1
p = tp/total_pred
r = tp/total_gt
return p, r, 2*p*r/(p+r), None
class T5Classifier:
def __init__(self, model_checkpoint):
self.tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, force_download = True)
self.model = AutoModelForSeq2SeqLM.from_pretrained(model_checkpoint, force_download = True)
self.data_collator = DataCollatorForSeq2Seq(self.tokenizer)
self.device = 'cuda' if torch.has_cuda else ('mps' if torch.has_mps else 'cpu')
def tokenize_function_inputs(self, sample):
"""
Udf to tokenize the input dataset.
"""
sample['input_ids'] = self.tokenizer(sample["text"], max_length = 512, truncation = True).input_ids
sample['labels'] = self.tokenizer(sample["labels"], max_length = 64, truncation = True).input_ids
return sample
def train(self, tokenized_datasets, **kwargs):
"""
Train the generative model.
"""
# Set training arguments
args = Seq2SeqTrainingArguments(
**kwargs
)
# Define trainer object
trainer = Trainer(
self.model,
args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["validation"] if tokenized_datasets.get("validation") is not None else None,
tokenizer=self.tokenizer,
data_collator = self.data_collator
)
print("Trainer device:", trainer.args.device)
# Finetune the model
torch.cuda.empty_cache()
print('\nModel training started ....')
trainer.train()
# Save best model
trainer.save_model()
return trainer
def get_labels(self, tokenized_dataset, batch_size = 4, sample_set = 'train'):
"""
Get the predictions from the trained model.
"""
def collate_fn(batch):
input_ids = [torch.tensor(example['input_ids']) for example in batch]
input_ids = pad_sequence(input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id)
return input_ids
dataloader = DataLoader(tokenized_dataset[sample_set], batch_size=batch_size, collate_fn=collate_fn)
predicted_output = []
self.model.to(self.device)
print('Model loaded to: ', self.device)
for batch in tqdm(dataloader):
batch = batch.to(self.device)
output_ids = self.model.generate(batch)
output_texts = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True)
for output_text in output_texts:
predicted_output.append(output_text)
return predicted_output
def get_metrics(self, y_true, y_pred):
return precision_score(y_true, y_pred, average='macro'), recall_score(y_true, y_pred, average='macro'), \
f1_score(y_true, y_pred, average='macro'), accuracy_score(y_true, y_pred) |