Spaces:
Running
Running
File size: 7,709 Bytes
b128c76 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
import io
import os
import time
from http import HTTPStatus
import numpy as np
import ormsgpack
import soundfile as sf
import torch
from kui.asgi import HTTPException, HttpView, JSONResponse, StreamResponse, request
from loguru import logger
from tools.schema import (
ServeASRRequest,
ServeASRResponse,
ServeChatRequest,
ServeTTSRequest,
ServeVQGANDecodeRequest,
ServeVQGANDecodeResponse,
ServeVQGANEncodeRequest,
ServeVQGANEncodeResponse,
)
from tools.server.agent import get_response_generator
from tools.server.api_utils import (
buffer_to_async_generator,
get_content_type,
inference_async,
)
from tools.server.inference import inference_wrapper as inference
from tools.server.model_manager import ModelManager
from tools.server.model_utils import batch_asr, cached_vqgan_batch_encode, vqgan_decode
MAX_NUM_SAMPLES = int(os.getenv("NUM_SAMPLES", 1))
class HealthView(HttpView):
"""
Return the health status of the server.
"""
@classmethod
async def post(cls):
return JSONResponse({"status": "ok"})
class VQGANEncodeView(HttpView):
"""
Encode the audio into symbolic tokens.
"""
@classmethod
async def post(cls):
# Decode the request
payload = await request.data()
req = ServeVQGANEncodeRequest(**payload)
# Get the model from the app
model_manager: ModelManager = request.app.state.model_manager
decoder_model = model_manager.decoder_model
# Encode the audio
start_time = time.time()
tokens = cached_vqgan_batch_encode(decoder_model, req.audios)
logger.info(
f"[EXEC] VQGAN encode time: {(time.time() - start_time) * 1000:.2f}ms"
)
# Return the response
return ormsgpack.packb(
ServeVQGANEncodeResponse(tokens=[i.tolist() for i in tokens]),
option=ormsgpack.OPT_SERIALIZE_PYDANTIC,
)
class VQGANDecodeView(HttpView):
"""
Decode the symbolic tokens into audio.
"""
@classmethod
async def post(cls):
# Decode the request
payload = await request.data()
req = ServeVQGANDecodeRequest(**payload)
# Get the model from the app
model_manager: ModelManager = request.app.state.model_manager
decoder_model = model_manager.decoder_model
# Decode the audio
tokens = [torch.tensor(token, dtype=torch.int) for token in req.tokens]
start_time = time.time()
audios = vqgan_decode(decoder_model, tokens)
logger.info(
f"[EXEC] VQGAN decode time: {(time.time() - start_time) * 1000:.2f}ms"
)
audios = [audio.astype(np.float16).tobytes() for audio in audios]
# Return the response
return ormsgpack.packb(
ServeVQGANDecodeResponse(audios=audios),
option=ormsgpack.OPT_SERIALIZE_PYDANTIC,
)
class ASRView(HttpView):
"""
Perform automatic speech recognition on the audio.
"""
@classmethod
async def post(cls):
# Decode the request
payload = await request.data()
req = ServeASRRequest(**payload)
# Get the model from the app
model_manager: ModelManager = request.app.state.model_manager
asr_model = model_manager.asr_model
lock = request.app.state.lock
# Perform ASR
start_time = time.time()
audios = [np.frombuffer(audio, dtype=np.float16) for audio in req.audios]
audios = [torch.from_numpy(audio).float() for audio in audios]
if any(audios.shape[-1] >= 30 * req.sample_rate for audios in audios):
raise HTTPException(status_code=400, content="Audio length is too long")
transcriptions = batch_asr(
asr_model, lock, audios=audios, sr=req.sample_rate, language=req.language
)
logger.info(f"[EXEC] ASR time: {(time.time() - start_time) * 1000:.2f}ms")
# Return the response
return ormsgpack.packb(
ServeASRResponse(transcriptions=transcriptions),
option=ormsgpack.OPT_SERIALIZE_PYDANTIC,
)
class TTSView(HttpView):
"""
Perform text-to-speech on the input text.
"""
@classmethod
async def post(cls):
# Decode the request
payload = await request.data()
req = ServeTTSRequest(**payload)
# Get the model from the app
app_state = request.app.state
model_manager: ModelManager = app_state.model_manager
engine = model_manager.tts_inference_engine
sample_rate = engine.decoder_model.spec_transform.sample_rate
# Check if the text is too long
if app_state.max_text_length > 0 and len(req.text) > app_state.max_text_length:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
content=f"Text is too long, max length is {app_state.max_text_length}",
)
# Check if streaming is enabled
if req.streaming and req.format != "wav":
raise HTTPException(
HTTPStatus.BAD_REQUEST,
content="Streaming only supports WAV format",
)
# Perform TTS
if req.streaming:
return StreamResponse(
iterable=inference_async(req, engine),
headers={
"Content-Disposition": f"attachment; filename=audio.{req.format}",
},
content_type=get_content_type(req.format),
)
else:
fake_audios = next(inference(req, engine))
buffer = io.BytesIO()
sf.write(
buffer,
fake_audios,
sample_rate,
format=req.format,
)
return StreamResponse(
iterable=buffer_to_async_generator(buffer.getvalue()),
headers={
"Content-Disposition": f"attachment; filename=audio.{req.format}",
},
content_type=get_content_type(req.format),
)
class ChatView(HttpView):
"""
Perform chatbot inference on the input text.
"""
@classmethod
async def post(cls):
# Decode the request
payload = await request.data()
req = ServeChatRequest(**payload)
# Check that the number of samples requested is correct
if req.num_samples < 1 or req.num_samples > MAX_NUM_SAMPLES:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
content=f"Number of samples must be between 1 and {MAX_NUM_SAMPLES}",
)
# Get the type of content provided
content_type = request.headers.get("Content-Type", "application/json")
json_mode = "application/json" in content_type
# Get the models from the app
model_manager: ModelManager = request.app.state.model_manager
llama_queue = model_manager.llama_queue
tokenizer = model_manager.tokenizer
config = model_manager.config
device = request.app.state.device
# Get the response generators
response_generator = get_response_generator(
llama_queue, tokenizer, config, req, device, json_mode
)
# Return the response in the correct format
if req.streaming is False:
result = response_generator()
if json_mode:
return JSONResponse(result.model_dump())
else:
return ormsgpack.packb(result, option=ormsgpack.OPT_SERIALIZE_PYDANTIC)
return StreamResponse(
iterable=response_generator(), content_type="text/event-stream"
)
|