File size: 4,403 Bytes
b289b78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from transformers import (
    WhisperForConditionalGeneration,
    WhisperProcessor,
    WhisperConfig,
)
import torch
import ffmpeg
import torch
import torch.nn.functional as F
import numpy as np
import os

# load_audio and pad_or_trim functions
SAMPLE_RATE = 16000
CHUNK_LENGTH = 30  # 30-second chunks
N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE  # 480000 samples in a 30-second chunk


# audio = whisper.load_audio('test.wav')
def load_audio(file: str, sr: int = SAMPLE_RATE, start_time: int = 0, dtype=np.float16):
    """
    Load an audio file into a numpy array at the specified sampling rate.
    """
    try:
        # This launches a subprocess to decode audio while down-mixing and resampling as necessary.
        # Requires the ffmpeg CLI and `ffmpeg-python` package to be installed.
        out, _ = (
            ffmpeg.input(file, ss=start_time, threads=0)
            .output("-", format="s16le", acodec="pcm_s16le", ac=1, ar=sr)
            .run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
        )
    except ffmpeg.Error as e:
        raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

    # return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0
    return np.frombuffer(out, np.int16).flatten().astype(dtype) / 32768.0


# audio = whisper.pad_or_trim(audio)
def pad_or_trim(array, length: int = N_SAMPLES, *, axis: int = -1):
    """
    Pad or trim the audio array to N_SAMPLES, as expected by the encoder.
    """
    if torch.is_tensor(array):
        if array.shape[axis] > length:
            array = array.index_select(
                dim=axis, index=torch.arange(length, device=array.device)
            )

        if array.shape[axis] < length:
            pad_widths = [(0, 0)] * array.ndim
            pad_widths[axis] = (0, length - array.shape[axis])
            array = F.pad(array, [pad for sizes in pad_widths[::-1] for pad in sizes])
    else:
        if array.shape[axis] > length:
            array = array.take(indices=range(length), axis=axis)

        if array.shape[axis] < length:
            pad_widths = [(0, 0)] * array.ndim
            pad_widths[axis] = (0, length - array.shape[axis])
            array = np.pad(array, pad_widths)

    return array


class Model:
    def __init__(
        self,
        model_name_or_path: str,
        cuda_visible_device: str = "0",
        device: str = "cuda",  # torch.device("cuda" if torch.cuda.is_available() else "cpu")
    ):
        os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_device
        self.DEVICE = device

        self.processor = WhisperProcessor.from_pretrained(model_name_or_path)
        self.tokenizer = self.processor.tokenizer

        self.config = WhisperConfig.from_pretrained(model_name_or_path)

        self.model = WhisperForConditionalGeneration(
            config=self.config
        ).from_pretrained(
            pretrained_model_name_or_path=model_name_or_path,
            torch_dtype=self.config.torch_dtype,
            # device_map=DEVICE,      # 'balanced', 'balanced_low_0', 'sequential', 'cuda', 'cpu'
            low_cpu_mem_usage=True,
        )

        # Move model to GPU
        if self.model.device.type != self.DEVICE:
            print(f"Moving model to {self.DEVICE}")
            self.model = self.model.to(self.DEVICE)
            self.model.eval()

        else:
            print(f"Model is already on {self.DEVICE}")
            self.model.eval()

        print("dtype of model acc to config: ", self.config.torch_dtype)
        print("dtype of loaded model: ", self.model.dtype)

    def transcribe(
        self, audio, language: str = "english", skip_special_tokens: bool = True
    ) -> str:
        input_features = (
            self.processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt")
            .input_features.half()
            .to(self.DEVICE)
        )
        with torch.no_grad():
            predicted_ids = self.model.generate(
                input_features,
                num_beams=1,
                language=language,
                task="transcribe",
                use_cache=True,
                is_multilingual=True,
                return_timestamps=True,
            )

        transcription = self.tokenizer.batch_decode(
            predicted_ids, skip_special_tokens=skip_special_tokens
        )[0]
        return transcription.strip()