File size: 16,019 Bytes
a600684
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
import os
import multiprocessing

# Pre-import PyTorch, if available.
# This fixes "OSError: [WinError 127] The specified procedure could not be found".
try:
    import torch
except ModuleNotFoundError:
    pass

# I'm sure this is not strictly correct, but let's keep this crutch for now.
try:
    import rwkv_cpp_shared_library
except ModuleNotFoundError:
    from . import rwkv_cpp_shared_library

from typing import TypeVar, Optional, Tuple, List

# A value of this type is either a numpy's ndarray or a PyTorch's Tensor.
NumpyArrayOrPyTorchTensor: TypeVar = TypeVar('NumpyArrayOrPyTorchTensor')

class RWKVModel:
    """
    An RWKV model managed by rwkv.cpp library.
    """

    def __init__(
            self,
            shared_library: rwkv_cpp_shared_library.RWKVSharedLibrary,
            model_path: str,
            thread_count: int = max(1, multiprocessing.cpu_count() // 2),
            gpu_layer_count: int = 0,
            **kwargs
    ) -> None:
        """
        Loads the model and prepares it for inference.
        In case of any error, this method will throw an exception.

        Parameters
        ----------
        shared_library : RWKVSharedLibrary
            rwkv.cpp shared library.
        model_path : str
            Path to RWKV model file in ggml format.
        thread_count : int
            Thread count to use. If not set, defaults to CPU count / 2.
        gpu_layer_count : int
            Count of layers to offload onto the GPU, must be >= 0.
            See documentation of `gpu_offload_layers` for details about layer offloading.
        """

        if 'gpu_layers_count' in kwargs:
            gpu_layer_count = kwargs['gpu_layers_count']

        if not os.path.isfile(model_path):
            raise ValueError(f'{model_path} is not a file')
        
        if not (thread_count > 0):
            raise ValueError('Thread count must be > 0')  

        if not (gpu_layer_count >= 0):
            raise ValueError('GPU layer count must be >= 0')

        self._library: rwkv_cpp_shared_library.RWKVSharedLibrary = shared_library

        self._ctx: rwkv_cpp_shared_library.RWKVContext = self._library.rwkv_init_from_file(model_path, thread_count)

        if gpu_layer_count > 0:
            self.gpu_offload_layers(gpu_layer_count)

        self._state_buffer_element_count: int = self._library.rwkv_get_state_buffer_element_count(self._ctx)
        self._logits_buffer_element_count: int = self._library.rwkv_get_logits_buffer_element_count(self._ctx)

        self._valid: bool = True

    def gpu_offload_layers(self, layer_count: int) -> bool:
        """
        Offloads specified count of model layers onto the GPU. Offloaded layers are evaluated using cuBLAS or CLBlast.
        For the purposes of this function, model head (unembedding matrix) is treated as an additional layer:
        - pass `model.n_layer` to offload all layers except model head
        - pass `model.n_layer + 1` to offload all layers, including model head

        Returns true if at least one layer was offloaded.
        If rwkv.cpp was compiled without cuBLAS and CLBlast support, this function is a no-op and always returns false.

        Parameters
        ----------
        layer_count : int
            Count of layers to offload onto the GPU, must be >= 0.
        """

        if not (layer_count >= 0):
            raise ValueError('Layer count must be >= 0')

        return self._library.rwkv_gpu_offload_layers(self._ctx, layer_count)

    @property
    def n_vocab(self) -> int:
        return self._library.rwkv_get_n_vocab(self._ctx)

    @property
    def n_embed(self) -> int:
        return self._library.rwkv_get_n_embed(self._ctx)

    @property
    def n_layer(self) -> int:
        return self._library.rwkv_get_n_layer(self._ctx)

    def eval(
            self,
            token: int,
            state_in: Optional[NumpyArrayOrPyTorchTensor],
            state_out: Optional[NumpyArrayOrPyTorchTensor] = None,
            logits_out: Optional[NumpyArrayOrPyTorchTensor] = None,
            use_numpy: bool = False
    ) -> Tuple[NumpyArrayOrPyTorchTensor, NumpyArrayOrPyTorchTensor]:
        """
        Evaluates the model for a single token.
        In case of any error, this method will throw an exception.

        Parameters
        ----------
        token : int
            Index of next token to be seen by the model. Must be in range 0 <= token < n_vocab.
        state_in : Optional[NumpyArrayOrTorchTensor]
            State from previous call of this method. If this is a first pass, set it to None.
        state_out : Optional[NumpyArrayOrTorchTensor]
            Optional output tensor for state. If provided, must be of type float32, contiguous and of shape (state_buffer_element_count).
        logits_out : Optional[NumpyArrayOrTorchTensor]
            Optional output tensor for logits. If provided, must be of type float32, contiguous and of shape (logits_buffer_element_count).
        use_numpy : bool
            If set to True, numpy's ndarrays will be created instead of PyTorch's Tensors.
            This parameter is ignored if any tensor parameter is not None; in such case,
            type of returned tensors will match the type of received tensors.

        Returns
        -------
        logits, state
            Logits vector of shape (n_vocab); state for the next step.
        """

        if not self._valid:
            raise ValueError('Model was freed')

        use_numpy = self._detect_numpy_usage([state_in, state_out, logits_out], use_numpy)

        if state_in is not None:
            self._validate_tensor(state_in, 'state_in', self._state_buffer_element_count)

            state_in_ptr = self._get_data_ptr(state_in)
        else:
            state_in_ptr = 0

        if state_out is not None:
            self._validate_tensor(state_out, 'state_out', self._state_buffer_element_count)
        else:
            state_out = self._zeros_float32(self._state_buffer_element_count, use_numpy)

        if logits_out is not None:
            self._validate_tensor(logits_out, 'logits_out', self._logits_buffer_element_count)
        else:
            logits_out = self._zeros_float32(self._logits_buffer_element_count, use_numpy)

        self._library.rwkv_eval(
            self._ctx,
            token,
            state_in_ptr,
            self._get_data_ptr(state_out),
            self._get_data_ptr(logits_out)
        )

        return logits_out, state_out
    
    def eval_sequence(
            self,
            tokens: List[int],
            state_in: Optional[NumpyArrayOrPyTorchTensor],
            state_out: Optional[NumpyArrayOrPyTorchTensor] = None,
            logits_out: Optional[NumpyArrayOrPyTorchTensor] = None,
            use_numpy: bool = False
    ) -> Tuple[NumpyArrayOrPyTorchTensor, NumpyArrayOrPyTorchTensor]:
        """
        Evaluates the model for a sequence of tokens.

        NOTE ON GGML NODE LIMIT

        ggml has a hard-coded limit on max amount of nodes in a computation graph. The sequence graph is built in a way that quickly exceedes
        this limit when using large models and/or large sequence lengths.
        Fortunately, rwkv.cpp's fork of ggml has increased limit which was tested to work for sequence lengths up to 64 for 14B models.

        If you get `GGML_ASSERT: ...\\ggml.c:16941: cgraph->n_nodes < GGML_MAX_NODES`, this means you've exceeded the limit.
        To get rid of the assertion failure, reduce the model size and/or sequence length.

        In case of any error, this method will throw an exception.

        Parameters
        ----------
        tokens : List[int]
            Indices of the next tokens to be seen by the model. Must be in range 0 <= token < n_vocab.
        state_in : Optional[NumpyArrayOrTorchTensor]
            State from previous call of this method. If this is a first pass, set it to None.
        state_out : Optional[NumpyArrayOrTorchTensor]
            Optional output tensor for state. If provided, must be of type float32, contiguous and of shape (state_buffer_element_count).
        logits_out : Optional[NumpyArrayOrTorchTensor]
            Optional output tensor for logits. If provided, must be of type float32, contiguous and of shape (logits_buffer_element_count).
        use_numpy : bool
            If set to True, numpy's ndarrays will be created instead of PyTorch's Tensors.
            This parameter is ignored if any tensor parameter is not None; in such case,
            type of returned tensors will match the type of received tensors.

        Returns
        -------
        logits, state
            Logits vector of shape (n_vocab); state for the next step.
        """

        if not self._valid:
            raise ValueError('Model was freed')

        use_numpy = self._detect_numpy_usage([state_in, state_out, logits_out], use_numpy)

        if state_in is not None:
            self._validate_tensor(state_in, 'state_in', self._state_buffer_element_count)

            state_in_ptr = self._get_data_ptr(state_in)
        else:
            state_in_ptr = 0

        if state_out is not None:
            self._validate_tensor(state_out, 'state_out', self._state_buffer_element_count)
        else:
            state_out = self._zeros_float32(self._state_buffer_element_count, use_numpy)

        if logits_out is not None:
            self._validate_tensor(logits_out, 'logits_out', self._logits_buffer_element_count)
        else:
            logits_out = self._zeros_float32(self._logits_buffer_element_count, use_numpy)

        self._library.rwkv_eval_sequence(
            self._ctx,
            tokens,
            state_in_ptr,
            self._get_data_ptr(state_out),
            self._get_data_ptr(logits_out)
        )

        return logits_out, state_out

    def eval_sequence_in_chunks(
            self,
            tokens: List[int],
            state_in: Optional[NumpyArrayOrPyTorchTensor],
            state_out: Optional[NumpyArrayOrPyTorchTensor] = None,
            logits_out: Optional[NumpyArrayOrPyTorchTensor] = None,
            chunk_size: int = 16,
            use_numpy: bool = False
    ) -> Tuple[NumpyArrayOrPyTorchTensor, NumpyArrayOrPyTorchTensor]:
        """
        Evaluates the model for a sequence of tokens using `eval_sequence`, splitting a potentially long sequence into fixed-length chunks.
        This function is useful for processing complete prompts and user input in chat & role-playing use-cases.
        It is recommended to use this function instead of `eval_sequence` to avoid mistakes and get maximum performance.

        Chunking allows processing sequences of thousands of tokens, while not reaching the ggml's node limit and not consuming too much memory.
        A reasonable and recommended value of chunk size is 16. If you want maximum performance, try different chunk sizes in range [2..64]
        and choose one that works the best in your use case.

        In case of any error, this method will throw an exception.

        Parameters
        ----------
        tokens : List[int]
            Indices of the next tokens to be seen by the model. Must be in range 0 <= token < n_vocab.
        chunk_size : int
            Size of each chunk in tokens, must be positive.
        state_in : Optional[NumpyArrayOrTorchTensor]
            State from previous call of this method. If this is a first pass, set it to None.
        state_out : Optional[NumpyArrayOrTorchTensor]
            Optional output tensor for state. If provided, must be of type float32, contiguous and of shape (state_buffer_element_count).
        logits_out : Optional[NumpyArrayOrTorchTensor]
            Optional output tensor for logits. If provided, must be of type float32, contiguous and of shape (logits_buffer_element_count).
        use_numpy : bool
            If set to True, numpy's ndarrays will be created instead of PyTorch's Tensors.
            This parameter is ignored if any tensor parameter is not None; in such case,
            type of returned tensors will match the type of received tensors.

        Returns
        -------
        logits, state
            Logits vector of shape (n_vocab); state for the next step.
        """

        if not self._valid:
            raise ValueError('Model was freed')

        use_numpy = self._detect_numpy_usage([state_in, state_out, logits_out], use_numpy)

        if state_in is not None:
            self._validate_tensor(state_in, 'state_in', self._state_buffer_element_count)

            state_in_ptr = self._get_data_ptr(state_in)
        else:
            state_in_ptr = 0

        if state_out is not None:
            self._validate_tensor(state_out, 'state_out', self._state_buffer_element_count)
        else:
            state_out = self._zeros_float32(self._state_buffer_element_count, use_numpy)

        if logits_out is not None:
            self._validate_tensor(logits_out, 'logits_out', self._logits_buffer_element_count)
        else:
            logits_out = self._zeros_float32(self._logits_buffer_element_count, use_numpy)

        self._library.rwkv_eval_sequence_in_chunks(
            self._ctx,
            tokens,
            chunk_size,
            state_in_ptr,
            self._get_data_ptr(state_out),
            self._get_data_ptr(logits_out)
        )

        return logits_out, state_out

    def free(self) -> None:
        """
        Frees all allocated resources.
        In case of any error, this method will throw an exception.
        The object must not be used anymore after calling this method.
        """

        if not self._valid:
            raise ValueError('Already freed')

        self._valid = False

        self._library.rwkv_free(self._ctx)

    def __del__(self) -> None:
        # Free the context on GC in case user forgot to call free() explicitly.
        if hasattr(self, '_valid') and self._valid:
            self.free()

    def _is_pytorch_tensor(self, tensor: NumpyArrayOrPyTorchTensor) -> bool:
        return hasattr(tensor, '__module__') and tensor.__module__ == 'torch'

    def _detect_numpy_usage(self, tensors: List[Optional[NumpyArrayOrPyTorchTensor]], use_numpy_by_default: bool) -> bool:
        for tensor in tensors:
            if tensor is not None:
                return False if self._is_pytorch_tensor(tensor) else True

        return use_numpy_by_default

    def _validate_tensor(self, tensor: NumpyArrayOrPyTorchTensor, name: str, size: int) -> None:
        if self._is_pytorch_tensor(tensor):
            tensor: torch.Tensor = tensor
            
            if tensor.device != torch.device('cpu'):
                raise ValueError(f'{name} is not on CPU')
            if tensor.dtype != torch.float32:
                raise ValueError(f'{name} is not of type float32')
            if tensor.shape != (size,):
                raise ValueError(f'{name} has invalid shape {tensor.shape}, expected ({size})')
            if not tensor.is_contiguous():
                raise ValueError(f'{name} is not contiguous')
        else:
            import numpy as np
            tensor: np.ndarray = tensor

            if tensor.dtype != np.float32:
                raise ValueError(f'{name} is not of type float32')
            if tensor.shape != (size,):
                raise ValueError(f'{name} has invalid shape {tensor.shape}, expected ({size})')
            if not tensor.data.contiguous:
                raise ValueError(f'{name} is not contiguous')

    def _get_data_ptr(self, tensor: NumpyArrayOrPyTorchTensor):
        if self._is_pytorch_tensor(tensor):
            return tensor.data_ptr()
        else:
            return tensor.ctypes.data

    def _zeros_float32(self, element_count: int, use_numpy: bool) -> NumpyArrayOrPyTorchTensor:
        if use_numpy:
            import numpy as np
            return np.zeros(element_count, dtype=np.float32)
        else:
            return torch.zeros(element_count, dtype=torch.float32, device='cpu')