|
from typing import Dict, List, Any |
|
import logger |
|
import spaces |
|
import gradio as gr |
|
import json |
|
import torch |
|
import wavio |
|
from tqdm import tqdm |
|
from huggingface_hub import snapshot_download |
|
from models import AudioDiffusion, DDPMScheduler |
|
from audioldm.audio.stft import TacotronSTFT |
|
from audioldm.variational_autoencoder import AutoencoderKL |
|
from pydub import AudioSegment |
|
from gradio import Markdown |
|
|
|
import torch |
|
|
|
from diffusers.models.unet_2d_condition import UNet2DConditionModel |
|
from diffusers import DiffusionPipeline,AudioPipelineOutput |
|
from transformers import CLIPTextModel, T5EncoderModel, AutoModel, T5Tokenizer, T5TokenizerFast |
|
from typing import Union |
|
from diffusers.utils.torch_utils import randn_tensor |
|
from tqdm import tqdm |
|
|
|
class Tango: |
|
def __init__(self, name="declare-lab/tango2", device=device_selection): |
|
|
|
path = snapshot_download(repo_id=name) |
|
|
|
vae_config = json.load(open("{}/vae_config.json".format(path))) |
|
stft_config = json.load(open("{}/stft_config.json".format(path))) |
|
main_config = json.load(open("{}/main_config.json".format(path))) |
|
|
|
self.vae = AutoencoderKL(**vae_config).to(device) |
|
self.stft = TacotronSTFT(**stft_config).to(device) |
|
self.model = AudioDiffusion(**main_config).to(device) |
|
|
|
vae_weights = torch.load("{}/pytorch_model_vae.bin".format(path), map_location=device) |
|
stft_weights = torch.load("{}/pytorch_model_stft.bin".format(path), map_location=device) |
|
main_weights = torch.load("{}/pytorch_model_main.bin".format(path), map_location=device) |
|
|
|
self.vae.load_state_dict(vae_weights) |
|
self.stft.load_state_dict(stft_weights) |
|
self.model.load_state_dict(main_weights) |
|
|
|
print ("Successfully loaded checkpoint from:", name) |
|
|
|
self.vae.eval() |
|
self.stft.eval() |
|
self.model.eval() |
|
|
|
self.scheduler = DDPMScheduler.from_pretrained(main_config["scheduler_name"], subfolder="scheduler") |
|
|
|
def chunks(self, lst, n): |
|
""" Yield successive n-sized chunks from a list. """ |
|
for i in range(0, len(lst), n): |
|
yield lst[i:i + n] |
|
|
|
def generate(self, prompt, steps=100, guidance=3, samples=1, disable_progress=True): |
|
""" Genrate audio for a single prompt string. """ |
|
with torch.no_grad(): |
|
latents = self.model.inference([prompt], self.scheduler, steps, guidance, samples, disable_progress=disable_progress) |
|
mel = self.vae.decode_first_stage(latents) |
|
wave = self.vae.decode_to_waveform(mel) |
|
return wave[0] |
|
|
|
def generate_for_batch(self, prompts, steps=200, guidance=3, samples=1, batch_size=8, disable_progress=True): |
|
""" Genrate audio for a list of prompt strings. """ |
|
outputs = [] |
|
for k in tqdm(range(0, len(prompts), batch_size)): |
|
batch = prompts[k: k+batch_size] |
|
with torch.no_grad(): |
|
latents = self.model.inference(batch, self.scheduler, steps, guidance, samples, disable_progress=disable_progress) |
|
mel = self.vae.decode_first_stage(latents) |
|
wave = self.vae.decode_to_waveform(mel) |
|
outputs += [item for item in wave] |
|
if samples == 1: |
|
return outputs |
|
else: |
|
return list(self.chunks(outputs, samples)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
class EndpointHandler(): |
|
def __init__(self, path=""): |
|
|
|
|
|
self.model= tango(device='cuda') |
|
|
|
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
""" |
|
data args: |
|
inputs (:obj: `str` | `PIL.Image` | `np.array`) |
|
kwargs |
|
Return: |
|
A :obj:`list` | `dict`: will be serialized and returned |
|
""" |
|
|
|
|
|
|
|
inputs = data.pop("inputs", data) |
|
|
|
logger.info(f"Received incoming request with {data=}") |
|
|
|
if "inputs" in data and isinstance(data["inputs"], str): |
|
prompt = data.pop("inputs") |
|
elif "prompt" in data and isinstance(data["prompt"], str): |
|
prompt = data.pop("prompt") |
|
else: |
|
raise ValueError( |
|
"Provided input body must contain either the key `inputs` or `prompt` with the" |
|
" prompt to use for the image generation, and it needs to be a non-empty string." |
|
) |
|
|
|
parameters = data.pop("parameters", {}) |
|
|
|
num_inference_steps = parameters.get("num_inference_steps", 30) |
|
width = parameters.get("width", 1024) |
|
height = parameters.get("height", 768) |
|
guidance_scale = parameters.get("guidance_scale", 3.5) |
|
|
|
|
|
seed = parameters.get("seed", 0) |
|
generator = torch.manual_seed(seed) |
|
|