import logging logging.basicConfig(level='ERROR') import numpy as np from tqdm import tqdm import json from collections import defaultdict import matplotlib.pyplot as plt from sklearn.metrics import auc, roc_curve import matplotlib import random from ipdb import set_trace as bp import time matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 # plot data def sweep(score, x): """ Compute a ROC curve and then return the FPR, TPR, AUC, and ACC. """ fpr, tpr, _ = roc_curve(x, -score) acc = np.max(1-(fpr+(1-tpr))/2) return fpr, tpr, auc(fpr, tpr), acc def do_plot(prediction, answers, sweep_fn=sweep, metric='auc', legend="", output_dir=None): """ Generate the ROC curves by using ntest models as test models and the rest to train. """ fpr, tpr, auc, acc = sweep_fn(np.array(prediction), np.array(answers, dtype=bool)) low = tpr[np.where(fpr<.05)[0][-1]] # bp() print('Attack %s AUC %.4f, Accuracy %.4f, TPR@5%%FPR of %.4f\n'%(legend, auc,acc, low)) metric_text = '' if metric == 'auc': metric_text = 'auc=%.3f'%auc elif metric == 'acc': metric_text = 'acc=%.3f'%acc plt.plot(fpr, tpr, label=legend+metric_text) return legend, auc,acc, low def fig_fpr_tpr(all_output, output_dir): print("output_dir", output_dir) answers = [] metric2predictions = defaultdict(list) for ex in all_output: answers.append(ex["label"]) for metric in ex["pred"].keys(): if ("raw" in metric) and ("clf" not in metric): continue metric2predictions[metric].append(ex["pred"][metric]) plt.figure(figsize=(4,3)) with open(f"{output_dir}/auc.txt", "w") as f: for metric, predictions in metric2predictions.items(): legend, auc, acc, low = do_plot(predictions, answers, legend=metric, metric='auc', output_dir=output_dir) f.write('%s AUC %.4f, Accuracy %.4f, TPR@0.1%%FPR of %.4f\n'%(legend, auc, acc, low)) plt.semilogx() plt.semilogy() plt.xlim(1e-5,1) plt.ylim(1e-5,1) plt.xlabel("False Positive Rate") plt.ylabel("True Positive Rate") plt.plot([0, 1], [0, 1], ls='--', color='gray') plt.subplots_adjust(bottom=.18, left=.18, top=.96, right=.96) plt.legend(fontsize=8) plt.savefig(f"{output_dir}/auc.png") def load_jsonl(input_path): with open(input_path, 'r') as f: data = [json.loads(line) for line in tqdm(f)] random.seed(0) random.shuffle(data) return data def dump_jsonl(data, path): with open(path, 'w') as f: for line in tqdm(data): f.write(json.dumps(line) + "\n") def read_jsonl(path): with open(path, 'r') as f: return [json.loads(line) for line in tqdm(f)] def convert_huggingface_data_to_list_dic(dataset): all_data = [] for i in range(len(dataset)): ex = dataset[i] all_data.append(ex) return all_data def process_truthful_qa(data): new_data = [] for ex in data: new_ex = {} label = ex["mc2_targets"]["labels"].index(1) output = ex["mc2_targets"]["choices"][label] # We change to mc2 instead of mc1 as it's those that open llm lead uses. (check about) new_ex["output"] = output new_ex["input"] = ex["question"] + " " + output new_data.append(new_ex) return new_data def process_mmlu(data): new_data = [] for ex in data: new_ex = {} label = ex["choices"][ex["answer"]] output = label new_ex["output"] = output new_ex["input"] = ex["question"] + " " + output new_data.append(new_ex) return new_data def process_arc(data): new_data = [] choice2label = {"A": 0, "B": 1, "C": 2, "D": 3} for ex in data: new_ex = {} # bp() # print(ex["answerKey"]) if ex["answerKey"] not in choice2label: continue label = choice2label[ex["answerKey"]] output = ex["choices"]["text"][label] new_ex["output"] = output new_ex["input"] = ex["question"] + " " + output new_data.append(new_ex) return new_data def process_gsm8k(data): new_data = [] for ex in data: new_ex = {} output = ex["answer"].split('####')[0].strip() new_ex["output"] = output new_ex["input"] = ex["question"] + " " + output new_data.append(new_ex) return new_data def process_winogrande(data): ''' new_data = [] for ex in data: new_ex = {} label = int(ex["answer"]) output = ex[f"option{label}"] new_ex["output"] = output new_ex["input"] = ex["sentence"] + " " + output new_data.append(new_ex) return new_data ''' new_data = [] for doc in data: new_doc = {} # Convert the answer to a numeric index answer_to_num = {"1": 0, "2": 1} label_idx = answer_to_num[doc["answer"]] # Generate options and select the correct one based on label_idx options = [doc["option1"], doc["option2"]] output = options[label_idx] # Build the new sentence by inserting the selected option idx = doc["sentence"].index("_") input_sentence = doc["sentence"][:idx] + output + doc["sentence"][idx+1:] # Assigning the processed values to the new_doc new_doc["output"] = output new_doc["input"] = input_sentence # Append the processed document to new_data new_data.append(new_doc) return new_data # I'm not sure if that's the correct format for winogrande given how the dataset works. def process_hellaswag(data): new_data = [] for ex in data: new_ex = {} label = int(ex["label"]) # For some reason label is in str and not int? output = ex["endings"][label] new_ex["output"] = output new_ex["input"] = ex["ctx"] + " " + output new_data.append(new_ex) return new_data