from random import shuffle import streamlit as st from datasets import load_dataset import numpy as np import os from sklearn.metrics import classification_report, accuracy_score, precision_recall_fscore_support import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from transformers import AutoModelForTokenClassification, AutoTokenizer, DataCollatorForTokenClassification from transformers import DebertaV2Config, DebertaV2ForTokenClassification os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" # print weights def print_trainable_parameters(model): pytorch_total_params = sum(p.numel() for p in model.parameters()) torch_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad) print(f'total params: {pytorch_total_params}. tunable params: {torch_total_params}') device = torch.device('cpu') print(f"Is CUDA available: {torch.cuda.is_available()}") # True if torch.cuda.is_available(): print(f"CUDA device: {torch.cuda.get_device_name(torch.cuda.current_device())}") device = torch.device('cuda') # Load models st.write('Loading the pretrained model ...') teacher_model_name = "iiiorg/piiranha-v1-detect-personal-information" teacher_model = AutoModelForTokenClassification.from_pretrained(teacher_model_name) tokenizer = AutoTokenizer.from_pretrained(teacher_model_name) print(teacher_model) print_trainable_parameters(teacher_model) label2id = teacher_model.config.label2id id2label = teacher_model.config.id2label st.write("id2label: ", id2label) st.write("label2id: ", label2id) dimension = len(id2label) st.write("dimension", dimension) student_model_config = teacher_model.config student_model_config.num_attention_heads = 8 student_model_config.num_hidden_layers = 4 student_model = DebertaV2ForTokenClassification.from_pretrained( "microsoft/mdeberta-v3-base", config=student_model_config) # ignore_mismatched_sizes=True) print(student_model) print_trainable_parameters(student_model) if torch.cuda.is_available(): teacher_model = teacher_model.to(device) student_model = student_model.to(device) # Load data. raw_dataset = load_dataset("ai4privacy/pii-masking-400k", split='train') raw_dataset = raw_dataset.filter(lambda example: example["language"].startswith("en")) #raw_dataset = raw_dataset.select(range(2000)) raw_dataset = raw_dataset.filter(lambda example, idx: idx % 11 == 0, with_indices=True) raw_dataset = raw_dataset.train_test_split(test_size=0.2) print(raw_dataset) print(raw_dataset.column_names) # inputs = tokenizer( # raw_dataset['train'][0]['mbert_tokens'], # truncation=True, # is_split_into_words=True) # print(inputs) # print(inputs.tokens()) # print(inputs.word_ids()) # function to align labels with tokens # --> special tokens: -100 label id (ignored by cross entropy), # --> if tokens are inside a word, replace 'B-' with 'I-' def align_labels_with_tokens(label, word_ids): aligned_label_ids = [] previous_word_idx = None for word_idx in word_ids: # Set the special tokens to -100. if word_idx is None: aligned_label_ids.append(-100) elif word_idx != previous_word_idx: # Only label the first token of a given word. if label[word_idx].startswith("B-"): label[word_idx] = label[word_idx].replace("B-", "I-") aligned_label_ids.append(label2id[label[word_idx]]) else: aligned_label_ids.append(-100) previous_word_idx = word_idx return aligned_label_ids # create tokenize function def tokenize_function(examples): # tokenize and truncate text. The examples argument would have already stripped # the train or test label. new_labels = [] inputs = tokenizer( examples['mbert_tokens'], is_split_into_words=True, padding=True, truncation=True, max_length=512) for i, label in enumerate(examples['mbert_token_classes']): word_ids = inputs.word_ids(batch_index=i) new_labels.append(align_labels_with_tokens(label, word_ids)) print("Printing partial input with tokenized output") print(inputs.tokens()[:1000]) print(inputs.word_ids()[:1000]) print(new_labels[0]) inputs["labels"] = new_labels return inputs # tokenize training and validation datasets tokenized_data = raw_dataset.map( tokenize_function, batched=True) tokenized_data.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels']) # data collator data_collator = DataCollatorForTokenClassification(tokenizer) st.write(tokenized_data["train"][:2]["labels"]) # Function to evaluate model performance def evaluate_model(model, dataloader, device): model.eval() # Set model to evaluation mode all_preds = [] all_labels = [] sample_count = 0 num_samples=10 # Disable gradient calculations with torch.no_grad(): for batch in dataloader: input_ids = batch['input_ids'].to(device) current_batch_size = input_ids.size(0) attention_mask = batch['attention_mask'].to(device) labels = batch['labels'].to(device) # Forward pass to get logits outputs = model(input_ids, attention_mask=attention_mask) logits = outputs.logits # Get predictions preds = torch.argmax(logits, dim=-1) # Process each sequence in the batch for i in range(current_batch_size): valid_mask = (labels[i] != -100) & (attention_mask[i] != 0) valid_preds = preds[i][valid_mask].flatten() valid_labels = labels[i][valid_mask].flatten() if sample_count < num_samples: print(f"Sample {sample_count + 1}:") print(f"Tokens: {tokenizer.convert_ids_to_tokens(input_ids[i])}") print(f"True Labels: {[id2label[label.item()] for label in valid_labels]}") print(f"Predicted Labels: {[id2label[pred.item()] for pred in valid_preds]}") print("-" * 50) sample_count += 1 all_preds.extend(valid_preds.tolist()) all_labels.extend(valid_labels.tolist()) # Calculate evaluation metrics print("evaluate_model sizes") print(len(all_preds)) print(len(all_labels)) all_preds = np.asarray(all_preds, dtype=np.float32) all_labels = np.asarray(all_labels, dtype=np.float32) report = classification_report(all_labels, all_preds, target_names=id2label.values(), zero_division=0) accuracy = accuracy_score(all_labels, all_preds) precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='micro') return report, accuracy, precision, recall, f1 # Function to compute distillation and hard-label loss def distillation_loss(student_logits, teacher_logits, true_labels, temperature, alpha): # print("Distillation loss sizes") # print(teacher_logits.size()) # print(student_logits.size()) # print(true_labels.size()) # Compute soft targets from teacher logits soft_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1) student_soft = nn.functional.log_softmax(student_logits / temperature, dim=-1) # KL Divergence loss for distillation distill_loss = nn.functional.kl_div(student_soft, soft_targets, reduction='batchmean') * (temperature ** 2) # Cross-entropy loss for hard labels student_logit_reshape = torch.transpose(student_logits, 1, 2) # transpose to match the labels dimension hard_loss = nn.CrossEntropyLoss()(student_logit_reshape, true_labels) # Combine losses loss = alpha * distill_loss + (1.0 - alpha) * hard_loss return loss # hyperparameters batch_size = 32 lr = 1e-4 num_epochs = 10 temperature = 2.0 alpha = 0.5 # define optimizer optimizer = optim.Adam(student_model.parameters(), lr=lr) # create training data loader dataloader = DataLoader(tokenized_data['train'], batch_size=batch_size, collate_fn=data_collator) # create testing data loader test_dataloader = DataLoader(tokenized_data['test'], batch_size=batch_size, collate_fn=data_collator) untrained_student_report, untrained_student_accuracy, untrained_student_precision, untrained_student_recall, untrained_student_f1 = evaluate_model(student_model, test_dataloader, device) print(f"Untrained Student (test) - Report:") print(untrained_student_report) print(f"Accuracy: {untrained_student_accuracy:.4f}, Precision: {untrained_student_precision:.4f}, Recall: {untrained_student_recall:.4f}, F1 Score: {untrained_student_f1:.4f}") # put student model in train mode student_model.train() # train model for epoch in range(num_epochs): for batch in dataloader: # Prepare inputs input_ids = batch['input_ids'].to(device) attention_mask = batch['attention_mask'].to(device) labels = batch['labels'].to(device) # Disable gradient calculation for teacher model with torch.no_grad(): teacher_outputs = teacher_model(input_ids, attention_mask=attention_mask) teacher_logits = teacher_outputs.logits # Forward pass through the student model student_outputs = student_model(input_ids, attention_mask=attention_mask) student_logits = student_outputs.logits # Compute the distillation loss loss = distillation_loss(student_logits, teacher_logits, labels, temperature, alpha) # Backpropagation optimizer.zero_grad() loss.backward() optimizer.step() print(f"Epoch {epoch + 1} completed with loss: {loss.item()}") test_dataloader = DataLoader(tokenized_data['test'], batch_size=batch_size, collate_fn=data_collator, shuffle=True) # Evaluate the teacher model teacher_report, teacher_accuracy, teacher_precision, teacher_recall, teacher_f1 = evaluate_model(teacher_model, test_dataloader, device) print(f"Teacher (test) - Report:") print(teacher_report) print(f"Accuracy: {teacher_accuracy:.4f}, Precision: {teacher_precision:.4f}, Recall: {teacher_recall:.4f}, F1 Score: {teacher_f1:.4f}") print("\n") # Evaluate the student model student_report, student_accuracy, student_precision, student_recall, student_f1 = evaluate_model(student_model, test_dataloader, device) print(f"Student (test) - Report:") print(student_report) print(f"Accuracy: {student_accuracy:.4f}, Precision: {student_precision:.4f}, Recall: {student_recall:.4f}, F1 Score: {student_f1:.4f}") print("\n") # put student model back into train mode student_model.train() st.write('Pushing model to huggingface') # Push model to huggingface hf_name = 'CarolXia' # your hf username or org name mode_name = "pii-kd-deberta-v2" model_id = hf_name + "/" + mode_name student_model.push_to_hub(model_id, token=st.secrets["HUGGINGFACE_TOKEN"])