Spaces:
Runtime error
Runtime error
from data_extractor import load_data | |
from utils import extract_feature, AVAILABLE_EMOTIONS | |
from create_csv import write_emodb_csv, write_tess_ravdess_csv, write_custom_csv | |
from sklearn.metrics import accuracy_score, make_scorer, fbeta_score, mean_squared_error, mean_absolute_error | |
from sklearn.metrics import confusion_matrix | |
from sklearn.model_selection import GridSearchCV | |
import matplotlib.pyplot as pl | |
from time import time | |
from utils import get_best_estimators, get_audio_config | |
import numpy as np | |
import tqdm | |
import os | |
import random | |
import pandas as pd | |
class EmotionRecognizer: | |
"""A class for training, testing and predicting emotions based on | |
speech's features that are extracted and fed into `sklearn` or `keras` model""" | |
def __init__(self, model=None, **kwargs): | |
""" | |
Params: | |
model (sklearn model): the model used to detect emotions. If `model` is None, then self.determine_best_model() | |
will be automatically called | |
emotions (list): list of emotions to be used. Note that these emotions must be available in | |
RAVDESS_TESS & EMODB Datasets, available nine emotions are the following: | |
'neutral', 'calm', 'happy', 'sad', 'angry', 'fear', 'disgust', 'ps' ( pleasant surprised ), 'boredom'. | |
Default is ["sad", "neutral", "happy"]. | |
tess_ravdess (bool): whether to use TESS & RAVDESS Speech datasets, default is True | |
emodb (bool): whether to use EMO-DB Speech dataset, default is True, | |
custom_db (bool): whether to use custom Speech dataset that is located in `data/train-custom` | |
and `data/test-custom`, default is True | |
tess_ravdess_name (str): the name of the output CSV file for TESS&RAVDESS dataset, default is "tess_ravdess.csv" | |
emodb_name (str): the name of the output CSV file for EMO-DB dataset, default is "emodb.csv" | |
custom_db_name (str): the name of the output CSV file for the custom dataset, default is "custom.csv" | |
features (list): list of speech features to use, default is ["mfcc", "chroma", "mel"] | |
(i.e MFCC, Chroma and MEL spectrogram ) | |
classification (bool): whether to use classification or regression, default is True | |
balance (bool): whether to balance the dataset ( both training and testing ), default is True | |
verbose (bool/int): whether to print messages on certain tasks, default is 1 | |
Note that when `tess_ravdess`, `emodb` and `custom_db` are set to `False`, `tess_ravdess` will be set to True | |
automatically. | |
""" | |
# emotions | |
self.emotions = kwargs.get("emotions", ["sad", "neutral", "happy"]) | |
# make sure that there are only available emotions | |
self._verify_emotions() | |
# audio config | |
self.features = kwargs.get("features", ["mfcc", "chroma", "mel"]) | |
self.audio_config = get_audio_config(self.features) | |
# datasets | |
self.tess_ravdess = kwargs.get("tess_ravdess", True) | |
self.emodb = kwargs.get("emodb", True) | |
self.custom_db = kwargs.get("custom_db", True) | |
if not self.tess_ravdess and not self.emodb and not self.custom_db: | |
self.tess_ravdess = True | |
self.classification = kwargs.get("classification", True) | |
self.balance = kwargs.get("balance", True) | |
self.override_csv = kwargs.get("override_csv", True) | |
self.verbose = kwargs.get("verbose", 1) | |
self.tess_ravdess_name = kwargs.get("tess_ravdess_name", "tess_ravdess.csv") | |
self.emodb_name = kwargs.get("emodb_name", "emodb.csv") | |
self.custom_db_name = kwargs.get("custom_db_name", "custom.csv") | |
self.verbose = kwargs.get("verbose", 1) | |
# set metadata path file names | |
self._set_metadata_filenames() | |
# write csv's anyway | |
self.write_csv() | |
# boolean attributes | |
self.data_loaded = False | |
self.model_trained = False | |
# model | |
if not model: | |
self.determine_best_model() | |
else: | |
self.model = model | |
def _set_metadata_filenames(self): | |
""" | |
Protected method to get all CSV (metadata) filenames into two instance attributes: | |
- `self.train_desc_files` for training CSVs | |
- `self.test_desc_files` for testing CSVs | |
""" | |
train_desc_files, test_desc_files = [], [] | |
if self.tess_ravdess: | |
train_desc_files.append(f"train_{self.tess_ravdess_name}") | |
test_desc_files.append(f"test_{self.tess_ravdess_name}") | |
if self.emodb: | |
train_desc_files.append(f"train_{self.emodb_name}") | |
test_desc_files.append(f"test_{self.emodb_name}") | |
if self.custom_db: | |
train_desc_files.append(f"train_{self.custom_db_name}") | |
test_desc_files.append(f"test_{self.custom_db_name}") | |
# set them to be object attributes | |
self.train_desc_files = train_desc_files | |
self.test_desc_files = test_desc_files | |
def _verify_emotions(self): | |
""" | |
This method makes sure that emotions passed in parameters are valid. | |
""" | |
for emotion in self.emotions: | |
assert emotion in AVAILABLE_EMOTIONS, "Emotion not recognized." | |
def get_best_estimators(self): | |
"""Loads estimators from grid files and returns them""" | |
return get_best_estimators(self.classification) | |
def write_csv(self): | |
""" | |
Write available CSV files in `self.train_desc_files` and `self.test_desc_files` | |
determined by `self._set_metadata_filenames()` method. | |
""" | |
for train_csv_file, test_csv_file in zip(self.train_desc_files, self.test_desc_files): | |
# not safe approach | |
if os.path.isfile(train_csv_file) and os.path.isfile(test_csv_file): | |
# file already exists, just skip writing csv files | |
if not self.override_csv: | |
continue | |
if self.emodb_name in train_csv_file: | |
write_emodb_csv(self.emotions, train_name=train_csv_file, test_name=test_csv_file, verbose=self.verbose) | |
if self.verbose: | |
print("[+] Writed EMO-DB CSV File") | |
elif self.tess_ravdess_name in train_csv_file: | |
write_tess_ravdess_csv(self.emotions, train_name=train_csv_file, test_name=test_csv_file, verbose=self.verbose) | |
if self.verbose: | |
print("[+] Writed TESS & RAVDESS DB CSV File") | |
elif self.custom_db_name in train_csv_file: | |
write_custom_csv(emotions=self.emotions, train_name=train_csv_file, test_name=test_csv_file, verbose=self.verbose) | |
if self.verbose: | |
print("[+] Writed Custom DB CSV File") | |
def load_data(self): | |
""" | |
Loads and extracts features from the audio files for the db's specified | |
""" | |
if not self.data_loaded: | |
result = load_data(self.train_desc_files, self.test_desc_files, self.audio_config, self.classification, | |
emotions=self.emotions, balance=self.balance) | |
self.X_train = result['X_train'] | |
self.X_test = result['X_test'] | |
self.y_train = result['y_train'] | |
self.y_test = result['y_test'] | |
self.train_audio_paths = result['train_audio_paths'] | |
self.test_audio_paths = result['test_audio_paths'] | |
self.balance = result["balance"] | |
if self.verbose: | |
print("[+] Data loaded") | |
self.data_loaded = True | |
def train(self, verbose=1): | |
""" | |
Train the model, if data isn't loaded, it 'll be loaded automatically | |
""" | |
if not self.data_loaded: | |
# if data isn't loaded yet, load it then | |
self.load_data() | |
if not self.model_trained: | |
self.model.fit(X=self.X_train, y=self.y_train) | |
self.model_trained = True | |
if verbose: | |
print("[+] Model trained") | |
def predict(self, audio_path): | |
""" | |
given an `audio_path`, this method extracts the features | |
and predicts the emotion | |
""" | |
feature = extract_feature(audio_path, **self.audio_config).reshape(1, -1) | |
return self.model.predict(feature)[0] | |
def predict_proba(self, audio_path): | |
""" | |
Predicts the probability of each emotion. | |
""" | |
if self.classification: | |
feature = extract_feature(audio_path, **self.audio_config).reshape(1, -1) | |
proba = self.model.predict_proba(feature)[0] | |
result = {} | |
for emotion, prob in zip(self.model.classes_, proba): | |
result[emotion] = prob | |
return result | |
else: | |
raise NotImplementedError("Probability prediction doesn't make sense for regression") | |
def grid_search(self, params, n_jobs=2, verbose=1): | |
""" | |
Performs GridSearchCV on `params` passed on the `self.model` | |
And returns the tuple: (best_estimator, best_params, best_score). | |
""" | |
score = accuracy_score if self.classification else mean_absolute_error | |
grid = GridSearchCV(estimator=self.model, param_grid=params, scoring=make_scorer(score), | |
n_jobs=n_jobs, verbose=verbose, cv=3) | |
grid_result = grid.fit(self.X_train, self.y_train) | |
return grid_result.best_estimator_, grid_result.best_params_, grid_result.best_score_ | |
def determine_best_model(self): | |
""" | |
Loads best estimators and determine which is best for test data, | |
and then set it to `self.model`. | |
In case of regression, the metric used is MSE and accuracy for classification. | |
Note that the execution of this method may take several minutes due | |
to training all estimators (stored in `grid` folder) for determining the best possible one. | |
""" | |
if not self.data_loaded: | |
self.load_data() | |
# loads estimators | |
estimators = self.get_best_estimators() | |
result = [] | |
if self.verbose: | |
estimators = tqdm.tqdm(estimators) | |
for estimator, params, cv_score in estimators: | |
if self.verbose: | |
estimators.set_description(f"Evaluating {estimator.__class__.__name__}") | |
detector = EmotionRecognizer(estimator, emotions=self.emotions, tess_ravdess=self.tess_ravdess, | |
emodb=self.emodb, custom_db=self.custom_db, classification=self.classification, | |
features=self.features, balance=self.balance, override_csv=False) | |
# data already loaded | |
detector.X_train = self.X_train | |
detector.X_test = self.X_test | |
detector.y_train = self.y_train | |
detector.y_test = self.y_test | |
detector.data_loaded = True | |
# train the model | |
detector.train(verbose=0) | |
# get test accuracy | |
accuracy = detector.test_score() | |
# append to result | |
result.append((detector.model, accuracy)) | |
# sort the result | |
# regression: best is the lower, not the higher | |
# classification: best is higher, not the lower | |
result = sorted(result, key=lambda item: item[1], reverse=self.classification) | |
best_estimator = result[0][0] | |
accuracy = result[0][1] | |
self.model = best_estimator | |
self.model_trained = True | |
if self.verbose: | |
if self.classification: | |
print(f"[+] Best model determined: {self.model.__class__.__name__} with {accuracy*100:.3f}% test accuracy") | |
else: | |
print(f"[+] Best model determined: {self.model.__class__.__name__} with {accuracy:.5f} mean absolute error") | |
def test_score(self): | |
""" | |
Calculates score on testing data | |
if `self.classification` is True, the metric used is accuracy, | |
Mean-Squared-Error is used otherwise (regression) | |
""" | |
y_pred = self.model.predict(self.X_test) | |
if self.classification: | |
return accuracy_score(y_true=self.y_test, y_pred=y_pred) | |
else: | |
return mean_squared_error(y_true=self.y_test, y_pred=y_pred) | |
def train_score(self): | |
""" | |
Calculates accuracy score on training data | |
if `self.classification` is True, the metric used is accuracy, | |
Mean-Squared-Error is used otherwise (regression) | |
""" | |
y_pred = self.model.predict(self.X_train) | |
if self.classification: | |
return accuracy_score(y_true=self.y_train, y_pred=y_pred) | |
else: | |
return mean_squared_error(y_true=self.y_train, y_pred=y_pred) | |
def train_fbeta_score(self, beta): | |
y_pred = self.model.predict(self.X_train) | |
return fbeta_score(self.y_train, y_pred, beta, average='micro') | |
def test_fbeta_score(self, beta): | |
y_pred = self.model.predict(self.X_test) | |
return fbeta_score(self.y_test, y_pred, beta, average='micro') | |
def confusion_matrix(self, percentage=True, labeled=True): | |
""" | |
Computes confusion matrix to evaluate the test accuracy of the classification | |
and returns it as numpy matrix or pandas dataframe (depends on params). | |
params: | |
percentage (bool): whether to use percentage instead of number of samples, default is True. | |
labeled (bool): whether to label the columns and indexes in the dataframe. | |
""" | |
if not self.classification: | |
raise NotImplementedError("Confusion matrix works only when it is a classification problem") | |
y_pred = self.model.predict(self.X_test) | |
matrix = confusion_matrix(self.y_test, y_pred, labels=self.emotions).astype(np.float32) | |
if percentage: | |
for i in range(len(matrix)): | |
matrix[i] = matrix[i] / np.sum(matrix[i]) | |
# make it percentage | |
matrix *= 100 | |
if labeled: | |
matrix = pd.DataFrame(matrix, index=[ f"true_{e}" for e in self.emotions ], | |
columns=[ f"predicted_{e}" for e in self.emotions ]) | |
return matrix | |
def draw_confusion_matrix(self): | |
"""Calculates the confusion matrix and shows it""" | |
matrix = self.confusion_matrix(percentage=False, labeled=False) | |
#TODO: add labels, title, legends, etc. | |
pl.imshow(matrix, cmap="binary") | |
pl.show() | |
def get_n_samples(self, emotion, partition): | |
"""Returns number data samples of the `emotion` class in a particular `partition` | |
('test' or 'train') | |
""" | |
if partition == "test": | |
return len([y for y in self.y_test if y == emotion]) | |
elif partition == "train": | |
return len([y for y in self.y_train if y == emotion]) | |
def get_samples_by_class(self): | |
""" | |
Returns a dataframe that contains the number of training | |
and testing samples for all emotions. | |
Note that if data isn't loaded yet, it'll be loaded | |
""" | |
if not self.data_loaded: | |
self.load_data() | |
train_samples = [] | |
test_samples = [] | |
total = [] | |
for emotion in self.emotions: | |
n_train = self.get_n_samples(emotion, "train") | |
n_test = self.get_n_samples(emotion, "test") | |
train_samples.append(n_train) | |
test_samples.append(n_test) | |
total.append(n_train + n_test) | |
# get total | |
total.append(sum(train_samples) + sum(test_samples)) | |
train_samples.append(sum(train_samples)) | |
test_samples.append(sum(test_samples)) | |
return pd.DataFrame(data={"train": train_samples, "test": test_samples, "total": total}, index=self.emotions + ["total"]) | |
def get_random_emotion(self, emotion, partition="train"): | |
""" | |
Returns random `emotion` data sample index on `partition`. | |
""" | |
if partition == "train": | |
index = random.choice(list(range(len(self.y_train)))) | |
while self.y_train[index] != emotion: | |
index = random.choice(list(range(len(self.y_train)))) | |
elif partition == "test": | |
index = random.choice(list(range(len(self.y_test)))) | |
while self.y_train[index] != emotion: | |
index = random.choice(list(range(len(self.y_test)))) | |
else: | |
raise TypeError("Unknown partition, only 'train' or 'test' is accepted") | |
return index | |
def plot_histograms(classifiers=True, beta=0.5, n_classes=3, verbose=1): | |
""" | |
Loads different estimators from `grid` folder and calculate some statistics to plot histograms. | |
Params: | |
classifiers (bool): if `True`, this will plot classifiers, regressors otherwise. | |
beta (float): beta value for calculating fbeta score for various estimators. | |
n_classes (int): number of classes | |
""" | |
# get the estimators from the performed grid search result | |
estimators = get_best_estimators(classifiers) | |
final_result = {} | |
for estimator, params, cv_score in estimators: | |
final_result[estimator.__class__.__name__] = [] | |
for i in range(3): | |
result = {} | |
# initialize the class | |
detector = EmotionRecognizer(estimator, verbose=0) | |
# load the data | |
detector.load_data() | |
if i == 0: | |
# first get 1% of sample data | |
sample_size = 0.01 | |
elif i == 1: | |
# second get 10% of sample data | |
sample_size = 0.1 | |
elif i == 2: | |
# last get all the data | |
sample_size = 1 | |
# calculate number of training and testing samples | |
n_train_samples = int(len(detector.X_train) * sample_size) | |
n_test_samples = int(len(detector.X_test) * sample_size) | |
# set the data | |
detector.X_train = detector.X_train[:n_train_samples] | |
detector.X_test = detector.X_test[:n_test_samples] | |
detector.y_train = detector.y_train[:n_train_samples] | |
detector.y_test = detector.y_test[:n_test_samples] | |
# calculate train time | |
t_train = time() | |
detector.train() | |
t_train = time() - t_train | |
# calculate test time | |
t_test = time() | |
test_accuracy = detector.test_score() | |
t_test = time() - t_test | |
# set the result to the dictionary | |
result['train_time'] = t_train | |
result['pred_time'] = t_test | |
result['acc_train'] = cv_score | |
result['acc_test'] = test_accuracy | |
result['f_train'] = detector.train_fbeta_score(beta) | |
result['f_test'] = detector.test_fbeta_score(beta) | |
if verbose: | |
print(f"[+] {estimator.__class__.__name__} with {sample_size*100}% ({n_train_samples}) data samples achieved {cv_score*100:.3f}% Validation Score in {t_train:.3f}s & {test_accuracy*100:.3f}% Test Score in {t_test:.3f}s") | |
# append the dictionary to the list of results | |
final_result[estimator.__class__.__name__].append(result) | |
if verbose: | |
print() | |
visualize(final_result, n_classes=n_classes) | |
def visualize(results, n_classes): | |
""" | |
Visualization code to display results of various learners. | |
inputs: | |
- results: a dictionary of lists of dictionaries that contain various results on the corresponding estimator | |
- n_classes: number of classes | |
""" | |
n_estimators = len(results) | |
# naive predictor | |
accuracy = 1 / n_classes | |
f1 = 1 / n_classes | |
# Create figure | |
fig, ax = pl.subplots(2, 4, figsize = (11,7)) | |
# Constants | |
bar_width = 0.4 | |
colors = [ (random.random(), random.random(), random.random()) for _ in range(n_estimators) ] | |
# Super loop to plot four panels of data | |
for k, learner in enumerate(results.keys()): | |
for j, metric in enumerate(['train_time', 'acc_train', 'f_train', 'pred_time', 'acc_test', 'f_test']): | |
for i in np.arange(3): | |
x = bar_width * n_estimators | |
# Creative plot code | |
ax[j//3, j%3].bar(i*x+k*(bar_width), results[learner][i][metric], width = bar_width, color = colors[k]) | |
ax[j//3, j%3].set_xticks([x-0.2, x*2-0.2, x*3-0.2]) | |
ax[j//3, j%3].set_xticklabels(["1%", "10%", "100%"]) | |
ax[j//3, j%3].set_xlabel("Training Set Size") | |
ax[j//3, j%3].set_xlim((-0.2, x*3)) | |
# Add unique y-labels | |
ax[0, 0].set_ylabel("Time (in seconds)") | |
ax[0, 1].set_ylabel("Accuracy Score") | |
ax[0, 2].set_ylabel("F-score") | |
ax[1, 0].set_ylabel("Time (in seconds)") | |
ax[1, 1].set_ylabel("Accuracy Score") | |
ax[1, 2].set_ylabel("F-score") | |
# Add titles | |
ax[0, 0].set_title("Model Training") | |
ax[0, 1].set_title("Accuracy Score on Training Subset") | |
ax[0, 2].set_title("F-score on Training Subset") | |
ax[1, 0].set_title("Model Predicting") | |
ax[1, 1].set_title("Accuracy Score on Testing Set") | |
ax[1, 2].set_title("F-score on Testing Set") | |
# Add horizontal lines for naive predictors | |
ax[0, 1].axhline(y = accuracy, xmin = -0.1, xmax = 3.0, linewidth = 1, color = 'k', linestyle = 'dashed') | |
ax[1, 1].axhline(y = accuracy, xmin = -0.1, xmax = 3.0, linewidth = 1, color = 'k', linestyle = 'dashed') | |
ax[0, 2].axhline(y = f1, xmin = -0.1, xmax = 3.0, linewidth = 1, color = 'k', linestyle = 'dashed') | |
ax[1, 2].axhline(y = f1, xmin = -0.1, xmax = 3.0, linewidth = 1, color = 'k', linestyle = 'dashed') | |
# Set y-limits for score panels | |
ax[0, 1].set_ylim((0, 1)) | |
ax[0, 2].set_ylim((0, 1)) | |
ax[1, 1].set_ylim((0, 1)) | |
ax[1, 2].set_ylim((0, 1)) | |
# Set additional plots invisibles | |
ax[0, 3].set_visible(False) | |
ax[1, 3].axis('off') | |
# Create legend | |
for i, learner in enumerate(results.keys()): | |
pl.bar(0, 0, color=colors[i], label=learner) | |
pl.legend() | |
# Aesthetics | |
pl.suptitle("Performance Metrics for Three Supervised Learning Models", fontsize = 16, y = 1.10) | |
pl.tight_layout() | |
pl.show() |