import streamlit as st from datasets import load_dataset import numpy as np import os from sklearn.metrics import accuracy_score, precision_recall_fscore_support import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from transformers import AutoModelForTokenClassification, AutoTokenizer, DataCollatorForTokenClassification from transformers import DebertaV2Config, DebertaV2ForTokenClassification os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" # print weights def print_trainable_parameters(model): pytorch_total_params = sum(p.numel() for p in model.parameters()) torch_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad) print(f'total params: {pytorch_total_params}. tunable params: {torch_total_params}') device = torch.device('cpu') print(f"Is CUDA available: {torch.cuda.is_available()}") # True if torch.cuda.is_available(): print(f"CUDA device: {torch.cuda.get_device_name(torch.cuda.current_device())}") device = torch.device('cuda') # Load models st.write('Loading the pretrained model ...') teacher_model_name = "iiiorg/piiranha-v1-detect-personal-information" teacher_model = AutoModelForTokenClassification.from_pretrained(teacher_model_name) tokenizer = AutoTokenizer.from_pretrained(teacher_model_name) print(teacher_model) print_trainable_parameters(teacher_model) label2id = teacher_model.config.label2id id2label = teacher_model.config.id2label st.write("id2label: ", id2label) st.write("label2id: ", label2id) dimension = len(id2label) st.write("dimension", dimension) student_model_config = teacher_model.config student_model_config.num_attention_heads = 8 student_model_config.num_hidden_layers = 4 student_model = DebertaV2ForTokenClassification.from_pretrained( "microsoft/mdeberta-v3-base", config=student_model_config) # ignore_mismatched_sizes=True) print(student_model) print_trainable_parameters(student_model) if torch.cuda.is_available(): teacher_model = teacher_model.to(device) student_model = student_model.to(device) # Load data. raw_dataset = load_dataset("ai4privacy/pii-masking-400k", split='train') raw_dataset = raw_dataset.filter(lambda example: example["language"].startswith("en")) #raw_dataset = raw_dataset.select(range(2000)) raw_dataset = raw_dataset.filter(lambda example, idx: idx % 11 == 0, with_indices=True) raw_dataset = raw_dataset.train_test_split(test_size=0.2) print(raw_dataset) print(raw_dataset.column_names) # inputs = tokenizer( # raw_dataset['train'][0]['mbert_tokens'], # truncation=True, # is_split_into_words=True) # print(inputs) # print(inputs.tokens()) # print(inputs.word_ids()) # function to align labels with tokens # --> special tokens: -100 label id (ignored by cross entropy), # --> if tokens are inside a word, replace 'B-' with 'I-' def align_labels_with_tokens(labels, word_ids, max_length): aligned_label_ids = [] for word_id in word_ids: if word_id is None: aligned_label_ids.append(-100) else: aligned_label_ids.append(label2id[labels[word_id]].replace("B-", "I-")) # Pad to max length aligned_label_ids += [-100] * (max_length - len(aligned_label_ids)) return aligned_label_ids # create tokenize function def tokenize_function(examples): inputs = tokenizer( examples['mbert_tokens'], is_split_into_words=True, truncation=True, max_length=512, padding="max_length" ) word_ids = inputs.word_ids() inputs["labels"] = [ align_labels_with_tokens(labels, word_ids, tokenizer.model_max_length) for labels in examples['mbert_token_classes'] ] return inputs # tokenize training and validation datasets tokenized_data = raw_dataset.map( tokenize_function, batched=True) tokenized_data.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels']) # data collator data_collator = DataCollatorForTokenClassification( tokenizer, padding=True, truncation=True, max_length=512 ) st.write(tokenized_data["train"][:2]["labels"]) # Function to evaluate model performance def evaluate_model(model, dataloader, device): model.eval() all_preds, all_labels = [], [] with torch.no_grad(): for batch in dataloader: input_ids = batch['input_ids'].to(device) attention_mask = batch['attention_mask'].to(device) labels = batch['labels'].to(device) outputs = model(input_ids, attention_mask=attention_mask) logits = outputs.logits preds = torch.argmax(logits, dim=-1) # Mask out padding tokens (-100 in labels) mask = labels != -100 valid_preds = preds[mask] valid_labels = labels[mask] all_preds.extend(valid_preds.cpu().numpy()) all_labels.extend(valid_labels.cpu().numpy()) # Convert to numpy arrays for metrics calculation all_preds = np.array(all_preds) all_labels = np.array(all_labels) accuracy = accuracy_score(all_labels, all_preds) precision, recall, f1, _ = precision_recall_fscore_support( all_labels, all_preds, average='micro' ) return accuracy, precision, recall, f1 # Function to compute distillation and hard-label loss def distillation_loss(student_logits, teacher_logits, true_labels, temperature, alpha): # print("Distillation loss sizes") # print(teacher_logits.size()) # print(student_logits.size()) # print(true_labels.size()) # Compute soft targets from teacher logits soft_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1) student_soft = nn.functional.log_softmax(student_logits / temperature, dim=-1) # KL Divergence loss for distillation distill_loss = nn.functional.kl_div(student_soft, soft_targets, reduction='batchmean') * (temperature ** 2) # Cross-entropy loss for hard labels student_logit_reshape = torch.transpose(student_logits, 1, 2) # transpose to match the labels dimension hard_loss = nn.CrossEntropyLoss()(student_logit_reshape, true_labels) # Combine losses loss = alpha * distill_loss + (1.0 - alpha) * hard_loss return loss # hyperparameters batch_size = 32 lr = 1e-4 num_epochs = 30 temperature = 2.0 alpha = 0.5 # define optimizer optimizer = optim.Adam(student_model.parameters(), lr=lr) # create training data loader dataloader = DataLoader(tokenized_data['train'], batch_size=batch_size, collate_fn=data_collator) # create testing data loader test_dataloader = DataLoader(tokenized_data['test'], batch_size=batch_size, collate_fn=data_collator) # TEMPORARY - for testing teacher_accuracy, teacher_precision, teacher_recall, teacher_f1 = evaluate_model(teacher_model, test_dataloader, device) print(f"Teacher (test) - Accuracy: {teacher_accuracy:.4f}, Precision: {teacher_precision:.4f}, Recall: {teacher_recall:.4f}, F1 Score: {teacher_f1:.4f}") # put student model in train mode student_model.train() # train model for epoch in range(num_epochs): for batch in dataloader: # Prepare inputs input_ids = batch['input_ids'].to(device) attention_mask = batch['attention_mask'].to(device) labels = batch['labels'].to(device) # Disable gradient calculation for teacher model with torch.no_grad(): teacher_outputs = teacher_model(input_ids, attention_mask=attention_mask) teacher_logits = teacher_outputs.logits # Forward pass through the student model student_outputs = student_model(input_ids, attention_mask=attention_mask) student_logits = student_outputs.logits # Compute the distillation loss loss = distillation_loss(student_logits, teacher_logits, labels, temperature, alpha) # Backpropagation optimizer.zero_grad() loss.backward() optimizer.step() print(f"Epoch {epoch + 1} completed with loss: {loss.item()}") # Evaluate the teacher model teacher_accuracy, teacher_precision, teacher_recall, teacher_f1 = evaluate_model(teacher_model, test_dataloader, device) print(f"Teacher (test) - Accuracy: {teacher_accuracy:.4f}, Precision: {teacher_precision:.4f}, Recall: {teacher_recall:.4f}, F1 Score: {teacher_f1:.4f}") # Evaluate the student model student_accuracy, student_precision, student_recall, student_f1 = evaluate_model(student_model, test_dataloader, device) print(f"Student (test) - Accuracy: {student_accuracy:.4f}, Precision: {student_precision:.4f}, Recall: {student_recall:.4f}, F1 Score: {student_f1:.4f}") print("\n") # put student model back into train mode student_model.train() #Compare the models # create testing data loader validation_dataloader = DataLoader(tokenized_data['test'], batch_size=8, collate_fn=data_collator) # Evaluate the teacher model teacher_accuracy, teacher_precision, teacher_recall, teacher_f1 = evaluate_model(teacher_model, validation_dataloader, device) print(f"Teacher (validation) - Accuracy: {teacher_accuracy:.4f}, Precision: {teacher_precision:.4f}, Recall: {teacher_recall:.4f}, F1 Score: {teacher_f1:.4f}") # Evaluate the student model student_accuracy, student_precision, student_recall, student_f1 = evaluate_model(student_model, validation_dataloader, device) print(f"Student (validation) - Accuracy: {student_accuracy:.4f}, Precision: {student_precision:.4f}, Recall: {student_recall:.4f}, F1 Score: {student_f1:.4f}") st.write('Pushing model to huggingface') # Push model to huggingface hf_name = 'CarolXia' # your hf username or org name mode_name = "pii-kd-deberta-v2" model_id = hf_name + "/" + mode_name student_model.push_to_hub(model_id, token=st.secrets["HUGGINGFACE_TOKEN"])