|
from pathlib import Path
|
|
from diffusers import AutoPipelineForText2Image
|
|
from transformers import CLIPVisionModelWithProjection
|
|
from diffusers.utils import load_image
|
|
from diffusers import LCMScheduler
|
|
|
|
|
|
stable_diffusion_id = "circulus/canvers-disney-v3.9.1"
|
|
ip_adapter_id = "h94/IP-Adapter"
|
|
ip_adapter_weight_name = "ip-adapter-full-face_sd15.bin"
|
|
lcm_lora_id = "latent-consistency/lcm-lora-sdv1-5"
|
|
models_dir = Path("on-canvers-disney-v3.9.1-ov-face")
|
|
int8_model_path = Path("on-canvers-disney-v3.9.1-ov-face-int8")
|
|
from optimum.intel import OVConfig, OVQuantizer, OVStableDiffusionPipeline, OVWeightQuantizationConfig
|
|
from optimum.intel.openvino.configuration import OVQuantizationMethod
|
|
|
|
load_original_pipeline = not all(
|
|
[
|
|
(models_dir / model_name).exists()
|
|
for model_name in [
|
|
"text_encoder.xml",
|
|
"image_encoder.xml",
|
|
"unet.xml",
|
|
"vae_decoder.xml",
|
|
"vae_encoder.xml",
|
|
]
|
|
]
|
|
)
|
|
|
|
|
|
def get_pipeline_components(
|
|
stable_diffusion_id,
|
|
ip_adapter_id,
|
|
ip_adapter_weight_name,
|
|
lcm_lora_id,
|
|
ip_adapter_scale=0.65,
|
|
):
|
|
image_encoder = CLIPVisionModelWithProjection.from_pretrained("h94/IP-Adapter", subfolder="models/image_encoder")
|
|
print(image_encoder)
|
|
pipeline = AutoPipelineForText2Image.from_pretrained(stable_diffusion_id, image_encoder=image_encoder)
|
|
pipeline.load_lora_weights(lcm_lora_id)
|
|
pipeline.fuse_lora()
|
|
pipeline.load_ip_adapter(ip_adapter_id, subfolder="models", weight_name=ip_adapter_weight_name)
|
|
pipeline.set_ip_adapter_scale(ip_adapter_scale)
|
|
scheduler = LCMScheduler.from_pretrained(stable_diffusion_id, subfolder="scheduler")
|
|
return (
|
|
pipeline.tokenizer,
|
|
pipeline.feature_extractor,
|
|
scheduler,
|
|
pipeline.text_encoder,
|
|
pipeline.image_encoder,
|
|
pipeline.unet,
|
|
pipeline.vae,
|
|
)
|
|
|
|
|
|
if load_original_pipeline:
|
|
(
|
|
tokenizer,
|
|
feature_extractor,
|
|
scheduler,
|
|
text_encoder,
|
|
image_encoder,
|
|
unet,
|
|
vae,
|
|
) = get_pipeline_components(stable_diffusion_id, ip_adapter_id, ip_adapter_weight_name, lcm_lora_id)
|
|
scheduler.save_pretrained(models_dir / "scheduler")
|
|
else:
|
|
tokenizer, feature_extractor, scheduler, text_encoder, image_encoder, unet, vae = (
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
)
|
|
|
|
import openvino as ov
|
|
import torch
|
|
import gc
|
|
|
|
|
|
def cleanup_torchscript_cache():
|
|
"""
|
|
Helper for removing cached model representation
|
|
"""
|
|
torch._C._jit_clear_class_registry()
|
|
torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore()
|
|
torch.jit._state._clear_class_state()
|
|
|
|
IMAGE_ENCODER_PATH = models_dir / "image_encoder.xml"
|
|
UNET_PATH = models_dir / "unet.xml"
|
|
VAE_DECODER_PATH = models_dir / "vae_decoder.xml"
|
|
VAE_ENCODER_PATH = models_dir / "vae_encoder.xml"
|
|
TEXT_ENCODER_PATH = models_dir / "text_encoder.xml"
|
|
|
|
if not IMAGE_ENCODER_PATH.exists():
|
|
with torch.no_grad():
|
|
ov_model = ov.convert_model(
|
|
image_encoder,
|
|
example_input=torch.zeros((1, 3, 224, 224)),
|
|
input=[-1, 3, 224, 224],
|
|
)
|
|
ov.save_model(ov_model, IMAGE_ENCODER_PATH)
|
|
feature_extractor.save_pretrained(models_dir / "feature_extractor")
|
|
del ov_model
|
|
cleanup_torchscript_cache()
|
|
|
|
|
|
if not UNET_PATH.exists():
|
|
inputs = {
|
|
"sample": torch.randn((2, 4, 64, 64)),
|
|
"timestep": torch.tensor(1),
|
|
"encoder_hidden_states": torch.randn((2, 77, 768)),
|
|
"added_cond_kwargs": {"image_embeds": torch.ones((2, 1280))},
|
|
}
|
|
|
|
print(unet)
|
|
|
|
with torch.no_grad():
|
|
ov_model = ov.convert_model(unet, example_input=inputs)
|
|
|
|
|
|
|
|
ov_model.inputs[-1].get_node().set_element_type(ov.Type.f32)
|
|
ov_model.validate_nodes_and_infer_types()
|
|
ov.save_model(ov_model, UNET_PATH)
|
|
del ov_model
|
|
cleanup_torchscript_cache()
|
|
|
|
if not VAE_DECODER_PATH.exists():
|
|
|
|
class VAEDecoderWrapper(torch.nn.Module):
|
|
def __init__(self, vae):
|
|
super().__init__()
|
|
self.vae = vae
|
|
|
|
def forward(self, latents):
|
|
return self.vae.decode(latents)
|
|
|
|
vae_decoder = VAEDecoderWrapper(vae)
|
|
with torch.no_grad():
|
|
ov_model = ov.convert_model(vae_decoder, example_input=torch.ones([1, 4, 64, 64]))
|
|
ov.save_model(ov_model, VAE_DECODER_PATH)
|
|
del ov_model
|
|
cleanup_torchscript_cache()
|
|
del vae_decoder
|
|
|
|
if not VAE_ENCODER_PATH.exists():
|
|
|
|
class VAEEncoderWrapper(torch.nn.Module):
|
|
def __init__(self, vae):
|
|
super().__init__()
|
|
self.vae = vae
|
|
|
|
def forward(self, image):
|
|
return self.vae.encode(x=image)["latent_dist"].sample()
|
|
|
|
vae_encoder = VAEEncoderWrapper(vae)
|
|
vae_encoder.eval()
|
|
image = torch.zeros((1, 3, 512, 512))
|
|
with torch.no_grad():
|
|
ov_model = ov.convert_model(vae_encoder, example_input=image)
|
|
ov.save_model(ov_model, VAE_ENCODER_PATH)
|
|
del ov_model
|
|
cleanup_torchscript_cache()
|
|
|
|
|
|
if not TEXT_ENCODER_PATH.exists():
|
|
with torch.no_grad():
|
|
ov_model = ov.convert_model(
|
|
text_encoder,
|
|
example_input=torch.ones([1, 77], dtype=torch.long),
|
|
input=[
|
|
(1, 77),
|
|
],
|
|
)
|
|
ov.save_model(ov_model, TEXT_ENCODER_PATH)
|
|
del ov_model
|
|
cleanup_torchscript_cache()
|
|
tokenizer.save_pretrained(models_dir / "tokenizer")
|
|
|
|
|
|
import inspect
|
|
from typing import List, Optional, Union, Dict, Tuple
|
|
import numpy as np
|
|
|
|
from pathlib import Path
|
|
from diffusers import AutoPipelineForText2Image
|
|
from transformers import CLIPVisionModelWithProjection
|
|
from diffusers.utils import load_image
|
|
from diffusers import LCMScheduler
|
|
|
|
import PIL
|
|
import cv2
|
|
import torch
|
|
import openvino as ov
|
|
|
|
from transformers import CLIPTokenizer, CLIPImageProcessor
|
|
from diffusers import DiffusionPipeline
|
|
from diffusers.pipelines.stable_diffusion.pipeline_output import (
|
|
StableDiffusionPipelineOutput,
|
|
)
|
|
from diffusers.schedulers import DDIMScheduler, LMSDiscreteScheduler, PNDMScheduler
|
|
from resampler import Resampler
|
|
|
|
|
|
def scale_fit_to_window(dst_width: int, dst_height: int, image_width: int, image_height: int):
|
|
"""
|
|
Preprocessing helper function for calculating image size for resize with peserving original aspect ratio
|
|
and fitting image to specific window size
|
|
|
|
Parameters:
|
|
dst_width (int): destination window width
|
|
dst_height (int): destination window height
|
|
image_width (int): source image width
|
|
image_height (int): source image height
|
|
Returns:
|
|
result_width (int): calculated width for resize
|
|
result_height (int): calculated height for resize
|
|
"""
|
|
im_scale = min(dst_height / image_height, dst_width / image_width)
|
|
return int(im_scale * image_width), int(im_scale * image_height)
|
|
|
|
|
|
def randn_tensor(
|
|
shape: Union[Tuple, List],
|
|
generator: Optional[Union[List["torch.Generator"], "torch.Generator"]] = None,
|
|
dtype: Optional["torch.dtype"] = None,
|
|
):
|
|
"""A helper function to create random tensors on the desired `device` with the desired `dtype`. When
|
|
passing a list of generators, you can seed each batch size individually.
|
|
|
|
"""
|
|
batch_size = shape[0]
|
|
rand_device = torch.device("cpu")
|
|
|
|
|
|
if isinstance(generator, list) and len(generator) == 1:
|
|
generator = generator[0]
|
|
|
|
if isinstance(generator, list):
|
|
shape = (1,) + shape[1:]
|
|
latents = [torch.randn(shape, generator=generator[i], device=rand_device, dtype=dtype) for i in range(batch_size)]
|
|
latents = torch.cat(latents, dim=0)
|
|
else:
|
|
latents = torch.randn(shape, generator=generator, device=rand_device, dtype=dtype)
|
|
|
|
return latents
|
|
|
|
|
|
def preprocess(image: PIL.Image.Image, height, width):
|
|
"""
|
|
Image preprocessing function. Takes image in PIL.Image format, resizes it to keep aspect ration and fits to model input window 512x512,
|
|
then converts it to np.ndarray and adds padding with zeros on right or bottom side of image (depends from aspect ratio), after that
|
|
converts data to float32 data type and change range of values from [0, 255] to [-1, 1], finally, converts data layout from planar NHWC to NCHW.
|
|
The function returns preprocessed input tensor and padding size, which can be used in postprocessing.
|
|
|
|
Parameters:
|
|
image (PIL.Image.Image): input image
|
|
Returns:
|
|
image (np.ndarray): preprocessed image tensor
|
|
meta (Dict): dictionary with preprocessing metadata info
|
|
"""
|
|
src_width, src_height = image.size
|
|
dst_width, dst_height = scale_fit_to_window(height, width, src_width, src_height)
|
|
image = np.array(image.resize((dst_width, dst_height), resample=PIL.Image.Resampling.LANCZOS))[None, :]
|
|
print(image.shape)
|
|
pad_width = width - dst_width
|
|
pad_height = height - dst_height
|
|
pad = ((0, 0), (0, pad_height), (0, pad_width), (0, 0))
|
|
image = np.pad(image, pad, mode="constant")
|
|
image = image.astype(np.float32) / 255.0
|
|
|
|
image = 2.0 * image - 1.0
|
|
image = image.transpose(0, 3, 1, 2)
|
|
print(image.shape)
|
|
return image, {"padding": pad, "src_width": src_width, "src_height": src_height}
|
|
|
|
|
|
class OVStableDiffusionPipeline(DiffusionPipeline):
|
|
def __init__(
|
|
self,
|
|
vae_decoder: ov.Model,
|
|
text_encoder: ov.Model,
|
|
tokenizer: CLIPTokenizer,
|
|
unet: ov.Model,
|
|
scheduler: Union[DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler],
|
|
image_encoder: ov.Model,
|
|
feature_extractor: CLIPImageProcessor,
|
|
vae_encoder: ov.Model,
|
|
):
|
|
"""
|
|
Pipeline for text-to-image generation using Stable Diffusion and IP-Adapter with OpenVINO
|
|
Parameters:
|
|
vae_decoder (ov.Model):
|
|
Variational Auto-Encoder (VAE) Model to decode images to and from latent representations.
|
|
text_encoder (ov.Model):CLIPImageProcessor
|
|
Frozen text-encoder. Stable Diffusion uses the text portion of
|
|
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically
|
|
the clip-vit-large-patch14(https://huggingface.co/openai/clip-vit-large-patch14) variant.
|
|
tokenizer (CLIPTokenizer):
|
|
Tokenizer of class CLIPTokenizer(https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
|
|
unet (ov.Model): Conditional U-Net architecture to denoise the encoded image latents.
|
|
scheduler (SchedulerMixin):
|
|
A scheduler to be used in combination with unet to denoise the encoded image latents
|
|
image_encoder (ov.Model):
|
|
IP-Adapter image encoder for embedding input image as input prompt for generation
|
|
feature_extractor :
|
|
"""
|
|
super().__init__()
|
|
self.scheduler = scheduler
|
|
self.vae_decoder = vae_decoder
|
|
self.image_encoder = image_encoder
|
|
self.text_encoder = text_encoder
|
|
self.unet = unet
|
|
self.height = 512
|
|
self.width = 512
|
|
self.vae_scale_factor = 8
|
|
self.tokenizer = tokenizer
|
|
self.vae_encoder = vae_encoder
|
|
self.feature_extractor = feature_extractor
|
|
self.register_to_config(unet=unet)
|
|
|
|
def __call__(
|
|
self,
|
|
prompt: Union[str, List[str]],
|
|
ip_adapter_image: PIL.Image.Image,
|
|
image: PIL.Image.Image = None,
|
|
num_inference_steps: Optional[int] = 4,
|
|
negative_prompt: Union[str, List[str]] = None,
|
|
guidance_scale: Optional[float] = 0.5,
|
|
eta: Optional[float] = 0.0,
|
|
output_type: Optional[str] = "pil",
|
|
height: Optional[int] = None,
|
|
width: Optional[int] = None,
|
|
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
|
|
latents: Optional[torch.FloatTensor] = None,
|
|
strength: float = 1.0,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
Function invoked when calling the pipeline for generation.
|
|
Parameters:
|
|
prompt (str or List[str]):
|
|
The prompt or prompts to guide the image generation.
|
|
image (PIL.Image.Image, *optional*, None):
|
|
Intinal image for generation.
|
|
num_inference_steps (int, *optional*, defaults to 50):
|
|
The number of denoising steps. More denoising steps usually lead to a higher quality image at the
|
|
expense of slower inference.
|
|
negative_prompt (str or List[str]):https://user-images.githubusercontent.com/29454499/258651862-28b63016-c5ff-4263-9da8-73ca31100165.jpeg
|
|
The negative prompt or prompts to guide the image generation.
|
|
guidance_scale (float, *optional*, defaults to 7.5):
|
|
Guidance scale as defined in Classifier-Free Diffusion Guidance(https://arxiv.org/abs/2207.12598).
|
|
guidance_scale is defined as `w` of equation 2.
|
|
Higher guidance scale encourages to generate images that are closely linked to the text prompt,
|
|
usually at the expense of lower image quality.
|
|
eta (float, *optional*, defaults to 0.0):
|
|
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to
|
|
[DDIMScheduler], will be ignored for others.
|
|
output_type (`str`, *optional*, defaults to "pil"):
|
|
The output format of the generate image. Choose between
|
|
[PIL](https://pillow.readthedocs.io/en/stable/): PIL.Image.Image or np.array.
|
|
height (int, *optional*, 512):
|
|
Generated image height
|
|
width (int, *optional*, 512):
|
|
Generated image width
|
|
generator (`torch.Generator` or `List[torch.Generator]`, *optional*):
|
|
A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make
|
|
generation deterministic.
|
|
latents (`torch.FloatTensor`, *optional*):
|
|
Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for image
|
|
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents
|
|
tensor is generated by sampling using the supplied random `generator`.
|
|
Returns:
|
|
Dictionary with keys:
|
|
sample - the last generated image PIL.Image.Image or np.arrayhttps://huggingface.co/latent-consistency/lcm-lora-sdv1-5
|
|
iterations - *optional* (if gif=True) images for all diffusion steps, List of PIL.Image.Image or np.array.
|
|
"""
|
|
do_classifier_free_guidance = guidance_scale > 1.0
|
|
|
|
text_embeddings = self._encode_prompt(
|
|
prompt,
|
|
do_classifier_free_guidance=do_classifier_free_guidance,
|
|
negative_prompt=negative_prompt,
|
|
)
|
|
|
|
image_embeds, negative_image_embeds = self.encode_image(ip_adapter_image)
|
|
if do_classifier_free_guidance:
|
|
image_embeds = np.concatenate([negative_image_embeds, image_embeds])
|
|
|
|
|
|
accepts_offset = "offset" in set(inspect.signature(self.scheduler.set_timesteps).parameters.keys())
|
|
extra_set_kwargs = {}
|
|
if accepts_offset:
|
|
extra_set_kwargs["offset"] = 1
|
|
|
|
self.scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs)
|
|
|
|
timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength)
|
|
latent_timestep = timesteps[:1]
|
|
|
|
print(num_inference_steps,timesteps)
|
|
|
|
|
|
latents, meta = self.prepare_latents(
|
|
1,
|
|
4,
|
|
height or self.height,
|
|
width or self.width,
|
|
generator=generator,
|
|
latents=latents,
|
|
image=image,
|
|
latent_timestep=latent_timestep,
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
|
|
extra_step_kwargs = {}
|
|
if accepts_eta:
|
|
extra_step_kwargs["eta"] = eta
|
|
|
|
for i, t in enumerate(self.progress_bar(timesteps)):
|
|
|
|
latent_model_input = np.concatenate([latents] * 2) if do_classifier_free_guidance else latents
|
|
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)
|
|
|
|
|
|
|
|
noise_pred = self.unet([latent_model_input, t, text_embeddings, image_embeds])[0]
|
|
|
|
if do_classifier_free_guidance:
|
|
noise_pred_uncond, noise_pred_text = noise_pred[0], noise_pred[1]
|
|
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
|
|
|
|
|
|
latents = self.scheduler.step(
|
|
torch.from_numpy(noise_pred),
|
|
t,
|
|
torch.from_numpy(latents),
|
|
**extra_step_kwargs,
|
|
)["prev_sample"].numpy()
|
|
|
|
|
|
image = self.vae_decoder(latents * (1 / 0.18215))[0]
|
|
|
|
image = self.postprocess_image(image, meta, output_type)
|
|
return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=False)
|
|
|
|
def _encode_prompt(
|
|
self,
|
|
prompt: Union[str, List[str]],
|
|
num_images_per_prompt: int = 1,
|
|
do_classifier_free_guidance: bool = True,
|
|
negative_prompt: Union[str, List[str]] = None,
|
|
):
|
|
"""
|
|
Encodes the prompt into text encoder hidden states.
|
|
|
|
Parameters:
|
|
prompt (str or list(str)): prompt to be encoded
|
|
num_images_per_prompt (int): number of images that should be generated per prompt
|
|
do_classifier_free_guidance (bool): whether to use classifier free guidance or not
|
|
negative_prompt (str or list(str)): negative prompt to be encoded.
|
|
Returns:
|
|
text_embeddings (np.ndarray): text encoder hidden states
|
|
"""
|
|
batch_size = len(prompt) if isinstance(prompt, list) else 1
|
|
|
|
|
|
text_inputs = self.tokenizer(
|
|
prompt,
|
|
padding="max_length",
|
|
max_length=self.tokenizer.model_max_length,
|
|
truncation=True,
|
|
return_tensors="np",
|
|
)
|
|
text_input_ids = text_inputs.input_ids
|
|
|
|
text_embeddings = self.text_encoder(text_input_ids)[0]
|
|
|
|
|
|
if num_images_per_prompt != 1:
|
|
bs_embed, seq_len, _ = text_embeddings.shape
|
|
text_embeddings = np.tile(text_embeddings, (1, num_images_per_prompt, 1))
|
|
text_embeddings = np.reshape(text_embeddings, (bs_embed * num_images_per_prompt, seq_len, -1))
|
|
|
|
|
|
if do_classifier_free_guidance:
|
|
uncond_tokens: List[str]
|
|
max_length = text_input_ids.shape[-1]
|
|
if negative_prompt is None:
|
|
uncond_tokens = [""] * batch_size
|
|
elif isinstance(negative_prompt, str):
|
|
uncond_tokens = [negative_prompt]
|
|
else:
|
|
uncond_tokens = negative_prompt
|
|
uncond_input = self.tokenizer(
|
|
uncond_tokens,
|
|
padding="max_length",
|
|
max_length=max_length,
|
|
truncation=True,
|
|
return_tensors="np",
|
|
)
|
|
|
|
uncond_embeddings = self.text_encoder(uncond_input.input_ids)[0]
|
|
|
|
|
|
seq_len = uncond_embeddings.shape[1]
|
|
uncond_embeddings = np.tile(uncond_embeddings, (1, num_images_per_prompt, 1))
|
|
uncond_embeddings = np.reshape(uncond_embeddings, (batch_size * num_images_per_prompt, seq_len, -1))
|
|
|
|
|
|
|
|
|
|
text_embeddings = np.concatenate([uncond_embeddings, text_embeddings])
|
|
|
|
return text_embeddings
|
|
|
|
def prepare_latents(
|
|
self,
|
|
batch_size,
|
|
num_channels_latents,
|
|
height,
|
|
width,
|
|
dtype=torch.float16,
|
|
generator=None,
|
|
latents=None,
|
|
image=None,
|
|
latent_timestep=None,
|
|
):
|
|
shape = (
|
|
batch_size,
|
|
num_channels_latents,
|
|
height // self.vae_scale_factor,
|
|
width // self.vae_scale_factor,
|
|
)
|
|
if isinstance(generator, list) and len(generator) != batch_size:
|
|
raise ValueError(
|
|
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
|
|
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
|
|
)
|
|
|
|
if latents is None:
|
|
latents = randn_tensor(shape, generator=generator, dtype=dtype)
|
|
|
|
if image is None:
|
|
|
|
latents = latents * self.scheduler.init_noise_sigma
|
|
return latents.numpy(), {}
|
|
input_image, meta = preprocess(image, height, width)
|
|
print(input_image.shape)
|
|
image_latents = self.vae_encoder(input_image)[0]
|
|
image_latents = image_latents * 0.18215
|
|
latents = self.scheduler.add_noise(torch.from_numpy(image_latents), latents, latent_timestep).numpy()
|
|
return latents, meta
|
|
|
|
def postprocess_image(self, image: np.ndarray, meta: Dict, output_type: str = "pil"):
|
|
"""
|
|
Postprocessing for decoded image. Takes generated image decoded by VAE decoder, unpad it to initial image size (if required),
|
|
normalize and convert to [0, 255] pixels range. Optionally, converts it from np.ndarray to PIL.Image format
|
|
|
|
Parameters:
|
|
image (np.ndarray):
|
|
Generated image
|
|
meta (Dict):
|
|
Metadata obtained on the latents preparing step can be empty
|
|
output_type (str, *optional*, pil):
|
|
Output format for result, can be pil or numpy
|
|
Returns:
|
|
image (List of np.ndarray or PIL.Image.Image):
|
|
Post-processed images
|
|
"""
|
|
if "padding" in meta:
|
|
pad = meta["padding"]
|
|
(_, end_h), (_, end_w) = pad[1:3]
|
|
h, w = image.shape[2:]
|
|
unpad_h = h - end_h
|
|
unpad_w = w - end_w
|
|
image = image[:, :, :unpad_h, :unpad_w]
|
|
image = np.clip(image / 2 + 0.5, 0, 1)
|
|
image = np.transpose(image, (0, 2, 3, 1))
|
|
|
|
|
|
|
|
if output_type == "pil":
|
|
image = self.numpy_to_pil(image)
|
|
if "src_height" in meta:
|
|
orig_height, orig_width = meta["src_height"], meta["src_width"]
|
|
image = [img.resize((orig_width, orig_height), PIL.Image.Resampling.LANCZOS) for img in image]
|
|
else:
|
|
if "src_height" in meta:
|
|
orig_height, orig_width = meta["src_height"], meta["src_width"]
|
|
image = [cv2.resize(img, (orig_width, orig_width)) for img in image]
|
|
|
|
|
|
return image
|
|
|
|
def encode_image(self, image, num_images_per_prompt=1):
|
|
if not isinstance(image, torch.Tensor):
|
|
image = self.feature_extractor(image, return_tensors="pt").pixel_values
|
|
|
|
image_embeds = self.image_encoder(image)[0]
|
|
"""
|
|
print(1,image_embeds)
|
|
image_proj_model = Resampler(
|
|
dim=1024,
|
|
depth=2,
|
|
dim_head=64,
|
|
heads=16,
|
|
num_queries=8,
|
|
embedding_dim=1280,
|
|
output_dim=1280,
|
|
ff_mult=2,
|
|
max_seq_len=257,
|
|
apply_pos_emb=True,
|
|
num_latents_mean_pooled=4,
|
|
)
|
|
|
|
image_embeds = image_proj_model(image_embeds)
|
|
print(2,image_embeds)
|
|
"""
|
|
|
|
if num_images_per_prompt > 1:
|
|
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
|
|
|
|
uncond_image_embeds = np.zeros(image_embeds.shape)
|
|
return image_embeds, uncond_image_embeds
|
|
|
|
def get_timesteps(self, num_inference_steps: int, strength: float):
|
|
"""
|
|
Helper function for getting scheduler timesteps for generation
|
|
In case of image-to-image generation, it updates number of steps according to strength
|
|
|
|
Parameters:
|
|
num_inference_steps (int):
|
|
number of inference steps for generation
|
|
strength (float):
|
|
value between 0.0 and 1.0, that controls the amount of noise that is added to the input image.
|
|
Values that approach 1.0 allow for lots of variations but will also produce images that are not semantically consistent with the input.
|
|
"""
|
|
|
|
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
|
|
|
|
t_start = max(num_inference_steps - init_timestep, 0)
|
|
timesteps = self.scheduler.timesteps[t_start:]
|
|
|
|
return timesteps, num_inference_steps - t_start
|
|
|
|
|
|
core = ov.Core()
|
|
device = "GPU"
|
|
|
|
|
|
|
|
from transformers import AutoTokenizer
|
|
from PIL import Image
|
|
|
|
ov_config = {"INFERENCE_PRECISION_HINT": "f16"}
|
|
vae_decoder = core.compile_model(VAE_DECODER_PATH, device, ov_config)
|
|
vae_encoder = core.compile_model(VAE_ENCODER_PATH, device, ov_config)
|
|
text_encoder = core.compile_model(TEXT_ENCODER_PATH, device )
|
|
image_encoder = core.compile_model(IMAGE_ENCODER_PATH, device)
|
|
unet = core.compile_model(UNET_PATH, device)
|
|
|
|
scheduler = LCMScheduler.from_pretrained(models_dir / "scheduler")
|
|
tokenizer = AutoTokenizer.from_pretrained(models_dir / "tokenizer")
|
|
feature_extractor = CLIPImageProcessor.from_pretrained(models_dir / "feature_extractor")
|
|
|
|
ov_pipe = OVStableDiffusionPipeline(
|
|
vae_decoder,
|
|
text_encoder,
|
|
tokenizer,
|
|
unet,
|
|
scheduler,
|
|
image_encoder,
|
|
feature_extractor,
|
|
vae_encoder,
|
|
|
|
)
|
|
|
|
"""
|
|
import datasets
|
|
DATASET_NAME = "jxie/coco_captions"
|
|
|
|
dataset = datasets.load_dataset("jxie/coco_captions", split="train", streaming=True).shuffle(seed=42)
|
|
def preprocess_fn(example):
|
|
return {"prompt": example["caption"]}
|
|
|
|
NUM_SAMPLES = 200
|
|
dataset = dataset.take(NUM_SAMPLES)
|
|
calibration_dataset = dataset.map(lambda x: preprocess_fn(x), remove_columns=dataset.column_names)
|
|
|
|
|
|
int8_pipe = None
|
|
|
|
import nncf
|
|
import datasets
|
|
from tqdm import tqdm
|
|
from transformers import set_seed
|
|
from typing import Any, Dict, List
|
|
|
|
set_seed(1)
|
|
|
|
class CompiledModelDecorator(ov.CompiledModel):
|
|
def __init__(self, compiled_model, prob: float, data_cache: List[Any] = None):
|
|
super().__init__(compiled_model)
|
|
self.data_cache = data_cache if data_cache else []
|
|
self.prob = np.clip(prob, 0, 1)
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
if np.random.rand() >= self.prob:
|
|
self.data_cache.append(*args)
|
|
return super().__call__(*args, **kwargs)
|
|
|
|
from diffusers.utils import load_image
|
|
|
|
def collect_calibration_data(pipeline: OVStableDiffusionPipeline, subset_size: int) -> List[Dict]:
|
|
original_unet = pipeline.unet
|
|
pipeline.unet = CompiledModelDecorator(original_unet, prob=0.3)
|
|
#google-research-datasets/conceptual_captions
|
|
dataset = datasets.load_dataset("jxie/coco_captions", split="train", streaming=True).shuffle(seed=42)
|
|
pipeline.set_progress_bar_config(disable=True)
|
|
#safety_checker = pipeline.safety_checker
|
|
#pipeline.safety_checker = None
|
|
|
|
# Run inference for data collection
|
|
pbar = tqdm(total=subset_size)
|
|
diff = 0
|
|
for batch in dataset:
|
|
prompt = batch["caption"]
|
|
image = load_image(batch["image"])
|
|
if len(prompt) > tokenizer.model_max_length:
|
|
continue
|
|
_ = pipeline(
|
|
prompt,
|
|
ip_adapter_image = image,
|
|
num_inference_steps=4,
|
|
guidance_scale=1,
|
|
#guidance_scale=8.0,
|
|
#lcm_origin_steps=50,
|
|
output_type="pil",
|
|
height=512,
|
|
width=512,
|
|
)
|
|
collected_subset_size = len(pipeline.unet.data_cache)
|
|
if collected_subset_size >= subset_size:
|
|
pbar.update(subset_size - pbar.n)
|
|
break
|
|
pbar.update(collected_subset_size - diff)
|
|
diff = collected_subset_size
|
|
|
|
calibration_dataset = pipeline.unet.data_cache
|
|
pipeline.set_progress_bar_config(disable=False)
|
|
pipeline.unet = original_unet
|
|
#pipeline.safety_checker = safety_checker
|
|
return calibration_dataset
|
|
|
|
|
|
UNET_INT8_PATH = models_dir / "unet_int8.xml"
|
|
|
|
if not UNET_INT8_PATH.exists():
|
|
subset_size = 200
|
|
unet_calibration_data = collect_calibration_data(ov_pipe, subset_size=subset_size)
|
|
|
|
|
|
import nncf
|
|
from nncf.scopes import IgnoredScope
|
|
|
|
if UNET_INT8_PATH.exists():
|
|
print("Loading quantized model")
|
|
quantized_unet = core.read_model(UNET_INT8_PATH)
|
|
else:
|
|
unet = core.read_model(UNET_PATH)
|
|
quantized_unet = nncf.quantize(
|
|
model=unet,
|
|
subset_size=subset_size,
|
|
calibration_dataset=nncf.Dataset(unet_calibration_data),
|
|
model_type=nncf.ModelType.TRANSFORMER,
|
|
advanced_parameters=nncf.AdvancedQuantizationParameters(
|
|
disable_bias_correction=True
|
|
)
|
|
)
|
|
ov.save_model(quantized_unet, UNET_INT8_PATH)
|
|
""" |