Spaces:
Running
on
Zero
Running
on
Zero
#!/usr/bin/env python3 | |
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""Server shows online learning model concept.""" | |
import argparse | |
import logging | |
import threading | |
from queue import Queue | |
from threading import Lock | |
import numpy as np | |
import torch # pytype: disable=import-error | |
import torch.nn.functional as functional # pytype: disable=import-error | |
import torch.optim as optim # pytype: disable=import-error | |
from torch.optim.lr_scheduler import StepLR # pytype: disable=import-error | |
from pytriton.decorators import batch | |
from pytriton.model_config import ModelConfig, Tensor | |
from pytriton.triton import Triton, TritonConfig | |
from model import Net # pytype: disable=import-error # isort:skip | |
LOGGER = logging.getLogger("examples.online_learning_mnist.server") | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s: %(message)s") | |
class Trainer: | |
"""Trainer class for MNIST model. | |
It is used to train the model and to keep track of the training progress. | |
It defines the learning rate scheduler and optimizer. | |
It organizes the training process in epochs. | |
""" | |
def __init__(self, model, lr, gamma, epoch_size): | |
self.model = model | |
self.optimizer = optim.Adadelta(model.parameters(), lr=lr) | |
self.scheduler = StepLR(self.optimizer, step_size=1, gamma=gamma) | |
self.iter = 0 | |
self.epoch = 0 | |
self.epoch_size = epoch_size | |
def train_batch(self, data, target): | |
self.optimizer.zero_grad() | |
output = self.model(data) | |
loss = functional.nll_loss(output, target) | |
loss.backward() | |
self.optimizer.step() | |
self.iter += 1 | |
return loss.item() | |
def ready(self): | |
return self.iter >= self.epoch_size | |
def next_epoch(self): | |
self.iter = 0 | |
self.epoch += 1 | |
self.scheduler.step() | |
class OnlineLearning(threading.Thread): | |
"""Online learning class that implements two infer functions: train and infer. | |
Infer function is used in inference endpoint and train function is used in training endpoint. | |
Train function collects data and trains model in background thread. | |
Infer function uses trained model to make inference. | |
When trained model is ready, it is swapped with infer model. | |
""" | |
def __init__(self, device, lr, gamma, epoch_size, max_queue_size): | |
super().__init__() | |
self.device = device | |
self.trained_model = Net().to(self.device) | |
self.trained_model.train() | |
self.infer_model = Net().to(self.device) | |
self.infer_model.eval() | |
self.stopped = False | |
self.train_data_queue = Queue(maxsize=max_queue_size) | |
self.lock = Lock() | |
self.trainer = Trainer(self.trained_model, lr, gamma, epoch_size) | |
self.last_loss = 0.0 | |
def run(self) -> None: | |
while not self.stopped: | |
image, target = self.train_data_queue.get() | |
if self.stopped: | |
return | |
data_tensor = torch.from_numpy(image).to(self.device) | |
labels = target.reshape((target.shape[0],)) | |
labels_tensor = torch.from_numpy(labels).to(self.device) | |
self.last_loss = self.trainer.train_batch(data_tensor, labels_tensor) | |
if self.trainer.ready(): | |
self.replace_inference_model() | |
self.trainer.next_epoch() | |
def stop(self): | |
self.stopped = True | |
self.train_data_queue.put((None, None)) | |
self.join() | |
def replace_inference_model(self): | |
with self.lock: | |
self.infer_model.load_state_dict(self.trained_model.state_dict()) | |
def train(self, requests): | |
"""Train function is used in training endpoint.""" | |
# concatenate all requests into one batch. No need for padding due to fixed image dimensions | |
images = np.concatenate([request["image"] for request in requests], axis=0) | |
targets = np.concatenate([request["target"] for request in requests], axis=0) | |
self.train_data_queue.put((images, targets)) | |
return [{"last_loss": np.array([[self.last_loss]]).astype(np.float32)} for _ in requests] | |
def infer(self, image): | |
"""Infer function is used in inference endpoint.""" | |
data_tensor = torch.from_numpy(image).to(self.device) | |
with self.lock: | |
res = self.infer_model(data_tensor) | |
res = res.numpy(force=True) | |
return {"predictions": res} | |
def _parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--verbose", | |
"-v", | |
action="store_true", | |
help="Enable verbose logging in debug mode.", | |
) | |
return parser.parse_args() | |
def main(): | |
args = _parse_args() | |
log_verbose = 1 if args.verbose else 0 | |
log_level = logging.DEBUG if args.verbose else logging.INFO | |
logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(name)s: %(message)s") | |
online_learning_model = OnlineLearning( | |
device=torch.device("cuda"), lr=1.0, gamma=0.7, epoch_size=134, max_queue_size=1000 | |
) | |
online_learning_model.start() | |
try: | |
with Triton(config=TritonConfig(log_verbose=log_verbose)) as triton: | |
LOGGER.info("Loading OnlineLearning model") | |
triton.bind( | |
model_name="MnistTrain", | |
infer_func=online_learning_model.train, | |
inputs=[ | |
# image for training | |
Tensor(name="image", dtype=np.float32, shape=(1, 28, 28)), | |
# target class corresponding to image (class index from 0 to 9) | |
Tensor(name="target", dtype=np.int64, shape=(1,)), | |
], | |
outputs=[ | |
# last loss value batch | |
Tensor(name="last_loss", dtype=np.float32, shape=(1,)), | |
], | |
config=ModelConfig(max_batch_size=64), | |
strict=True, | |
) | |
triton.bind( | |
model_name="MnistInfer", | |
infer_func=online_learning_model.infer, | |
inputs=[ | |
# image for classification | |
Tensor(name="image", dtype=np.float32, shape=(1, 28, 28)), | |
], | |
outputs=[ | |
# predictions taken from softmax layer | |
Tensor(name="predictions", dtype=np.float32, shape=(-1,)), | |
], | |
config=ModelConfig(max_batch_size=64), | |
strict=True, | |
) | |
LOGGER.info("Serving model") | |
triton.serve() | |
finally: | |
LOGGER.info("Stopping online learning model") | |
online_learning_model.stop() | |
if __name__ == "__main__": | |
main() | |