Spaces:
Sleeping
Sleeping
# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import pathlib | |
import tempfile | |
import numpy as np | |
from pytriton.decorators import TritonContext, batch | |
from pytriton.model_config.tensor import Tensor | |
from pytriton.model_config.triton_model_config import TensorSpec | |
from pytriton.models.manager import ModelManager | |
from pytriton.models.model import Model, ModelConfig | |
from pytriton.proxy.communication import TensorStore | |
from pytriton.proxy.types import Request | |
from pytriton.utils.workspace import Workspace | |
def test_get_model_config_return_model_config_when_minimal_required_data(tmp_path): | |
def infer_func(inputs): | |
return inputs | |
triton_context = TritonContext() | |
workspace = Workspace(tmp_path / "workspace") | |
model = Model( | |
model_name="simple", | |
model_version=2, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(dtype=np.float32, shape=(-1,)), | |
Tensor(dtype=np.float32, shape=(-1,)), | |
], | |
outputs=[ | |
Tensor(dtype=np.int32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
model_config = model._get_triton_model_config() | |
assert model_config.model_name == "simple" | |
assert model_config.model_version == 2 | |
assert model_config.batching is True | |
assert model_config.max_batch_size == 128 | |
assert model_config.inputs == [ | |
TensorSpec(name="INPUT_1", dtype=np.float32, shape=(-1,)), | |
TensorSpec(name="INPUT_2", dtype=np.float32, shape=(-1,)), | |
] | |
assert model_config.outputs == [ | |
TensorSpec(name="OUTPUT_1", dtype=np.int32, shape=(-1,)), | |
] | |
ipc_socket_path = workspace.path / "ipc_proxy_backend_simple" | |
assert model_config.backend_parameters == { | |
"shared-memory-socket": f"ipc://{ipc_socket_path.as_posix()}", | |
} | |
def test_get_model_config_return_model_config_when_custom_names(): | |
def infer_func(inputs): | |
return inputs | |
triton_context = TritonContext() | |
with tempfile.TemporaryDirectory() as tempdir: | |
tempdir = pathlib.Path(tempdir) | |
workspace = Workspace(tempdir / "workspace") | |
model = Model( | |
model_name="simple", | |
model_version=2, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
Tensor(name="variable2", dtype=np.float32().dtype, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="factorials", dtype=np.int32().dtype, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
model_config = model._get_triton_model_config() | |
assert model_config.model_name == "simple" | |
assert model_config.model_version == 2 | |
assert model_config.batching is True | |
assert model_config.max_batch_size == 128 | |
assert model_config.inputs == [ | |
TensorSpec(name="variable1", dtype=object, shape=(2, 1)), | |
TensorSpec(name="variable2", dtype=np.float32, shape=(2, 1)), | |
] | |
assert model_config.outputs == [ | |
TensorSpec(name="factorials", dtype=np.int32, shape=(-1,)), | |
] | |
def test_generate_model_create_model_store(): | |
def infer_func(inputs): | |
return inputs | |
triton_context = TritonContext() | |
with tempfile.TemporaryDirectory() as tempdir: | |
tempdir = pathlib.Path(tempdir) | |
workspace = Workspace(tempdir / "workspace") | |
model = Model( | |
model_name="simple", | |
model_version=2, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
with tempfile.TemporaryDirectory() as tempdir: | |
model_repository = pathlib.Path(tempdir) / "model_repository" | |
model_repository.mkdir() | |
model.generate_model(model_repository) | |
assert (model_repository / "simple").is_dir() | |
assert (model_repository / "simple" / "config.pbtxt").is_file() | |
assert (model_repository / "simple" / "2").is_dir() | |
assert (model_repository / "simple" / "2" / "model.py").is_file() | |
def test_generate_models_with_same_names_and_different_versions_create_model_store(): | |
def infer_func(inputs): | |
return inputs | |
triton_context = TritonContext() | |
with tempfile.TemporaryDirectory() as tempdir: | |
tempdir = pathlib.Path(tempdir) | |
workspace = Workspace(tempdir / "workspace") | |
model1 = Model( | |
model_name="simple", | |
model_version=1, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
model2 = Model( | |
model_name="simple", | |
model_version=2, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
with tempfile.TemporaryDirectory() as tempdir: | |
model_repository = pathlib.Path(tempdir) / "model_repository" | |
model_repository.mkdir() | |
model1.generate_model(model_repository) | |
model2.generate_model(model_repository) | |
assert (model_repository / "simple").is_dir() | |
assert (model_repository / "simple" / "config.pbtxt").is_file() | |
assert (model_repository / "simple" / "1").is_dir() | |
assert (model_repository / "simple" / "1" / "model.py").is_file() | |
assert (model_repository / "simple" / "2").is_dir() | |
assert (model_repository / "simple" / "2" / "model.py").is_file() | |
def test_setup_create_proxy_backend_connection(tmp_path): | |
def infer_func(inputs): | |
return inputs | |
triton_context = TritonContext() | |
workspace = Workspace(tmp_path / "workspace") | |
tensor_store = TensorStore(workspace.path / "data_store.sock") | |
model = Model( | |
model_name="simple", | |
model_version=2, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
try: | |
tensor_store.start() | |
model.setup() | |
assert len(model._inference_handlers) == 1 | |
finally: | |
model.clean() | |
tensor_store.close() | |
def test_setup_can_be_called_multiple_times(tmp_path): | |
def infer_func(inputs): | |
return inputs | |
triton_context = TritonContext() | |
workspace = Workspace(tmp_path / "workspace") | |
tensor_store = TensorStore(workspace.path / "data_store.sock") | |
model = Model( | |
model_name="simple", | |
model_version=2, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
try: | |
tensor_store.start() | |
model.setup() | |
assert len(model._inference_handlers) == 1 | |
python_backend1 = model._inference_handlers[0] | |
assert python_backend1 is not None | |
model.setup() | |
assert len(model._inference_handlers) == 1 | |
python_backend2 = model._inference_handlers[0] | |
assert python_backend2 is not None | |
assert python_backend1 == python_backend2 | |
finally: | |
model.clean() | |
tensor_store.close() | |
def test_clean_remove_proxy_backend_connection(tmp_path): | |
def infer_func(inputs): | |
return inputs | |
triton_context = TritonContext() | |
workspace = Workspace(tmp_path / "workspace") | |
tensor_store = TensorStore(workspace.path / "data_store.sock") | |
model = Model( | |
model_name="simple", | |
model_version=2, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
try: | |
tensor_store.start() | |
model.setup() | |
finally: | |
model.clean() | |
tensor_store.close() | |
assert len(model._inference_handlers) == 0 | |
def test_clean_can_be_called_multiple_times(tmp_path): | |
def infer_func(inputs): | |
return inputs | |
triton_context = TritonContext() | |
workspace = Workspace(tmp_path / "workspace") | |
tensor_store = TensorStore(workspace.path / "data_store.sock") | |
model = Model( | |
model_name="simple", | |
model_version=2, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
try: | |
tensor_store.start() | |
model.setup() | |
model.clean() | |
model.clean() | |
assert len(model._inference_handlers) == 0 | |
finally: | |
tensor_store.close() | |
def test_is_alive_return_false_when_model_not_setup(tmp_path): | |
def infer_func(inputs): | |
return inputs | |
triton_context = TritonContext() | |
with tempfile.TemporaryDirectory() as tempdir: | |
tempdir = pathlib.Path(tempdir) | |
workspace = Workspace(tempdir / "workspace") | |
model = Model( | |
model_name="simple", | |
model_version=2, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
assert not model.is_alive() | |
def test_is_alive_return_true_when_model_is_setup(tmp_path): | |
def infer_func(inputs): | |
return inputs | |
triton_context = TritonContext() | |
workspace = Workspace(tmp_path / "workspace") | |
tensor_store = TensorStore(workspace.path / "data_store.sock") | |
model = Model( | |
model_name="simple", | |
model_version=2, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
try: | |
tensor_store.start() | |
model.setup() | |
assert model.is_alive() | |
assert len(model._inference_handlers) == 1 | |
finally: | |
model.clean() | |
tensor_store.close() | |
def test_triton_context_injection(tmp_path): | |
class Multimodel: | |
def infer1(self, variable1): | |
return [variable1] | |
def infer2(self, variable2): | |
return [variable2] | |
m = Multimodel() | |
def infer_func(variable3): | |
return [variable3] | |
triton_context = TritonContext() | |
workspace = Workspace(tmp_path / "workspace") | |
tensor_store = TensorStore(workspace.path / "data_store.sock") | |
tensor_store.start() | |
model1 = Model( | |
model_name="simple1", | |
model_version=1, | |
inference_fn=m.infer1, | |
inputs=[ | |
Tensor(name="variable1", dtype=np.int32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="out1", dtype=np.int32, shape=(2, 1)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
model2 = Model( | |
model_name="simple2", | |
model_version=1, | |
inference_fn=m.infer2, | |
inputs=[ | |
Tensor(name="variable2", dtype=np.int32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="out2", dtype=np.int32, shape=(2, 1)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
model3 = Model( | |
model_name="simple3", | |
model_version=1, | |
inference_fn=infer_func, | |
inputs=[ | |
Tensor(name="variable3", dtype=np.int32, shape=(2, 1)), | |
], | |
outputs=[ | |
Tensor(name="out3", dtype=np.int32, shape=(2, 1)), | |
], | |
config=ModelConfig(max_batch_size=128, batching=True), | |
workspace=workspace, | |
triton_context=triton_context, | |
strict=False, | |
) | |
manager = ModelManager("") | |
try: | |
manager.add_model(model1) | |
model1.setup() | |
manager.add_model(model2) | |
model2.setup() | |
manager.add_model(model3) | |
model3.setup() | |
input_requests1 = [Request({"variable1": np.array([[7, 5], [8, 6]])}, {})] | |
input_requests2 = [Request({"variable2": np.array([[1, 2], [1, 2], [11, 12]])}, {})] | |
input_requests3 = [Request({"variable3": np.array([[1, 2]])}, {})] | |
def assert_inputs_properly_mapped_to_outputs(expected_out_name, outputs, input_request_arr): | |
assert len(outputs) == 1 | |
assert expected_out_name in outputs[0] | |
assert outputs[0][expected_out_name].shape == input_request_arr.shape | |
assert np.array_equal(outputs[0][expected_out_name], input_request_arr) | |
outputs1 = m.infer1(input_requests1) | |
assert_inputs_properly_mapped_to_outputs("out1", outputs1, input_requests1[0]["variable1"]) | |
outputs2 = m.infer2(input_requests2) | |
assert_inputs_properly_mapped_to_outputs("out2", outputs2, input_requests2[0]["variable2"]) | |
outputs3 = infer_func(input_requests3) | |
assert_inputs_properly_mapped_to_outputs("out3", outputs3, input_requests3[0]["variable3"]) | |
outputs1 = m.infer1(input_requests1) | |
assert_inputs_properly_mapped_to_outputs("out1", outputs1, input_requests1[0]["variable1"]) | |
outputs3 = infer_func(input_requests3) | |
assert_inputs_properly_mapped_to_outputs("out3", outputs3, input_requests3[0]["variable3"]) | |
finally: | |
manager.clean() | |
tensor_store.close() | |