Spaces:
Runtime error
Runtime error
File size: 4,782 Bytes
3119dd6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
import queue
import wave
from io import BytesIO
from pathlib import Path
import wget
import ffmpeg
import numpy as np
import webrtcvad
from stt import Metadata
from stt import Model, version
def normalize_audio_input(audio):
output, err = ffmpeg.input('pipe:0').output('pipe:1', f='WAV', acodec='pcm_s16le', ac=1, ar='16k', loglevel='error',
hide_banner=None).run(input=audio, capture_stdout=True,
capture_stderr=True)
if err:
raise Exception(err)
return output
class Frame(object):
"""Represents a "frame" of audio data."""
def __init__(self, frame_bytes, timestamp, duration):
self.bytes = frame_bytes
self.timestamp = timestamp
self.duration = duration
class SpeechToTextEngine:
""" Class to perform speech-to-text transcription and related functionality """
FORMAT = 8
SAMPLE_RATE = 16000
CHANNELS = 1
BLOCKS_PER_SECOND = 50
def __init__(self, scorer='kinyarwanda.scorer') -> None:
""" Initializing the DeepSpeech model """
wget.download("https://huggingface.co/mbazaNLP/kinyarwanda-coqui-stt-model/resolve/main/kinyarwanda.scorer")
wget.download("https://huggingface.co/mbazaNLP/kinyarwanda-coqui-stt-model/resolve/main/kinyarwanda.tflite")
self.model = Model('kinyarwanda.tflite')
self.model.enableExternalScorer(
scorer_path=Path(__file__).parents[0].joinpath(scorer).absolute().as_posix())
self.vad = webrtcvad.Vad(mode=3)
self.sample_rate = self.SAMPLE_RATE
self.buffer_queue = queue.Queue()
def run(self, audio) -> str:
""" Receives the audio, normalizes it and is sent to the model to be transcribed. Returns the result of the
transcribe audio in string format."""
normalized_audio = normalize_audio_input(audio)
audio_streams = BytesIO(normalized_audio)
with wave.Wave_read(audio_streams) as wav:
audio_streams = np.frombuffer(wav.readframes(wav.getnframes()), np.int16)
results = self.model.stt(audio_buffer=audio_streams)
return results
def run_with_metadata(self, audio) -> Metadata:
normalized_audio = normalize_audio_input(audio)
audio_streams = BytesIO(normalized_audio)
with wave.Wave_read(audio_streams) as wav:
audio_streams = np.frombuffer(wav.readframes(wav.getnframes()), np.int16)
results = self.model.sttWithMetadata(audio_buffer=audio_streams)
return results
def add_hot_words(self, data) -> list:
""" Receives data in form of hot-words and boosts, adds them to the language model and return the list of the
added hot-words """
all_hot_words = []
try:
print('----------------------------------------------------')
for hot_word in data:
# Change all the characters of the hot-word to lower case
word = hot_word.lower()
# Get numeric value of the boost
boost = float(data.get(hot_word))
# Adding the hot-word and its boost to the language model
self.model.addHotWord(hot_word, boost)
# Printing on the prompt the activity
print(f"`{word}` hot-word with boost `{boost}` was added.")
all_hot_words.append(word)
return all_hot_words
except RuntimeError:
return []
def erase_hot_word(self, hot_words) -> None:
try:
for hot_word in hot_words:
self.model.eraseHotWord(hot_word)
print(f"`{hot_word}` hot-word is erased.")
print('----------------------------------------------------')
except RuntimeError:
return
def clear_hot_words(self) -> str:
try:
self.model.clearHotWords()
return f"All hot-words were erased."
except RuntimeError:
return f"No more hot-words are left."
def deep_stream(self):
return self.model.createStream()
def frame_generator(self, audio, sample_rate=16000, frame_duration_ms=30):
"""
Takes the desired frame duration in milliseconds, the PCM data, and
the sample rate. Yields Frames of the requested duration.
"""
# audio = np.frombuffer(audio, np.int16)
n = int(sample_rate * (frame_duration_ms / 1000.0) * 2)
offset = 0
timestamp = 0.0
duration = (float(n) / sample_rate) / 2.0
while offset + n < len(audio):
yield Frame(audio[offset:offset + n], timestamp, duration)
timestamp += duration
offset += n |