Spaces:
Sleeping
Sleeping
# Copyright 2021 The HuggingFace Team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import json | |
import logging | |
import statistics | |
from os import mkdir | |
from os.path import exists, isdir | |
from os.path import join as pjoin | |
import matplotlib.pyplot as plt | |
import matplotlib.image as mpimg | |
import nltk | |
import numpy as np | |
import pandas as pd | |
import plotly | |
import plotly.express as px | |
import plotly.figure_factory as ff | |
import plotly.graph_objects as go | |
import pyarrow.feather as feather | |
import seaborn as sns | |
import torch | |
from datasets import load_from_disk | |
from nltk.corpus import stopwords | |
from sklearn.feature_extraction.text import CountVectorizer | |
from .dataset_utils import (CNT, DEDUP_TOT, EMBEDDING_FIELD, LENGTH_FIELD, | |
OUR_LABEL_FIELD, OUR_TEXT_FIELD, PROP, | |
TEXT_NAN_CNT, TOKENIZED_FIELD, TOT_OPEN_WORDS, | |
TOT_WORDS, TXT_LEN, VOCAB, WORD, extract_field, | |
load_truncated_dataset) | |
from .embeddings import Embeddings | |
from .npmi import nPMI | |
from .zipf import Zipf | |
pd.options.display.float_format = "{:,.3f}".format | |
logs = logging.getLogger(__name__) | |
logs.setLevel(logging.WARNING) | |
logs.propagate = False | |
if not logs.handlers: | |
# Logging info to log file | |
file = logging.FileHandler("./log_files/dataset_statistics.log") | |
fileformat = logging.Formatter("%(asctime)s:%(message)s") | |
file.setLevel(logging.INFO) | |
file.setFormatter(fileformat) | |
# Logging debug messages to stream | |
stream = logging.StreamHandler() | |
streamformat = logging.Formatter("[data_measurements_tool] %(message)s") | |
stream.setLevel(logging.WARNING) | |
stream.setFormatter(streamformat) | |
logs.addHandler(file) | |
logs.addHandler(stream) | |
# TODO: Read this in depending on chosen language / expand beyond english | |
nltk.download("stopwords") | |
_CLOSED_CLASS = ( | |
stopwords.words("english") | |
+ [ | |
"t", | |
"n", | |
"ll", | |
"d", | |
"wasn", | |
"weren", | |
"won", | |
"aren", | |
"wouldn", | |
"shouldn", | |
"didn", | |
"don", | |
"hasn", | |
"ain", | |
"couldn", | |
"doesn", | |
"hadn", | |
"haven", | |
"isn", | |
"mightn", | |
"mustn", | |
"needn", | |
"shan", | |
"would", | |
"could", | |
"dont", | |
"u", | |
] | |
+ [str(i) for i in range(0, 21)] | |
) | |
_IDENTITY_TERMS = [ | |
"man", | |
"woman", | |
"non-binary", | |
"gay", | |
"lesbian", | |
"queer", | |
"trans", | |
"straight", | |
"cis", | |
"she", | |
"her", | |
"hers", | |
"he", | |
"him", | |
"his", | |
"they", | |
"them", | |
"their", | |
"theirs", | |
"himself", | |
"herself", | |
] | |
# treating inf values as NaN as well | |
pd.set_option("use_inf_as_na", True) | |
_MIN_VOCAB_COUNT = 10 | |
_TREE_DEPTH = 12 | |
_TREE_MIN_NODES = 250 | |
# as long as we're using sklearn - already pushing the resources | |
_MAX_CLUSTER_EXAMPLES = 5000 | |
_NUM_VOCAB_BATCHES = 2000 | |
_TOP_N = 100 | |
_CVEC = CountVectorizer(token_pattern="(?u)\\b\\w+\\b", lowercase=True) | |
class DatasetStatisticsCacheClass: | |
def __init__( | |
self, | |
cache_dir, | |
dset_name, | |
dset_config, | |
split_name, | |
text_field, | |
label_field, | |
label_names, | |
calculation=None, | |
use_cache=False, | |
): | |
# This is only used for standalone runs for each kind of measurement. | |
self.calculation = calculation | |
self.our_text_field = OUR_TEXT_FIELD | |
self.our_length_field = LENGTH_FIELD | |
self.our_label_field = OUR_LABEL_FIELD | |
self.our_tokenized_field = TOKENIZED_FIELD | |
self.our_embedding_field = EMBEDDING_FIELD | |
self.cache_dir = cache_dir | |
# Use stored data if there; otherwise calculate afresh | |
self.use_cache = use_cache | |
### What are we analyzing? | |
# name of the Hugging Face dataset | |
self.dset_name = dset_name | |
# name of the dataset config | |
self.dset_config = dset_config | |
# name of the split to analyze | |
self.split_name = split_name | |
# TODO: Chould this be "feature" ? | |
# which text fields are we analysing? | |
self.text_field = text_field | |
# which label fields are we analysing? | |
self.label_field = label_field | |
# what are the names of the classes? | |
self.label_names = label_names | |
## Hugging Face dataset objects | |
self.dset = None # original dataset | |
# HF dataset with all of the self.text_field instances in self.dset | |
self.text_dset = None | |
self.dset_peek = None | |
# HF dataset with text embeddings in the same order as self.text_dset | |
self.embeddings_dset = None | |
# HF dataset with all of the self.label_field instances in self.dset | |
self.label_dset = None | |
## Data frames | |
# Tokenized text | |
self.tokenized_df = None | |
# save sentence length histogram in the class so it doesn't ge re-computed | |
self.length_df = None | |
self.fig_tok_length = None | |
# Data Frame version of self.label_dset | |
self.label_df = None | |
# save label pie chart in the class so it doesn't ge re-computed | |
self.fig_labels = None | |
# Vocabulary with word counts in the dataset | |
self.vocab_counts_df = None | |
# Vocabulary filtered to remove stopwords | |
self.vocab_counts_filtered_df = None | |
self.sorted_top_vocab_df = None | |
## General statistics and duplicates | |
self.total_words = 0 | |
self.total_open_words = 0 | |
# Number of NaN values (NOT empty strings) | |
self.text_nan_count = 0 | |
# Number of text items that appear more than once in the dataset | |
self.dedup_total = 0 | |
# Duplicated text items along with their number of occurences ("count") | |
self.dup_counts_df = None | |
self.avg_length = None | |
self.std_length = None | |
self.general_stats_dict = None | |
self.num_uniq_lengths = 0 | |
# clustering text by embeddings | |
# the hierarchical clustering tree is represented as a list of nodes, | |
# the first is the root | |
self.node_list = [] | |
# save tree figure in the class so it doesn't ge re-computed | |
self.fig_tree = None | |
# keep Embeddings object around to explore clusters | |
self.embeddings = None | |
# nPMI | |
# Holds a nPMIStatisticsCacheClass object | |
self.npmi_stats = None | |
# TODO: Have lowercase be an option for a user to set. | |
self.to_lowercase = True | |
# The minimum amount of times a word should occur to be included in | |
# word-count-based calculations (currently just relevant to nPMI) | |
self.min_vocab_count = _MIN_VOCAB_COUNT | |
# zipf | |
self.z = None | |
self.zipf_fig = None | |
self.cvec = _CVEC | |
# File definitions | |
# path to the directory used for caching | |
if not isinstance(text_field, str): | |
text_field = "-".join(text_field) | |
# if isinstance(label_field, str): | |
# label_field = label_field | |
# else: | |
# label_field = "-".join(label_field) | |
self.cache_path = pjoin( | |
self.cache_dir, | |
f"{dset_name}_{dset_config}_{split_name}_{text_field}", # {label_field}, | |
) | |
# Cache files not needed for UI | |
self.dset_fid = pjoin(self.cache_path, "base_dset") | |
self.tokenized_df_fid = pjoin(self.cache_path, "tokenized_df.feather") | |
self.label_dset_fid = pjoin(self.cache_path, "label_dset") | |
# Needed for UI -- embeddings | |
self.text_dset_fid = pjoin(self.cache_path, "text_dset") | |
# Needed for UI | |
self.dset_peek_json_fid = pjoin(self.cache_path, "dset_peek.json") | |
## Label cache files. | |
# Needed for UI | |
self.fig_labels_json_fid = pjoin(self.cache_path, "fig_labels.json") | |
## Length cache files | |
# Needed for UI | |
self.length_df_fid = pjoin(self.cache_path, "length_df.feather") | |
# Needed for UI | |
self.length_stats_json_fid = pjoin(self.cache_path, "length_stats.json") | |
self.vocab_counts_df_fid = pjoin(self.cache_path, "vocab_counts.feather") | |
# Needed for UI | |
self.dup_counts_df_fid = pjoin(self.cache_path, "dup_counts_df.feather") | |
# Needed for UI | |
self.fig_tok_length_fid = pjoin(self.cache_path, "fig_tok_length.png") | |
## General text stats | |
# Needed for UI | |
self.general_stats_json_fid = pjoin(self.cache_path, "general_stats_dict.json") | |
# Needed for UI | |
self.sorted_top_vocab_df_fid = pjoin( | |
self.cache_path, "sorted_top_vocab.feather" | |
) | |
## Zipf cache files | |
# Needed for UI | |
self.zipf_fid = pjoin(self.cache_path, "zipf_basic_stats.json") | |
# Needed for UI | |
self.zipf_fig_fid = pjoin(self.cache_path, "zipf_fig.json") | |
## Embeddings cache files | |
# Needed for UI | |
self.node_list_fid = pjoin(self.cache_path, "node_list.th") | |
# Needed for UI | |
self.fig_tree_json_fid = pjoin(self.cache_path, "fig_tree.json") | |
self.live = False | |
def set_deployment(self, live=True): | |
""" | |
Function that we can hit when we deploy, so that cache files are not | |
written out/recalculated, but instead that part of the UI can be punted. | |
""" | |
self.live = live | |
def check_cache_dir(self): | |
""" | |
First function to call to create the cache directory. | |
If in deployment mode and cache directory does not already exist, | |
return False. | |
""" | |
if self.live: | |
return isdir(self.cache_path) | |
else: | |
if not isdir(self.cache_path): | |
logs.warning("Creating cache directory %s." % self.cache_path) | |
mkdir(self.cache_path) | |
return isdir(self.cache_path) | |
def get_base_dataset(self): | |
"""Gets a pointer to the truncated base dataset object.""" | |
if not self.dset: | |
self.dset = load_truncated_dataset( | |
self.dset_name, | |
self.dset_config, | |
self.split_name, | |
cache_name=self.dset_fid, | |
use_cache=True, | |
use_streaming=True, | |
) | |
def load_or_prepare_general_stats(self, save=True): | |
""" | |
Content for expander_general_stats widget. | |
Provides statistics for total words, total open words, | |
the sorted top vocab, the NaN count, and the duplicate count. | |
Args: | |
Returns: | |
""" | |
# General statistics | |
if ( | |
self.use_cache | |
and exists(self.general_stats_json_fid) | |
and exists(self.dup_counts_df_fid) | |
and exists(self.sorted_top_vocab_df_fid) | |
): | |
logs.info("Loading cached general stats") | |
self.load_general_stats() | |
else: | |
if not self.live: | |
logs.info("Preparing general stats") | |
self.prepare_general_stats() | |
if save: | |
write_df(self.sorted_top_vocab_df, self.sorted_top_vocab_df_fid) | |
write_df(self.dup_counts_df, self.dup_counts_df_fid) | |
write_json(self.general_stats_dict, self.general_stats_json_fid) | |
def load_or_prepare_text_lengths(self, save=True): | |
""" | |
The text length widget relies on this function, which provides | |
a figure of the text lengths, some text length statistics, and | |
a text length dataframe to peruse. | |
Args: | |
save: | |
Returns: | |
""" | |
# Text length figure | |
if self.use_cache and exists(self.fig_tok_length_fid): | |
self.fig_tok_length_png = mpimg.imread(self.fig_tok_length_fid) | |
else: | |
if not self.live: | |
self.prepare_fig_text_lengths() | |
if save: | |
self.fig_tok_length.savefig(self.fig_tok_length_fid) | |
# Text length dataframe | |
if self.use_cache and exists(self.length_df_fid): | |
self.length_df = feather.read_feather(self.length_df_fid) | |
else: | |
if not self.live: | |
self.prepare_length_df() | |
if save: | |
write_df(self.length_df, self.length_df_fid) | |
# Text length stats. | |
if self.use_cache and exists(self.length_stats_json_fid): | |
with open(self.length_stats_json_fid, "r") as f: | |
self.length_stats_dict = json.load(f) | |
self.avg_length = self.length_stats_dict["avg length"] | |
self.std_length = self.length_stats_dict["std length"] | |
self.num_uniq_lengths = self.length_stats_dict["num lengths"] | |
else: | |
if not self.live: | |
self.prepare_text_length_stats() | |
if save: | |
write_json(self.length_stats_dict, self.length_stats_json_fid) | |
def prepare_length_df(self): | |
if not self.live: | |
if self.tokenized_df is None: | |
self.tokenized_df = self.do_tokenization() | |
self.tokenized_df[LENGTH_FIELD] = self.tokenized_df[TOKENIZED_FIELD].apply( | |
len | |
) | |
self.length_df = self.tokenized_df[ | |
[LENGTH_FIELD, OUR_TEXT_FIELD] | |
].sort_values(by=[LENGTH_FIELD], ascending=True) | |
def prepare_text_length_stats(self): | |
if not self.live: | |
if ( | |
self.tokenized_df is None | |
or LENGTH_FIELD not in self.tokenized_df.columns | |
or self.length_df is None | |
): | |
self.prepare_length_df() | |
avg_length = sum(self.tokenized_df[LENGTH_FIELD]) / len( | |
self.tokenized_df[LENGTH_FIELD] | |
) | |
self.avg_length = round(avg_length, 1) | |
std_length = statistics.stdev(self.tokenized_df[LENGTH_FIELD]) | |
self.std_length = round(std_length, 1) | |
self.num_uniq_lengths = len(self.length_df["length"].unique()) | |
self.length_stats_dict = { | |
"avg length": self.avg_length, | |
"std length": self.std_length, | |
"num lengths": self.num_uniq_lengths, | |
} | |
def prepare_fig_text_lengths(self): | |
if not self.live: | |
if ( | |
self.tokenized_df is None | |
or LENGTH_FIELD not in self.tokenized_df.columns | |
): | |
self.prepare_length_df() | |
self.fig_tok_length = make_fig_lengths(self.tokenized_df, LENGTH_FIELD) | |
def load_or_prepare_embeddings(self): | |
self.embeddings = Embeddings(self, use_cache=self.use_cache) | |
self.embeddings.make_hierarchical_clustering() | |
self.node_list = self.embeddings.node_list | |
self.fig_tree = self.embeddings.fig_tree | |
# get vocab with word counts | |
def load_or_prepare_vocab(self, save=True): | |
""" | |
Calculates the vocabulary count from the tokenized text. | |
The resulting dataframes may be used in nPMI calculations, zipf, etc. | |
:param | |
:return: | |
""" | |
if self.use_cache and exists(self.vocab_counts_df_fid): | |
logs.info("Reading vocab from cache") | |
self.load_vocab() | |
self.vocab_counts_filtered_df = filter_vocab(self.vocab_counts_df) | |
else: | |
logs.info("Calculating vocab afresh") | |
if self.tokenized_df is None: | |
self.tokenized_df = self.do_tokenization() | |
if save: | |
logs.info("Writing out.") | |
write_df(self.tokenized_df, self.tokenized_df_fid) | |
word_count_df = count_vocab_frequencies(self.tokenized_df) | |
logs.info("Making dfs with proportion.") | |
self.vocab_counts_df = calc_p_word(word_count_df) | |
self.vocab_counts_filtered_df = filter_vocab(self.vocab_counts_df) | |
if save: | |
logs.info("Writing out.") | |
write_df(self.vocab_counts_df, self.vocab_counts_df_fid) | |
logs.info("unfiltered vocab") | |
logs.info(self.vocab_counts_df) | |
logs.info("filtered vocab") | |
logs.info(self.vocab_counts_filtered_df) | |
def load_vocab(self): | |
with open(self.vocab_counts_df_fid, "rb") as f: | |
self.vocab_counts_df = feather.read_feather(f) | |
# Handling for changes in how the index is saved. | |
self.vocab_counts_df = self._set_idx_col_names(self.vocab_counts_df) | |
def load_or_prepare_text_duplicates(self, save=True): | |
if self.use_cache and exists(self.dup_counts_df_fid): | |
with open(self.dup_counts_df_fid, "rb") as f: | |
self.dup_counts_df = feather.read_feather(f) | |
elif self.dup_counts_df is None: | |
if not self.live: | |
self.prepare_text_duplicates() | |
if save: | |
write_df(self.dup_counts_df, self.dup_counts_df_fid) | |
else: | |
if not self.live: | |
# This happens when self.dup_counts_df is already defined; | |
# This happens when general_statistics were calculated first, | |
# since general statistics requires the number of duplicates | |
if save: | |
write_df(self.dup_counts_df, self.dup_counts_df_fid) | |
def load_general_stats(self): | |
self.general_stats_dict = json.load( | |
open(self.general_stats_json_fid, encoding="utf-8") | |
) | |
with open(self.sorted_top_vocab_df_fid, "rb") as f: | |
self.sorted_top_vocab_df = feather.read_feather(f) | |
self.text_nan_count = self.general_stats_dict[TEXT_NAN_CNT] | |
self.dedup_total = self.general_stats_dict[DEDUP_TOT] | |
self.total_words = self.general_stats_dict[TOT_WORDS] | |
self.total_open_words = self.general_stats_dict[TOT_OPEN_WORDS] | |
def prepare_general_stats(self): | |
if not self.live: | |
if self.tokenized_df is None: | |
logs.warning("Tokenized dataset not yet loaded; doing so.") | |
self.load_or_prepare_tokenized_df() | |
if self.vocab_counts_df is None: | |
logs.warning("Vocab not yet loaded; doing so.") | |
self.load_or_prepare_vocab() | |
self.sorted_top_vocab_df = self.vocab_counts_filtered_df.sort_values( | |
"count", ascending=False | |
).head(_TOP_N) | |
self.total_words = len(self.vocab_counts_df) | |
self.total_open_words = len(self.vocab_counts_filtered_df) | |
self.text_nan_count = int(self.tokenized_df.isnull().sum().sum()) | |
self.prepare_text_duplicates() | |
self.dedup_total = sum(self.dup_counts_df[CNT]) | |
self.general_stats_dict = { | |
TOT_WORDS: self.total_words, | |
TOT_OPEN_WORDS: self.total_open_words, | |
TEXT_NAN_CNT: self.text_nan_count, | |
DEDUP_TOT: self.dedup_total, | |
} | |
def prepare_text_duplicates(self): | |
if not self.live: | |
if self.tokenized_df is None: | |
self.load_or_prepare_tokenized_df() | |
dup_df = self.tokenized_df[self.tokenized_df.duplicated([OUR_TEXT_FIELD])] | |
self.dup_counts_df = pd.DataFrame( | |
dup_df.pivot_table( | |
columns=[OUR_TEXT_FIELD], aggfunc="size" | |
).sort_values(ascending=False), | |
columns=[CNT], | |
) | |
self.dup_counts_df[OUR_TEXT_FIELD] = self.dup_counts_df.index.copy() | |
def load_or_prepare_dataset(self, save=True): | |
""" | |
Prepares the HF datasets and data frames containing the untokenized and | |
tokenized text as well as the label values. | |
self.tokenized_df is used further for calculating text lengths, | |
word counts, etc. | |
Args: | |
save: Store the calculated data to disk. | |
Returns: | |
""" | |
logs.info("Doing text dset.") | |
self.load_or_prepare_text_dset(save) | |
#logs.info("Doing tokenized dataframe") | |
#self.load_or_prepare_tokenized_df(save) | |
logs.info("Doing dataset peek") | |
self.load_or_prepare_dset_peek(save) | |
def load_or_prepare_dset_peek(self, save=True): | |
if self.use_cache and exists(self.dset_peek_json_fid): | |
with open(self.dset_peek_json_fid, "r") as f: | |
self.dset_peek = json.load(f)["dset peek"] | |
else: | |
if not self.live: | |
if self.dset is None: | |
self.get_base_dataset() | |
self.dset_peek = self.dset[:100] | |
if save: | |
write_json({"dset peek": self.dset_peek}, self.dset_peek_json_fid) | |
def load_or_prepare_tokenized_df(self, save=True): | |
if self.use_cache and exists(self.tokenized_df_fid): | |
self.tokenized_df = feather.read_feather(self.tokenized_df_fid) | |
else: | |
if not self.live: | |
# tokenize all text instances | |
self.tokenized_df = self.do_tokenization() | |
if save: | |
logs.warning("Saving tokenized dataset to disk") | |
# save tokenized text | |
write_df(self.tokenized_df, self.tokenized_df_fid) | |
def load_or_prepare_text_dset(self, save=True): | |
if self.use_cache and exists(self.text_dset_fid): | |
# load extracted text | |
self.text_dset = load_from_disk(self.text_dset_fid) | |
logs.warning("Loaded dataset from disk") | |
logs.info(self.text_dset) | |
# ...Or load it from the server and store it anew | |
else: | |
if not self.live: | |
self.prepare_text_dset() | |
if save: | |
# save extracted text instances | |
logs.warning("Saving dataset to disk") | |
self.text_dset.save_to_disk(self.text_dset_fid) | |
def prepare_text_dset(self): | |
if not self.live: | |
self.get_base_dataset() | |
# extract all text instances | |
self.text_dset = self.dset.map( | |
lambda examples: extract_field( | |
examples, self.text_field, OUR_TEXT_FIELD | |
), | |
batched=True, | |
remove_columns=list(self.dset.features), | |
) | |
##additon | |
self.text_dset = self.text_dset.filter(lambda ex: ex["text"] is not None) | |
def do_tokenization(self): | |
""" | |
Tokenizes the dataset | |
:return: | |
""" | |
if self.text_dset is None: | |
self.load_or_prepare_text_dset() | |
sent_tokenizer = self.cvec.build_tokenizer() | |
def tokenize_batch(examples): | |
# TODO: lowercase should be an option | |
res = { | |
TOKENIZED_FIELD: [ | |
tuple(sent_tokenizer(text.lower())) | |
for text in examples[OUR_TEXT_FIELD] | |
] | |
} | |
res[LENGTH_FIELD] = [len(tok_text) for tok_text in res[TOKENIZED_FIELD]] | |
return res | |
tokenized_dset = self.text_dset.map( | |
tokenize_batch, | |
batched=True, | |
# remove_columns=[OUR_TEXT_FIELD], keep around to print | |
) | |
tokenized_df = pd.DataFrame(tokenized_dset) | |
return tokenized_df | |
def set_label_field(self, label_field="label"): | |
""" | |
Setter for label_field. Used in the CLI when a user asks for information | |
about labels, but does not specify the field; | |
'label' is assumed as a default. | |
""" | |
self.label_field = label_field | |
def load_or_prepare_labels(self, save=True): | |
# TODO: This is in a transitory state for creating fig cache. | |
# Clean up to be caching and reading everything correctly. | |
""" | |
Extracts labels from the Dataset | |
:return: | |
""" | |
# extracted labels | |
if len(self.label_field) > 0: | |
if self.use_cache and exists(self.fig_labels_json_fid): | |
self.fig_labels = read_plotly(self.fig_labels_json_fid) | |
elif self.use_cache and exists(self.label_dset_fid): | |
# load extracted labels | |
self.label_dset = load_from_disk(self.label_dset_fid) | |
self.label_df = self.label_dset.to_pandas() | |
self.fig_labels = make_fig_labels( | |
self.label_df, self.label_names, OUR_LABEL_FIELD | |
) | |
if save: | |
write_plotly(self.fig_labels, self.fig_labels_json_fid) | |
else: | |
if not self.live: | |
self.prepare_labels() | |
if save: | |
# save extracted label instances | |
self.label_dset.save_to_disk(self.label_dset_fid) | |
write_plotly(self.fig_labels, self.fig_labels_json_fid) | |
def prepare_labels(self): | |
if not self.live: | |
self.get_base_dataset() | |
self.label_dset = self.dset.map( | |
lambda examples: extract_field( | |
examples, self.label_field, OUR_LABEL_FIELD | |
), | |
batched=True, | |
remove_columns=list(self.dset.features), | |
) | |
self.label_df = self.label_dset.to_pandas() | |
self.fig_labels = make_fig_labels( | |
self.label_df, self.label_names, OUR_LABEL_FIELD | |
) | |
def load_or_prepare_npmi(self): | |
self.npmi_stats = nPMIStatisticsCacheClass(self, use_cache=self.use_cache) | |
self.npmi_stats.load_or_prepare_npmi_terms() | |
def load_or_prepare_zipf(self, save=True): | |
# TODO: Current UI only uses the fig, meaning the self.z here is irrelevant | |
# when only reading from cache. Either the UI should use it, or it should | |
# be removed when reading in cache | |
if self.use_cache and exists(self.zipf_fig_fid) and exists(self.zipf_fid): | |
with open(self.zipf_fid, "r") as f: | |
zipf_dict = json.load(f) | |
self.z = Zipf() | |
self.z.load(zipf_dict) | |
self.zipf_fig = read_plotly(self.zipf_fig_fid) | |
elif self.use_cache and exists(self.zipf_fid): | |
# TODO: Read zipf data so that the vocab is there. | |
with open(self.zipf_fid, "r") as f: | |
zipf_dict = json.load(f) | |
self.z = Zipf() | |
self.z.load(zipf_dict) | |
self.zipf_fig = make_zipf_fig(self.vocab_counts_df, self.z) | |
if save: | |
write_plotly(self.zipf_fig, self.zipf_fig_fid) | |
else: | |
self.z = Zipf(self.vocab_counts_df) | |
self.zipf_fig = make_zipf_fig(self.vocab_counts_df, self.z) | |
if save: | |
write_zipf_data(self.z, self.zipf_fid) | |
write_plotly(self.zipf_fig, self.zipf_fig_fid) | |
def _set_idx_col_names(self, input_vocab_df): | |
if input_vocab_df.index.name != VOCAB and VOCAB in input_vocab_df.columns: | |
input_vocab_df = input_vocab_df.set_index([VOCAB]) | |
input_vocab_df[VOCAB] = input_vocab_df.index | |
return input_vocab_df | |
class nPMIStatisticsCacheClass: | |
""" "Class to interface between the app and the nPMI class | |
by calling the nPMI class with the user's selections.""" | |
def __init__(self, dataset_stats, use_cache=False): | |
self.live = dataset_stats.live | |
self.dstats = dataset_stats | |
self.pmi_cache_path = pjoin(self.dstats.cache_path, "pmi_files") | |
if not isdir(self.pmi_cache_path): | |
logs.warning("Creating pmi cache directory %s." % self.pmi_cache_path) | |
# We need to preprocess everything. | |
mkdir(self.pmi_cache_path) | |
self.joint_npmi_df_dict = {} | |
# TODO: Users ideally can type in whatever words they want. | |
self.termlist = _IDENTITY_TERMS | |
# termlist terms that are available more than _MIN_VOCAB_COUNT times | |
self.available_terms = _IDENTITY_TERMS | |
logs.info(self.termlist) | |
self.use_cache = use_cache | |
# TODO: Let users specify | |
self.open_class_only = True | |
self.min_vocab_count = self.dstats.min_vocab_count | |
self.subgroup_files = {} | |
self.npmi_terms_fid = pjoin(self.dstats.cache_path, "npmi_terms.json") | |
def load_or_prepare_npmi_terms(self): | |
""" | |
Figures out what identity terms the user can select, based on whether | |
they occur more than self.min_vocab_count times | |
:return: Identity terms occurring at least self.min_vocab_count times. | |
""" | |
# TODO: Add the user's ability to select subgroups. | |
# TODO: Make min_vocab_count here value selectable by the user. | |
if ( | |
self.use_cache | |
and exists(self.npmi_terms_fid) | |
and json.load(open(self.npmi_terms_fid))["available terms"] != [] | |
): | |
available_terms = json.load(open(self.npmi_terms_fid))["available terms"] | |
else: | |
true_false = [ | |
term in self.dstats.vocab_counts_df.index for term in self.termlist | |
] | |
word_list_tmp = [x for x, y in zip(self.termlist, true_false) if y] | |
true_false_counts = [ | |
self.dstats.vocab_counts_df.loc[word, CNT] >= self.min_vocab_count | |
for word in word_list_tmp | |
] | |
available_terms = [ | |
word for word, y in zip(word_list_tmp, true_false_counts) if y | |
] | |
logs.info(available_terms) | |
with open(self.npmi_terms_fid, "w+") as f: | |
json.dump({"available terms": available_terms}, f) | |
self.available_terms = available_terms | |
return available_terms | |
def load_or_prepare_joint_npmi(self, subgroup_pair): | |
""" | |
Run on-the fly, while the app is already open, | |
as it depends on the subgroup terms that the user chooses | |
:param subgroup_pair: | |
:return: | |
""" | |
# Canonical ordering for subgroup_list | |
subgroup_pair = sorted(subgroup_pair) | |
subgroup1 = subgroup_pair[0] | |
subgroup2 = subgroup_pair[1] | |
subgroups_str = "-".join(subgroup_pair) | |
if not isdir(self.pmi_cache_path): | |
logs.warning("Creating cache") | |
# We need to preprocess everything. | |
# This should eventually all go into a prepare_dataset CLI | |
mkdir(self.pmi_cache_path) | |
joint_npmi_fid = pjoin(self.pmi_cache_path, subgroups_str + "_npmi.csv") | |
subgroup_files = define_subgroup_files(subgroup_pair, self.pmi_cache_path) | |
# Defines the filenames for the cache files from the selected subgroups. | |
# Get as much precomputed data as we can. | |
if self.use_cache and exists(joint_npmi_fid): | |
# When everything is already computed for the selected subgroups. | |
logs.info("Loading cached joint npmi") | |
joint_npmi_df = self.load_joint_npmi_df(joint_npmi_fid) | |
npmi_display_cols = [ | |
"npmi-bias", | |
subgroup1 + "-npmi", | |
subgroup2 + "-npmi", | |
subgroup1 + "-count", | |
subgroup2 + "-count", | |
] | |
joint_npmi_df = joint_npmi_df[npmi_display_cols] | |
# When maybe some things have been computed for the selected subgroups. | |
else: | |
if not self.live: | |
logs.info("Preparing new joint npmi") | |
joint_npmi_df, subgroup_dict = self.prepare_joint_npmi_df( | |
subgroup_pair, subgroup_files | |
) | |
# Cache new results | |
logs.info("Writing out.") | |
for subgroup in subgroup_pair: | |
write_subgroup_npmi_data(subgroup, subgroup_dict, subgroup_files) | |
with open(joint_npmi_fid, "w+") as f: | |
joint_npmi_df.to_csv(f) | |
else: | |
joint_npmi_df = pd.DataFrame() | |
logs.info("The joint npmi df is") | |
logs.info(joint_npmi_df) | |
return joint_npmi_df | |
def load_joint_npmi_df(self, joint_npmi_fid): | |
""" | |
Reads in a saved dataframe with all of the paired results. | |
:param joint_npmi_fid: | |
:return: paired results | |
""" | |
with open(joint_npmi_fid, "rb") as f: | |
joint_npmi_df = pd.read_csv(f) | |
joint_npmi_df = self._set_idx_cols_from_cache(joint_npmi_df) | |
return joint_npmi_df.dropna() | |
def prepare_joint_npmi_df(self, subgroup_pair, subgroup_files): | |
""" | |
Computs the npmi bias based on the given subgroups. | |
Handles cases where some of the selected subgroups have cached nPMI | |
computations, but other's don't, computing everything afresh if there | |
are not cached files. | |
:param subgroup_pair: | |
:return: Dataframe with nPMI for the words, nPMI bias between the words. | |
""" | |
subgroup_dict = {} | |
# When npmi is computed for some (but not all) of subgroup_list | |
for subgroup in subgroup_pair: | |
logs.info("Load or failing...") | |
# When subgroup npmi has been computed in a prior session. | |
cached_results = self.load_or_fail_cached_npmi_scores( | |
subgroup, subgroup_files[subgroup] | |
) | |
# If the function did not return False and we did find it, use. | |
if cached_results: | |
# FYI: subgroup_cooc_df, subgroup_pmi_df, subgroup_npmi_df = cached_results | |
# Holds the previous sessions' data for use in this session. | |
subgroup_dict[subgroup] = cached_results | |
logs.info("Calculating for subgroup list") | |
joint_npmi_df, subgroup_dict = self.do_npmi(subgroup_pair, subgroup_dict) | |
return joint_npmi_df.dropna(), subgroup_dict | |
# TODO: Update pairwise assumption | |
def do_npmi(self, subgroup_pair, subgroup_dict): | |
""" | |
Calculates nPMI for given identity terms and the nPMI bias between. | |
:param subgroup_pair: List of identity terms to calculate the bias for | |
:return: Subset of data for the UI | |
:return: Selected identity term's co-occurrence counts with | |
other words, pmi per word, and nPMI per word. | |
""" | |
logs.info("Initializing npmi class") | |
npmi_obj = self.set_npmi_obj() | |
# Canonical ordering used | |
subgroup_pair = tuple(sorted(subgroup_pair)) | |
# Calculating nPMI statistics | |
for subgroup in subgroup_pair: | |
# If the subgroup data is already computed, grab it. | |
# TODO: Should we set idx and column names similarly to how we set them for cached files? | |
if subgroup not in subgroup_dict: | |
logs.info("Calculating statistics for %s" % subgroup) | |
vocab_cooc_df, pmi_df, npmi_df = npmi_obj.calc_metrics(subgroup) | |
# Store the nPMI information for the current subgroups | |
subgroup_dict[subgroup] = (vocab_cooc_df, pmi_df, npmi_df) | |
# Pair the subgroups together, indexed by all words that | |
# co-occur between them. | |
logs.info("Computing pairwise npmi bias") | |
paired_results = npmi_obj.calc_paired_metrics(subgroup_pair, subgroup_dict) | |
UI_results = make_npmi_fig(paired_results, subgroup_pair) | |
return UI_results, subgroup_dict | |
def set_npmi_obj(self): | |
""" | |
Initializes the nPMI class with the given words and tokenized sentences. | |
:return: | |
""" | |
npmi_obj = nPMI(self.dstats.vocab_counts_df, self.dstats.tokenized_df) | |
return npmi_obj | |
def load_or_fail_cached_npmi_scores(self, subgroup, subgroup_fids): | |
""" | |
Reads cached scores from the specified subgroup files | |
:param subgroup: string of the selected identity term | |
:return: | |
""" | |
# TODO: Ordering of npmi, pmi, vocab triple should be consistent | |
subgroup_npmi_fid, subgroup_pmi_fid, subgroup_cooc_fid = subgroup_fids | |
if ( | |
exists(subgroup_npmi_fid) | |
and exists(subgroup_pmi_fid) | |
and exists(subgroup_cooc_fid) | |
): | |
logs.info("Reading in pmi data....") | |
with open(subgroup_cooc_fid, "rb") as f: | |
subgroup_cooc_df = pd.read_csv(f) | |
logs.info("pmi") | |
with open(subgroup_pmi_fid, "rb") as f: | |
subgroup_pmi_df = pd.read_csv(f) | |
logs.info("npmi") | |
with open(subgroup_npmi_fid, "rb") as f: | |
subgroup_npmi_df = pd.read_csv(f) | |
subgroup_cooc_df = self._set_idx_cols_from_cache( | |
subgroup_cooc_df, subgroup, "count" | |
) | |
subgroup_pmi_df = self._set_idx_cols_from_cache( | |
subgroup_pmi_df, subgroup, "pmi" | |
) | |
subgroup_npmi_df = self._set_idx_cols_from_cache( | |
subgroup_npmi_df, subgroup, "npmi" | |
) | |
return subgroup_cooc_df, subgroup_pmi_df, subgroup_npmi_df | |
return False | |
def _set_idx_cols_from_cache(self, csv_df, subgroup=None, calc_str=None): | |
""" | |
Helps make sure all of the read-in files can be accessed within code | |
via standardized indices and column names. | |
:param csv_df: | |
:param subgroup: | |
:param calc_str: | |
:return: | |
""" | |
# The csv saves with this column instead of the index, so that's weird. | |
if "Unnamed: 0" in csv_df.columns: | |
csv_df = csv_df.set_index("Unnamed: 0") | |
csv_df.index.name = WORD | |
elif WORD in csv_df.columns: | |
csv_df = csv_df.set_index(WORD) | |
csv_df.index.name = WORD | |
elif VOCAB in csv_df.columns: | |
csv_df = csv_df.set_index(VOCAB) | |
csv_df.index.name = WORD | |
if subgroup and calc_str: | |
csv_df.columns = [subgroup + "-" + calc_str] | |
elif subgroup: | |
csv_df.columns = [subgroup] | |
elif calc_str: | |
csv_df.columns = [calc_str] | |
return csv_df | |
def get_available_terms(self): | |
return self.load_or_prepare_npmi_terms() | |
def dummy(doc): | |
return doc | |
def count_vocab_frequencies(tokenized_df): | |
""" | |
Based on an input pandas DataFrame with a 'text' column, | |
this function will count the occurrences of all words. | |
:return: [num_words x num_sentences] DataFrame with the rows corresponding to the | |
different vocabulary words and the column to the presence (0 or 1) of that word. | |
""" | |
cvec = CountVectorizer( | |
tokenizer=dummy, | |
preprocessor=dummy, | |
) | |
# We do this to calculate per-word statistics | |
# Fast calculation of single word counts | |
logs.info( | |
"Fitting dummy tokenization to make matrix using the previous tokenization" | |
) | |
cvec.fit(tokenized_df[TOKENIZED_FIELD]) | |
document_matrix = cvec.transform(tokenized_df[TOKENIZED_FIELD]) | |
batches = np.linspace(0, tokenized_df.shape[0], _NUM_VOCAB_BATCHES).astype(int) | |
i = 0 | |
tf = [] | |
while i < len(batches) - 1: | |
logs.info("%s of %s vocab batches" % (str(i), str(len(batches)))) | |
batch_result = np.sum( | |
document_matrix[batches[i] : batches[i + 1]].toarray(), axis=0 | |
) | |
tf.append(batch_result) | |
i += 1 | |
word_count_df = pd.DataFrame( | |
[np.sum(tf, axis=0)], columns=cvec.get_feature_names() | |
).transpose() | |
# Now organize everything into the dataframes | |
word_count_df.columns = [CNT] | |
word_count_df.index.name = WORD | |
return word_count_df | |
def calc_p_word(word_count_df): | |
# p(word) | |
word_count_df[PROP] = word_count_df[CNT] / float(sum(word_count_df[CNT])) | |
vocab_counts_df = pd.DataFrame(word_count_df.sort_values(by=CNT, ascending=False)) | |
vocab_counts_df[VOCAB] = vocab_counts_df.index | |
return vocab_counts_df | |
def filter_vocab(vocab_counts_df): | |
# TODO: Add warnings (which words are missing) to log file? | |
filtered_vocab_counts_df = vocab_counts_df.drop(_CLOSED_CLASS, errors="ignore") | |
filtered_count = filtered_vocab_counts_df[CNT] | |
filtered_count_denom = float(sum(filtered_vocab_counts_df[CNT])) | |
filtered_vocab_counts_df[PROP] = filtered_count / filtered_count_denom | |
return filtered_vocab_counts_df | |
## Figures ## | |
def write_plotly(fig, fid): | |
write_json(plotly.io.to_json(fig), fid) | |
def read_plotly(fid): | |
fig = plotly.io.from_json(json.load(open(fid, encoding="utf-8"))) | |
return fig | |
def make_fig_lengths(tokenized_df, length_field): | |
fig_tok_length, axs = plt.subplots(figsize=(15, 6), dpi=150) | |
sns.histplot(data=tokenized_df[length_field], kde=True, bins=100, ax=axs) | |
sns.rugplot(data=tokenized_df[length_field], ax=axs) | |
return fig_tok_length | |
def make_fig_labels(label_df, label_names, label_field): | |
labels = label_df[label_field].unique() | |
label_sums = [len(label_df[label_df[label_field] == label]) for label in labels] | |
fig_labels = px.pie(label_df, values=label_sums, names=label_names) | |
return fig_labels | |
def make_zipf_fig_ranked_word_list(vocab_df, unique_counts, unique_ranks): | |
ranked_words = {} | |
for count, rank in zip(unique_counts, unique_ranks): | |
vocab_df[vocab_df[CNT] == count]["rank"] = rank | |
ranked_words[rank] = ",".join( | |
vocab_df[vocab_df[CNT] == count].index.astype(str) | |
) # Use the hovertext kw argument for hover text | |
ranked_words_list = [wrds for rank, wrds in sorted(ranked_words.items())] | |
return ranked_words_list | |
def make_npmi_fig(paired_results, subgroup_pair): | |
subgroup1, subgroup2 = subgroup_pair | |
UI_results = pd.DataFrame() | |
if "npmi-bias" in paired_results: | |
UI_results["npmi-bias"] = paired_results["npmi-bias"].astype(float) | |
UI_results[subgroup1 + "-npmi"] = paired_results["npmi"][ | |
subgroup1 + "-npmi" | |
].astype(float) | |
UI_results[subgroup1 + "-count"] = paired_results["count"][ | |
subgroup1 + "-count" | |
].astype(int) | |
if subgroup1 != subgroup2: | |
UI_results[subgroup2 + "-npmi"] = paired_results["npmi"][ | |
subgroup2 + "-npmi" | |
].astype(float) | |
UI_results[subgroup2 + "-count"] = paired_results["count"][ | |
subgroup2 + "-count" | |
].astype(int) | |
return UI_results.sort_values(by="npmi-bias", ascending=True) | |
def make_zipf_fig(vocab_counts_df, z): | |
zipf_counts = z.calc_zipf_counts(vocab_counts_df) | |
unique_counts = z.uniq_counts | |
unique_ranks = z.uniq_ranks | |
ranked_words_list = make_zipf_fig_ranked_word_list( | |
vocab_counts_df, unique_counts, unique_ranks | |
) | |
zmin = z.get_xmin() | |
logs.info("zipf counts is") | |
logs.info(zipf_counts) | |
layout = go.Layout(xaxis=dict(range=[0, 100])) | |
fig = go.Figure( | |
data=[ | |
go.Bar( | |
x=z.uniq_ranks, | |
y=z.uniq_counts, | |
hovertext=ranked_words_list, | |
name="Word Rank Frequency", | |
) | |
], | |
layout=layout, | |
) | |
fig.add_trace( | |
go.Scatter( | |
x=z.uniq_ranks[zmin : len(z.uniq_ranks)], | |
y=zipf_counts[zmin : len(z.uniq_ranks)], | |
hovertext=ranked_words_list[zmin : len(z.uniq_ranks)], | |
line=go.scatter.Line(color="crimson", width=3), | |
name="Zipf Predicted Frequency", | |
) | |
) | |
# Customize aspect | |
# fig.update_traces(marker_color='limegreen', | |
# marker_line_width=1.5, opacity=0.6) | |
fig.update_layout(title_text="Word Counts, Observed and Predicted by Zipf") | |
fig.update_layout(xaxis_title="Word Rank") | |
fig.update_layout(yaxis_title="Frequency") | |
fig.update_layout(legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.10)) | |
return fig | |
## Input/Output ### | |
def define_subgroup_files(subgroup_list, pmi_cache_path): | |
""" | |
Sets the file ids for the input identity terms | |
:param subgroup_list: List of identity terms | |
:return: | |
""" | |
subgroup_files = {} | |
for subgroup in subgroup_list: | |
# TODO: Should the pmi, npmi, and count just be one file? | |
subgroup_npmi_fid = pjoin(pmi_cache_path, subgroup + "_npmi.csv") | |
subgroup_pmi_fid = pjoin(pmi_cache_path, subgroup + "_pmi.csv") | |
subgroup_cooc_fid = pjoin(pmi_cache_path, subgroup + "_vocab_cooc.csv") | |
subgroup_files[subgroup] = ( | |
subgroup_npmi_fid, | |
subgroup_pmi_fid, | |
subgroup_cooc_fid, | |
) | |
return subgroup_files | |
## Input/Output ## | |
def intersect_dfs(df_dict): | |
started = 0 | |
new_df = None | |
for key, df in df_dict.items(): | |
if df is None: | |
continue | |
for key2, df2 in df_dict.items(): | |
if df2 is None: | |
continue | |
if key == key2: | |
continue | |
if started: | |
new_df = new_df.join(df2, how="inner", lsuffix="1", rsuffix="2") | |
else: | |
new_df = df.join(df2, how="inner", lsuffix="1", rsuffix="2") | |
started = 1 | |
return new_df.copy() | |
def write_df(df, df_fid): | |
feather.write_feather(df, df_fid) | |
def write_json(json_dict, json_fid): | |
with open(json_fid, "w", encoding="utf-8") as f: | |
json.dump(json_dict, f) | |
def write_subgroup_npmi_data(subgroup, subgroup_dict, subgroup_files): | |
""" | |
Saves the calculated nPMI statistics to their output files. | |
Includes the npmi scores for each identity term, the pmi scores, and the | |
co-occurrence counts of the identity term with all the other words | |
:param subgroup: Identity term | |
:return: | |
""" | |
subgroup_fids = subgroup_files[subgroup] | |
subgroup_npmi_fid, subgroup_pmi_fid, subgroup_cooc_fid = subgroup_fids | |
subgroup_dfs = subgroup_dict[subgroup] | |
subgroup_cooc_df, subgroup_pmi_df, subgroup_npmi_df = subgroup_dfs | |
with open(subgroup_npmi_fid, "w+") as f: | |
subgroup_npmi_df.to_csv(f) | |
with open(subgroup_pmi_fid, "w+") as f: | |
subgroup_pmi_df.to_csv(f) | |
with open(subgroup_cooc_fid, "w+") as f: | |
subgroup_cooc_df.to_csv(f) | |
def write_zipf_data(z, zipf_fid): | |
zipf_dict = {} | |
zipf_dict["xmin"] = int(z.xmin) | |
zipf_dict["xmax"] = int(z.xmax) | |
zipf_dict["alpha"] = float(z.alpha) | |
zipf_dict["ks_distance"] = float(z.distance) | |
zipf_dict["p-value"] = float(z.ks_test.pvalue) | |
zipf_dict["uniq_counts"] = [int(count) for count in z.uniq_counts] | |
zipf_dict["uniq_ranks"] = [int(rank) for rank in z.uniq_ranks] | |
with open(zipf_fid, "w+", encoding="utf-8") as f: | |
json.dump(zipf_dict, f) | |