|
import inspect
|
|
import os
|
|
from typing import Union
|
|
|
|
import PIL
|
|
import numpy as np
|
|
import torch
|
|
import tqdm
|
|
from accelerate import load_checkpoint_in_model
|
|
from diffusers import AutoencoderKL, DDIMScheduler, UNet2DConditionModel
|
|
from diffusers.pipelines.stable_diffusion.safety_checker import \
|
|
StableDiffusionSafetyChecker
|
|
from diffusers.utils.torch_utils import randn_tensor
|
|
from huggingface_hub import snapshot_download
|
|
from transformers import CLIPImageProcessor
|
|
|
|
from model.attn_processor import SkipAttnProcessor
|
|
from model.utils import get_trainable_module, init_adapter
|
|
from utils import (compute_vae_encodings, numpy_to_pil, prepare_image,
|
|
prepare_mask_image, resize_and_crop, resize_and_padding)
|
|
|
|
|
|
class CatVTONPipeline:
|
|
def __init__(
|
|
self,
|
|
base_ckpt,
|
|
attn_ckpt,
|
|
attn_ckpt_version="mix",
|
|
weight_dtype=torch.float32,
|
|
device='cuda',
|
|
compile=False,
|
|
skip_safety_check=False,
|
|
use_tf32=True,
|
|
):
|
|
self.device = device
|
|
self.weight_dtype = weight_dtype
|
|
self.skip_safety_check = skip_safety_check
|
|
|
|
self.noise_scheduler = DDIMScheduler.from_pretrained(base_ckpt, subfolder="scheduler")
|
|
self.vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse").to(device, dtype=weight_dtype)
|
|
if not skip_safety_check:
|
|
self.feature_extractor = CLIPImageProcessor.from_pretrained(base_ckpt, subfolder="feature_extractor")
|
|
self.safety_checker = StableDiffusionSafetyChecker.from_pretrained(base_ckpt, subfolder="safety_checker").to(device, dtype=weight_dtype)
|
|
self.unet = UNet2DConditionModel.from_pretrained(base_ckpt, subfolder="unet").to(device, dtype=weight_dtype)
|
|
init_adapter(self.unet, cross_attn_cls=SkipAttnProcessor)
|
|
self.attn_modules = get_trainable_module(self.unet, "attention")
|
|
self.auto_attn_ckpt_load(attn_ckpt, attn_ckpt_version)
|
|
|
|
if compile:
|
|
self.unet = torch.compile(self.unet)
|
|
self.vae = torch.compile(self.vae, mode="reduce-overhead")
|
|
|
|
|
|
if use_tf32:
|
|
torch.set_float32_matmul_precision("high")
|
|
torch.backends.cuda.matmul.allow_tf32 = True
|
|
|
|
def auto_attn_ckpt_load(self, attn_ckpt, version):
|
|
sub_folder = {
|
|
"mix": "mix-48k-1024",
|
|
"vitonhd": "vitonhd-16k-512",
|
|
"dresscode": "dresscode-16k-512",
|
|
}[version]
|
|
if os.path.exists(attn_ckpt):
|
|
load_checkpoint_in_model(self.attn_modules, os.path.join(attn_ckpt, sub_folder, 'attention'))
|
|
else:
|
|
repo_path = snapshot_download(repo_id=attn_ckpt)
|
|
print(f"Downloaded {attn_ckpt} to {repo_path}")
|
|
load_checkpoint_in_model(self.attn_modules, os.path.join(repo_path, sub_folder, 'attention'))
|
|
|
|
def run_safety_checker(self, image):
|
|
if self.safety_checker is None:
|
|
has_nsfw_concept = None
|
|
else:
|
|
safety_checker_input = self.feature_extractor(image, return_tensors="pt").to(self.device)
|
|
image, has_nsfw_concept = self.safety_checker(
|
|
images=image, clip_input=safety_checker_input.pixel_values.to(self.weight_dtype)
|
|
)
|
|
return image, has_nsfw_concept
|
|
|
|
def check_inputs(self, image, condition_image, mask, width, height):
|
|
if isinstance(image, torch.Tensor) and isinstance(condition_image, torch.Tensor) and isinstance(mask, torch.Tensor):
|
|
return image, condition_image, mask
|
|
assert image.size == mask.size, "Image and mask must have the same size"
|
|
image = resize_and_crop(image, (width, height))
|
|
mask = resize_and_crop(mask, (width, height))
|
|
condition_image = resize_and_padding(condition_image, (width, height))
|
|
return image, condition_image, mask
|
|
|
|
def prepare_extra_step_kwargs(self, generator, eta):
|
|
|
|
|
|
|
|
|
|
|
|
accepts_eta = "eta" in set(
|
|
inspect.signature(self.noise_scheduler.step).parameters.keys()
|
|
)
|
|
extra_step_kwargs = {}
|
|
if accepts_eta:
|
|
extra_step_kwargs["eta"] = eta
|
|
|
|
|
|
accepts_generator = "generator" in set(
|
|
inspect.signature(self.noise_scheduler.step).parameters.keys()
|
|
)
|
|
if accepts_generator:
|
|
extra_step_kwargs["generator"] = generator
|
|
return extra_step_kwargs
|
|
|
|
@torch.no_grad()
|
|
def __call__(
|
|
self,
|
|
image: Union[PIL.Image.Image, torch.Tensor],
|
|
condition_image: Union[PIL.Image.Image, torch.Tensor],
|
|
mask: Union[PIL.Image.Image, torch.Tensor],
|
|
num_inference_steps: int = 50,
|
|
guidance_scale: float = 2.5,
|
|
height: int = 1024,
|
|
width: int = 768,
|
|
generator=None,
|
|
eta=1.0,
|
|
**kwargs
|
|
):
|
|
concat_dim = -2
|
|
|
|
image, condition_image, mask = self.check_inputs(image, condition_image, mask, width, height)
|
|
image = prepare_image(image).to(self.device, dtype=self.weight_dtype)
|
|
condition_image = prepare_image(condition_image).to(self.device, dtype=self.weight_dtype)
|
|
mask = prepare_mask_image(mask).to(self.device, dtype=self.weight_dtype)
|
|
|
|
masked_image = image * (mask < 0.5)
|
|
|
|
masked_latent = compute_vae_encodings(masked_image, self.vae)
|
|
condition_latent = compute_vae_encodings(condition_image, self.vae)
|
|
mask_latent = torch.nn.functional.interpolate(mask, size=masked_latent.shape[-2:], mode="nearest")
|
|
del image, mask, condition_image
|
|
|
|
masked_latent_concat = torch.cat([masked_latent, condition_latent], dim=concat_dim)
|
|
mask_latent_concat = torch.cat([mask_latent, torch.zeros_like(mask_latent)], dim=concat_dim)
|
|
|
|
latents = randn_tensor(
|
|
masked_latent_concat.shape,
|
|
generator=generator,
|
|
device=masked_latent_concat.device,
|
|
dtype=self.weight_dtype,
|
|
)
|
|
|
|
self.noise_scheduler.set_timesteps(num_inference_steps, device=self.device)
|
|
timesteps = self.noise_scheduler.timesteps
|
|
latents = latents * self.noise_scheduler.init_noise_sigma
|
|
|
|
if do_classifier_free_guidance := (guidance_scale > 1.0):
|
|
masked_latent_concat = torch.cat(
|
|
[
|
|
torch.cat([masked_latent, torch.zeros_like(condition_latent)], dim=concat_dim),
|
|
masked_latent_concat,
|
|
]
|
|
)
|
|
mask_latent_concat = torch.cat([mask_latent_concat] * 2)
|
|
|
|
|
|
extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)
|
|
num_warmup_steps = (len(timesteps) - num_inference_steps * self.noise_scheduler.order)
|
|
with tqdm.tqdm(total=num_inference_steps) as progress_bar:
|
|
for i, t in enumerate(timesteps):
|
|
|
|
non_inpainting_latent_model_input = (torch.cat([latents] * 2) if do_classifier_free_guidance else latents)
|
|
non_inpainting_latent_model_input = self.noise_scheduler.scale_model_input(non_inpainting_latent_model_input, t)
|
|
|
|
inpainting_latent_model_input = torch.cat([non_inpainting_latent_model_input, mask_latent_concat, masked_latent_concat], dim=1)
|
|
|
|
noise_pred= self.unet(
|
|
inpainting_latent_model_input,
|
|
t.to(self.device),
|
|
encoder_hidden_states=None,
|
|
return_dict=False,
|
|
)[0]
|
|
|
|
if do_classifier_free_guidance:
|
|
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
|
|
noise_pred = noise_pred_uncond + guidance_scale * (
|
|
noise_pred_text - noise_pred_uncond
|
|
)
|
|
|
|
latents = self.noise_scheduler.step(
|
|
noise_pred, t, latents, **extra_step_kwargs
|
|
).prev_sample
|
|
|
|
if i == len(timesteps) - 1 or (
|
|
(i + 1) > num_warmup_steps
|
|
and (i + 1) % self.noise_scheduler.order == 0
|
|
):
|
|
progress_bar.update()
|
|
|
|
|
|
latents = latents.split(latents.shape[concat_dim] // 2, dim=concat_dim)[0]
|
|
latents = 1 / self.vae.config.scaling_factor * latents
|
|
image = self.vae.decode(latents.to(self.device, dtype=self.weight_dtype)).sample
|
|
image = (image / 2 + 0.5).clamp(0, 1)
|
|
|
|
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
|
|
image = numpy_to_pil(image)
|
|
|
|
|
|
if not self.skip_safety_check:
|
|
current_script_directory = os.path.dirname(os.path.realpath(__file__))
|
|
nsfw_image = os.path.join(os.path.dirname(current_script_directory), 'resource', 'img', 'NSFW.jpg')
|
|
nsfw_image = PIL.Image.open(nsfw_image).resize(image[0].size)
|
|
image_np = np.array(image)
|
|
_, has_nsfw_concept = self.run_safety_checker(image=image_np)
|
|
for i, not_safe in enumerate(has_nsfw_concept):
|
|
if not_safe:
|
|
image[i] = nsfw_image
|
|
return image
|
|
|
|
|
|
class CatVTONPix2PixPipeline(CatVTONPipeline):
|
|
def auto_attn_ckpt_load(self, attn_ckpt, version):
|
|
|
|
if os.path.exists(attn_ckpt):
|
|
load_checkpoint_in_model(self.attn_modules, os.path.join(attn_ckpt, version, 'attention'))
|
|
else:
|
|
repo_path = snapshot_download(repo_id=attn_ckpt)
|
|
print(f"Downloaded {attn_ckpt} to {repo_path}")
|
|
load_checkpoint_in_model(self.attn_modules, os.path.join(repo_path, version, 'attention'))
|
|
|
|
def check_inputs(self, image, condition_image, width, height):
|
|
if isinstance(image, torch.Tensor) and isinstance(condition_image, torch.Tensor) and isinstance(torch.Tensor):
|
|
return image, condition_image
|
|
image = resize_and_crop(image, (width, height))
|
|
condition_image = resize_and_padding(condition_image, (width, height))
|
|
return image, condition_image
|
|
|
|
@torch.no_grad()
|
|
def __call__(
|
|
self,
|
|
image: Union[PIL.Image.Image, torch.Tensor],
|
|
condition_image: Union[PIL.Image.Image, torch.Tensor],
|
|
num_inference_steps: int = 50,
|
|
guidance_scale: float = 2.5,
|
|
height: int = 1024,
|
|
width: int = 768,
|
|
generator=None,
|
|
eta=1.0,
|
|
**kwargs
|
|
):
|
|
concat_dim = -1
|
|
|
|
image, condition_image = self.check_inputs(image, condition_image, width, height)
|
|
image = prepare_image(image).to(self.device, dtype=self.weight_dtype)
|
|
condition_image = prepare_image(condition_image).to(self.device, dtype=self.weight_dtype)
|
|
|
|
image_latent = compute_vae_encodings(image, self.vae)
|
|
condition_latent = compute_vae_encodings(condition_image, self.vae)
|
|
del image, condition_image
|
|
|
|
condition_latent_concat = torch.cat([image_latent, condition_latent], dim=concat_dim)
|
|
|
|
latents = randn_tensor(
|
|
condition_latent_concat.shape,
|
|
generator=generator,
|
|
device=condition_latent_concat.device,
|
|
dtype=self.weight_dtype,
|
|
)
|
|
|
|
self.noise_scheduler.set_timesteps(num_inference_steps, device=self.device)
|
|
timesteps = self.noise_scheduler.timesteps
|
|
latents = latents * self.noise_scheduler.init_noise_sigma
|
|
|
|
if do_classifier_free_guidance := (guidance_scale > 1.0):
|
|
condition_latent_concat = torch.cat(
|
|
[
|
|
torch.cat([image_latent, torch.zeros_like(condition_latent)], dim=concat_dim),
|
|
condition_latent_concat,
|
|
]
|
|
)
|
|
|
|
|
|
extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)
|
|
num_warmup_steps = (len(timesteps) - num_inference_steps * self.noise_scheduler.order)
|
|
with tqdm.tqdm(total=num_inference_steps) as progress_bar:
|
|
for i, t in enumerate(timesteps):
|
|
|
|
latent_model_input = (torch.cat([latents] * 2) if do_classifier_free_guidance else latents)
|
|
latent_model_input = self.noise_scheduler.scale_model_input(latent_model_input, t)
|
|
|
|
p2p_latent_model_input = torch.cat([latent_model_input, condition_latent_concat], dim=1)
|
|
|
|
noise_pred= self.unet(
|
|
p2p_latent_model_input,
|
|
t.to(self.device),
|
|
encoder_hidden_states=None,
|
|
return_dict=False,
|
|
)[0]
|
|
|
|
if do_classifier_free_guidance:
|
|
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
|
|
noise_pred = noise_pred_uncond + guidance_scale * (
|
|
noise_pred_text - noise_pred_uncond
|
|
)
|
|
|
|
latents = self.noise_scheduler.step(
|
|
noise_pred, t, latents, **extra_step_kwargs
|
|
).prev_sample
|
|
|
|
if i == len(timesteps) - 1 or (
|
|
(i + 1) > num_warmup_steps
|
|
and (i + 1) % self.noise_scheduler.order == 0
|
|
):
|
|
progress_bar.update()
|
|
|
|
|
|
latents = latents.split(latents.shape[concat_dim] // 2, dim=concat_dim)[0]
|
|
latents = 1 / self.vae.config.scaling_factor * latents
|
|
image = self.vae.decode(latents.to(self.device, dtype=self.weight_dtype)).sample
|
|
image = (image / 2 + 0.5).clamp(0, 1)
|
|
|
|
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
|
|
image = numpy_to_pil(image)
|
|
|
|
|
|
if not self.skip_safety_check:
|
|
current_script_directory = os.path.dirname(os.path.realpath(__file__))
|
|
nsfw_image = os.path.join(os.path.dirname(current_script_directory), 'resource', 'img', 'NSFW.jpg')
|
|
nsfw_image = PIL.Image.open(nsfw_image).resize(image[0].size)
|
|
image_np = np.array(image)
|
|
_, has_nsfw_concept = self.run_safety_checker(image=image_np)
|
|
for i, not_safe in enumerate(has_nsfw_concept):
|
|
if not_safe:
|
|
image[i] = nsfw_image
|
|
return image
|
|
|