Spaces:
Sleeping
Sleeping
import streamlit as st | |
from datasets import load_dataset | |
import numpy as np | |
import os | |
from sklearn.metrics import accuracy_score, precision_recall_fscore_support | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.utils.data import DataLoader | |
from transformers import AutoModelForTokenClassification, AutoTokenizer, DataCollatorForTokenClassification | |
from transformers import DebertaV2Config, DebertaV2ForTokenClassification | |
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" | |
# print weights | |
def print_trainable_parameters(model): | |
pytorch_total_params = sum(p.numel() for p in model.parameters()) | |
torch_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad) | |
print(f'total params: {pytorch_total_params}. tunable params: {torch_total_params}') | |
device = torch.device('cpu') | |
print(f"Is CUDA available: {torch.cuda.is_available()}") | |
# True | |
if torch.cuda.is_available(): | |
print(f"CUDA device: {torch.cuda.get_device_name(torch.cuda.current_device())}") | |
device = torch.device('cuda') | |
# Load models | |
st.write('Loading the pretrained model ...') | |
teacher_model_name = "iiiorg/piiranha-v1-detect-personal-information" | |
teacher_model = AutoModelForTokenClassification.from_pretrained(teacher_model_name) | |
tokenizer = AutoTokenizer.from_pretrained(teacher_model_name) | |
print(teacher_model) | |
print_trainable_parameters(teacher_model) | |
label2id = teacher_model.config.label2id | |
id2label = teacher_model.config.id2label | |
st.write("id2label: ", id2label) | |
st.write("label2id: ", label2id) | |
dimension = len(id2label) | |
st.write("dimension", dimension) | |
student_model_config = teacher_model.config | |
student_model_config.num_attention_heads = 8 | |
student_model_config.num_hidden_layers = 4 | |
student_model = DebertaV2ForTokenClassification.from_pretrained( | |
"microsoft/mdeberta-v3-base", | |
config=student_model_config) | |
# ignore_mismatched_sizes=True) | |
print(student_model) | |
print_trainable_parameters(student_model) | |
if torch.cuda.is_available(): | |
teacher_model = teacher_model.to(device) | |
student_model = student_model.to(device) | |
# Load data. | |
raw_dataset = load_dataset("ai4privacy/pii-masking-400k", split='train') | |
raw_dataset = raw_dataset.filter(lambda example: example["language"].startswith("en")) | |
#raw_dataset = raw_dataset.select(range(2000)) | |
raw_dataset = raw_dataset.filter(lambda example, idx: idx % 11 == 0, with_indices=True) | |
raw_dataset = raw_dataset.train_test_split(test_size=0.2) | |
print(raw_dataset) | |
print(raw_dataset.column_names) | |
# inputs = tokenizer( | |
# raw_dataset['train'][0]['mbert_tokens'], | |
# truncation=True, | |
# is_split_into_words=True) | |
# print(inputs) | |
# print(inputs.tokens()) | |
# print(inputs.word_ids()) | |
# function to align labels with tokens | |
# --> special tokens: -100 label id (ignored by cross entropy), | |
# --> if tokens are inside a word, replace 'B-' with 'I-' | |
def align_labels_with_tokens(labels, word_ids, max_length): | |
aligned_label_ids = [] | |
for word_id in word_ids: | |
if word_id is None: | |
aligned_label_ids.append(-100) | |
else: | |
aligned_label_ids.append(label2id[labels[word_id]].replace("B-", "I-")) | |
# Pad to max length | |
aligned_label_ids += [-100] * (max_length - len(aligned_label_ids)) | |
return aligned_label_ids | |
# create tokenize function | |
def tokenize_function(examples): | |
inputs = tokenizer( | |
examples['mbert_tokens'], | |
is_split_into_words=True, | |
truncation=True, | |
max_length=512, | |
padding="max_length" | |
) | |
word_ids = inputs.word_ids() | |
inputs["labels"] = [ | |
align_labels_with_tokens(labels, word_ids, tokenizer.model_max_length) | |
for labels in examples['mbert_token_classes'] | |
] | |
return inputs | |
# tokenize training and validation datasets | |
tokenized_data = raw_dataset.map( | |
tokenize_function, | |
batched=True) | |
tokenized_data.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels']) | |
# data collator | |
data_collator = DataCollatorForTokenClassification( | |
tokenizer, padding=True, truncation=True, max_length=512 | |
) | |
st.write(tokenized_data["train"][:2]["labels"]) | |
# Function to evaluate model performance | |
def evaluate_model(model, dataloader, device): | |
model.eval() | |
all_preds, all_labels = [], [] | |
with torch.no_grad(): | |
for batch in dataloader: | |
input_ids = batch['input_ids'].to(device) | |
attention_mask = batch['attention_mask'].to(device) | |
labels = batch['labels'].to(device) | |
outputs = model(input_ids, attention_mask=attention_mask) | |
logits = outputs.logits | |
preds = torch.argmax(logits, dim=-1) | |
# Mask out padding tokens (-100 in labels) | |
mask = labels != -100 | |
valid_preds = preds[mask] | |
valid_labels = labels[mask] | |
all_preds.extend(valid_preds.cpu().numpy()) | |
all_labels.extend(valid_labels.cpu().numpy()) | |
# Convert to numpy arrays for metrics calculation | |
all_preds = np.array(all_preds) | |
all_labels = np.array(all_labels) | |
accuracy = accuracy_score(all_labels, all_preds) | |
precision, recall, f1, _ = precision_recall_fscore_support( | |
all_labels, all_preds, average='micro' | |
) | |
return accuracy, precision, recall, f1 | |
# Function to compute distillation and hard-label loss | |
def distillation_loss(student_logits, teacher_logits, true_labels, temperature, alpha): | |
# print("Distillation loss sizes") | |
# print(teacher_logits.size()) | |
# print(student_logits.size()) | |
# print(true_labels.size()) | |
# Compute soft targets from teacher logits | |
soft_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1) | |
student_soft = nn.functional.log_softmax(student_logits / temperature, dim=-1) | |
# KL Divergence loss for distillation | |
distill_loss = nn.functional.kl_div(student_soft, soft_targets, reduction='batchmean') * (temperature ** 2) | |
# Cross-entropy loss for hard labels | |
student_logit_reshape = torch.transpose(student_logits, 1, 2) # transpose to match the labels dimension | |
hard_loss = nn.CrossEntropyLoss()(student_logit_reshape, true_labels) | |
# Combine losses | |
loss = alpha * distill_loss + (1.0 - alpha) * hard_loss | |
return loss | |
# hyperparameters | |
batch_size = 32 | |
lr = 1e-4 | |
num_epochs = 30 | |
temperature = 2.0 | |
alpha = 0.5 | |
# define optimizer | |
optimizer = optim.Adam(student_model.parameters(), lr=lr) | |
# create training data loader | |
dataloader = DataLoader(tokenized_data['train'], batch_size=batch_size, collate_fn=data_collator) | |
# create testing data loader | |
test_dataloader = DataLoader(tokenized_data['test'], batch_size=batch_size, collate_fn=data_collator) | |
# TEMPORARY - for testing | |
teacher_accuracy, teacher_precision, teacher_recall, teacher_f1 = evaluate_model(teacher_model, test_dataloader, device) | |
print(f"Teacher (test) - Accuracy: {teacher_accuracy:.4f}, Precision: {teacher_precision:.4f}, Recall: {teacher_recall:.4f}, F1 Score: {teacher_f1:.4f}") | |
# put student model in train mode | |
student_model.train() | |
# train model | |
for epoch in range(num_epochs): | |
for batch in dataloader: | |
# Prepare inputs | |
input_ids = batch['input_ids'].to(device) | |
attention_mask = batch['attention_mask'].to(device) | |
labels = batch['labels'].to(device) | |
# Disable gradient calculation for teacher model | |
with torch.no_grad(): | |
teacher_outputs = teacher_model(input_ids, attention_mask=attention_mask) | |
teacher_logits = teacher_outputs.logits | |
# Forward pass through the student model | |
student_outputs = student_model(input_ids, attention_mask=attention_mask) | |
student_logits = student_outputs.logits | |
# Compute the distillation loss | |
loss = distillation_loss(student_logits, teacher_logits, labels, temperature, alpha) | |
# Backpropagation | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
print(f"Epoch {epoch + 1} completed with loss: {loss.item()}") | |
# Evaluate the teacher model | |
teacher_accuracy, teacher_precision, teacher_recall, teacher_f1 = evaluate_model(teacher_model, test_dataloader, device) | |
print(f"Teacher (test) - Accuracy: {teacher_accuracy:.4f}, Precision: {teacher_precision:.4f}, Recall: {teacher_recall:.4f}, F1 Score: {teacher_f1:.4f}") | |
# Evaluate the student model | |
student_accuracy, student_precision, student_recall, student_f1 = evaluate_model(student_model, test_dataloader, device) | |
print(f"Student (test) - Accuracy: {student_accuracy:.4f}, Precision: {student_precision:.4f}, Recall: {student_recall:.4f}, F1 Score: {student_f1:.4f}") | |
print("\n") | |
# put student model back into train mode | |
student_model.train() | |
#Compare the models | |
# create testing data loader | |
validation_dataloader = DataLoader(tokenized_data['test'], batch_size=8, collate_fn=data_collator) | |
# Evaluate the teacher model | |
teacher_accuracy, teacher_precision, teacher_recall, teacher_f1 = evaluate_model(teacher_model, validation_dataloader, device) | |
print(f"Teacher (validation) - Accuracy: {teacher_accuracy:.4f}, Precision: {teacher_precision:.4f}, Recall: {teacher_recall:.4f}, F1 Score: {teacher_f1:.4f}") | |
# Evaluate the student model | |
student_accuracy, student_precision, student_recall, student_f1 = evaluate_model(student_model, validation_dataloader, device) | |
print(f"Student (validation) - Accuracy: {student_accuracy:.4f}, Precision: {student_precision:.4f}, Recall: {student_recall:.4f}, F1 Score: {student_f1:.4f}") | |
st.write('Pushing model to huggingface') | |
# Push model to huggingface | |
hf_name = 'CarolXia' # your hf username or org name | |
mode_name = "pii-kd-deberta-v2" | |
model_id = hf_name + "/" + mode_name | |
student_model.push_to_hub(model_id, token=st.secrets["HUGGINGFACE_TOKEN"]) | |