Spaces:
Running
Running
import os.path | |
from typing import Optional | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import soundfile as sf | |
import streamlit as st | |
import torch | |
import transformers | |
from dtw import dtw | |
from scipy import signal | |
from transformers import AutoConfig | |
from transformers.models.wav2vec2 import Wav2Vec2Model | |
from datetime import datetime | |
from random import randrange | |
import os | |
import psutil | |
def play_audio(filename): | |
audio_file = open(filename, "rb") | |
audio_bytes = audio_file.read() | |
st.audio(audio_bytes, format="audio/wav") | |
def aligner(x, y): | |
return dtw(x, y, keep_internals=True) | |
def compute_costs(gcm): | |
res = [[] for _ in range(gcm.N)] | |
for i in range(gcm.index1.shape[0]): | |
d = gcm.localCostMatrix[gcm.index1[i], gcm.index2[i]] | |
res[gcm.index1[i]].append(d) | |
n = [len(x) for x in res] | |
res = [np.mean(x) for x in res] | |
return res, n | |
#@st.cache(show_spinner=False, hash_funcs={torch.nn.parameter.Parameter: lambda _: None}, max_entries=1) | |
def load_wav2vec2_featurizer(model_id: str, layer: Optional[int] = None): | |
transformers.logging.set_verbosity(transformers.logging.ERROR) | |
model_kwargs = {} | |
if layer is not None: | |
model_kwargs["num_hidden_layers"] = int(layer) if layer > 0 else 0 | |
with st.spinner("Loading model..."): | |
model = Wav2Vec2Model.from_pretrained(model_id, **model_kwargs) | |
model.eval() | |
if torch.cuda.is_available(): | |
model.cuda() | |
# st.success("Done!") | |
return model | |
#@st.cache(persist=True, show_spinner=False, max_entries=3) | |
def run(model_id, layer, filename_x, filename_y): | |
model = load_wav2vec2_featurizer(model_id, layer) | |
def _featurize(path): | |
input_values, rate = sf.read(path, dtype=np.float32) | |
if len(input_values.shape) == 2: | |
input_values = input_values.mean(1) | |
if rate != 16_000: | |
new_length = int(input_values.shape[0] / rate * 16_000) | |
input_values = signal.resample(input_values, new_length) | |
input_values = torch.from_numpy(input_values).unsqueeze(0) | |
if torch.cuda.is_available(): | |
input_values = input_values.cuda() | |
if layer is None: | |
hidden_states = model(input_values, output_hidden_states=True).hidden_states | |
hidden_states = [s.squeeze(0).cpu().numpy() for s in hidden_states] | |
return hidden_states | |
if layer >= 0: | |
hidden_state = model(input_values).last_hidden_state.squeeze(0).cpu().numpy() | |
else: | |
hidden_state = model.feature_extractor(input_values) | |
hidden_state = hidden_state.transpose(1, 2) | |
if layer == -1: | |
hidden_state = model.feature_projection(hidden_state) | |
hidden_state = hidden_state.squeeze(0).cpu().numpy() | |
return hidden_state | |
with st.spinner("Measuring distance..."): | |
feats_x = _featurize(filename_x) | |
feats_y = _featurize(filename_y) | |
print('3. Features computed', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
gcm = aligner(feats_x, feats_y) | |
print('4. Alignments computed', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
d = gcm.normalizedDistance | |
print("Distance:", d) | |
c, n = compute_costs(gcm) | |
print('5. Costs computed', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
del model | |
return d, c, n | |
def main(): | |
st.title("Word-level Neural Acoustic Distance Visualizer") | |
st.write( | |
"This tool visualizes pronunciation differences between two recordings of the same word. The two recordings have to be wave files containing a single spoken word. \n\n\ | |
Choose any wav2vec 2.0 compatible model identifier on the [Hugging Face Model Hub](https://huggingface.co/models?filter=wav2vec2) and select the output layer you want to use.\n\n\ | |
To upload your own recordings select 'custom upload' in the audio file selection step. The first recording is put on the x-axis of the plot and the second one will be the reference recording for computing distance.\n\ | |
You should already see an example plot of two sample recordings.\n\n\ | |
This visualization tool is part of [neural representations for modeling variation in speech](https://doi.org/10.1016/j.wocn.2022.101137). \n\ | |
Please see our paper for further details.") | |
st.subheader("Model selection:") | |
model_id = st.selectbox("Select the wav2vec 2.0 model you want to use:", | |
("facebook/wav2vec2-large-960h", "facebook/wav2vec2-large", "facebook/wav2vec2-large-xlsr-53", | |
"facebook/wav2vec2-xls-r-300m", "other"), | |
index=0) | |
if model_id == "other": | |
model_id = st.text_input("Enter the wav2vec 2.0 model you want to use:", | |
value="facebook/wav2vec2-large-960h", | |
key="model") | |
print(f"\n### Start new run\n") # test | |
try: | |
cfg = AutoConfig.from_pretrained(model_id) | |
layer = st.number_input("Select the layer you want to use:", min_value=1, max_value=cfg.num_hidden_layers, value=10) | |
except OSError: | |
st.error( | |
"Please select a wav2vec 2.0 compatible model identifier on the [Hugging Face Model Hub](https://huggingface.co/models?filter=wav2vec2)." | |
) | |
layer = None | |
print('1. Model selected', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
st.subheader("Audio file selection:") | |
filename_x = st.selectbox("Filename (x-axis):", | |
("falling_huud_mobiel_201145.wav", "falling_hood_mobiel_203936.wav", "custom upload")) | |
if filename_x == "falling_huud_mobiel_201145.wav": | |
filename_x = "./examples/falling_huud_mobiel_201145.wav" | |
play_audio(filename_x) | |
if filename_x == "falling_hood_mobiel_203936.wav": | |
filename_x = "./examples/falling_hood_mobiel_203936.wav" | |
play_audio(filename_x) | |
filename_y = st.selectbox("Filename (y-axis):", | |
("falling_hood_mobiel_203936.wav", "falling_huud_mobiel_201145.wav", "custom upload")) | |
if filename_y == "falling_huud_mobiel_201145.wav": | |
filename_y = "./examples/falling_huud_mobiel_201145.wav" | |
play_audio(filename_y) | |
if filename_y == "falling_hood_mobiel_203936.wav": | |
filename_y = "./examples/falling_hood_mobiel_203936.wav" | |
play_audio(filename_y) | |
if filename_x == "custom upload": | |
filename_x = st.file_uploader("Choose a file (x-axis)", key="f_x") | |
if filename_y == "custom upload": | |
filename_y = st.file_uploader("Choose a file (y-axis)", key="f_y") | |
print('2. Files selected', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
if filename_x is not None and filename_y is not None and layer is not None: | |
print(f"\nX: {filename_x}\nY: {filename_y}") | |
d, c, n = run(model_id, layer, filename_x, filename_y) | |
# d_b, c_b, n_b = run(featurizer_b) | |
fig, axes = plt.subplots(figsize=(4, 2.5)) | |
print('6. Plot init', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
window_size = 9 | |
rate = 20 | |
x = np.arange(0, len(c) * rate, rate) | |
offset = (window_size - 1) // 2 | |
x_ = x[offset:-offset] | |
# Target layer | |
axes.plot(x, c, alpha=0.5, color="gray", linestyle="--") | |
axes.scatter(x, c, np.array(n) * 10, color="gray") | |
c_ = np.convolve(c, np.ones(window_size) / window_size, mode="valid") | |
axes.plot(x_, c_) | |
# Last layer | |
# axes.plot(x, c_b, alpha=0.5, color="gray", linestyle="--") | |
# axes.scatter(x, c_b, np.array(n_b) * 10, color="gray") | |
# c_b_ = np.convolve(c_b, np.ones(window_size) / window_size, mode="valid") | |
# axes.plot(x_, c_b_, linestyle="--") | |
axes.set_xlabel("time (ms)") | |
axes.set_ylabel("distance per frame") | |
axes.hlines(y=d, xmin=0, xmax=np.max(x), linestyles="dashdot") | |
plt.tight_layout(pad=0) | |
plt_id = randrange(0, 10) | |
plt.savefig("./output/plot" + str(plt_id) + ".pdf") | |
st.pyplot(fig) | |
main() | |
print('7. Plot filled', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
if os.path.isfile("./output/plot.pdf"): | |
st.caption(" Visualization of neural acoustic distances\ | |
per frame (based on wav2vec 2.0) with the pronunciation of\ | |
the first filename on the x-axis and distances to the pronunciation\ | |
of second filename on the y-axis. The horizontal line represents\ | |
the global distance value (i.e. the average of all individual frames).\ | |
The blue continuous line represents the moving average distance based on 9 frames,\ | |
corresponding to 180ms. As a result of the moving average, the blue line does not cover the entire duration of\ | |
the sample. Larger bullet sizes indicate that multiple\ | |
frames in the pronunciation on the y-axis are aligned to a single frame in the pronunciation on the x-axis.") | |
with open("./output/plot.pdf", "rb") as file: | |
btn = st.download_button(label="Download plot", data=file, file_name="plot.pdf", mime="image/pdf") | |
print('8. End', datetime.now().strftime('%d-%m-%Y %H:%M:%S')) # test | |
print(f"9. RAM used: {psutil.Process().memory_info().rss / (1024 * 1024):.2f} MB") # test | |
for name in dir(): | |
if not name.startswith('_'): | |
del globals()[name] | |
import gc | |
gc.collect() |