import cv2 import torch import numpy as np import PIL from PIL import Image from typing import Tuple, List, Optional from pydantic import BaseModel import diffusers from diffusers.utils import load_image from diffusers.models import ControlNetModel from diffusers.pipelines.controlnet.multicontrolnet import MultiControlNetModel from style_template import styles from pipeline_stable_diffusion_xl_instantid_full import StableDiffusionXLInstantIDPipeline, draw_kps from controlnet_aux import OpenposeDetector import torch.nn.functional as F from torchvision.transforms import Compose import os from huggingface_hub import hf_hub_download import base64 import io import json from transformers import CLIPProcessor, CLIPModel import onnxruntime as ort # global variable device = "cuda" if torch.cuda.is_available() else "cpu" dtype = torch.float16 if str(device).__contains__("cuda") else torch.float32 STYLE_NAMES = list(styles.keys()) DEFAULT_STYLE_NAME = "Spring Festival" # Download LCM-LoRA model if not already downloaded lcm_lora_path = "./checkpoints/pytorch_lora_weights.safetensors" if not os.path.exists(lcm_lora_path): hf_hub_download(repo_id="latent-consistency/lcm-lora-sdxl", filename="pytorch_lora_weights.safetensors", local_dir="./checkpoints") class GenerateImageRequest(BaseModel): inputs: str negative_prompt: str style: str num_steps: int identitynet_strength_ratio: float adapter_strength_ratio: float pose_strength: float canny_strength: float depth_strength: float controlnet_selection: List[str] guidance_scale: float seed: int enable_LCM: bool enhance_face_region: bool face_image_base64: str pose_image_base64: Optional[str] = None class EndpointHandler: def __init__(self, model_dir): # Ensure the necessary files are downloaded controlnet_config = hf_hub_download(repo_id="InstantX/InstantID", filename="ControlNetModel/config.json", local_dir=os.path.join(model_dir, "checkpoints")) controlnet_model = hf_hub_download(repo_id="InstantX/InstantID", filename="ControlNetModel/diffusion_pytorch_model.safetensors", local_dir=os.path.join(model_dir, "checkpoints")) face_adapter = hf_hub_download(repo_id="InstantX/InstantID", filename="ip-adapter.bin", local_dir=os.path.join(model_dir, "checkpoints")) # Load the ONNX model onnx_model_path = os.path.join(model_dir, "models", "version-RFB-320.onnx") if not os.path.exists(onnx_model_path): print(f"Model path {onnx_model_path} does not exist. Please ensure the model is available.") self.ort_session = ort.InferenceSession(onnx_model_path) openpose = OpenposeDetector.from_pretrained("lllyasviel/ControlNet") # Path to InstantID models controlnet_path = os.path.join(model_dir, "checkpoints", "ControlNetModel") # Load pipeline face ControlNetModel self.controlnet_identitynet = ControlNetModel.from_pretrained( controlnet_path, torch_dtype=dtype ) # controlnet-pose controlnet_pose_model = "thibaud/controlnet-openpose-sdxl-1.0" controlnet_canny_model = "diffusers/controlnet-canny-sdxl-1.0" controlnet_pose = ControlNetModel.from_pretrained( controlnet_pose_model, torch_dtype=dtype ).to(device) controlnet_canny = ControlNetModel.from_pretrained( controlnet_canny_model, torch_dtype=dtype ).to(device) def get_canny_image(image, t1=100, t2=200): image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) edges = cv2.Canny(image, t1, t2) return Image.fromarray(edges, "L") self.controlnet_map = { "pose": controlnet_pose, "canny": controlnet_canny } self.controlnet_map_fn = { "pose": openpose, "canny": get_canny_image } pretrained_model_name_or_path = "wangqixun/YamerMIX_v8" self.pipe = StableDiffusionXLInstantIDPipeline.from_pretrained( pretrained_model_name_or_path, controlnet=[self.controlnet_identitynet], torch_dtype=dtype, safety_checker=None, feature_extractor=None, ).to(device) self.pipe.scheduler = diffusers.EulerDiscreteScheduler.from_config( self.pipe.scheduler.config ) # load and disable LCM self.pipe.load_lora_weights(lcm_lora_path) self.pipe.fuse_lora() self.pipe.disable_lora() self.pipe.cuda() self.pipe.load_ip_adapter_instantid(face_adapter) self.pipe.image_proj_model.to("cuda") self.pipe.unet.to("cuda") # Load CLIP model for safety checking self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device) def is_nsfw(self, image: Image.Image) -> bool: """ Check if an image contains NSFW content using CLIP model. Args: image (Image.Image): PIL image to check. Returns: bool: True if the image is NSFW, False otherwise. """ inputs = self.clip_processor(text=["NSFW", "SFW"], images=image, return_tensors="pt", padding=True) inputs = {k: v.to(device) for k, v in inputs.items()} outputs = self.clip_model(**inputs) logits_per_image = outputs.logits_per_image # this is the image-text similarity score probs = logits_per_image.softmax(dim=1) # we take the softmax to get the probabilities nsfw_prob = probs[0, 0].item() # probability of "NSFW" label return nsfw_prob > 0.8 # Adjusted threshold for NSFW detection def preprocess(self, image): # Preprocess the image for ONNX model image = cv2.resize(image, (320, 240)) # Adjust based on model input size image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = np.transpose(image, (2, 0, 1)) image = image[np.newaxis, :, :, :].astype(np.float32) / 127.5 - 1.0 # Normalize to [-1, 1] return image def get_face_info(self, image): # Preprocess the image image = self.preprocess(image) # Run the ONNX model to get the face detection results input_name = self.ort_session.get_inputs()[0].name outputs = self.ort_session.run(None, {input_name: image}) # Process the output to extract face information bboxes = outputs[0][0] # Adjust based on model output structure face_info_list = [] for bbox in bboxes: score = bbox[2] if score > 0.5: # Confidence threshold x1, y1, x2, y2 = bbox[3:7] * [320, 240, 320, 240] # Scale coordinates face_info_list.append({ "bbox": [x1, y1, x2, y2], "embedding": self.get_face_embedding(image[:, :, int(y1):int(y2), int(x1):int(x2)]) }) return face_info_list def __call__(self, data): def convert_from_cv2_to_image(img: np.ndarray) -> Image: return Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) def convert_from_image_to_cv2(img: Image) -> np.ndarray: return cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) def resize_img( input_image, max_side=1280, min_side=1024, size=None, pad_to_max_side=False, mode=PIL.Image.BILINEAR, base_pixel_number=64, ): w, h = input_image.size if size is not None: w_resize_new, h_resize_new = size else: ratio = min_side / min(h, w) w, h = round(ratio * w), round(ratio * h) ratio = max_side / max(h, w) input_image = input_image.resize([round(ratio * w), round(ratio * h)], mode) w_resize_new = (round(ratio * w) // base_pixel_number) * base_pixel_number h_resize_new = (round(ratio * h) // base_pixel_number) * base_pixel_number input_image = input_image.resize([w_resize_new, h_resize_new], mode) if pad_to_max_side: res = np.ones([max_side, max_side, 3], dtype=np.uint8) * 255 offset_x = (max_side - w_resize_new) // 2 offset_y = (max_side - h_resize_new) // 2 res[ offset_y : offset_y + h_resize_new, offset_x : offset_x + w_resize_new ] = np.array(input_image) input_image = Image.fromarray(res) return input_image def apply_style( style_name: str, positive: str, negative: str = "" ) -> Tuple[str, str]: p, n = styles.get(style_name, styles[DEFAULT_STYLE_NAME]) return p.replace("{prompt}", positive), n + " " + negative request = GenerateImageRequest(**data) inputs = request.inputs negative_prompt = request.negative_prompt style_name = request.style identitynet_strength_ratio = request.identitynet_strength_ratio adapter_strength_ratio = request.adapter_strength_ratio pose_strength = request.pose_strength canny_strength = request.canny_strength num_steps = request.num_steps guidance_scale = request.guidance_scale controlnet_selection = request.controlnet_selection seed = request.seed enhance_face_region = request.enhance_face_region enable_LCM = request.enable_LCM if enable_LCM: self.pipe.enable_lora() self.pipe.scheduler = diffusers.LCMScheduler.from_config(self.pipe.scheduler.config) guidance_scale = min(max(guidance_scale, 0), 1) else: self.pipe.disable_lora() self.pipe.scheduler = diffusers.EulerDiscreteScheduler.from_config(self.pipe.scheduler.config) # apply the style template inputs, negative_prompt = apply_style(style_name, inputs, negative_prompt) # Decode base64 image face_image_base64 = data.get("face_image_base64") face_image_data = base64.b64decode(face_image_base64) face_image = Image.open(io.BytesIO(face_image_data)) pose_image_base64 = data.get("pose_image_base64") pose_image = None if pose_image_base64: pose_image_data = base64.b64decode(pose_image_base64) pose_image = Image.open(io.BytesIO(pose_image_data)) face_image = resize_img(face_image, max_side=1024) face_image_cv2 = convert_from_image_to_cv2(face_image) height, width, _ = face_image_cv2.shape # Extract face features using the ONNX model face_info_list = self.get_face_info(face_image_cv2) if len(face_info_list) == 0: return {"error": "No faces detected."} # Use the largest face detected face_info = max(face_info_list, key=lambda x: (x["bbox"][2] - x["bbox"][0]) * (x["bbox"][3] - x["bbox"][1])) face_emb = face_info["embedding"] face_kps = draw_kps(convert_from_cv2_to_image(face_image_cv2), face_info["kps"]) img_controlnet = face_image if pose_image: pose_image = resize_img(pose_image, max_side=1024) img_controlnet = pose_image pose_image_cv2 = convert_from_image_to_cv2(pose_image) # Extract face features from pose image using the ONNX model face_info_list = self.get_face_info(pose_image_cv2) if len(face_info_list) == 0: return {"error": "No faces detected in pose image."} face_info = max(face_info_list, key=lambda x: (x["bbox"][2] - x["bbox"][0]) * (x["bbox"][3] - x["bbox"][1])) face_emb = face_info["embedding"] face_kps = draw_kps(pose_image, face_info["kps"]) width, height = face_kps.size control_mask = np.zeros([height, width, 3], dtype=np.uint8) # Ensure dtype is uint8 x1, y1, x2, y2 = face_info["bbox"] x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) control_mask[y1:y2, x1:x2] = 255 control_mask = Image.fromarray(control_mask) controlnet_scales = { "pose": pose_strength, "canny": canny_strength } self.pipe.controlnet = MultiControlNetModel( [self.controlnet_identitynet] + [self.controlnet_map[s] for s in controlnet_selection] ) control_scales = [float(identitynet_strength_ratio)] + [ controlnet_scales[s] for s in controlnet_selection ] control_images = [face_kps] + [ self.controlnet_map_fn[s](img_controlnet).resize((width, height)) for s in controlnet_selection ] generator = torch.Generator(device=device).manual_seed(seed) print("Start inference...") print(f"[Debug] Prompt: {inputs}, \n[Debug] Neg Prompt: {negative_prompt}") self.pipe.set_ip_adapter_scale(adapter_strength_ratio) outputs = self.pipe( prompt=inputs, negative_prompt=negative_prompt, image_embeds=face_emb, image=control_images, control_mask=control_mask, controlnet_conditioning_scale=control_scales, num_inference_steps=num_steps, guidance_scale=guidance_scale, height=height, width=width, generator=generator, enhance_face_region=enhance_face_region ) images = outputs.images # Check for NSFW content if self.is_nsfw(images[0]): return {"error": "Generated image contains NSFW content and was discarded."} # Convert the output image to base64 buffered = io.BytesIO() images[0].save(buffered, format="JPEG") img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") return {"generated_image_base64": img_str}