|
""" |
|
CUDA: 12.1 |
|
cuDNN Version: 8.9.2.26_1.0-1_amd64 |
|
Tensorflow Version: 2.12.0 |
|
Torch Version: 2.1.0.dev20230606+cu121 |
|
Transformers Version: 4.30.2 |
|
BENCHMARK: |
|
- RAM: 2.8 GB |
|
- VRAM: 1812 MB |
|
- test.wav: 23 s |
|
- GPU (3060) -> 1.1s (TensorCore is used for fp16 inference) |
|
- GPU (1660S) -> 3.3s |
|
- CPU -> torch.float16 not supported on CPU (Ryzen 5 3600) |
|
- Punchuation: True |
|
""" |
|
|
|
from transformers import ( |
|
WhisperForConditionalGeneration, WhisperProcessor, WhisperConfig |
|
) |
|
import torch |
|
import ffmpeg |
|
import torch |
|
import torch.nn.functional as F |
|
import numpy as np |
|
import os |
|
|
|
|
|
SAMPLE_RATE = 16000 |
|
CHUNK_LENGTH = 30 |
|
N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE |
|
|
|
class Model: |
|
def __init__(self, |
|
model_name_or_path: str, |
|
cuda_visible_device: str = "0", |
|
device: str = 'cuda' |
|
): |
|
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_device |
|
self.DEVICE = device |
|
|
|
self.processor = WhisperProcessor.from_pretrained(model_name_or_path) |
|
self.tokenizer = self.processor.tokenizer |
|
|
|
self.config = WhisperConfig.from_pretrained(model_name_or_path) |
|
|
|
self.model = WhisperForConditionalGeneration( |
|
config=self.config |
|
).from_pretrained( |
|
pretrained_model_name_or_path = model_name_or_path, |
|
torch_dtype = self.config.torch_dtype, |
|
|
|
low_cpu_mem_usage = True, |
|
) |
|
|
|
|
|
if self.model.device.type != self.DEVICE: |
|
print(f'Moving model to {self.DEVICE}') |
|
self.model = self.model.to(self.DEVICE) |
|
self.model.eval() |
|
|
|
else: |
|
print(f'Model is already on {self.DEVICE}') |
|
self.model.eval() |
|
|
|
print('dtype of model acc to config: ', self.config.torch_dtype) |
|
print('dtype of loaded model: ', self.model.dtype) |
|
|
|
|
|
|
|
def load_audio(self, file: str, sr: int = SAMPLE_RATE, start_time: int = 0, dtype=np.float16): |
|
try: |
|
|
|
|
|
out, _ = ( |
|
ffmpeg.input(file, ss=start_time, threads=0) |
|
.output("-", format="s16le", acodec="pcm_s16le", ac=1, ar=sr) |
|
.run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True) |
|
) |
|
except ffmpeg.Error as e: |
|
raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e |
|
|
|
|
|
return np.frombuffer(out, np.int16).flatten().astype(dtype) / 32768.0 |
|
|
|
|
|
|
|
def _pad_or_trim(self, array, length: int = N_SAMPLES, *, axis: int = -1): |
|
""" |
|
Pad or trim the audio array to N_SAMPLES, as expected by the encoder. |
|
""" |
|
if torch.is_tensor(array): |
|
if array.shape[axis] > length: |
|
array = array.index_select( |
|
dim=axis, index=torch.arange(length, device=array.device) |
|
) |
|
|
|
if array.shape[axis] < length: |
|
pad_widths = [(0, 0)] * array.ndim |
|
pad_widths[axis] = (0, length - array.shape[axis]) |
|
array = F.pad(array, [pad for sizes in pad_widths[::-1] for pad in sizes]) |
|
else: |
|
if array.shape[axis] > length: |
|
array = array.take(indices=range(length), axis=axis) |
|
|
|
if array.shape[axis] < length: |
|
pad_widths = [(0, 0)] * array.ndim |
|
pad_widths[axis] = (0, length - array.shape[axis]) |
|
array = np.pad(array, pad_widths) |
|
|
|
return array |
|
|
|
def transcribe(self, audio: np.ndarray, language: str = "english"): |
|
|
|
audio = self._pad_or_trim(audio) |
|
input_features = self.processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt").input_features.half().to(self.DEVICE) |
|
with torch.no_grad(): |
|
predicted_ids = self.model.generate( |
|
input_features, |
|
num_beams = 1, |
|
language=language, |
|
task="transcribe", |
|
use_cache=True, |
|
is_multilingual=True, |
|
return_timestamps=True, |
|
) |
|
|
|
transcription = self.tokenizer.batch_decode(predicted_ids, skip_special_tokens=True)[0] |
|
return transcription.strip() |