ncoop57
Get minimum working openai server
bab8078
raw
history blame
4.9 kB
import json
import random
import string
import time
import os
import torch
import numpy as np
import tritonclient.grpc as client_util
from tokenizers import Tokenizer
from tritonclient.utils import np_to_triton_dtype, InferenceServerException
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
np.finfo(np.dtype("float32"))
np.finfo(np.dtype("float64"))
token = os.environ.get("HUB_TOKEN", None)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
tokenizer = AutoTokenizer.from_pretrained("bigcode/christmas-models", use_auth_token=token)
model = AutoModelForCausalLM.from_pretrained("bigcode/christmas-models", trust_remote_code=True, use_auth_token=token).to(device)
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device=device)
class CodeGenProxy:
def __init__(self, host: str = 'triton', port: int = 8001, verbose: bool = False):
self.tokenizer = AutoTokenizer.from_pretrained("bigcode/christmas-models", use_auth_token=token)
self.client = client_util.InferenceServerClient(url=f'{host}:{port}', verbose=verbose)
self.PAD_CHAR = 50256
# Max number of tokens the model can handle
self.MAX_MODEL_LEN = 2048
class TokensExceedsMaximum(Exception):
pass
@staticmethod
def prepare_tensor(name: str, tensor_input):
t = client_util.InferInput(
name, tensor_input.shape, np_to_triton_dtype(tensor_input.dtype))
t.set_data_from_numpy(tensor_input)
return t
@staticmethod
def trim_with_stopwords(output: str, stopwords: list) -> str:
for w in sorted(stopwords, key=len, reverse=True):
if output.endswith(w):
output = output[:-len(w)]
break
return output
@staticmethod
def to_word_list_format(word_dict, tokenizer):
flat_ids = []
offsets = []
for word_dict_item in word_dict:
item_flat_ids = []
item_offsets = []
for word in word_dict_item:
ids = tokenizer.encode(word)
if len(ids) == 0:
continue
item_flat_ids += ids
item_offsets.append(len(ids))
# Hack, can we do this better?
if word == '\n\n':
item_flat_ids += [198, 198]
item_offsets.append(2)
flat_ids.append(np.array(item_flat_ids))
offsets.append(np.cumsum(np.array(item_offsets)))
pad_to = max(1, max(len(ids) for ids in flat_ids))
for i, (ids, offs) in enumerate(zip(flat_ids, offsets)):
flat_ids[i] = np.pad(ids, (0, pad_to - len(ids)), constant_values=0)
offsets[i] = np.pad(offs, (0, pad_to - len(offs)), constant_values=-1)
return np.array([flat_ids, offsets], dtype="int32").transpose((1, 0, 2))
def generate(self, data):
global pipe
prompt = data['prompt']
n = data.get('n', 1)
model_name = data["model"]
choices = []
text = pipe(prompt, do_sample=True, top_p=0.95, max_new_tokens=50)[0]['generated_text']
choice = {
'text': text,
'index': 0,
'finish_reason': "stop",
'logprobs': None,
}
choices.append(choice)
completion = {
'id': None, # fill in
'model': 'codegen',
'object': 'text_completion',
'created': int(time.time()),
'choices': None, # fill in
'usage': {
'completion_tokens': int(50),
'prompt_tokens': int(50),
'total_tokens': int(100),
}
}
return completion, choices
@staticmethod
def random_completion_id():
return 'cmpl-' + ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(29))
def streamed_response(self, completion, choices):
for c in choices:
completion['id'] = self.random_completion_id()
completion['choices'] = [c]
yield f'data: {json.dumps(completion)}\n\n'
yield 'data: [DONE]\n\n'
def non_streamed_response(self, completion, choices) -> str:
completion['id'] = self.random_completion_id()
completion['choices'] = choices
return json.dumps(completion)
def __call__(self, data: dict):
st = time.time()
try:
completion, choices = self.generate(data)
except InferenceServerException as E:
print(E)
completion = {}
choices = []
ed = time.time()
print(f"Returned completion in {(ed - st) * 1000} ms")
if data.get('stream', False):
return self.streamed_response(completion, choices)
else:
return self.non_streamed_response(completion, choices)