|
import inspect
|
|
from typing import List, Optional, Union, Dict, Tuple
|
|
import numpy as np
|
|
|
|
from pathlib import Path
|
|
from diffusers import AutoPipelineForText2Image
|
|
from transformers import CLIPVisionModelWithProjection
|
|
from diffusers.utils import load_image
|
|
from diffusers import LCMScheduler
|
|
|
|
import PIL
|
|
import cv2
|
|
import torch
|
|
import openvino as ov
|
|
|
|
from transformers import CLIPTokenizer, CLIPImageProcessor
|
|
from diffusers import DiffusionPipeline
|
|
from diffusers.pipelines.stable_diffusion.pipeline_output import (
|
|
StableDiffusionPipelineOutput,
|
|
)
|
|
from diffusers.schedulers import DDIMScheduler, LMSDiscreteScheduler, PNDMScheduler
|
|
from resampler import Resampler
|
|
|
|
|
|
def scale_fit_to_window(dst_width: int, dst_height: int, image_width: int, image_height: int):
|
|
"""
|
|
Preprocessing helper function for calculating image size for resize with peserving original aspect ratio
|
|
and fitting image to specific window size
|
|
|
|
Parameters:
|
|
dst_width (int): destination window width
|
|
dst_height (int): destination window height
|
|
image_width (int): source image width
|
|
image_height (int): source image height
|
|
Returns:
|
|
result_width (int): calculated width for resize
|
|
result_height (int): calculated height for resize
|
|
"""
|
|
im_scale = min(dst_height / image_height, dst_width / image_width)
|
|
return int(im_scale * image_width), int(im_scale * image_height)
|
|
|
|
|
|
def randn_tensor(
|
|
shape: Union[Tuple, List],
|
|
generator: Optional[Union[List["torch.Generator"], "torch.Generator"]] = None,
|
|
dtype: Optional["torch.dtype"] = None,
|
|
):
|
|
"""A helper function to create random tensors on the desired `device` with the desired `dtype`. When
|
|
passing a list of generators, you can seed each batch size individually.
|
|
|
|
"""
|
|
batch_size = shape[0]
|
|
rand_device = torch.device("cpu")
|
|
|
|
|
|
if isinstance(generator, list) and len(generator) == 1:
|
|
generator = generator[0]
|
|
|
|
if isinstance(generator, list):
|
|
shape = (1,) + shape[1:]
|
|
latents = [torch.randn(shape, generator=generator[i], device=rand_device, dtype=dtype) for i in range(batch_size)]
|
|
latents = torch.cat(latents, dim=0)
|
|
else:
|
|
latents = torch.randn(shape, generator=generator, device=rand_device, dtype=dtype)
|
|
|
|
return latents
|
|
|
|
|
|
def preprocess(image: PIL.Image.Image, height, width):
|
|
"""
|
|
Image preprocessing function. Takes image in PIL.Image format, resizes it to keep aspect ration and fits to model input window 512x512,
|
|
then converts it to np.ndarray and adds padding with zeros on right or bottom side of image (depends from aspect ratio), after that
|
|
converts data to float32 data type and change range of values from [0, 255] to [-1, 1], finally, converts data layout from planar NHWC to NCHW.
|
|
The function returns preprocessed input tensor and padding size, which can be used in postprocessing.
|
|
|
|
Parameters:
|
|
image (PIL.Image.Image): input image
|
|
Returns:
|
|
image (np.ndarray): preprocessed image tensor
|
|
meta (Dict): dictionary with preprocessing metadata info
|
|
"""
|
|
src_width, src_height = image.size
|
|
dst_width, dst_height = scale_fit_to_window(height, width, src_width, src_height)
|
|
image = np.array(image.resize((dst_width, dst_height), resample=PIL.Image.Resampling.LANCZOS))[None, :]
|
|
print(image.shape)
|
|
pad_width = width - dst_width
|
|
pad_height = height - dst_height
|
|
pad = ((0, 0), (0, pad_height), (0, pad_width), (0, 0))
|
|
image = np.pad(image, pad, mode="constant")
|
|
image = image.astype(np.float32) / 255.0
|
|
|
|
image = 2.0 * image - 1.0
|
|
image = image.transpose(0, 3, 1, 2)
|
|
print(image.shape)
|
|
return image, {"padding": pad, "src_width": src_width, "src_height": src_height}
|
|
|
|
|
|
class OVStableDiffusionPipeline(DiffusionPipeline):
|
|
def __init__(
|
|
self,
|
|
vae_decoder: ov.Model,
|
|
text_encoder: ov.Model,
|
|
tokenizer: CLIPTokenizer,
|
|
unet: ov.Model,
|
|
scheduler: Union[DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler],
|
|
image_encoder: ov.Model,
|
|
feature_extractor: CLIPImageProcessor,
|
|
vae_encoder: ov.Model,
|
|
):
|
|
"""
|
|
Pipeline for text-to-image generation using Stable Diffusion and IP-Adapter with OpenVINO
|
|
Parameters:
|
|
vae_decoder (ov.Model):
|
|
Variational Auto-Encoder (VAE) Model to decode images to and from latent representations.
|
|
text_encoder (ov.Model):CLIPImageProcessor
|
|
Frozen text-encoder. Stable Diffusion uses the text portion of
|
|
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically
|
|
the clip-vit-large-patch14(https://huggingface.co/openai/clip-vit-large-patch14) variant.
|
|
tokenizer (CLIPTokenizer):
|
|
Tokenizer of class CLIPTokenizer(https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
|
|
unet (ov.Model): Conditional U-Net architecture to denoise the encoded image latents.
|
|
scheduler (SchedulerMixin):
|
|
A scheduler to be used in combination with unet to denoise the encoded image latents
|
|
image_encoder (ov.Model):
|
|
IP-Adapter image encoder for embedding input image as input prompt for generation
|
|
feature_extractor :
|
|
"""
|
|
super().__init__()
|
|
self.scheduler = scheduler
|
|
self.vae_decoder = vae_decoder
|
|
self.image_encoder = image_encoder
|
|
self.text_encoder = text_encoder
|
|
self.unet = unet
|
|
self.height = 512
|
|
self.width = 512
|
|
self.vae_scale_factor = 8
|
|
self.tokenizer = tokenizer
|
|
self.vae_encoder = vae_encoder
|
|
self.feature_extractor = feature_extractor
|
|
|
|
def __call__(
|
|
self,
|
|
prompt: Union[str, List[str]],
|
|
ip_adapter_image: PIL.Image.Image,
|
|
image: PIL.Image.Image = None,
|
|
num_inference_steps: Optional[int] = 4,
|
|
negative_prompt: Union[str, List[str]] = None,
|
|
guidance_scale: Optional[float] = 0.5,
|
|
eta: Optional[float] = 0.0,
|
|
output_type: Optional[str] = "pil",
|
|
height: Optional[int] = None,
|
|
width: Optional[int] = None,
|
|
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
|
|
latents: Optional[torch.FloatTensor] = None,
|
|
strength: float = 1.0,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
Function invoked when calling the pipeline for generation.
|
|
Parameters:
|
|
prompt (str or List[str]):
|
|
The prompt or prompts to guide the image generation.
|
|
image (PIL.Image.Image, *optional*, None):
|
|
Intinal image for generation.
|
|
num_inference_steps (int, *optional*, defaults to 50):
|
|
The number of denoising steps. More denoising steps usually lead to a higher quality image at the
|
|
expense of slower inference.
|
|
negative_prompt (str or List[str]):https://user-images.githubusercontent.com/29454499/258651862-28b63016-c5ff-4263-9da8-73ca31100165.jpeg
|
|
The negative prompt or prompts to guide the image generation.
|
|
guidance_scale (float, *optional*, defaults to 7.5):
|
|
Guidance scale as defined in Classifier-Free Diffusion Guidance(https://arxiv.org/abs/2207.12598).
|
|
guidance_scale is defined as `w` of equation 2.
|
|
Higher guidance scale encourages to generate images that are closely linked to the text prompt,
|
|
usually at the expense of lower image quality.
|
|
eta (float, *optional*, defaults to 0.0):
|
|
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to
|
|
[DDIMScheduler], will be ignored for others.
|
|
output_type (`str`, *optional*, defaults to "pil"):
|
|
The output format of the generate image. Choose between
|
|
[PIL](https://pillow.readthedocs.io/en/stable/): PIL.Image.Image or np.array.
|
|
height (int, *optional*, 512):
|
|
Generated image height
|
|
width (int, *optional*, 512):
|
|
Generated image width
|
|
generator (`torch.Generator` or `List[torch.Generator]`, *optional*):
|
|
A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make
|
|
generation deterministic.
|
|
latents (`torch.FloatTensor`, *optional*):
|
|
Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for image
|
|
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents
|
|
tensor is generated by sampling using the supplied random `generator`.
|
|
Returns:
|
|
Dictionary with keys:
|
|
sample - the last generated image PIL.Image.Image or np.arrayhttps://huggingface.co/latent-consistency/lcm-lora-sdv1-5
|
|
iterations - *optional* (if gif=True) images for all diffusion steps, List of PIL.Image.Image or np.array.
|
|
"""
|
|
do_classifier_free_guidance = guidance_scale > 1.0
|
|
|
|
text_embeddings = self._encode_prompt(
|
|
prompt,
|
|
do_classifier_free_guidance=do_classifier_free_guidance,
|
|
negative_prompt=negative_prompt,
|
|
)
|
|
|
|
image_embeds, negative_image_embeds = self.encode_image(ip_adapter_image)
|
|
if do_classifier_free_guidance:
|
|
image_embeds = np.concatenate([negative_image_embeds, image_embeds])
|
|
|
|
|
|
accepts_offset = "offset" in set(inspect.signature(self.scheduler.set_timesteps).parameters.keys())
|
|
extra_set_kwargs = {}
|
|
if accepts_offset:
|
|
extra_set_kwargs["offset"] = 1
|
|
|
|
self.scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs)
|
|
|
|
timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength)
|
|
latent_timestep = timesteps[:1]
|
|
|
|
print(num_inference_steps,timesteps)
|
|
|
|
|
|
latents, meta = self.prepare_latents(
|
|
1,
|
|
4,
|
|
height or self.height,
|
|
width or self.width,
|
|
generator=generator,
|
|
latents=latents,
|
|
image=image,
|
|
latent_timestep=latent_timestep,
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
|
|
extra_step_kwargs = {}
|
|
if accepts_eta:
|
|
extra_step_kwargs["eta"] = eta
|
|
|
|
for i, t in enumerate(self.progress_bar(timesteps)):
|
|
|
|
latent_model_input = np.concatenate([latents] * 2) if do_classifier_free_guidance else latents
|
|
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)
|
|
|
|
|
|
|
|
noise_pred = self.unet([latent_model_input, t, text_embeddings, image_embeds])[0]
|
|
|
|
if do_classifier_free_guidance:
|
|
noise_pred_uncond, noise_pred_text = noise_pred[0], noise_pred[1]
|
|
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
|
|
|
|
|
|
latents = self.scheduler.step(
|
|
torch.from_numpy(noise_pred),
|
|
t,
|
|
torch.from_numpy(latents),
|
|
**extra_step_kwargs,
|
|
)["prev_sample"].numpy()
|
|
|
|
|
|
image = self.vae_decoder(latents * (1 / 0.18215))[0]
|
|
|
|
image = self.postprocess_image(image, meta, output_type)
|
|
return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=False)
|
|
|
|
def _encode_prompt(
|
|
self,
|
|
prompt: Union[str, List[str]],
|
|
num_images_per_prompt: int = 1,
|
|
do_classifier_free_guidance: bool = True,
|
|
negative_prompt: Union[str, List[str]] = None,
|
|
):
|
|
"""
|
|
Encodes the prompt into text encoder hidden states.
|
|
|
|
Parameters:
|
|
prompt (str or list(str)): prompt to be encoded
|
|
num_images_per_prompt (int): number of images that should be generated per prompt
|
|
do_classifier_free_guidance (bool): whether to use classifier free guidance or not
|
|
negative_prompt (str or list(str)): negative prompt to be encoded.
|
|
Returns:
|
|
text_embeddings (np.ndarray): text encoder hidden states
|
|
"""
|
|
batch_size = len(prompt) if isinstance(prompt, list) else 1
|
|
|
|
|
|
text_inputs = self.tokenizer(
|
|
prompt,
|
|
padding="max_length",
|
|
max_length=self.tokenizer.model_max_length,
|
|
truncation=True,
|
|
return_tensors="np",
|
|
)
|
|
text_input_ids = text_inputs.input_ids
|
|
|
|
text_embeddings = self.text_encoder(text_input_ids)[0]
|
|
|
|
|
|
if num_images_per_prompt != 1:
|
|
bs_embed, seq_len, _ = text_embeddings.shape
|
|
text_embeddings = np.tile(text_embeddings, (1, num_images_per_prompt, 1))
|
|
text_embeddings = np.reshape(text_embeddings, (bs_embed * num_images_per_prompt, seq_len, -1))
|
|
|
|
|
|
if do_classifier_free_guidance:
|
|
uncond_tokens: List[str]
|
|
max_length = text_input_ids.shape[-1]
|
|
if negative_prompt is None:
|
|
uncond_tokens = [""] * batch_size
|
|
elif isinstance(negative_prompt, str):
|
|
uncond_tokens = [negative_prompt]
|
|
else:
|
|
uncond_tokens = negative_prompt
|
|
uncond_input = self.tokenizer(
|
|
uncond_tokens,
|
|
padding="max_length",
|
|
max_length=max_length,
|
|
truncation=True,
|
|
return_tensors="np",
|
|
)
|
|
|
|
uncond_embeddings = self.text_encoder(uncond_input.input_ids)[0]
|
|
|
|
|
|
seq_len = uncond_embeddings.shape[1]
|
|
uncond_embeddings = np.tile(uncond_embeddings, (1, num_images_per_prompt, 1))
|
|
uncond_embeddings = np.reshape(uncond_embeddings, (batch_size * num_images_per_prompt, seq_len, -1))
|
|
|
|
|
|
|
|
|
|
text_embeddings = np.concatenate([uncond_embeddings, text_embeddings])
|
|
|
|
return text_embeddings
|
|
|
|
def prepare_latents(
|
|
self,
|
|
batch_size,
|
|
num_channels_latents,
|
|
height,
|
|
width,
|
|
dtype=torch.float16,
|
|
generator=None,
|
|
latents=None,
|
|
image=None,
|
|
latent_timestep=None,
|
|
):
|
|
shape = (
|
|
batch_size,
|
|
num_channels_latents,
|
|
height // self.vae_scale_factor,
|
|
width // self.vae_scale_factor,
|
|
)
|
|
if isinstance(generator, list) and len(generator) != batch_size:
|
|
raise ValueError(
|
|
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
|
|
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
|
|
)
|
|
|
|
if latents is None:
|
|
latents = randn_tensor(shape, generator=generator, dtype=dtype)
|
|
|
|
if image is None:
|
|
|
|
latents = latents * self.scheduler.init_noise_sigma
|
|
return latents.numpy(), {}
|
|
input_image, meta = preprocess(image, height, width)
|
|
print(input_image.shape)
|
|
image_latents = self.vae_encoder(input_image)[0]
|
|
image_latents = image_latents * 0.18215
|
|
latents = self.scheduler.add_noise(torch.from_numpy(image_latents), latents, latent_timestep).numpy()
|
|
return latents, meta
|
|
|
|
def postprocess_image(self, image: np.ndarray, meta: Dict, output_type: str = "pil"):
|
|
"""
|
|
Postprocessing for decoded image. Takes generated image decoded by VAE decoder, unpad it to initial image size (if required),
|
|
normalize and convert to [0, 255] pixels range. Optionally, converts it from np.ndarray to PIL.Image format
|
|
|
|
Parameters:
|
|
image (np.ndarray):
|
|
Generated image
|
|
meta (Dict):
|
|
Metadata obtained on the latents preparing step can be empty
|
|
output_type (str, *optional*, pil):
|
|
Output format for result, can be pil or numpy
|
|
Returns:
|
|
image (List of np.ndarray or PIL.Image.Image):
|
|
Post-processed images
|
|
"""
|
|
if "padding" in meta:
|
|
pad = meta["padding"]
|
|
(_, end_h), (_, end_w) = pad[1:3]
|
|
h, w = image.shape[2:]
|
|
unpad_h = h - end_h
|
|
unpad_w = w - end_w
|
|
image = image[:, :, :unpad_h, :unpad_w]
|
|
image = np.clip(image / 2 + 0.5, 0, 1)
|
|
image = np.transpose(image, (0, 2, 3, 1))
|
|
|
|
|
|
|
|
if output_type == "pil":
|
|
image = self.numpy_to_pil(image)
|
|
if "src_height" in meta:
|
|
orig_height, orig_width = meta["src_height"], meta["src_width"]
|
|
image = [img.resize((orig_width, orig_height), PIL.Image.Resampling.LANCZOS) for img in image]
|
|
else:
|
|
if "src_height" in meta:
|
|
orig_height, orig_width = meta["src_height"], meta["src_width"]
|
|
image = [cv2.resize(img, (orig_width, orig_width)) for img in image]
|
|
|
|
|
|
return image
|
|
|
|
def encode_image(self, image, num_images_per_prompt=1):
|
|
if not isinstance(image, torch.Tensor):
|
|
image = self.feature_extractor(image, return_tensors="pt").pixel_values
|
|
|
|
image_embeds = self.image_encoder(image)[0]
|
|
"""
|
|
print(1,image_embeds)
|
|
image_proj_model = Resampler(
|
|
dim=1024,
|
|
depth=2,
|
|
dim_head=64,
|
|
heads=16,
|
|
num_queries=8,
|
|
embedding_dim=1280,
|
|
output_dim=1280,
|
|
ff_mult=2,
|
|
max_seq_len=257,
|
|
apply_pos_emb=True,
|
|
num_latents_mean_pooled=4,
|
|
)
|
|
|
|
image_embeds = image_proj_model(image_embeds)
|
|
print(2,image_embeds)
|
|
"""
|
|
|
|
if num_images_per_prompt > 1:
|
|
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
|
|
|
|
uncond_image_embeds = np.zeros(image_embeds.shape)
|
|
return image_embeds, uncond_image_embeds
|
|
|
|
def get_timesteps(self, num_inference_steps: int, strength: float):
|
|
"""
|
|
Helper function for getting scheduler timesteps for generation
|
|
In case of image-to-image generation, it updates number of steps according to strength
|
|
|
|
Parameters:
|
|
num_inference_steps (int):
|
|
number of inference steps for generation
|
|
strength (float):
|
|
value between 0.0 and 1.0, that controls the amount of noise that is added to the input image.
|
|
Values that approach 1.0 allow for lots of variations but will also produce images that are not semantically consistent with the input.
|
|
"""
|
|
|
|
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
|
|
|
|
t_start = max(num_inference_steps - init_timestep, 0)
|
|
timesteps = self.scheduler.timesteps[t_start:]
|
|
|
|
return timesteps, num_inference_steps - t_start
|
|
|
|
|
|
core = ov.Core()
|
|
device = "CPU"
|
|
|
|
models_dir = Path('on-canvers-disney-v3.9.1-ov-face')
|
|
IMAGE_ENCODER_PATH = models_dir / "image_encoder.xml"
|
|
UNET_PATH = models_dir / "unet.xml"
|
|
VAE_DECODER_PATH = models_dir / "vae_decoder.xml"
|
|
VAE_ENCODER_PATH = models_dir / "vae_encoder.xml"
|
|
TEXT_ENCODER_PATH = models_dir / "text_encoder.xml"
|
|
|
|
from transformers import AutoTokenizer
|
|
from PIL import Image
|
|
|
|
ov_config = {}
|
|
vae_decoder = core.compile_model(VAE_DECODER_PATH, device, ov_config)
|
|
vae_encoder = core.compile_model(VAE_ENCODER_PATH, device, ov_config)
|
|
text_encoder = core.compile_model(TEXT_ENCODER_PATH, device)
|
|
image_encoder = core.compile_model(IMAGE_ENCODER_PATH, device)
|
|
unet = core.compile_model(UNET_PATH, device)
|
|
|
|
scheduler = LCMScheduler.from_pretrained(models_dir / "scheduler")
|
|
tokenizer = AutoTokenizer.from_pretrained(models_dir / "tokenizer")
|
|
feature_extractor = CLIPImageProcessor.from_pretrained(models_dir / "feature_extractor")
|
|
|
|
ov_pipe = OVStableDiffusionPipeline(
|
|
vae_decoder,
|
|
text_encoder,
|
|
tokenizer,
|
|
unet,
|
|
scheduler,
|
|
image_encoder,
|
|
feature_extractor,
|
|
vae_encoder,
|
|
)
|
|
|
|
generator = torch.Generator(device="cpu").manual_seed(576)
|
|
|
|
ip_image = load_image("./input.jpg")
|
|
|
|
|
|
image = Image.open("ai_face.png").convert('RGB')
|
|
image.resize((512, 512))
|
|
|
|
|
|
|
|
|
|
result = ov_pipe(
|
|
prompt="best quality, high quality, beautiful korean woman is wearing glasses",
|
|
|
|
ip_adapter_image=image,
|
|
height=512,
|
|
width=512,
|
|
guidance_scale=1,
|
|
generator=generator,
|
|
|
|
num_inference_steps=4,
|
|
).images[0]
|
|
|
|
result.save("test7.png") |