|
import cv2 |
|
import torch |
|
import numpy as np |
|
from PIL import Image |
|
from typing import Tuple, List, Optional, Dict, Any |
|
from pydantic import BaseModel |
|
import diffusers |
|
from diffusers.utils import load_image |
|
from diffusers.models import ControlNetModel |
|
from diffusers.pipelines.controlnet.multicontrolnet import MultiControlNetModel |
|
from style_template import styles |
|
from pipeline_stable_diffusion_xl_instantid_full import StableDiffusionXLInstantIDPipeline, draw_kps |
|
from controlnet_aux import OpenposeDetector |
|
import torch.nn.functional as F |
|
from torchvision.transforms import Compose |
|
import os |
|
from huggingface_hub import hf_hub_download |
|
import base64 |
|
import io |
|
from transformers import CLIPProcessor, CLIPModel |
|
import onnxruntime as ort |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
dtype = torch.float16 if str(device).__contains__("cuda") else torch.float32 |
|
STYLE_NAMES = list(styles.keys()) |
|
DEFAULT_STYLE_NAME = "Spring Festival" |
|
|
|
|
|
lcm_lora_path = "./checkpoints/pytorch_lora_weights.safetensors" |
|
if not os.path.exists(lcm_lora_path): |
|
hf_hub_download(repo_id="latent-consistency/lcm-lora-sdxl", filename="pytorch_lora_weights.safetensors", local_dir="./checkpoints") |
|
|
|
class GenerateImageRequest(BaseModel): |
|
inputs: str |
|
negative_prompt: str |
|
style: str |
|
num_steps: int |
|
identitynet_strength_ratio: float |
|
adapter_strength_ratio: float |
|
pose_strength: float |
|
canny_strength: float |
|
depth_strength: float |
|
controlnet_selection: List[str] |
|
guidance_scale: float |
|
seed: int |
|
enable_LCM: bool |
|
enhance_face_region: bool |
|
face_image_base64: str |
|
pose_image_base64: Optional[str] = None |
|
|
|
class EndpointHandler: |
|
def __init__(self, model_dir=""): |
|
|
|
controlnet_config = hf_hub_download(repo_id="InstantX/InstantID", filename="ControlNetModel/config.json", local_dir=os.path.join(model_dir, "checkpoints")) |
|
controlnet_model = hf_hub_download(repo_id="InstantX/InstantID", filename="ControlNetModel/diffusion_pytorch_model.safetensors", local_dir=os.path.join(model_dir, "checkpoints")) |
|
face_adapter = hf_hub_download(repo_id="InstantX/InstantID", filename="ip-adapter.bin", local_dir=os.path.join(model_dir, "checkpoints")) |
|
|
|
|
|
onnx_model_path = os.path.join(model_dir, "models", "version-RFB-320.onnx") |
|
if not os.path.exists(onnx_model_path): |
|
print(f"Model path {onnx_model_path} does not exist. Please ensure the model is available.") |
|
self.ort_session = ort.InferenceSession(onnx_model_path) |
|
|
|
self.openpose = OpenposeDetector.from_pretrained("lllyasviel/ControlNet") |
|
|
|
|
|
controlnet_path = os.path.join(model_dir, "checkpoints", "ControlNetModel") |
|
|
|
|
|
self.controlnet_identitynet = ControlNetModel.from_pretrained(controlnet_path, torch_dtype=dtype) |
|
|
|
|
|
self.controlnet_pose = ControlNetModel.from_pretrained("thibaud/controlnet-openpose-sdxl-1.0", torch_dtype=dtype).to(device) |
|
self.controlnet_canny = ControlNetModel.from_pretrained("diffusers/controlnet-canny-sdxl-1.0", torch_dtype=dtype).to(device) |
|
|
|
|
|
self.controlnet_map = { |
|
"pose": self.controlnet_pose, |
|
"canny": self.controlnet_canny |
|
} |
|
|
|
self.controlnet_map_fn = { |
|
"pose": self.openpose, |
|
"canny": self.get_canny_image |
|
} |
|
|
|
pretrained_model_name_or_path = "stablediffusionapi/protovision-xl-high-fidel" |
|
|
|
self.pipe = StableDiffusionXLInstantIDPipeline.from_pretrained( |
|
pretrained_model_name_or_path, |
|
controlnet=[self.controlnet_identitynet], |
|
torch_dtype=dtype, |
|
safety_checker=None, |
|
feature_extractor=None, |
|
).to(device) |
|
|
|
self.pipe.scheduler = diffusers.EulerDiscreteScheduler.from_config( |
|
self.pipe.scheduler.config |
|
) |
|
|
|
|
|
self.pipe.load_lora_weights(lcm_lora_path) |
|
self.pipe.fuse_lora() |
|
self.pipe.disable_lora() |
|
|
|
self.pipe.cuda() |
|
self.pipe.load_ip_adapter_instantid(face_adapter) |
|
self.pipe.image_proj_model.to("cuda") |
|
self.pipe.unet.to("cuda") |
|
|
|
|
|
self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") |
|
self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device) |
|
|
|
def get_canny_image(self, image, t1=100, t2=200): |
|
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
|
edges = cv2.Canny(image, t1, t2) |
|
return Image.fromarray(edges, "L") |
|
|
|
def is_nsfw(self, image: Image.Image) -> bool: |
|
inputs = self.clip_processor(text=["NSFW", "SFW"], images=image, return_tensors="pt", padding=True) |
|
inputs = {k: v.to(device) for k, v in inputs.items()} |
|
outputs = self.clip_model(**inputs) |
|
logits_per_image = outputs.logits_per_image |
|
probs = logits_per_image.softmax(dim=1) |
|
nsfw_prob = probs[0, 0].item() |
|
return nsfw_prob > 0.9 |
|
|
|
def preprocess(self, image): |
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
image = cv2.resize(image, (320, 240)) |
|
image_mean = np.array([127, 127, 127]) |
|
image = (image - image_mean) / 128 |
|
image = np.transpose(image, [2, 0, 1]) |
|
image = np.expand_dims(image, axis=0) |
|
image = image.astype(np.float32) |
|
return image |
|
|
|
def get_face_info(self, image): |
|
preprocessed_image = self.preprocess(image) |
|
input_name = self.ort_session.get_inputs()[0].name |
|
confidences, boxes = self.ort_session.run(None, {input_name: preprocessed_image}) |
|
|
|
print(f"Confidences shape: {confidences.shape}, Boxes shape: {boxes.shape}") |
|
|
|
face_info_list = [] |
|
for i in range(len(boxes)): |
|
box = boxes[i] |
|
conf = confidences[i] |
|
if conf[0] > 0.7: |
|
x1, y1, x2, y2 = box[0] * 320, box[1] * 240, box[2] * 320, box[3] * 240 |
|
face_info_list.append({"bbox": [x1, y1, x2, y2]}) |
|
return face_info_list |
|
|
|
def __call__(self, data: Any) -> Dict[str, Any]: |
|
request = GenerateImageRequest(**data) |
|
|
|
if request.enable_LCM: |
|
self.pipe.enable_lora() |
|
self.pipe.scheduler = diffusers.LCMScheduler.from_config(self.pipe.scheduler.config) |
|
guidance_scale = min(max(request.guidance_scale, 0), 1) |
|
else: |
|
self.pipe.disable_lora() |
|
self.pipe.scheduler = diffusers.EulerDiscreteScheduler.from_config(self.pipe.scheduler.config) |
|
|
|
|
|
inputs, negative_prompt = self.apply_style(request.style, request.inputs, request.negative_prompt) |
|
|
|
|
|
face_image = self.decode_base64_image(request.face_image_base64) |
|
pose_image = self.decode_base64_image(request.pose_image_base64) if request.pose_image_base64 else None |
|
|
|
face_image = self.resize_img(face_image, max_side=1024) |
|
face_image_cv2 = self.convert_from_image_to_cv2(face_image) |
|
height, width, _ = face_image_cv2.shape |
|
|
|
|
|
face_info_list = self.get_face_info(face_image_cv2) |
|
if len(face_info_list) == 0: |
|
return {"error": "No faces detected."} |
|
|
|
face_info = max(face_info_list, key=lambda x: (x["bbox"][2] - x["bbox"][0]) * (x["bbox"][3] - x["bbox"][1])) |
|
face_kps = draw_kps(self.convert_from_cv2_to_image(face_image_cv2), face_info["bbox"]) |
|
img_controlnet = face_image |
|
|
|
if pose_image: |
|
pose_image = self.resize_img(pose_image, max_side=1024) |
|
img_controlnet = pose_image |
|
pose_image_cv2 = self.convert_from_image_to_cv2(pose_image) |
|
face_info_list = self.get_face_info(pose_image_cv2) |
|
if len(face_info_list) == 0: |
|
return {"error": "No faces detected in pose image."} |
|
|
|
face_info = max(face_info_list, key=lambda x: (x["bbox"][2] - x["bbox"][0]) * (x["bbox"][3] - x["bbox"][1])) |
|
face_kps = draw_kps(pose_image, face_info["bbox"]) |
|
width, height = face_kps.size |
|
|
|
control_mask = np.zeros([height, width, 3], dtype=np.uint8) |
|
x1, y1, x2, y2 = map(int, face_info["bbox"]) |
|
control_mask[y1:y2, x1:x2] = 255 |
|
control_mask = Image.fromarray(control_mask) |
|
|
|
controlnet_scales = {"pose": request.pose_strength, "canny": request.canny_strength} |
|
self.pipe.controlnet = MultiControlNetModel( |
|
[self.controlnet_identitynet] + [self.controlnet_map[s] for s in request.controlnet_selection] |
|
) |
|
control_scales = [float(request.identitynet_strength_ratio)] + [controlnet_scales[s] for s in request.controlnet_selection] |
|
control_images = [face_kps] + [self.controlnet_map_fn[s](img_controlnet).resize((width, height)) for s in request.controlnet_selection] |
|
|
|
generator = torch.Generator(device=device).manual_seed(request.seed) |
|
|
|
outputs = self.pipe( |
|
prompt=inputs, |
|
negative_prompt=negative_prompt, |
|
image=control_images, |
|
control_mask=control_mask, |
|
controlnet_conditioning_scale=control_scales, |
|
num_inference_steps=request.num_steps, |
|
guidance_scale=request.guidance_scale, |
|
height=height, |
|
width=width, |
|
generator=generator, |
|
enhance_face_region=request.enhance_face_region, |
|
) |
|
|
|
images = outputs.images |
|
|
|
if self.is_nsfw(images[0]): |
|
return {"error": "Generated image contains NSFW content and was discarded."} |
|
|
|
|
|
buffered = io.BytesIO() |
|
images[0].save(buffered, format="JPEG") |
|
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") |
|
|
|
return {"generated_image_base64": img_str} |
|
|
|
def decode_base64_image(self, image_string): |
|
base64_image = base64.b64decode(image_string) |
|
buffer = io.BytesIO(base64_image) |
|
return Image.open(buffer) |
|
|
|
def convert_from_cv2_to_image(self, img: np.ndarray) -> Image: |
|
return Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) |
|
|
|
def convert_from_image_to_cv2(self, img: Image) -> np.ndarray: |
|
return cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) |
|
|
|
def resize_img(self, input_image, max_side=1280, min_side=1024, size=None, pad_to_max_side=False, mode=Image.BILINEAR, base_pixel_number=64): |
|
w, h = input_image.size |
|
if size is not None: |
|
w_resize_new, h_resize_new = size |
|
else: |
|
ratio = min_side / min(h, w) |
|
w, h = round(ratio * w), round(ratio * h) |
|
ratio = max_side / max(h, w) |
|
input_image = input_image.resize([round(ratio * w), round(ratio * h)], mode) |
|
w_resize_new = (round(ratio * w) // base_pixel_number) * base_pixel_number |
|
h_resize_new = (round(ratio * h) // base_pixel_number) * base_pixel_number |
|
input_image = input_image.resize([w_resize_new, h_resize_new], mode) |
|
|
|
if pad_to_max_side: |
|
res = np.ones([max_side, max_side, 3], dtype=np.uint8) * 255 |
|
offset_x = (max_side - w_resize_new) // 2 |
|
offset_y = (max_side - h_resize_new) // 2 |
|
res[offset_y: offset_y + h_resize_new, offset_x: offset_x + w_resize_new] = np.array(input_image) |
|
input_image = Image.fromarray(res) |
|
return input_image |
|
|
|
def apply_style(self, style_name: str, positive: str, negative: str = "") -> Tuple[str, str]: |
|
p, n = styles.get(style_name, styles[DEFAULT_STYLE_NAME]) |
|
return p.replace("{prompt}", positive), n + " " + negative |
|
|