File size: 4,403 Bytes
b289b78 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
from transformers import (
WhisperForConditionalGeneration,
WhisperProcessor,
WhisperConfig,
)
import torch
import ffmpeg
import torch
import torch.nn.functional as F
import numpy as np
import os
# load_audio and pad_or_trim functions
SAMPLE_RATE = 16000
CHUNK_LENGTH = 30 # 30-second chunks
N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE # 480000 samples in a 30-second chunk
# audio = whisper.load_audio('test.wav')
def load_audio(file: str, sr: int = SAMPLE_RATE, start_time: int = 0, dtype=np.float16):
"""
Load an audio file into a numpy array at the specified sampling rate.
"""
try:
# This launches a subprocess to decode audio while down-mixing and resampling as necessary.
# Requires the ffmpeg CLI and `ffmpeg-python` package to be installed.
out, _ = (
ffmpeg.input(file, ss=start_time, threads=0)
.output("-", format="s16le", acodec="pcm_s16le", ac=1, ar=sr)
.run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
)
except ffmpeg.Error as e:
raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e
# return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0
return np.frombuffer(out, np.int16).flatten().astype(dtype) / 32768.0
# audio = whisper.pad_or_trim(audio)
def pad_or_trim(array, length: int = N_SAMPLES, *, axis: int = -1):
"""
Pad or trim the audio array to N_SAMPLES, as expected by the encoder.
"""
if torch.is_tensor(array):
if array.shape[axis] > length:
array = array.index_select(
dim=axis, index=torch.arange(length, device=array.device)
)
if array.shape[axis] < length:
pad_widths = [(0, 0)] * array.ndim
pad_widths[axis] = (0, length - array.shape[axis])
array = F.pad(array, [pad for sizes in pad_widths[::-1] for pad in sizes])
else:
if array.shape[axis] > length:
array = array.take(indices=range(length), axis=axis)
if array.shape[axis] < length:
pad_widths = [(0, 0)] * array.ndim
pad_widths[axis] = (0, length - array.shape[axis])
array = np.pad(array, pad_widths)
return array
class Model:
def __init__(
self,
model_name_or_path: str,
cuda_visible_device: str = "0",
device: str = "cuda", # torch.device("cuda" if torch.cuda.is_available() else "cpu")
):
os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_device
self.DEVICE = device
self.processor = WhisperProcessor.from_pretrained(model_name_or_path)
self.tokenizer = self.processor.tokenizer
self.config = WhisperConfig.from_pretrained(model_name_or_path)
self.model = WhisperForConditionalGeneration(
config=self.config
).from_pretrained(
pretrained_model_name_or_path=model_name_or_path,
torch_dtype=self.config.torch_dtype,
# device_map=DEVICE, # 'balanced', 'balanced_low_0', 'sequential', 'cuda', 'cpu'
low_cpu_mem_usage=True,
)
# Move model to GPU
if self.model.device.type != self.DEVICE:
print(f"Moving model to {self.DEVICE}")
self.model = self.model.to(self.DEVICE)
self.model.eval()
else:
print(f"Model is already on {self.DEVICE}")
self.model.eval()
print("dtype of model acc to config: ", self.config.torch_dtype)
print("dtype of loaded model: ", self.model.dtype)
def transcribe(
self, audio, language: str = "english", skip_special_tokens: bool = True
) -> str:
input_features = (
self.processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt")
.input_features.half()
.to(self.DEVICE)
)
with torch.no_grad():
predicted_ids = self.model.generate(
input_features,
num_beams=1,
language=language,
task="transcribe",
use_cache=True,
is_multilingual=True,
return_timestamps=True,
)
transcription = self.tokenizer.batch_decode(
predicted_ids, skip_special_tokens=skip_special_tokens
)[0]
return transcription.strip()
|