from data_extractor import load_data from utils import extract_feature, AVAILABLE_EMOTIONS from create_csv import write_emodb_csv, write_tess_ravdess_csv, write_custom_csv from sklearn.metrics import accuracy_score, make_scorer, fbeta_score, mean_squared_error, mean_absolute_error from sklearn.metrics import confusion_matrix from sklearn.model_selection import GridSearchCV import matplotlib.pyplot as pl from time import time from utils import get_best_estimators, get_audio_config import numpy as np import tqdm import os import random import pandas as pd class EmotionRecognizer: """A class for training, testing and predicting emotions based on speech's features that are extracted and fed into `sklearn` or `keras` model""" def __init__(self, model=None, **kwargs): """ Params: model (sklearn model): the model used to detect emotions. If `model` is None, then self.determine_best_model() will be automatically called emotions (list): list of emotions to be used. Note that these emotions must be available in RAVDESS_TESS & EMODB Datasets, available nine emotions are the following: 'neutral', 'calm', 'happy', 'sad', 'angry', 'fear', 'disgust', 'ps' ( pleasant surprised ), 'boredom'. Default is ["sad", "neutral", "happy"]. tess_ravdess (bool): whether to use TESS & RAVDESS Speech datasets, default is True emodb (bool): whether to use EMO-DB Speech dataset, default is True, custom_db (bool): whether to use custom Speech dataset that is located in `data/train-custom` and `data/test-custom`, default is True tess_ravdess_name (str): the name of the output CSV file for TESS&RAVDESS dataset, default is "tess_ravdess.csv" emodb_name (str): the name of the output CSV file for EMO-DB dataset, default is "emodb.csv" custom_db_name (str): the name of the output CSV file for the custom dataset, default is "custom.csv" features (list): list of speech features to use, default is ["mfcc", "chroma", "mel"] (i.e MFCC, Chroma and MEL spectrogram ) classification (bool): whether to use classification or regression, default is True balance (bool): whether to balance the dataset ( both training and testing ), default is True verbose (bool/int): whether to print messages on certain tasks, default is 1 Note that when `tess_ravdess`, `emodb` and `custom_db` are set to `False`, `tess_ravdess` will be set to True automatically. """ # emotions self.emotions = kwargs.get("emotions", ["sad", "neutral", "happy"]) # make sure that there are only available emotions self._verify_emotions() # audio config self.features = kwargs.get("features", ["mfcc", "chroma", "mel"]) self.audio_config = get_audio_config(self.features) # datasets self.tess_ravdess = kwargs.get("tess_ravdess", True) self.emodb = kwargs.get("emodb", True) self.custom_db = kwargs.get("custom_db", True) if not self.tess_ravdess and not self.emodb and not self.custom_db: self.tess_ravdess = True self.classification = kwargs.get("classification", True) self.balance = kwargs.get("balance", True) self.override_csv = kwargs.get("override_csv", True) self.verbose = kwargs.get("verbose", 1) self.tess_ravdess_name = kwargs.get("tess_ravdess_name", "tess_ravdess.csv") self.emodb_name = kwargs.get("emodb_name", "emodb.csv") self.custom_db_name = kwargs.get("custom_db_name", "custom.csv") self.verbose = kwargs.get("verbose", 1) # set metadata path file names self._set_metadata_filenames() # write csv's anyway self.write_csv() # boolean attributes self.data_loaded = False self.model_trained = False # model if not model: self.determine_best_model() else: self.model = model def _set_metadata_filenames(self): """ Protected method to get all CSV (metadata) filenames into two instance attributes: - `self.train_desc_files` for training CSVs - `self.test_desc_files` for testing CSVs """ train_desc_files, test_desc_files = [], [] if self.tess_ravdess: train_desc_files.append(f"train_{self.tess_ravdess_name}") test_desc_files.append(f"test_{self.tess_ravdess_name}") if self.emodb: train_desc_files.append(f"train_{self.emodb_name}") test_desc_files.append(f"test_{self.emodb_name}") if self.custom_db: train_desc_files.append(f"train_{self.custom_db_name}") test_desc_files.append(f"test_{self.custom_db_name}") # set them to be object attributes self.train_desc_files = train_desc_files self.test_desc_files = test_desc_files def _verify_emotions(self): """ This method makes sure that emotions passed in parameters are valid. """ for emotion in self.emotions: assert emotion in AVAILABLE_EMOTIONS, "Emotion not recognized." def get_best_estimators(self): """Loads estimators from grid files and returns them""" return get_best_estimators(self.classification) def write_csv(self): """ Write available CSV files in `self.train_desc_files` and `self.test_desc_files` determined by `self._set_metadata_filenames()` method. """ for train_csv_file, test_csv_file in zip(self.train_desc_files, self.test_desc_files): # not safe approach if os.path.isfile(train_csv_file) and os.path.isfile(test_csv_file): # file already exists, just skip writing csv files if not self.override_csv: continue if self.emodb_name in train_csv_file: write_emodb_csv(self.emotions, train_name=train_csv_file, test_name=test_csv_file, verbose=self.verbose) if self.verbose: print("[+] Writed EMO-DB CSV File") elif self.tess_ravdess_name in train_csv_file: write_tess_ravdess_csv(self.emotions, train_name=train_csv_file, test_name=test_csv_file, verbose=self.verbose) if self.verbose: print("[+] Writed TESS & RAVDESS DB CSV File") elif self.custom_db_name in train_csv_file: write_custom_csv(emotions=self.emotions, train_name=train_csv_file, test_name=test_csv_file, verbose=self.verbose) if self.verbose: print("[+] Writed Custom DB CSV File") def load_data(self): """ Loads and extracts features from the audio files for the db's specified """ if not self.data_loaded: result = load_data(self.train_desc_files, self.test_desc_files, self.audio_config, self.classification, emotions=self.emotions, balance=self.balance) self.X_train = result['X_train'] self.X_test = result['X_test'] self.y_train = result['y_train'] self.y_test = result['y_test'] self.train_audio_paths = result['train_audio_paths'] self.test_audio_paths = result['test_audio_paths'] self.balance = result["balance"] if self.verbose: print("[+] Data loaded") self.data_loaded = True def train(self, verbose=1): """ Train the model, if data isn't loaded, it 'll be loaded automatically """ if not self.data_loaded: # if data isn't loaded yet, load it then self.load_data() if not self.model_trained: self.model.fit(X=self.X_train, y=self.y_train) self.model_trained = True if verbose: print("[+] Model trained") def predict(self, audio_path): """ given an `audio_path`, this method extracts the features and predicts the emotion """ feature = extract_feature(audio_path, **self.audio_config).reshape(1, -1) return self.model.predict(feature)[0] def predict_proba(self, audio_path): """ Predicts the probability of each emotion. """ if self.classification: feature = extract_feature(audio_path, **self.audio_config).reshape(1, -1) proba = self.model.predict_proba(feature)[0] result = {} for emotion, prob in zip(self.model.classes_, proba): result[emotion] = prob return result else: raise NotImplementedError("Probability prediction doesn't make sense for regression") def grid_search(self, params, n_jobs=2, verbose=1): """ Performs GridSearchCV on `params` passed on the `self.model` And returns the tuple: (best_estimator, best_params, best_score). """ score = accuracy_score if self.classification else mean_absolute_error grid = GridSearchCV(estimator=self.model, param_grid=params, scoring=make_scorer(score), n_jobs=n_jobs, verbose=verbose, cv=3) grid_result = grid.fit(self.X_train, self.y_train) return grid_result.best_estimator_, grid_result.best_params_, grid_result.best_score_ def determine_best_model(self): """ Loads best estimators and determine which is best for test data, and then set it to `self.model`. In case of regression, the metric used is MSE and accuracy for classification. Note that the execution of this method may take several minutes due to training all estimators (stored in `grid` folder) for determining the best possible one. """ if not self.data_loaded: self.load_data() # loads estimators estimators = self.get_best_estimators() result = [] if self.verbose: estimators = tqdm.tqdm(estimators) for estimator, params, cv_score in estimators: if self.verbose: estimators.set_description(f"Evaluating {estimator.__class__.__name__}") detector = EmotionRecognizer(estimator, emotions=self.emotions, tess_ravdess=self.tess_ravdess, emodb=self.emodb, custom_db=self.custom_db, classification=self.classification, features=self.features, balance=self.balance, override_csv=False) # data already loaded detector.X_train = self.X_train detector.X_test = self.X_test detector.y_train = self.y_train detector.y_test = self.y_test detector.data_loaded = True # train the model detector.train(verbose=0) # get test accuracy accuracy = detector.test_score() # append to result result.append((detector.model, accuracy)) # sort the result # regression: best is the lower, not the higher # classification: best is higher, not the lower result = sorted(result, key=lambda item: item[1], reverse=self.classification) best_estimator = result[0][0] accuracy = result[0][1] self.model = best_estimator self.model_trained = True if self.verbose: if self.classification: print(f"[+] Best model determined: {self.model.__class__.__name__} with {accuracy*100:.3f}% test accuracy") else: print(f"[+] Best model determined: {self.model.__class__.__name__} with {accuracy:.5f} mean absolute error") def test_score(self): """ Calculates score on testing data if `self.classification` is True, the metric used is accuracy, Mean-Squared-Error is used otherwise (regression) """ y_pred = self.model.predict(self.X_test) if self.classification: return accuracy_score(y_true=self.y_test, y_pred=y_pred) else: return mean_squared_error(y_true=self.y_test, y_pred=y_pred) def train_score(self): """ Calculates accuracy score on training data if `self.classification` is True, the metric used is accuracy, Mean-Squared-Error is used otherwise (regression) """ y_pred = self.model.predict(self.X_train) if self.classification: return accuracy_score(y_true=self.y_train, y_pred=y_pred) else: return mean_squared_error(y_true=self.y_train, y_pred=y_pred) def train_fbeta_score(self, beta): y_pred = self.model.predict(self.X_train) return fbeta_score(self.y_train, y_pred, beta, average='micro') def test_fbeta_score(self, beta): y_pred = self.model.predict(self.X_test) return fbeta_score(self.y_test, y_pred, beta, average='micro') def confusion_matrix(self, percentage=True, labeled=True): """ Computes confusion matrix to evaluate the test accuracy of the classification and returns it as numpy matrix or pandas dataframe (depends on params). params: percentage (bool): whether to use percentage instead of number of samples, default is True. labeled (bool): whether to label the columns and indexes in the dataframe. """ if not self.classification: raise NotImplementedError("Confusion matrix works only when it is a classification problem") y_pred = self.model.predict(self.X_test) matrix = confusion_matrix(self.y_test, y_pred, labels=self.emotions).astype(np.float32) if percentage: for i in range(len(matrix)): matrix[i] = matrix[i] / np.sum(matrix[i]) # make it percentage matrix *= 100 if labeled: matrix = pd.DataFrame(matrix, index=[ f"true_{e}" for e in self.emotions ], columns=[ f"predicted_{e}" for e in self.emotions ]) return matrix def draw_confusion_matrix(self): """Calculates the confusion matrix and shows it""" matrix = self.confusion_matrix(percentage=False, labeled=False) #TODO: add labels, title, legends, etc. pl.imshow(matrix, cmap="binary") pl.show() def get_n_samples(self, emotion, partition): """Returns number data samples of the `emotion` class in a particular `partition` ('test' or 'train') """ if partition == "test": return len([y for y in self.y_test if y == emotion]) elif partition == "train": return len([y for y in self.y_train if y == emotion]) def get_samples_by_class(self): """ Returns a dataframe that contains the number of training and testing samples for all emotions. Note that if data isn't loaded yet, it'll be loaded """ if not self.data_loaded: self.load_data() train_samples = [] test_samples = [] total = [] for emotion in self.emotions: n_train = self.get_n_samples(emotion, "train") n_test = self.get_n_samples(emotion, "test") train_samples.append(n_train) test_samples.append(n_test) total.append(n_train + n_test) # get total total.append(sum(train_samples) + sum(test_samples)) train_samples.append(sum(train_samples)) test_samples.append(sum(test_samples)) return pd.DataFrame(data={"train": train_samples, "test": test_samples, "total": total}, index=self.emotions + ["total"]) def get_random_emotion(self, emotion, partition="train"): """ Returns random `emotion` data sample index on `partition`. """ if partition == "train": index = random.choice(list(range(len(self.y_train)))) while self.y_train[index] != emotion: index = random.choice(list(range(len(self.y_train)))) elif partition == "test": index = random.choice(list(range(len(self.y_test)))) while self.y_train[index] != emotion: index = random.choice(list(range(len(self.y_test)))) else: raise TypeError("Unknown partition, only 'train' or 'test' is accepted") return index def plot_histograms(classifiers=True, beta=0.5, n_classes=3, verbose=1): """ Loads different estimators from `grid` folder and calculate some statistics to plot histograms. Params: classifiers (bool): if `True`, this will plot classifiers, regressors otherwise. beta (float): beta value for calculating fbeta score for various estimators. n_classes (int): number of classes """ # get the estimators from the performed grid search result estimators = get_best_estimators(classifiers) final_result = {} for estimator, params, cv_score in estimators: final_result[estimator.__class__.__name__] = [] for i in range(3): result = {} # initialize the class detector = EmotionRecognizer(estimator, verbose=0) # load the data detector.load_data() if i == 0: # first get 1% of sample data sample_size = 0.01 elif i == 1: # second get 10% of sample data sample_size = 0.1 elif i == 2: # last get all the data sample_size = 1 # calculate number of training and testing samples n_train_samples = int(len(detector.X_train) * sample_size) n_test_samples = int(len(detector.X_test) * sample_size) # set the data detector.X_train = detector.X_train[:n_train_samples] detector.X_test = detector.X_test[:n_test_samples] detector.y_train = detector.y_train[:n_train_samples] detector.y_test = detector.y_test[:n_test_samples] # calculate train time t_train = time() detector.train() t_train = time() - t_train # calculate test time t_test = time() test_accuracy = detector.test_score() t_test = time() - t_test # set the result to the dictionary result['train_time'] = t_train result['pred_time'] = t_test result['acc_train'] = cv_score result['acc_test'] = test_accuracy result['f_train'] = detector.train_fbeta_score(beta) result['f_test'] = detector.test_fbeta_score(beta) if verbose: print(f"[+] {estimator.__class__.__name__} with {sample_size*100}% ({n_train_samples}) data samples achieved {cv_score*100:.3f}% Validation Score in {t_train:.3f}s & {test_accuracy*100:.3f}% Test Score in {t_test:.3f}s") # append the dictionary to the list of results final_result[estimator.__class__.__name__].append(result) if verbose: print() visualize(final_result, n_classes=n_classes) def visualize(results, n_classes): """ Visualization code to display results of various learners. inputs: - results: a dictionary of lists of dictionaries that contain various results on the corresponding estimator - n_classes: number of classes """ n_estimators = len(results) # naive predictor accuracy = 1 / n_classes f1 = 1 / n_classes # Create figure fig, ax = pl.subplots(2, 4, figsize = (11,7)) # Constants bar_width = 0.4 colors = [ (random.random(), random.random(), random.random()) for _ in range(n_estimators) ] # Super loop to plot four panels of data for k, learner in enumerate(results.keys()): for j, metric in enumerate(['train_time', 'acc_train', 'f_train', 'pred_time', 'acc_test', 'f_test']): for i in np.arange(3): x = bar_width * n_estimators # Creative plot code ax[j//3, j%3].bar(i*x+k*(bar_width), results[learner][i][metric], width = bar_width, color = colors[k]) ax[j//3, j%3].set_xticks([x-0.2, x*2-0.2, x*3-0.2]) ax[j//3, j%3].set_xticklabels(["1%", "10%", "100%"]) ax[j//3, j%3].set_xlabel("Training Set Size") ax[j//3, j%3].set_xlim((-0.2, x*3)) # Add unique y-labels ax[0, 0].set_ylabel("Time (in seconds)") ax[0, 1].set_ylabel("Accuracy Score") ax[0, 2].set_ylabel("F-score") ax[1, 0].set_ylabel("Time (in seconds)") ax[1, 1].set_ylabel("Accuracy Score") ax[1, 2].set_ylabel("F-score") # Add titles ax[0, 0].set_title("Model Training") ax[0, 1].set_title("Accuracy Score on Training Subset") ax[0, 2].set_title("F-score on Training Subset") ax[1, 0].set_title("Model Predicting") ax[1, 1].set_title("Accuracy Score on Testing Set") ax[1, 2].set_title("F-score on Testing Set") # Add horizontal lines for naive predictors ax[0, 1].axhline(y = accuracy, xmin = -0.1, xmax = 3.0, linewidth = 1, color = 'k', linestyle = 'dashed') ax[1, 1].axhline(y = accuracy, xmin = -0.1, xmax = 3.0, linewidth = 1, color = 'k', linestyle = 'dashed') ax[0, 2].axhline(y = f1, xmin = -0.1, xmax = 3.0, linewidth = 1, color = 'k', linestyle = 'dashed') ax[1, 2].axhline(y = f1, xmin = -0.1, xmax = 3.0, linewidth = 1, color = 'k', linestyle = 'dashed') # Set y-limits for score panels ax[0, 1].set_ylim((0, 1)) ax[0, 2].set_ylim((0, 1)) ax[1, 1].set_ylim((0, 1)) ax[1, 2].set_ylim((0, 1)) # Set additional plots invisibles ax[0, 3].set_visible(False) ax[1, 3].axis('off') # Create legend for i, learner in enumerate(results.keys()): pl.bar(0, 0, color=colors[i], label=learner) pl.legend() # Aesthetics pl.suptitle("Performance Metrics for Three Supervised Learning Models", fontsize = 16, y = 1.10) pl.tight_layout() pl.show()