import gc, time
import numpy as np
import PIL.Image
from diffusers import (
    ControlNetModel,
    DiffusionPipeline,
    StableDiffusionControlNetPipeline,
    StableDiffusionControlNetInpaintPipeline,
    StableDiffusionPipeline,
    AutoencoderKL,
    StableDiffusionXLInpaintPipeline,
    StableDiffusionXLAdapterPipeline,
    T2IAdapter,
    StableDiffusionXLPipeline,
    AutoPipelineForImage2Image
)
from huggingface_hub import hf_hub_download
import torch, random, json
from controlnet_aux import (
    CannyDetector,
    ContentShuffleDetector,
    HEDdetector,
    LineartAnimeDetector,
    LineartDetector,
    MidasDetector,
    MLSDdetector,
    NormalBaeDetector,
    OpenposeDetector,
    PidiNetDetector,
)
from transformers import pipeline
from controlnet_aux.util import HWC3, ade_palette
from transformers import AutoImageProcessor, UperNetForSemanticSegmentation
import cv2
from diffusers import (
    DPMSolverMultistepScheduler,
    DPMSolverSinglestepScheduler,
    KDPM2DiscreteScheduler,
    EulerDiscreteScheduler,
    EulerAncestralDiscreteScheduler,
    HeunDiscreteScheduler,
    LMSDiscreteScheduler,
    DDIMScheduler,
    DEISMultistepScheduler,
    UniPCMultistepScheduler,
    LCMScheduler,
    PNDMScheduler,
    KDPM2AncestralDiscreteScheduler,
    EDMDPMSolverMultistepScheduler,
    EDMEulerScheduler,
)
from .prompt_weights import get_embed_new, add_comma_after_pattern_ti
from .utils import save_pil_image_with_metadata
from .lora_loader import lora_mix_load
from .inpainting_canvas import draw, make_inpaint_condition
from .adetailer import ad_model_process
from ..upscalers.esrgan import UpscalerESRGAN, UpscalerLanczos, UpscalerNearest
from ..logging.logging_setup import logger
from .extra_model_loaders import custom_task_model_loader
from .high_resolution import process_images_high_resolution
from .style_prompt_config import styles_data, STYLE_NAMES, get_json_content, apply_style
import os
from compel import Compel, ReturnedEmbeddingsType
import ipywidgets as widgets, mediapy
from IPython.display import display
from PIL import Image
from typing import Union, Optional, List, Tuple, Dict, Any, Callable
import logging, diffusers, copy, warnings
logging.getLogger("diffusers").setLevel(logging.ERROR)
#logging.getLogger("transformers").setLevel(logging.ERROR)
diffusers.utils.logging.set_verbosity(40)
warnings.filterwarnings(action="ignore", category=FutureWarning, module="diffusers")
warnings.filterwarnings(action="ignore", category=FutureWarning, module="transformers")

# =====================================
# Utils preprocessor
# =====================================
def resize_image(input_image, resolution, interpolation=None):
    H, W, C = input_image.shape
    H = float(H)
    W = float(W)
    k = float(resolution) / max(H, W)
    H *= k
    W *= k
    H = int(np.round(H / 64.0)) * 64
    W = int(np.round(W / 64.0)) * 64
    if interpolation is None:
        interpolation = cv2.INTER_LANCZOS4 if k > 1 else cv2.INTER_AREA
    img = cv2.resize(input_image, (W, H), interpolation=interpolation)
    return img


class DepthEstimator:
    def __init__(self):
        self.model = pipeline("depth-estimation")

    def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image:
        detect_resolution = kwargs.pop("detect_resolution", 512)
        image_resolution = kwargs.pop("image_resolution", 512)
        image = np.array(image)
        image = HWC3(image)
        image = resize_image(image, resolution=detect_resolution)
        image = PIL.Image.fromarray(image)
        image = self.model(image)
        image = image["depth"]
        image = np.array(image)
        image = HWC3(image)
        image = resize_image(image, resolution=image_resolution)
        return PIL.Image.fromarray(image)


class ImageSegmentor:
    def __init__(self):
        self.image_processor = AutoImageProcessor.from_pretrained(
            "openmmlab/upernet-convnext-small"
        )
        self.image_segmentor = UperNetForSemanticSegmentation.from_pretrained(
            "openmmlab/upernet-convnext-small"
        )

    @torch.inference_mode()
    def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image:
        detect_resolution = kwargs.pop("detect_resolution", 512)
        image_resolution = kwargs.pop("image_resolution", 512)
        image = HWC3(image)
        image = resize_image(image, resolution=detect_resolution)
        image = PIL.Image.fromarray(image)

        pixel_values = self.image_processor(image, return_tensors="pt").pixel_values
        outputs = self.image_segmentor(pixel_values)
        seg = self.image_processor.post_process_semantic_segmentation(
            outputs, target_sizes=[image.size[::-1]]
        )[0]
        color_seg = np.zeros((seg.shape[0], seg.shape[1], 3), dtype=np.uint8)
        for label, color in enumerate(ade_palette()):
            color_seg[seg == label, :] = color
        color_seg = color_seg.astype(np.uint8)

        color_seg = resize_image(
            color_seg, resolution=image_resolution, interpolation=cv2.INTER_NEAREST
        )
        return PIL.Image.fromarray(color_seg)


class Preprocessor:
    MODEL_ID = "lllyasviel/Annotators"

    def __init__(self):
        self.model = None
        self.name = ""

    def load(self, name: str) -> None:
        if name == self.name:
            return
        if name == "HED":
            self.model = HEDdetector.from_pretrained(self.MODEL_ID)
        elif name == "Midas":
            self.model = MidasDetector.from_pretrained(self.MODEL_ID)
        elif name == "MLSD":
            self.model = MLSDdetector.from_pretrained(self.MODEL_ID)
        elif name == "Openpose":
            self.model = OpenposeDetector.from_pretrained(self.MODEL_ID)
        elif name == "PidiNet":
            self.model = PidiNetDetector.from_pretrained(self.MODEL_ID)
        elif name == "NormalBae":
            self.model = NormalBaeDetector.from_pretrained(self.MODEL_ID)
        elif name == "Lineart":
            self.model = LineartDetector.from_pretrained(self.MODEL_ID)
        elif name == "LineartAnime":
            self.model = LineartAnimeDetector.from_pretrained(self.MODEL_ID)
        elif name == "Canny":
            self.model = CannyDetector()
        elif name == "ContentShuffle":
            self.model = ContentShuffleDetector()
        elif name == "DPT":
            self.model = DepthEstimator()
        elif name == "UPerNet":
            self.model = ImageSegmentor()
        else:
            raise ValueError
        torch.cuda.empty_cache()
        gc.collect()
        self.name = name

    def __call__(self, image: PIL.Image.Image, **kwargs) -> PIL.Image.Image:
        if self.name == "Canny":
            if "detect_resolution" in kwargs:
                detect_resolution = kwargs.pop("detect_resolution")
                image = np.array(image)
                image = HWC3(image)
                image = resize_image(image, resolution=detect_resolution)
            image = self.model(image, **kwargs)
            return PIL.Image.fromarray(image)
        elif self.name == "Midas":
            detect_resolution = kwargs.pop("detect_resolution", 512)
            image_resolution = kwargs.pop("image_resolution", 512)
            image = np.array(image)
            image = HWC3(image)
            image = resize_image(image, resolution=detect_resolution)
            image = self.model(image, **kwargs)
            image = HWC3(image)
            image = resize_image(image, resolution=image_resolution)
            return PIL.Image.fromarray(image)
        else:
            return self.model(image, **kwargs)


# =====================================
# Base Model
# =====================================

CONTROLNET_MODEL_IDS = {
    "openpose": "lllyasviel/control_v11p_sd15_openpose",
    "canny": "lllyasviel/control_v11p_sd15_canny",
    "mlsd": "lllyasviel/control_v11p_sd15_mlsd",
    "scribble": "lllyasviel/control_v11p_sd15_scribble",
    "softedge": "lllyasviel/control_v11p_sd15_softedge",
    "segmentation": "lllyasviel/control_v11p_sd15_seg",
    "depth": "lllyasviel/control_v11f1p_sd15_depth",
    "normalbae": "lllyasviel/control_v11p_sd15_normalbae",
    "lineart": "lllyasviel/control_v11p_sd15_lineart",
    "lineart_anime": "lllyasviel/control_v11p_sd15s2_lineart_anime",
    "shuffle": "lllyasviel/control_v11e_sd15_shuffle",
    "ip2p": "lllyasviel/control_v11e_sd15_ip2p",
    "inpaint": "lllyasviel/control_v11p_sd15_inpaint",
    "txt2img": "Nothinghere",
    "sdxl_canny": "TencentARC/t2i-adapter-canny-sdxl-1.0",
    "sdxl_sketch": "TencentARC/t2i-adapter-sketch-sdxl-1.0",
    "sdxl_lineart": "TencentARC/t2i-adapter-lineart-sdxl-1.0",
    "sdxl_depth-midas": "TencentARC/t2i-adapter-depth-midas-sdxl-1.0",
    "sdxl_openpose": "TencentARC/t2i-adapter-openpose-sdxl-1.0",
    #"sdxl_depth-zoe": "TencentARC/t2i-adapter-depth-zoe-sdxl-1.0",
    #"sdxl_recolor": "TencentARC/t2i-adapter-recolor-sdxl-1.0",
    "img2img": "Nothinghere",
}


# def download_all_controlnet_weights() -> None:
#     for model_id in CONTROLNET_MODEL_IDS.values():
#         ControlNetModel.from_pretrained(model_id)


SCHEDULER_CONFIG_MAP = {
    "DPM++ 2M": (DPMSolverMultistepScheduler, {}),
    "DPM++ 2M Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True}),
    "DPM++ 2M SDE": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++"}),
    "DPM++ 2M SDE Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True, "algorithm_type": "sde-dpmsolver++"}),
    "DPM++ SDE": (DPMSolverSinglestepScheduler, {}),
    "DPM++ SDE Karras": (DPMSolverSinglestepScheduler, {"use_karras_sigmas": True}),
    "DPM2": (KDPM2DiscreteScheduler, {}),
    "DPM2 Karras": (KDPM2DiscreteScheduler, {"use_karras_sigmas": True}),
    "DPM2 a" : (KDPM2AncestralDiscreteScheduler, {}),
    "DPM2 a Karras" : (KDPM2AncestralDiscreteScheduler, {"use_karras_sigmas": True}),
    "Euler": (EulerDiscreteScheduler, {}),
    "Euler a": (EulerAncestralDiscreteScheduler, {}),
    "Heun": (HeunDiscreteScheduler, {}),
    "LMS": (LMSDiscreteScheduler, {}),
    "LMS Karras": (LMSDiscreteScheduler, {"use_karras_sigmas": True}),
    "DDIM": (DDIMScheduler, {}),
    "DEIS": (DEISMultistepScheduler, {}),
    "UniPC": (UniPCMultistepScheduler, {}),
    "PNDM" : (PNDMScheduler, {}),

    "DPM++ 2M Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True}),
    "DPM++ 2M Ef": (DPMSolverMultistepScheduler, {"euler_at_final": True}),
    "DPM++ 2M SDE Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True, "algorithm_type": "sde-dpmsolver++"}),
    "DPM++ 2M SDE Ef": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++", "euler_at_final": True}),

    "EDMDPM": (EDMDPMSolverMultistepScheduler, {}),
    "EDMEuler": (EDMEulerScheduler, {}),

    "LCM" : (LCMScheduler, {}),
}

scheduler_names = list(SCHEDULER_CONFIG_MAP.keys())

def process_prompts_valid(specific_prompt, specific_negative_prompt, prompt, negative_prompt):
    specific_prompt_empty = (specific_prompt in [None, ""])
    specific_negative_prompt_empty = (specific_negative_prompt in [None, ""])

    prompt_valid = prompt if specific_prompt_empty else specific_prompt
    negative_prompt_valid = negative_prompt if specific_negative_prompt_empty else specific_negative_prompt

    return specific_prompt_empty, specific_negative_prompt_empty, prompt_valid, negative_prompt_valid

class Model_Diffusers:
    def __init__(
        self,
        base_model_id: str = "runwayml/stable-diffusion-v1-5",
        task_name: str = "txt2img",
        vae_model=None,
        type_model_precision=torch.float16,
        sdxl_safetensors = False,
    ):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.base_model_id = ""
        self.task_name = ""
        self.vae_model = None
        self.type_model_precision = (
            type_model_precision if torch.cuda.is_available() else torch.float32
        )  # For SD 1.5

        self.load_pipe(
            base_model_id, task_name, vae_model, type_model_precision, sdxl_safetensors = sdxl_safetensors
        )
        self.preprocessor = Preprocessor()

        self.styles_data = styles_data
        self.STYLE_NAMES = STYLE_NAMES
        self.style_json_file = ""


    def load_pipe(
        self,
        base_model_id: str,
        task_name="txt2img",
        vae_model=None,
        type_model_precision=torch.float16,
        reload=False,
        sdxl_safetensors = False,
        retain_model_in_memory = True,
    ) -> DiffusionPipeline:
        if (
            base_model_id == self.base_model_id
            and task_name == self.task_name
            and hasattr(self, "pipe")
            and self.vae_model == vae_model
            and self.pipe is not None
            and reload == False
        ):
            if self.type_model_precision == type_model_precision or self.device.type == "cpu":
                return

        if hasattr(self, "pipe") and os.path.isfile(base_model_id):
            unload_model = False
            if self.pipe == None:
                unload_model = True
            elif type_model_precision != self.type_model_precision and self.device.type != "cpu":
                unload_model = True
        else:
            if hasattr(self, "pipe"):
                unload_model = False
                if self.pipe == None:
                    unload_model = True
            else:
                unload_model = True
        self.type_model_precision = (
            type_model_precision if torch.cuda.is_available() else torch.float32
        )

        if self.type_model_precision == torch.float32 and os.path.isfile(base_model_id):
            logger.info(f"Working with full precision {str(self.type_model_precision)}")

        # Load model
        if self.base_model_id == base_model_id and self.pipe is not None and reload == False and self.vae_model == vae_model and unload_model == False:
            #logger.info("Previous loaded base model") # not return
            class_name = self.class_name
        else:
            # Unload previous model and stuffs
            self.pipe = None
            self.model_memory = {}
            self.lora_memory = [None, None, None, None, None]
            self.lora_scale_memory = [1.0, 1.0, 1.0, 1.0, 1.0]
            self.LCMconfig = None
            self.embed_loaded = []
            self.FreeU = False
            torch.cuda.empty_cache()
            gc.collect()

            # Load new model
            if os.path.isfile(base_model_id): # exists or not same # if os.path.exists(base_model_id):

                if sdxl_safetensors:
                    logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix")
                    self.pipe = StableDiffusionXLPipeline.from_single_file(
                        base_model_id,
                        vae=AutoencoderKL.from_pretrained(
                            "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16
                        ),
                        torch_dtype=self.type_model_precision,
                    )
                    class_name = "StableDiffusionXLPipeline"
                else:
                    self.pipe = StableDiffusionPipeline.from_single_file(
                        base_model_id,
                        # vae=None
                        # if vae_model == None
                        # else AutoencoderKL.from_single_file(
                        #     vae_model
                        # ),
                        torch_dtype=self.type_model_precision,
                    )
                    class_name = "StableDiffusionPipeline"
            else:
                file_config = hf_hub_download(repo_id=base_model_id, filename="model_index.json")

                # Reading data from the JSON file
                with open(file_config, 'r') as json_config:
                    data_config = json.load(json_config)

                # Searching for the value of the "_class_name" key
                if '_class_name' in data_config:
                    class_name = data_config['_class_name']

                match class_name:
                    case "StableDiffusionPipeline":
                        self.pipe = StableDiffusionPipeline.from_pretrained(
                            base_model_id,
                            torch_dtype=self.type_model_precision,
                        )

                    case "StableDiffusionXLPipeline":
                        logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix")
                        try:
                            self.pipe = DiffusionPipeline.from_pretrained(
                                base_model_id,
                                vae=AutoencoderKL.from_pretrained(
                                    "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16
                                ),
                                torch_dtype=torch.float16,
                                use_safetensors=True,
                                variant="fp16",
                                add_watermarker=False,
                            )
                        except Exception as e:
                            logger.debug(e)
                            logger.debug("Loading model without parameter variant=fp16")
                            self.pipe = DiffusionPipeline.from_pretrained(
                                base_model_id,
                                vae=AutoencoderKL.from_pretrained(
                                    "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16
                                ),
                                torch_dtype=torch.float16,
                                use_safetensors=True,
                                add_watermarker=False,
                            )
            self.base_model_id = base_model_id
            self.class_name = class_name

            # Load VAE after loaded model
            if vae_model is None :
                logger.debug("Default VAE")
                pass
            else:
                if os.path.isfile(vae_model):
                    self.pipe.vae = AutoencoderKL.from_single_file(
                        vae_model
                    )
                else:
                    self.pipe.vae = AutoencoderKL.from_pretrained(
                        vae_model,
                        subfolder = "vae",
                    )
                try:
                  self.pipe.vae.to(self.type_model_precision)
                except:
                  logger.warning(f"VAE: not in {self.type_model_precision}")
            self.vae_model = vae_model

            # Define base scheduler
            self.default_scheduler = copy.deepcopy(self.pipe.scheduler)
            logger.debug(f"Base sampler: {self.default_scheduler}")

        if task_name in self.model_memory:
            self.pipe = self.model_memory[task_name]
            # Create new base values
            #self.pipe.to(self.device)
            # torch.cuda.empty_cache()
            # gc.collect()
            self.base_model_id = base_model_id
            self.task_name = task_name
            self.vae_model = vae_model
            self.class_name = class_name
            self.pipe.watermark = None
            return

        # Load task
        model_id = CONTROLNET_MODEL_IDS[task_name]

        if task_name == "inpaint":
            match class_name:
                case "StableDiffusionPipeline":

                    controlnet = ControlNetModel.from_pretrained(
                        model_id, torch_dtype=self.type_model_precision
                    )

                    self.pipe = StableDiffusionControlNetInpaintPipeline(
                        vae=self.pipe.vae,
                        text_encoder=self.pipe.text_encoder,
                        tokenizer=self.pipe.tokenizer,
                        unet=self.pipe.unet,
                        controlnet=controlnet,
                        scheduler=self.pipe.scheduler,
                        safety_checker=self.pipe.safety_checker,
                        feature_extractor=self.pipe.feature_extractor,
                        requires_safety_checker=self.pipe.config.requires_safety_checker,
                    )
                case "StableDiffusionXLPipeline":

                    self.pipe = StableDiffusionXLInpaintPipeline(
                        vae=self.pipe.vae,
                        text_encoder=self.pipe.text_encoder,
                        text_encoder_2=self.pipe.text_encoder_2,
                        tokenizer=self.pipe.tokenizer,
                        tokenizer_2=self.pipe.tokenizer_2,
                        unet=self.pipe.unet,
                        # controlnet=self.controlnet,
                        scheduler=self.pipe.scheduler,
                    )


        if task_name not in ["txt2img", "inpaint", "img2img"]:
            match class_name:
                case "StableDiffusionPipeline":

                    controlnet = ControlNetModel.from_pretrained(
                        model_id, torch_dtype=self.type_model_precision
                    )

                    self.pipe = StableDiffusionControlNetPipeline(
                        vae=self.pipe.vae,
                        text_encoder=self.pipe.text_encoder,
                        tokenizer=self.pipe.tokenizer,
                        unet=self.pipe.unet,
                        controlnet=controlnet,
                        scheduler=self.pipe.scheduler,
                        safety_checker=self.pipe.safety_checker,
                        feature_extractor=self.pipe.feature_extractor,
                        requires_safety_checker=self.pipe.config.requires_safety_checker,
                    )
                    self.pipe.scheduler = UniPCMultistepScheduler.from_config(self.pipe.scheduler.config)

                case "StableDiffusionXLPipeline":

                    adapter = T2IAdapter.from_pretrained(
                        model_id,
                        torch_dtype=torch.float16,
                        varient="fp16",
                    ).to(self.device)

                    self.pipe = StableDiffusionXLAdapterPipeline(
                        vae=self.pipe.vae,
                        text_encoder=self.pipe.text_encoder,
                        text_encoder_2=self.pipe.text_encoder_2,
                        tokenizer=self.pipe.tokenizer,
                        tokenizer_2=self.pipe.tokenizer_2,
                        unet=self.pipe.unet,
                        adapter=adapter,
                        scheduler=self.pipe.scheduler,
                    ).to(self.device)


        if task_name in ["txt2img", "img2img"]:
            match class_name:

                case "StableDiffusionPipeline":
                    self.pipe = StableDiffusionPipeline(
                        vae=self.pipe.vae,
                        text_encoder=self.pipe.text_encoder,
                        tokenizer=self.pipe.tokenizer,
                        unet=self.pipe.unet,
                        scheduler=self.pipe.scheduler,
                        safety_checker=self.pipe.safety_checker,
                        feature_extractor=self.pipe.feature_extractor,
                        requires_safety_checker=self.pipe.config.requires_safety_checker,
                    )

                case "StableDiffusionXLPipeline":
                    self.pipe = StableDiffusionXLPipeline(
                        vae=self.pipe.vae,
                        text_encoder=self.pipe.text_encoder,
                        text_encoder_2=self.pipe.text_encoder_2,
                        tokenizer=self.pipe.tokenizer,
                        tokenizer_2=self.pipe.tokenizer_2,
                        unet=self.pipe.unet,
                        scheduler=self.pipe.scheduler,
                    )

            if task_name == "img2img":
                self.pipe = AutoPipelineForImage2Image.from_pipe(self.pipe)

        # Create new base values
        self.pipe.to(self.device)
        torch.cuda.empty_cache()
        gc.collect()

        self.base_model_id = base_model_id
        self.task_name = task_name
        self.vae_model = vae_model
        self.class_name = class_name

        if self.class_name == "StableDiffusionXLPipeline":
            self.pipe.enable_vae_slicing()
            self.pipe.enable_vae_tiling()
            self.pipe.watermark = None

        if retain_model_in_memory == True and task_name not in self.model_memory:
            self.model_memory[task_name] = self.pipe

        return

    def load_controlnet_weight(self, task_name: str) -> None:
        torch.cuda.empty_cache()
        gc.collect()
        model_id = CONTROLNET_MODEL_IDS[task_name]
        controlnet = ControlNetModel.from_pretrained(
            model_id, torch_dtype=self.type_model_precision
        )
        controlnet.to(self.device)
        torch.cuda.empty_cache()
        gc.collect()
        self.pipe.controlnet = controlnet
        #self.task_name = task_name

    @torch.autocast("cuda")
    def run_pipe(
        self,
        prompt: str,
        negative_prompt: str,
        prompt_embeds,
        negative_prompt_embeds,
        control_image: PIL.Image.Image,
        num_images: int,
        num_steps: int,
        guidance_scale: float,
        clip_skip: int,
        generator,
        controlnet_conditioning_scale,
        control_guidance_start,
        control_guidance_end,
    ) -> list[PIL.Image.Image]:
        # Return PIL images
        # generator = torch.Generator().manual_seed(seed)
        return self.pipe(
            prompt=prompt,
            negative_prompt=negative_prompt,
            prompt_embeds=prompt_embeds,
            negative_prompt_embeds=negative_prompt_embeds,
            guidance_scale=guidance_scale,
            clip_skip=clip_skip,
            num_images_per_prompt=num_images,
            num_inference_steps=num_steps,
            generator=generator,
            controlnet_conditioning_scale=controlnet_conditioning_scale,
            control_guidance_start=control_guidance_start,
            control_guidance_end=control_guidance_end,
            image=control_image,
        ).images

    @torch.autocast("cuda")
    def run_pipe_SD(
        self,
        prompt: str,
        negative_prompt: str,
        prompt_embeds,
        negative_prompt_embeds,
        num_images: int,
        num_steps: int,
        guidance_scale: float,
        clip_skip: int,
        height: int,
        width: int,
        generator,
    ) -> list[PIL.Image.Image]:
        # Return PIL images
        # generator = torch.Generator().manual_seed(seed)
        self.preview_handle = None
        return self.pipe(
            prompt=prompt,
            negative_prompt=negative_prompt,
            prompt_embeds=prompt_embeds,
            negative_prompt_embeds=negative_prompt_embeds,
            guidance_scale=guidance_scale,
            clip_skip=clip_skip,
            num_images_per_prompt=num_images,
            num_inference_steps=num_steps,
            generator=generator,
            height=height,
            width=width,
            callback=self.callback_pipe if self.image_previews else None,
            callback_steps=10 if self.image_previews else 100,
        ).images

    # @torch.autocast('cuda')
    # def run_pipe_SDXL(
    #     self,
    #     prompt: str,
    #     negative_prompt: str,
    #     prompt_embeds,
    #     negative_prompt_embeds,
    #     num_images: int,
    #     num_steps: int,
    #     guidance_scale: float,
    #     clip_skip: int,
    #     height : int,
    #     width : int,
    #     generator,
    #     seddd,
    #     conditioning,
    #     pooled,
    # ) -> list[PIL.Image.Image]:
    #     # Return PIL images
    #     #generator = torch.Generator("cuda").manual_seed(seddd) # generator = torch.Generator("cuda").manual_seed(seed),
    #     return self.pipe(
    #         prompt = None,
    #         negative_prompt = None,
    #         prompt_embeds=conditioning[0:1],
    #         pooled_prompt_embeds=pooled[0:1],
    #         negative_prompt_embeds=conditioning[1:2],
    #         negative_pooled_prompt_embeds=pooled[1:2],
    #         height = height,
    #         width = width,
    #         num_inference_steps = num_steps,
    #         guidance_scale = guidance_scale,
    #         clip_skip = clip_skip,
    #         num_images_per_prompt = num_images,
    #         generator = generator,
    #         ).images

    @torch.autocast("cuda")
    def run_pipe_inpaint(
        self,
        prompt: str,
        negative_prompt: str,
        prompt_embeds,
        negative_prompt_embeds,
        control_image: PIL.Image.Image,
        num_images: int,
        num_steps: int,
        guidance_scale: float,
        clip_skip: int,
        strength: float,
        init_image,
        control_mask,
        controlnet_conditioning_scale,
        control_guidance_start,
        control_guidance_end,
        generator,
    ) -> list[PIL.Image.Image]:
        # Return PIL images
        # generator = torch.Generator().manual_seed(seed)
        return self.pipe(
            prompt=None,
            negative_prompt=None,
            prompt_embeds=prompt_embeds,
            negative_prompt_embeds=negative_prompt_embeds,
            eta=1.0,
            strength=strength,
            image=init_image,  # original image
            mask_image=control_mask,  # mask, values of 0 to 255
            control_image=control_image,  # tensor control image
            num_images_per_prompt=num_images,
            num_inference_steps=num_steps,
            guidance_scale=guidance_scale,
            clip_skip=clip_skip,
            generator=generator,
            controlnet_conditioning_scale=controlnet_conditioning_scale,
            control_guidance_start=control_guidance_start,
            control_guidance_end=control_guidance_end,
        ).images

    @torch.autocast("cuda")
    def run_pipe_img2img(
        self,
        prompt: str,
        negative_prompt: str,
        prompt_embeds,
        negative_prompt_embeds,
        num_images: int,
        num_steps: int,
        guidance_scale: float,
        clip_skip: int,
        strength: float,
        init_image,
        generator,
    ) -> list[PIL.Image.Image]:
        # Return PIL images
        # generator = torch.Generator().manual_seed(seed)
        return self.pipe(
            prompt=None,
            negative_prompt=None,
            prompt_embeds=prompt_embeds,
            negative_prompt_embeds=negative_prompt_embeds,
            eta=1.0,
            strength=strength,
            image=init_image,  # original image
            num_images_per_prompt=num_images,
            num_inference_steps=num_steps,
            guidance_scale=guidance_scale,
            clip_skip=clip_skip,
            generator=generator,
        ).images

    ### self.x_process return image_preprocessor###
    @torch.inference_mode()
    def process_canny(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocess_resolution: int,
        low_threshold: int,
        high_threshold: int,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        self.preprocessor.load("Canny")
        control_image = self.preprocessor(
            image=image,
            low_threshold=low_threshold,
            high_threshold=high_threshold,
            image_resolution=image_resolution,
            detect_resolution=preprocess_resolution,
        )

        return control_image

    @torch.inference_mode()
    def process_mlsd(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocess_resolution: int,
        value_threshold: float,
        distance_threshold: float,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        self.preprocessor.load("MLSD")
        control_image = self.preprocessor(
            image=image,
            image_resolution=image_resolution,
            detect_resolution=preprocess_resolution,
            thr_v=value_threshold,
            thr_d=distance_threshold,
        )

        return control_image

    @torch.inference_mode()
    def process_scribble(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocess_resolution: int,
        preprocessor_name: str,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        if preprocessor_name == "None":
            image = HWC3(image)
            image = resize_image(image, resolution=image_resolution)
            control_image = PIL.Image.fromarray(image)
        elif preprocessor_name == "HED":
            self.preprocessor.load(preprocessor_name)
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
                detect_resolution=preprocess_resolution,
                scribble=False,
            )
        elif preprocessor_name == "PidiNet":
            self.preprocessor.load(preprocessor_name)
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
                detect_resolution=preprocess_resolution,
                safe=False,
            )

        return control_image

    @torch.inference_mode()
    def process_scribble_interactive(
        self,
        image_and_mask: dict[str, np.ndarray],
        image_resolution: int,
    ) -> list[PIL.Image.Image]:
        if image_and_mask is None:
            raise ValueError

        image = image_and_mask["mask"]
        image = HWC3(image)
        image = resize_image(image, resolution=image_resolution)
        control_image = PIL.Image.fromarray(image)

        return control_image

    @torch.inference_mode()
    def process_softedge(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocess_resolution: int,
        preprocessor_name: str,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        if preprocessor_name == "None":
            image = HWC3(image)
            image = resize_image(image, resolution=image_resolution)
            control_image = PIL.Image.fromarray(image)
        elif preprocessor_name in ["HED", "HED safe"]:
            safe = "safe" in preprocessor_name
            self.preprocessor.load("HED")
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
                detect_resolution=preprocess_resolution,
                scribble=safe,
            )
        elif preprocessor_name in ["PidiNet", "PidiNet safe"]:
            safe = "safe" in preprocessor_name
            self.preprocessor.load("PidiNet")
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
                detect_resolution=preprocess_resolution,
                safe=safe,
            )
        else:
            raise ValueError

        return control_image

    @torch.inference_mode()
    def process_openpose(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocess_resolution: int,
        preprocessor_name: str,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        if preprocessor_name == "None":
            image = HWC3(image)
            image = resize_image(image, resolution=image_resolution)
            control_image = PIL.Image.fromarray(image)
        else:
            self.preprocessor.load("Openpose")
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
                detect_resolution=preprocess_resolution,
                hand_and_face=True,
            )

        return control_image

    @torch.inference_mode()
    def process_segmentation(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocess_resolution: int,
        preprocessor_name: str,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        if preprocessor_name == "None":
            image = HWC3(image)
            image = resize_image(image, resolution=image_resolution)
            control_image = PIL.Image.fromarray(image)
        else:
            self.preprocessor.load(preprocessor_name)
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
                detect_resolution=preprocess_resolution,
            )

        return control_image

    @torch.inference_mode()
    def process_depth(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocess_resolution: int,
        preprocessor_name: str,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        if preprocessor_name == "None":
            image = HWC3(image)
            image = resize_image(image, resolution=image_resolution)
            control_image = PIL.Image.fromarray(image)
        else:
            self.preprocessor.load(preprocessor_name)
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
                detect_resolution=preprocess_resolution,
            )

        return control_image

    @torch.inference_mode()
    def process_normal(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocess_resolution: int,
        preprocessor_name: str,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        if preprocessor_name == "None":
            image = HWC3(image)
            image = resize_image(image, resolution=image_resolution)
            control_image = PIL.Image.fromarray(image)
        else:
            self.preprocessor.load("NormalBae")
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
                detect_resolution=preprocess_resolution,
            )

        return control_image

    @torch.inference_mode()
    def process_lineart(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocess_resolution: int,
        preprocessor_name: str,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        if preprocessor_name in ["None", "None (anime)"]:
            image = HWC3(image)
            image = resize_image(image, resolution=image_resolution)
            control_image = PIL.Image.fromarray(image)
        elif preprocessor_name in ["Lineart", "Lineart coarse"]:
            coarse = "coarse" in preprocessor_name
            self.preprocessor.load("Lineart")
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
                detect_resolution=preprocess_resolution,
                coarse=coarse,
            )
        elif preprocessor_name == "Lineart (anime)":
            self.preprocessor.load("LineartAnime")
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
                detect_resolution=preprocess_resolution,
            )

        if self.class_name == "StableDiffusionPipeline":
            if "anime" in preprocessor_name:
                self.load_controlnet_weight("lineart_anime")
                logger.info("Linear anime")
            else:
                self.load_controlnet_weight("lineart")

        return control_image

    @torch.inference_mode()
    def process_shuffle(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocessor_name: str,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        if preprocessor_name == "None":
            image = HWC3(image)
            image = resize_image(image, resolution=image_resolution)
            control_image = PIL.Image.fromarray(image)
        else:
            self.preprocessor.load(preprocessor_name)
            control_image = self.preprocessor(
                image=image,
                image_resolution=image_resolution,
            )

        return control_image

    @torch.inference_mode()
    def process_ip2p(
        self,
        image: np.ndarray,
        image_resolution: int,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        image = HWC3(image)
        image = resize_image(image, resolution=image_resolution)
        control_image = PIL.Image.fromarray(image)

        return control_image

    @torch.inference_mode()
    def process_inpaint(
        self,
        image: np.ndarray,
        image_resolution: int,
        preprocess_resolution: int,
        image_mask: str,  ###
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        image = HWC3(image)
        image = resize_image(image, resolution=image_resolution)
        init_image = PIL.Image.fromarray(image)

        image_mask = HWC3(image_mask)
        image_mask = resize_image(image_mask, resolution=image_resolution)
        control_mask = PIL.Image.fromarray(image_mask)

        control_image = make_inpaint_condition(init_image, control_mask)

        return init_image, control_mask, control_image

    @torch.inference_mode()
    def process_img2img(
        self,
        image: np.ndarray,
        image_resolution: int,
    ) -> list[PIL.Image.Image]:
        if image is None:
            raise ValueError

        image = HWC3(image)
        image = resize_image(image, resolution=image_resolution)
        init_image = PIL.Image.fromarray(image)

        return init_image

    def get_scheduler(self, name):
        if name in SCHEDULER_CONFIG_MAP:
            scheduler_class, config = SCHEDULER_CONFIG_MAP[name]
            #return scheduler_class.from_config(self.pipe.scheduler.config, **config)
            # beta self.default_scheduler
            return scheduler_class.from_config(self.default_scheduler.config, **config)
        else:
            raise ValueError(f"Scheduler with name {name} not found. Valid schedulers: {', '.join(scheduler_names)}")

    def create_prompt_embeds(
        self,
        prompt,
        negative_prompt,
        textual_inversion,
        clip_skip,
        syntax_weights,
        ):
        if self.class_name == "StableDiffusionPipeline":
            if self.embed_loaded != textual_inversion and textual_inversion != []:
                # Textual Inversion
                for name, directory_name in textual_inversion:
                    try:
                        if directory_name.endswith(".pt"):
                            model = torch.load(directory_name, map_location=self.device)
                            model_tensors = model.get("string_to_param").get("*")
                            s_model = {"emb_params": model_tensors}
                            # save_file(s_model, directory_name[:-3] + '.safetensors')
                            self.pipe.load_textual_inversion(s_model, token=name)

                        else:
                            # self.pipe.text_encoder.resize_token_embeddings(len(self.pipe.tokenizer),pad_to_multiple_of=128)
                            # self.pipe.load_textual_inversion("./bad_prompt.pt", token="baddd")
                            self.pipe.load_textual_inversion(directory_name, token=name)
                        if not self.gui_active:
                            logger.info(f"Applied : {name}")

                    except Exception as e:
                      exception = str(e)
                      if name in exception:
                        logger.debug(f"Previous loaded embed {name}")
                      else:
                        logger.error(exception)
                        logger.error(f"Can't apply embed {name}")
                self.embed_loaded = textual_inversion

            # Clip skip
            # clip_skip_diffusers = None #clip_skip - 1 # future update
            if not hasattr(self, "compel"):
                self.compel = Compel(
                    tokenizer=self.pipe.tokenizer,
                    text_encoder=self.pipe.text_encoder,
                    truncate_long_prompts=False,
                    returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NORMALIZED if clip_skip else ReturnedEmbeddingsType.LAST_HIDDEN_STATES_NORMALIZED,
                )

            # Prompt weights for textual inversion
            prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer)
            negative_prompt_ti = self.pipe.maybe_convert_prompt(
                negative_prompt, self.pipe.tokenizer
            )

            # separate the multi-vector textual inversion by comma
            if self.embed_loaded != []:
                prompt_ti = add_comma_after_pattern_ti(prompt_ti)
                negative_prompt_ti = add_comma_after_pattern_ti(negative_prompt_ti)

            # Syntax weights
            self.pipe.to(self.device)
            if syntax_weights == "Classic":
                prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel)
                negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel)
            else:
                prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel, compel_process_sd=True)
                negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel, compel_process_sd=True)

            # Fix error shape
            if prompt_emb.shape != negative_prompt_emb.shape:
                (
                    prompt_emb,
                    negative_prompt_emb,
                ) = self.compel.pad_conditioning_tensors_to_same_length(
                    [prompt_emb, negative_prompt_emb]
                )

            return prompt_emb, negative_prompt_emb

        else:
            # SDXL embed
            if self.embed_loaded != textual_inversion and textual_inversion != []:
                # Textual Inversion
                for name, directory_name in textual_inversion:
                    try:
                        from safetensors.torch import load_file
                        state_dict = load_file(directory_name)
                        self.pipe.load_textual_inversion(state_dict["clip_g"], token=name, text_encoder=self.pipe.text_encoder_2, tokenizer=self.pipe.tokenizer_2)
                        self.pipe.load_textual_inversion(state_dict["clip_l"], token=name, text_encoder=self.pipe.text_encoder, tokenizer=self.pipe.tokenizer)
                        if not self.gui_active:
                            logger.info(f"Applied : {name}")
                    except Exception as e:
                      exception = str(e)
                      if name in exception:
                        logger.debug(f"Previous loaded embed {name}")
                      else:
                        logger.error(exception)
                        logger.error(f"Can't apply embed {name}")
                self.embed_loaded = textual_inversion

            if not hasattr(self, "compel"):
                # Clip skip
                if clip_skip:
                    # clip_skip_diffusers = None #clip_skip - 1 # future update
                    self.compel = Compel(
                        tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2],
                        text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2],
                        returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NON_NORMALIZED,
                        requires_pooled=[False, True],
                        truncate_long_prompts=False,
                    )
                else:
                    # clip_skip_diffusers = None # clip_skip = None # future update
                    self.compel = Compel(
                        tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2],
                        text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2],
                        requires_pooled=[False, True],
                        truncate_long_prompts=False,
                    )

            # Prompt weights for textual inversion
            try:
                prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer)
                negative_prompt_ti = self.pipe.maybe_convert_prompt(negative_prompt, self.pipe.tokenizer)
            except:
                prompt_ti = prompt
                negative_prompt_ti = negative_prompt
                logger.error("FAILED: Convert prompt for textual inversion")

            # prompt syntax style a1...
            if syntax_weights == "Classic":
                self.pipe.to("cuda")
                prompt_ti = get_embed_new(prompt_ti, self.pipe, self.compel, only_convert_string=True)
                negative_prompt_ti = get_embed_new(negative_prompt_ti, self.pipe, self.compel, only_convert_string=True)
            else:
                prompt_ti = prompt
                negative_prompt_ti = negative_prompt

            conditioning, pooled = self.compel([prompt_ti, negative_prompt_ti])

            return conditioning, pooled



    def process_lora(self, select_lora, lora_weights_scale, unload=False):
        device = "cuda" if torch.cuda.is_available() else "cpu"
        if not unload:
            if select_lora != None:
                try:
                    self.pipe = lora_mix_load(
                        self.pipe,
                        select_lora,
                        lora_weights_scale,
                        device=device,
                        dtype=self.type_model_precision,
                    )
                    logger.info(select_lora)
                except Exception as e:
                    logger.error(f"ERROR: LoRA not compatible: {select_lora}")
                    logger.debug(f"{str(e)}")
            return self.pipe
        else:
            # Unload numerically unstable but fast and need less memory
            if select_lora != None:
                try:
                    self.pipe = lora_mix_load(
                        self.pipe,
                        select_lora,
                        -lora_weights_scale,
                        device=device,
                        dtype=self.type_model_precision,
                    )
                    logger.debug(f"Unload LoRA: {select_lora}")
                except:
                    pass
            return self.pipe

    def load_style_file(self, style_json_file):
        if os.path.exists(style_json_file):
            try:
                file_json_read = get_json_content(style_json_file)
                self.styles_data = {k["name"]: (k["prompt"], k["negative_prompt"]) for k in file_json_read}
                self.STYLE_NAMES = list(self.styles_data.keys())
                self.style_json_file = style_json_file
                logger.info(f"Styles json file loaded with {len(self.STYLE_NAMES)} styles")
                logger.debug(str(self.STYLE_NAMES))
            except Exception as e:
                logger.error(str(e))
        else:
            logger.error("Not found styles json file in directory")

    def callback_pipe(self, iter, t, latents):
        # convert latents to image
        with torch.no_grad():
            latents = 1 / 0.18215 * latents
            image = self.pipe.vae.decode(latents).sample

            image = (image / 2 + 0.5).clamp(0, 1)

            # we always cast to float32 as this does not cause significant overhead and is compatible with bfloa16
            image = image.cpu().permute(0, 2, 3, 1).float().numpy()

            # convert to PIL Images
            image = self.pipe.numpy_to_pil(image)

            # show one image
            # global preview_handle
            if self.preview_handle == None:
                self.preview_handle = display(image[0], display_id=True)
            else:
                self.preview_handle.update(image[0])

    def __call__(
        self,
        prompt: str = "",
        negative_prompt: str = "",
        img_height: int = 512,
        img_width: int = 512,
        num_images: int = 1,
        num_steps: int = 30,
        guidance_scale: float = 7.5,
        clip_skip: Optional[bool] = True,
        seed: int = -1,
        sampler: str = "DPM++ 2M",
        syntax_weights: str = "Classic",

        lora_A: Optional[str] = None,
        lora_scale_A: float = 1.0,
        lora_B: Optional[str] = None,
        lora_scale_B: float = 1.0,
        lora_C: Optional[str] = None,
        lora_scale_C: float = 1.0,
        lora_D: Optional[str] = None,
        lora_scale_D: float = 1.0,
        lora_E: Optional[str] = None,
        lora_scale_E: float = 1.0,
        textual_inversion: List[Tuple[str, str]] = [],
        FreeU: bool = False,
        adetailer_A: bool = False,
        adetailer_A_params: Dict[str, Any] = {},
        adetailer_B: bool = False,
        adetailer_B_params: Dict[str, Any] = {},
        style_prompt: Optional[Any] = [""],
        style_json_file: Optional[Any] = "",

        image: Optional[Any] = None,
        preprocessor_name: Optional[str] = "None",
        preprocess_resolution: int = 512,
        image_resolution: int = 512,
        image_mask: Optional[Any] = None,
        strength: float = 0.35,
        low_threshold: int = 100,
        high_threshold: int = 200,
        value_threshold: float = 0.1,
        distance_threshold: float = 0.1,
        controlnet_conditioning_scale: float = 1.0,
        control_guidance_start: float = 0.0,
        control_guidance_end: float = 1.0,
        t2i_adapter_preprocessor: bool = True,
        t2i_adapter_conditioning_scale: float = 1.0,
        t2i_adapter_conditioning_factor: float = 1.0,

        upscaler_model_path: Optional[str] = None, # add latent
        upscaler_increases_size: float = 1.5,
        esrgan_tile: int = 100,
        esrgan_tile_overlap: int = 10,
        hires_steps: int = 25,
        hires_denoising_strength: float = 0.35,
        hires_prompt: str = "",
        hires_negative_prompt: str = "",
        hires_sampler: str = "Use same sampler",

        loop_generation: int = 1,
        display_images: bool = False,
        save_generated_images: bool = True,
        image_storage_location: str = "./images",
        generator_in_cpu: bool = False,
        leave_progress_bar: bool = False,
        disable_progress_bar: bool = False,
        hires_before_adetailer: bool = False,
        hires_after_adetailer: bool = True,
        retain_compel_previous_load: bool = False,
        retain_detailfix_model_previous_load: bool = False,
        retain_hires_model_previous_load: bool = False,
        image_previews: bool = False,
        xformers_memory_efficient_attention: bool = False,
        gui_active: bool = False,
    ):

        """
        The call function for the generation.

        Args:
            prompt (str , optional):
                The prompt or prompts to guide image generation.
            negative_prompt (str , optional):
                The prompt or prompts to guide what to not include in image generation. Ignored when not using guidance (`guidance_scale < 1`).
            img_height (int, optional, defaults to 512):
                The height in pixels of the generated image.
            img_width (int, optional, defaults to 512):
                The width in pixels of the generated image.
            num_images (int, optional, defaults to 1):
                The number of images to generate per prompt.
            num_steps (int, optional, defaults to 30):
                The number of denoising steps. More denoising steps usually lead to a higher quality image at the
                expense of slower inference.
            guidance_scale (float, optional, defaults to 7.5):
                A higher guidance scale value encourages the model to generate images closely linked to the text
                `prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`.
            clip_skip (bool, optional):
                Number of layers to be skipped from CLIP while computing the prompt embeddings. It can be placed on
                the penultimate (True) or last layer (False).
            seed (int, optional, defaults to -1):
                A seed for controlling the randomness of the image generation process. -1 design a random seed.
            sampler (str, optional, defaults to "DPM++ 2M"):
                The sampler used for the generation process. Available samplers: DPM++ 2M, DPM++ 2M Karras, DPM++ 2M SDE,
                DPM++ 2M SDE Karras, DPM++ SDE, DPM++ SDE Karras, DPM2, DPM2 Karras, Euler, Euler a, Heun, LMS, LMS Karras,
                DDIM, DEIS, UniPC, DPM2 a, DPM2 a Karras, PNDM, LCM, DPM++ 2M Lu, DPM++ 2M Ef, DPM++ 2M SDE Lu and DPM++ 2M SDE Ef.
            syntax_weights (str, optional, defaults to "Classic"):
                Specifies the type of syntax weights used during generation. "Classic" is (word:weight), "Compel" is (word)weight
            lora_A (str, optional):
                Placeholder for lora A parameter.
            lora_scale_A (float, optional, defaults to 1.0):
                Placeholder for lora scale A parameter.
            lora_B (str, optional):
                Placeholder for lora B parameter.
            lora_scale_B (float, optional, defaults to 1.0):
                Placeholder for lora scale B parameter.
            lora_C (str, optional):
                Placeholder for lora C parameter.
            lora_scale_C (float, optional, defaults to 1.0):
                Placeholder for lora scale C parameter.
            lora_D (str, optional):
                Placeholder for lora D parameter.
            lora_scale_D (float, optional, defaults to 1.0):
                Placeholder for lora scale D parameter.
            lora_E (str, optional):
                Placeholder for lora E parameter.
            lora_scale_E (float, optional, defaults to 1.0):
                Placeholder for lora scale E parameter.
            textual_inversion (List[Tuple[str, str]], optional, defaults to []):
                Placeholder for textual inversion list of tuples. Help the model to adapt to a particular
                style. [("<token_activation>","<path_embeding>"),...]
            FreeU (bool, optional, defaults to False):
                Is a method that substantially improves diffusion model sample quality at no costs.
            adetailer_A (bool, optional, defaults to False):
                Guided Inpainting to Correct Image, it is preferable to use low values for strength.
            adetailer_A_params (Dict[str, Any], optional, defaults to {}):
                Placeholder for adetailer_A parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}.
                If not specified, default values will be used:
                - face_detector_ad (bool): Indicates whether face detection is enabled. Defaults to True.
                - person_detector_ad (bool): Indicates whether person detection is enabled. Defaults to True.
                - hand_detector_ad (bool): Indicates whether hand detection is enabled. Defaults to False.
                - prompt (str): A prompt for the adetailer_A. Defaults to an empty string.
                - negative_prompt (str): A negative prompt for the adetailer_A. Defaults to an empty string.
                - strength (float): The strength parameter value. Defaults to 0.35.
                - mask_dilation (int): The mask dilation value. Defaults to 4.
                - mask_blur (int): The mask blur value. Defaults to 4.
                - mask_padding (int): The mask padding value. Defaults to 32.
                - inpaint_only (bool): Indicates if only inpainting is to be performed. Defaults to True. False is img2img mode
                - sampler (str): The sampler type to be used. Defaults to "Use same sampler".
            adetailer_B (bool, optional, defaults to False):
                Guided Inpainting to Correct Image, it is preferable to use low values for strength.
            adetailer_B_params (Dict[str, Any], optional, defaults to {}):
                Placeholder for adetailer_B parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}.
                If not specified, default values will be used.
            style_prompt (str, optional):
                If a style that is in STYLE_NAMES is specified, it will be added to the original prompt and negative prompt.
            style_json_file (str, optional):
                JSON with styles to be applied and used in style_prompt.
            upscaler_model_path (str, optional):
                Placeholder for upscaler model path.
            upscaler_increases_size (float, optional, defaults to 1.5):
                Placeholder for upscaler increases size parameter.
            esrgan_tile (int, optional, defaults to 100):
                Tile if use a ESRGAN model.
            esrgan_tile_overlap (int, optional, defaults to 100):
                Tile overlap if use a ESRGAN model.
            hires_steps (int, optional, defaults to 25):
                The number of denoising steps for hires. More denoising steps usually lead to a higher quality image at the
                expense of slower inference.
            hires_denoising_strength (float, optional, defaults to 0.35):
                Strength parameter for the hires.
            hires_prompt (str , optional):
                The prompt for hires. If not specified, the main prompt will be used.
            hires_negative_prompt (str , optional):
                The negative prompt for hires. If not specified, the main negative prompt will be used.
            hires_sampler (str, optional, defaults to "Use same sampler"):
                The sampler used for the hires generation process. If not specified, the main sampler will be used.
            image (Any, optional):
                The image to be used for the Inpaint, ControlNet, or T2I adapter.
            preprocessor_name (str, optional, defaults to "None"):
                Preprocessor name for ControlNet.
            preprocess_resolution (int, optional, defaults to 512):
                Preprocess resolution for the Inpaint, ControlNet, or T2I adapter.
            image_resolution (int, optional, defaults to 512):
                Image resolution for the Img2Img, Inpaint, ControlNet, or T2I adapter.
            image_mask (Any, optional):
                Path image mask for the Inpaint.
            strength (float, optional, defaults to 0.35):
                Strength parameter for the Inpaint and Img2Img.
            low_threshold (int, optional, defaults to 100):
                Low threshold parameter for ControlNet and T2I Adapter Canny.
            high_threshold (int, optional, defaults to 200):
                High threshold parameter for ControlNet and T2I Adapter Canny.
            value_threshold (float, optional, defaults to 0.1):
                Value threshold parameter for ControlNet MLSD.
            distance_threshold (float, optional, defaults to 0.1):
                Distance threshold parameter for ControlNet MLSD.
            controlnet_conditioning_scale (float, optional, defaults to 1.0):
                The outputs of the ControlNet are multiplied by `controlnet_conditioning_scale` before they are added
                to the residual in the original `unet`. Used in ControlNet and Inpaint
            control_guidance_start (float, optional, defaults to 0.0):
                The percentage of total steps at which the ControlNet starts applying. Used in ControlNet and Inpaint
            control_guidance_end (float, optional, defaults to 1.0):
                The percentage of total steps at which the ControlNet stops applying. Used in ControlNet and Inpaint
            t2i_adapter_preprocessor (bool, optional, defaults to True):
                Preprocessor for the image in sdxl_canny by default is True.
            t2i_adapter_conditioning_scale (float, optional, defaults to 1.0):
                The outputs of the adapter are multiplied by `t2i_adapter_conditioning_scale` before they are added to the
                residual in the original unet.
            t2i_adapter_conditioning_factor (float, optional, defaults to 1.0):
                The fraction of timesteps for which adapter should be applied. If `t2i_adapter_conditioning_factor` is
                `0.0`, adapter is not applied at all. If `t2i_adapter_conditioning_factor` is `1.0`, adapter is applied for
                all timesteps. If `t2i_adapter_conditioning_factor` is `0.5`, adapter is applied for half of the timesteps.
            loop_generation (int, optional, defaults to 1):
                The number of times the specified `num_images` will be generated.
            display_images (bool, optional, defaults to False):
                If you use a notebook, you will be able to display the images generated with this parameter.
            save_generated_images (bool, optional, defaults to True):
                By default, the generated images are saved in the current location within the 'images' folder. You can disable this with this parameter.
            image_storage_location (str , optional, defaults to "./images"):
                The directory where the generated images are saved.
            generator_in_cpu (bool, optional, defaults to False):
                The generator by default is specified on the GPU. To obtain more consistent results across various environments,
                it is preferable to use the generator on the CPU.
            leave_progress_bar (bool, optional, defaults to False):
                Leave the progress bar after generating the image.
            disable_progress_bar (bool, optional, defaults to False):
                Do not display the progress bar during image generation.
            hires_before_adetailer (bool, optional, defaults to False):
                Apply an upscale and high-resolution fix before adetailer.
            hires_after_adetailer (bool, optional, defaults to True):
                Apply an upscale and high-resolution fix after adetailer.
            retain_compel_previous_load (bool, optional, defaults to False):
                The previous compel remains preloaded in memory.
            retain_detailfix_model_previous_load (bool, optional, defaults to False):
                The previous adetailer model remains preloaded in memory.
            retain_hires_model_previous_load (bool, optional, defaults to False):
                The previous hires model remains preloaded in memory.
            image_previews (bool, optional, defaults to False):
                Displaying the image denoising process.
            xformers_memory_efficient_attention (bool, optional, defaults to False):
                Improves generation time, currently disabled.
            gui_active (bool, optional, defaults to False):
                utility when used with a GUI, it changes the behavior especially by displaying confirmation messages or options.

        Specific parameter usage details:

            Additional parameters that will be used in Inpaint:
                - image
                - image_mask
                - image_resolution
                - strength
                for SD 1.5:
                  - controlnet_conditioning_scale
                  - control_guidance_start
                  - control_guidance_end

            Additional parameters that will be used in img2img:
                - image
                - image_resolution
                - strength

            Additional parameters that will be used in ControlNet for SD 1.5 depending on the task:
                - image
                - preprocessor_name
                - preprocess_resolution
                - image_resolution
                - controlnet_conditioning_scale
                - control_guidance_start
                - control_guidance_end
                for Canny:
                    - low_threshold
                    - high_threshold
                for MLSD:
                    - value_threshold
                    - distance_threshold

            Additional parameters that will be used in T2I adapter for SDXL depending on the task:
                - image
                - preprocess_resolution
                - image_resolution
                - t2i_adapter_preprocessor
                - t2i_adapter_conditioning_scale
                - t2i_adapter_conditioning_factor

        """

        if self.task_name != "txt2img" and image == None:
            raise ValueError(
                "You need to specify the <image> for this task."
            )
        if img_height % 8 != 0:
            img_height = img_height + (8 - img_height % 8)
            logger.warning(f"Height must be divisible by 8, changed to {str(img_height)}")
        if img_width % 8 != 0:
            img_width = img_width + (8 - img_width % 8)
            logger.warning(f"Width must be divisible by 8, changed to {str(img_width)}")
        if image_resolution % 8 != 0:
            image_resolution = image_resolution + (8 - image_resolution % 8)
            logger.warning(f"Image resolution must be divisible by 8, changed to {str(image_resolution)}")
        if control_guidance_start >= control_guidance_end:
            logger.error(
                "Control guidance start (ControlNet Start Threshold) cannot be larger or equal to control guidance end (ControlNet Stop Threshold). The default values 0.0 and 1.0 will be used."
            )
            control_guidance_start, control_guidance_end = 0.0, 1.0

        self.gui_active = gui_active
        self.image_previews = image_previews

        if self.pipe == None:
            self.load_pipe(
                self.base_model_id,
                task_name=self.task_name,
                vae_model=self.vae_model,
                reload=True,
            )

        self.pipe.set_progress_bar_config(leave=leave_progress_bar)
        self.pipe.set_progress_bar_config(disable=disable_progress_bar)

        xformers_memory_efficient_attention=False # disabled
        if xformers_memory_efficient_attention and torch.cuda.is_available():
            self.pipe.disable_xformers_memory_efficient_attention()
        self.pipe.to(self.device)

        # Load style prompt file
        if style_json_file != "" and style_json_file != self.style_json_file:
            self.load_style_file(style_json_file)
        # Set style
        if isinstance(style_prompt, str):
            style_prompt = [style_prompt]
        if style_prompt != [""]:
            prompt, negative_prompt = apply_style(style_prompt, prompt, negative_prompt, self.styles_data, self.STYLE_NAMES)

        # LoRA load
        if self.lora_memory == [
            lora_A,
            lora_B,
            lora_C,
            lora_D,
            lora_E,
        ] and self.lora_scale_memory == [
            lora_scale_A,
            lora_scale_B,
            lora_scale_C,
            lora_scale_D,
            lora_scale_E,
        ]:
            for single_lora in self.lora_memory:
                if single_lora != None:
                    logger.info(f"LoRA in memory: {single_lora}")
            pass

        else:
            logger.debug("_un, re and load_ lora")
            self.pipe = self.process_lora(
                self.lora_memory[0], self.lora_scale_memory[0], unload=True
            )
            self.pipe = self.process_lora(
                self.lora_memory[1], self.lora_scale_memory[1], unload=True
            )
            self.pipe = self.process_lora(
                self.lora_memory[2], self.lora_scale_memory[2], unload=True
            )
            self.pipe = self.process_lora(
                self.lora_memory[3], self.lora_scale_memory[3], unload=True
            )
            self.pipe = self.process_lora(
                self.lora_memory[4], self.lora_scale_memory[4], unload=True
            )

            self.pipe = self.process_lora(lora_A, lora_scale_A)
            self.pipe = self.process_lora(lora_B, lora_scale_B)
            self.pipe = self.process_lora(lora_C, lora_scale_C)
            self.pipe = self.process_lora(lora_D, lora_scale_D)
            self.pipe = self.process_lora(lora_E, lora_scale_E)

        self.lora_memory = [lora_A, lora_B, lora_C, lora_D, lora_E]
        self.lora_scale_memory = [
            lora_scale_A,
            lora_scale_B,
            lora_scale_C,
            lora_scale_D,
            lora_scale_E,
        ]

        # LCM config
        if sampler == "LCM" and self.LCMconfig == None:
            if self.class_name == "StableDiffusionPipeline":
                adapter_id = "latent-consistency/lcm-lora-sdv1-5"
            elif self.class_name == "StableDiffusionXLPipeline":
                adapter_id = "latent-consistency/lcm-lora-sdxl"

            self.process_lora(adapter_id, 1.0)
            self.LCMconfig = adapter_id
            logger.info("LCM")
        elif sampler != "LCM" and self.LCMconfig != None:
            self.process_lora(self.LCMconfig, 1.0, unload=True)
            self.LCMconfig = None
        elif self.LCMconfig != None:
            logger.info("LCM")

        # FreeU
        if FreeU:
            logger.info("FreeU active")
            if self.class_name == "StableDiffusionPipeline":
                # sd
                self.pipe.enable_freeu(s1=0.9, s2=0.2, b1=1.2, b2=1.4)
            else:
                # sdxl
                self.pipe.enable_freeu(s1=0.6, s2=0.4, b1=1.1, b2=1.2)
            self.FreeU = True
        elif self.FreeU:
            self.pipe.disable_freeu()
            self.FreeU = False

        # Prompt Optimizations
        if hasattr(self, "compel") and not retain_compel_previous_load:
          del self.compel

        prompt_emb, negative_prompt_emb = self.create_prompt_embeds(
            prompt=prompt,
            negative_prompt=negative_prompt,
            textual_inversion=textual_inversion,
            clip_skip=clip_skip,
            syntax_weights=syntax_weights,
        )

        if self.class_name != "StableDiffusionPipeline":
            # Additional prompt for SDXL
            conditioning, pooled = prompt_emb.clone(), negative_prompt_emb.clone()
            prompt_emb = negative_prompt_emb = None


        if torch.cuda.is_available() and xformers_memory_efficient_attention:
            if xformers_memory_efficient_attention:
                self.pipe.enable_xformers_memory_efficient_attention()
            else:
                self.pipe.disable_xformers_memory_efficient_attention()

        try:
            #self.pipe.scheduler = DPMSolverSinglestepScheduler() # fix default params by random scheduler, not recomn
            self.pipe.scheduler = self.get_scheduler(sampler)
        except Exception as e:
            logger.debug(f"{e}")
            logger.warning(f"Error in sampler, please try again")
            #self.pipe = None
            torch.cuda.empty_cache()
            gc.collect()
            return

        self.pipe.safety_checker = None

        # Get image Global
        if self.task_name != "txt2img":
            if isinstance(image, str):
                # If the input is a string (file path), open it as an image
                image_pil = Image.open(image)
                numpy_array = np.array(image_pil, dtype=np.uint8)
            elif isinstance(image, Image.Image):
                # If the input is already a PIL Image, convert it to a NumPy array
                numpy_array = np.array(image, dtype=np.uint8)
            elif isinstance(image, np.ndarray):
                # If the input is a NumPy array, np.uint8
                numpy_array = image.astype(np.uint8)
            else:
                if gui_active:
                    logger.info(
                        "Not found image"
                    )
                    return
                else:
                    raise ValueError(
                        "Unsupported image type or not control image found; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive"
                    )

            # Extract the RGB channels
            try:
                array_rgb = numpy_array[:, :, :3]
            except:
                logger.error("Unsupported image type")
                raise ValueError(
                    "Unsupported image type; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive"
                )  # return

        # Get params preprocess Global SD 1.5
        preprocess_params_config = {}
        if self.task_name not in ["txt2img", "inpaint", "img2img"]:
            preprocess_params_config["image"] = array_rgb
            preprocess_params_config["image_resolution"] = image_resolution

            if self.task_name != "ip2p":
                if self.task_name != "shuffle":
                    preprocess_params_config[
                        "preprocess_resolution"
                    ] = preprocess_resolution
                if self.task_name != "mlsd" and self.task_name != "canny":
                    preprocess_params_config["preprocessor_name"] = preprocessor_name

        # RUN Preprocess SD 1.5
        if self.task_name == "inpaint":
            # Get mask for Inpaint
            if gui_active or os.path.exists(str(image_mask)):
                # Read image mask from gui
                mask_control_img = Image.open(image_mask)
                numpy_array_mask = np.array(mask_control_img, dtype=np.uint8)
                array_rgb_mask = numpy_array_mask[:, :, :3]
            elif not gui_active:
                # Convert control image to draw
                import base64
                import matplotlib.pyplot as plt
                name_without_extension = os.path.splitext(image.split("/")[-1])[0]
                image64 = base64.b64encode(open(image, "rb").read())
                image64 = image64.decode("utf-8")
                img = np.array(plt.imread(f"{image}")[:, :, :3])

                # Create mask interactive
                logger.info(f"Draw the mask on this canvas using the mouse. When you finish, press 'Finish' in the bottom side of the canvas.")
                draw(
                    image64,
                    filename=f"./{name_without_extension}_draw.png",
                    w=img.shape[1],
                    h=img.shape[0],
                    line_width=0.04 * img.shape[1],
                )

                # Create mask and save
                with_mask = np.array(
                    plt.imread(f"./{name_without_extension}_draw.png")[:, :, :3]
                )
                mask = (
                    (with_mask[:, :, 0] == 1)
                    * (with_mask[:, :, 1] == 0)
                    * (with_mask[:, :, 2] == 0)
                )
                plt.imsave(f"./{name_without_extension}_mask.png", mask, cmap="gray")
                mask_control = f"./{name_without_extension}_mask.png"
                logger.info(f"Mask saved: {mask_control}")

                # Read image mask
                mask_control_img = Image.open(mask_control)
                numpy_array_mask = np.array(mask_control_img, dtype=np.uint8)
                array_rgb_mask = numpy_array_mask[:, :, :3]
            else:
                raise ValueError("No images found")

            init_image, control_mask, control_image = self.process_inpaint(
                image=array_rgb,
                image_resolution=image_resolution,
                preprocess_resolution=preprocess_resolution,  # Not used
                image_mask=array_rgb_mask,
            )

        elif self.task_name == "openpose":
            logger.info("Openpose")
            control_image = self.process_openpose(**preprocess_params_config)

        elif self.task_name == "canny":
            logger.info("Canny")
            control_image = self.process_canny(
                **preprocess_params_config,
                low_threshold=low_threshold,
                high_threshold=high_threshold,
            )

        elif self.task_name == "mlsd":
            logger.info("MLSD")
            control_image = self.process_mlsd(
                **preprocess_params_config,
                value_threshold=value_threshold,
                distance_threshold=distance_threshold,
            )

        elif self.task_name == "scribble":
            logger.info("Scribble")
            control_image = self.process_scribble(**preprocess_params_config)

        elif self.task_name == "softedge":
            logger.info("Softedge")
            control_image = self.process_softedge(**preprocess_params_config)

        elif self.task_name == "segmentation":
            logger.info("Segmentation")
            control_image = self.process_segmentation(**preprocess_params_config)

        elif self.task_name == "depth":
            logger.info("Depth")
            control_image = self.process_depth(**preprocess_params_config)

        elif self.task_name == "normalbae":
            logger.info("NormalBae")
            control_image = self.process_normal(**preprocess_params_config)

        elif self.task_name == "lineart":
            logger.info("Lineart")
            control_image = self.process_lineart(**preprocess_params_config)

        elif self.task_name == "shuffle":
            logger.info("Shuffle")
            control_image = self.process_shuffle(**preprocess_params_config)

        elif self.task_name == "ip2p":
            logger.info("Ip2p")
            control_image = self.process_ip2p(**preprocess_params_config)

        elif self.task_name == "img2img":
            preprocess_params_config["image"] = array_rgb
            preprocess_params_config["image_resolution"] = image_resolution
            init_image = self.process_img2img(**preprocess_params_config)

        # RUN Preprocess T2I for SDXL
        if self.class_name == "StableDiffusionXLPipeline":
            # Get params preprocess XL
            preprocess_params_config_xl = {}
            if self.task_name not in ["txt2img", "inpaint", "img2img"]:
                preprocess_params_config_xl["image"] = array_rgb
                preprocess_params_config_xl["preprocess_resolution"] = preprocess_resolution
                preprocess_params_config_xl["image_resolution"] = image_resolution
                # preprocess_params_config_xl["additional_prompt"] = additional_prompt # ""

            if self.task_name == "sdxl_canny": # preprocessor true default
                logger.info("SDXL Canny: Preprocessor active by default")
                control_image = self.process_canny(
                    **preprocess_params_config_xl,
                    low_threshold=low_threshold,
                    high_threshold=high_threshold,
                )
            elif self.task_name == "sdxl_openpose":
                logger.info("SDXL Openpose")
                control_image = self.process_openpose(
                    preprocessor_name = "Openpose" if t2i_adapter_preprocessor else "None",
                    **preprocess_params_config_xl,
                )
            elif self.task_name == "sdxl_sketch":
                logger.info("SDXL Scribble")
                control_image = self.process_scribble(
                    preprocessor_name = "PidiNet" if t2i_adapter_preprocessor else "None",
                    **preprocess_params_config_xl,
                )
            elif self.task_name == "sdxl_depth-midas":
                logger.info("SDXL Depth")
                control_image = self.process_depth(
                    preprocessor_name = "Midas" if t2i_adapter_preprocessor else "None",
                    **preprocess_params_config_xl,
                )
            elif self.task_name == "sdxl_lineart":
                logger.info("SDXL Lineart")
                control_image = self.process_lineart(
                    preprocessor_name = "Lineart" if t2i_adapter_preprocessor else "None",
                    **preprocess_params_config_xl,
                )

        # Get general params for TASK
        if self.class_name == "StableDiffusionPipeline":
            # Base params pipe sd
            pipe_params_config = {
                "prompt": None,  # prompt, 
                "negative_prompt": None,  # negative_prompt,
                "prompt_embeds": prompt_emb,
                "negative_prompt_embeds": negative_prompt_emb,
                "num_images": num_images,
                "num_steps": num_steps,
                "guidance_scale": guidance_scale,
                "clip_skip": None,  # clip_skip, because we use clip skip of compel
            }
        else:
            # Base params pipe sdxl
            pipe_params_config = {
                "prompt" : None,
                "negative_prompt" : None,
                "num_inference_steps" : num_steps,
                "guidance_scale" : guidance_scale,
                "clip_skip" : None,
                "num_images_per_prompt" : num_images,
            }

        # New params
        if self.class_name == "StableDiffusionXLPipeline":
            # pipe sdxl
            if self.task_name == "txt2img":
                pipe_params_config["height"] = img_height
                pipe_params_config["width"] = img_width
            elif self.task_name == "inpaint":
                pipe_params_config["strength"] = strength
                pipe_params_config["image"] = init_image
                pipe_params_config["mask_image"] = control_mask
                logger.info(f"Image resolution: {str(init_image.size)}")
            elif self.task_name not in ["txt2img", "inpaint", "img2img"]:
                pipe_params_config["image"] = control_image
                pipe_params_config["adapter_conditioning_scale"] = t2i_adapter_conditioning_scale
                pipe_params_config["adapter_conditioning_factor"] = t2i_adapter_conditioning_factor
                logger.info(f"Image resolution: {str(control_image.size)}")
            elif self.task_name == "img2img":
                pipe_params_config["strength"] = strength
                pipe_params_config["image"] = init_image
                logger.info(f"Image resolution: {str(init_image.size)}")
        elif self.task_name == "txt2img":
            pipe_params_config["height"] = img_height
            pipe_params_config["width"] = img_width
        elif self.task_name == "inpaint":
            pipe_params_config["strength"] = strength
            pipe_params_config["init_image"] = init_image
            pipe_params_config["control_mask"] = control_mask
            pipe_params_config["control_image"] = control_image
            pipe_params_config[
                "controlnet_conditioning_scale"
            ] = controlnet_conditioning_scale
            pipe_params_config["control_guidance_start"] = control_guidance_start
            pipe_params_config["control_guidance_end"] = control_guidance_end
            logger.info(f"Image resolution: {str(init_image.size)}")
        elif self.task_name not in ["txt2img", "inpaint", "img2img"]:
            pipe_params_config["control_image"] = control_image
            pipe_params_config[
                "controlnet_conditioning_scale"
            ] = controlnet_conditioning_scale
            pipe_params_config["control_guidance_start"] = control_guidance_start
            pipe_params_config["control_guidance_end"] = control_guidance_end
            logger.info(f"Image resolution: {str(control_image.size)}")
        elif self.task_name == "img2img":
            pipe_params_config["strength"] = strength
            pipe_params_config["init_image"] = init_image
            logger.info(f"Image resolution: {str(init_image.size)}")

        # detailfix params and pipe global
        if adetailer_A or adetailer_B:

            # global params detailfix
            default_params_detailfix = {
                "face_detector_ad" : True,
                "person_detector_ad" : True,
                "hand_detector_ad" : False,
                "prompt": "",
                "negative_prompt" : "",
                "strength" : 0.35,
                "mask_dilation" : 4,
                "mask_blur" : 4,
                "mask_padding" : 32,
                #"sampler" : "Use same sampler",
                #"inpaint_only" : True,
            }

            # Pipe detailfix_pipe
            if not hasattr(self, "detailfix_pipe") or not retain_detailfix_model_previous_load:
                if  adetailer_A_params.get("inpaint_only", False) == True or adetailer_B_params.get("inpaint_only", False) == True:
                    detailfix_pipe = custom_task_model_loader(
                        pipe=self.pipe,
                        model_category="detailfix",
                        task_name=self.task_name,
                        torch_dtype=self.type_model_precision
                    )
                else:
                    detailfix_pipe = custom_task_model_loader(
                        pipe=self.pipe,
                        model_category="detailfix_img2img",
                        task_name=self.task_name,
                        torch_dtype=self.type_model_precision
                    )
                if hasattr(self, "detailfix_pipe"):
                    del self.detailfix_pipe
            if retain_detailfix_model_previous_load:
                if hasattr(self, "detailfix_pipe"):
                    detailfix_pipe = self.detailfix_pipe
                else:
                    self.detailfix_pipe = detailfix_pipe
            adetailer_A_params.pop("inpaint_only", None)
            adetailer_B_params.pop("inpaint_only", None)

            # Define base scheduler detailfix
            detailfix_pipe.default_scheduler = copy.deepcopy(self.default_scheduler)
            if  adetailer_A_params.get("sampler", "Use same sampler") != "Use same sampler":
                logger.debug("detailfix_pipe will use the sampler from adetailer_A")
                detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"])
            adetailer_A_params.pop("sampler", None)
            if adetailer_B_params.get("sampler", "Use same sampler") != "Use same sampler":
                logger.debug("detailfix_pipe will use the sampler from adetailer_B")
                detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"])
            adetailer_B_params.pop("sampler", None)

            detailfix_pipe.set_progress_bar_config(leave=leave_progress_bar)
            detailfix_pipe.set_progress_bar_config(disable=disable_progress_bar)
            detailfix_pipe.to(self.device)
            torch.cuda.empty_cache()
            gc.collect()

        if adetailer_A:
            for key_param, default_value in default_params_detailfix.items():
                if key_param not in adetailer_A_params:
                    adetailer_A_params[key_param] = default_value
                elif type(default_value) != type(adetailer_A_params[key_param]):
                    logger.warning(f"DetailFix A: Error type param, set default {str(key_param)}")
                    adetailer_A_params[key_param] = default_value

            detailfix_params_A = {
                "prompt": adetailer_A_params["prompt"],
                "negative_prompt" : adetailer_A_params["negative_prompt"],
                "strength" : adetailer_A_params["strength"],
                "num_inference_steps" : num_steps,
                "guidance_scale" : guidance_scale,
            }

            # clear params yolo
            adetailer_A_params.pop('strength', None)
            adetailer_A_params.pop('prompt', None)
            adetailer_A_params.pop('negative_prompt', None)

            # Verify prompt detailfix_params_A and get valid
            prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A, prompt_df_A, negative_prompt_df_A = process_prompts_valid(
                detailfix_params_A["prompt"], detailfix_params_A["negative_prompt"], prompt, negative_prompt
            )

            # Params detailfix
            if self.class_name == "StableDiffusionPipeline":
                # SD detailfix
                # detailfix_params_A["controlnet_conditioning_scale"] = controlnet_conditioning_scale
                # detailfix_params_A["control_guidance_start"] = control_guidance_start
                # detailfix_params_A["control_guidance_end"] = control_guidance_end

                if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A:
                    detailfix_params_A["prompt_embeds"] = prompt_emb
                    detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb
                else:
                    prompt_emb_ad, negative_prompt_emb_ad = self.create_prompt_embeds(
                        prompt=prompt_df_A,
                        negative_prompt=negative_prompt_df_A,
                        textual_inversion=textual_inversion,
                        clip_skip=clip_skip,
                        syntax_weights=syntax_weights,
                    )
                    detailfix_params_A["prompt_embeds"] = prompt_emb_ad
                    detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb_ad

                detailfix_params_A["prompt"] = None
                detailfix_params_A["negative_prompt"] = None

            else:
                # SDXL detailfix
                if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A:
                    conditioning_detailfix_A, pooled_detailfix_A = conditioning, pooled
                else:
                    conditioning_detailfix_A, pooled_detailfix_A = self.create_prompt_embeds(
                        prompt=prompt_df_A,
                        negative_prompt=negative_prompt_df_A,
                        textual_inversion=textual_inversion,
                        clip_skip=clip_skip,
                        syntax_weights=syntax_weights,
                    )

                detailfix_params_A.pop('prompt', None)
                detailfix_params_A.pop('negative_prompt', None)
                detailfix_params_A["prompt_embeds"] = conditioning_detailfix_A[0:1]
                detailfix_params_A["pooled_prompt_embeds"] = pooled_detailfix_A[0:1]
                detailfix_params_A["negative_prompt_embeds"] = conditioning_detailfix_A[1:2]
                detailfix_params_A["negative_pooled_prompt_embeds"] = pooled_detailfix_A[1:2]

            logger.debug(f"detailfix A prompt empty {prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A}")
            if not prompt_empty_detailfix_A or not negative_prompt_empty_detailfix_A:
                logger.debug(f"Prompts detailfix A {prompt_df_A, negative_prompt_df_A}")
            logger.debug(f"Pipe params detailfix A \n{detailfix_params_A}")
            logger.debug(f"Params detailfix A \n{adetailer_A_params}")

        if adetailer_B:
            for key_param, default_value in default_params_detailfix.items():
                if key_param not in adetailer_B_params:
                    adetailer_B_params[key_param] = default_value
                elif type(default_value) != type(adetailer_B_params[key_param]):
                    logger.warning(f"DetailfFix B: Error type param, set default {str(key_param)}")
                    adetailer_B_params[key_param] = default_value

            detailfix_params_B = {
                "prompt": adetailer_B_params["prompt"],
                "negative_prompt" : adetailer_B_params["negative_prompt"],
                "strength" : adetailer_B_params["strength"],
                "num_inference_steps" : num_steps,
                "guidance_scale" : guidance_scale,
            }

            # clear params yolo
            adetailer_B_params.pop('strength', None)
            adetailer_B_params.pop('prompt', None)
            adetailer_B_params.pop('negative_prompt', None)

            # Verify prompt detailfix_params_B and get valid
            prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B, prompt_df_B, negative_prompt_df_B = process_prompts_valid(
                detailfix_params_B["prompt"], detailfix_params_B["negative_prompt"], prompt, negative_prompt
            )

            # Params detailfix
            if self.class_name == "StableDiffusionPipeline":
                # SD detailfix
                # detailfix_params_B["controlnet_conditioning_scale"] = controlnet_conditioning_scale
                # detailfix_params_B["control_guidance_start"] = control_guidance_start
                # detailfix_params_B["control_guidance_end"] = control_guidance_end

                if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B:
                    detailfix_params_B["prompt_embeds"] = prompt_emb
                    detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb
                else:
                    prompt_emb_ad_b, negative_prompt_emb_ad_b = self.create_prompt_embeds(
                        prompt=prompt_df_B,
                        negative_prompt=negative_prompt_df_B,
                        textual_inversion=textual_inversion,
                        clip_skip=clip_skip,
                        syntax_weights=syntax_weights,
                    )
                    detailfix_params_B["prompt_embeds"] = prompt_emb_ad_b
                    detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb_ad_b
                detailfix_params_B["prompt"] = None
                detailfix_params_B["negative_prompt"] = None
            else:
                # SDXL detailfix
                if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B:
                    conditioning_detailfix_B, pooled_detailfix_B = conditioning, pooled
                else:
                    conditioning_detailfix_B, pooled_detailfix_B = self.create_prompt_embeds(
                        prompt=prompt_df_B,
                        negative_prompt=negative_prompt_df_B,
                        textual_inversion=textual_inversion,
                        clip_skip=clip_skip,
                        syntax_weights=syntax_weights,
                    )
                detailfix_params_B.pop('prompt', None)
                detailfix_params_B.pop('negative_prompt', None)
                detailfix_params_B["prompt_embeds"] = conditioning_detailfix_B[0:1]
                detailfix_params_B["pooled_prompt_embeds"] = pooled_detailfix_B[0:1]
                detailfix_params_B["negative_prompt_embeds"] = conditioning_detailfix_B[1:2]
                detailfix_params_B["negative_pooled_prompt_embeds"] = pooled_detailfix_B[1:2]

            logger.debug(f"detailfix B prompt empty {prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B}")
            if not prompt_empty_detailfix_B or not negative_prompt_empty_detailfix_B:
                logger.debug(f"Prompts detailfix B {prompt_df_B, negative_prompt_df_B}")
            logger.debug(f"Pipe params detailfix B \n{detailfix_params_B}")
            logger.debug(f"Params detailfix B \n{adetailer_B_params}")

        if hires_steps > 1 and upscaler_model_path != None:
            # Hires params BASE
            hires_params_config = {
                "prompt" : None,
                "negative_prompt" : None,
                "num_inference_steps" : hires_steps,
                "guidance_scale" : guidance_scale,
                "clip_skip" : None,
                "strength" : hires_denoising_strength,
            }
            if self.class_name == "StableDiffusionPipeline":
                hires_params_config["eta"] = 1.0

            # Verify prompt hires and get valid
            hires_prompt_empty, hires_negative_prompt_empty, prompt_hires_valid, negative_prompt_hires_valid = process_prompts_valid(
                hires_prompt, hires_negative_prompt, prompt, negative_prompt
            )

            # Hires embed params
            if self.class_name == "StableDiffusionPipeline":
                if hires_prompt_empty and hires_negative_prompt_empty:
                    hires_params_config["prompt_embeds"] = prompt_emb
                    hires_params_config["negative_prompt_embeds"] = negative_prompt_emb
                else:
                    prompt_emb_hires, negative_prompt_emb_hires = self.create_prompt_embeds(
                        prompt=prompt_hires_valid,
                        negative_prompt=negative_prompt_hires_valid,
                        textual_inversion=textual_inversion,
                        clip_skip=clip_skip,
                        syntax_weights=syntax_weights,
                    )

                    hires_params_config["prompt_embeds"] = prompt_emb_hires
                    hires_params_config["negative_prompt_embeds"] = negative_prompt_emb_hires
            else:
                if hires_prompt_empty and hires_negative_prompt_empty:
                    hires_conditioning, hires_pooled = conditioning, pooled
                else:
                    hires_conditioning, hires_pooled = self.create_prompt_embeds(
                        prompt=prompt_hires_valid,
                        negative_prompt=negative_prompt_hires_valid,
                        textual_inversion=textual_inversion,
                        clip_skip=clip_skip,
                        syntax_weights=syntax_weights,
                    )

                hires_params_config.pop('prompt', None)
                hires_params_config.pop('negative_prompt', None)
                hires_params_config["prompt_embeds"] = hires_conditioning[0:1]
                hires_params_config["pooled_prompt_embeds"] = hires_pooled[0:1]
                hires_params_config["negative_prompt_embeds"] = hires_conditioning[1:2]
                hires_params_config["negative_pooled_prompt_embeds"] = hires_pooled[1:2]

            # Hires pipe
            if not hasattr(self, "hires_pipe") or not retain_hires_model_previous_load:
                hires_pipe = custom_task_model_loader(
                    pipe=self.pipe,
                    model_category="hires",
                    task_name=self.task_name,
                    torch_dtype=self.type_model_precision
                )
                if hasattr(self, "hires_pipe"):
                    del self.hires_pipe
            if retain_hires_model_previous_load:
                if hasattr(self, "hires_pipe"):
                    hires_pipe = self.hires_pipe
                else:
                    self.hires_pipe = hires_pipe

            # Hires scheduler
            if  hires_sampler != "Use same sampler":
                logger.debug("New hires sampler")
                hires_pipe.scheduler = self.get_scheduler(hires_sampler)

            hires_pipe.set_progress_bar_config(leave=leave_progress_bar)
            hires_pipe.set_progress_bar_config(disable=disable_progress_bar)
            hires_pipe.to(self.device)
            torch.cuda.empty_cache()
            gc.collect()
        else:
            hires_params_config = {}
            hires_pipe = None

        # Debug info
        try:
            logger.debug(f"INFO PIPE: {self.pipe.__class__.__name__}")
            logger.debug(f"text_encoder_type: {self.pipe.text_encoder.dtype}")
            logger.debug(f"unet_type: {self.pipe.unet.dtype}")
            logger.debug(f"vae_type: {self.pipe.vae.dtype}")
            logger.debug(f"pipe_type: {self.pipe.dtype}")
            logger.debug(f"scheduler_main_pipe: {self.pipe.scheduler}")
            if adetailer_A or adetailer_B:
                logger.debug(f"scheduler_detailfix: {detailfix_pipe.scheduler}")
            if hires_steps > 1 and upscaler_model_path != None:
                logger.debug(f"scheduler_hires: {hires_pipe.scheduler}")
        except Exception as e:
            logger.debug(f"{str(e)}")

        # === RUN PIPE === #
        for i in range(loop_generation):

            # number seed
            if seed == -1:
                seeds = [random.randint(0, 2147483647) for _ in range(num_images)]
            else:
                if num_images == 1:
                    seeds = [seed]
                else:
                    seeds = [seed] + [random.randint(0, 2147483647) for _ in range(num_images-1)]
                    
            # generators 
            generators = []  # List to store all the generators
            for calculate_seed in seeds:
                if generator_in_cpu or self.device.type == "cpu":
                    generator = torch.Generator().manual_seed(calculate_seed)
                else:
                    try:
                        generator = torch.Generator("cuda").manual_seed(calculate_seed)
                    except:
                        logger.warning("Generator in CPU")
                        generator = torch.Generator().manual_seed(calculate_seed)

                generators.append(generator)

            # fix img2img bug need concat tensor prompts with generator same number (only in batch inference)
            pipe_params_config["generator"] = generators if self.task_name != "img2img" else generators[0] # no list
            seeds = seeds if self.task_name != "img2img" else [seeds[0]] * num_images

            try:
                if self.class_name == "StableDiffusionXLPipeline":
                    # sdxl pipe
                    images = self.pipe(
                        prompt_embeds=conditioning[0:1],
                        pooled_prompt_embeds=pooled[0:1],
                        negative_prompt_embeds=conditioning[1:2],
                        negative_pooled_prompt_embeds=pooled[1:2],
                        #generator=pipe_params_config["generator"],
                        **pipe_params_config,
                    ).images
                    if self.task_name not in ["txt2img", "inpaint", "img2img"]:
                        images = [control_image] + images
                elif self.task_name == "txt2img":
                    images = self.run_pipe_SD(**pipe_params_config)
                elif self.task_name == "inpaint":
                    images = self.run_pipe_inpaint(**pipe_params_config)
                elif self.task_name not in ["txt2img", "inpaint", "img2img"]:
                    results = self.run_pipe(
                        **pipe_params_config
                    )  ## pipe ControlNet add condition_weights
                    images = [control_image] + results
                    del results
                elif self.task_name == "img2img":
                    images = self.run_pipe_img2img(**pipe_params_config)
            except Exception as e:
                e = str(e)
                if "Tensor with 2 elements cannot be converted to Scalar" in e:
                    logger.debug(e)
                    logger.error("Error in sampler; trying with DDIM sampler")
                    self.pipe.scheduler = self.default_scheduler
                    self.pipe.scheduler = DDIMScheduler.from_config(self.pipe.scheduler.config)
                    if self.class_name == "StableDiffusionXLPipeline":
                        # sdxl pipe
                        images = self.pipe(
                            prompt_embeds=conditioning[0:1],
                            pooled_prompt_embeds=pooled[0:1],
                            negative_prompt_embeds=conditioning[1:2],
                            negative_pooled_prompt_embeds=pooled[1:2],
                            #generator=pipe_params_config["generator"],
                            **pipe_params_config,
                        ).images
                        if self.task_name not in ["txt2img", "inpaint", "img2img"]:
                            images = [control_image] + images
                    elif self.task_name == "txt2img":
                        images = self.run_pipe_SD(**pipe_params_config)
                    elif self.task_name == "inpaint":
                        images = self.run_pipe_inpaint(**pipe_params_config)
                    elif self.task_name not in ["txt2img", "inpaint", "img2img"]:
                        results = self.run_pipe(
                            **pipe_params_config
                        )  ## pipe ControlNet add condition_weights
                        images = [control_image] + results
                        del results
                    elif self.task_name == "img2img":
                        images = self.run_pipe_img2img(**pipe_params_config)
                elif "The size of tensor a (0) must match the size of tensor b (3) at non-singleton" in e:
                    raise ValueError(f"steps / strength too low for the model to produce a satisfactory response")
                else:
                    raise ValueError(e)
                    
            torch.cuda.empty_cache()
            gc.collect()

            if hires_before_adetailer and upscaler_model_path != None:
                logger.debug(f"Hires before; same seed for each image (no batch)")
                images = process_images_high_resolution(
                    images,
                    upscaler_model_path,
                    upscaler_increases_size,
                    esrgan_tile, esrgan_tile_overlap,
                    hires_steps, hires_params_config,
                    self.task_name,
                    generators[0], #pipe_params_config["generator"][0], # no generator
                    hires_pipe,
                )

            # Adetailer stuff
            if adetailer_A or adetailer_B:
                # image_pil_list = []
                # for img_single in images:
                # image_ad = img_single.convert("RGB")
                # image_pil_list.append(image_ad)
                if self.task_name not in ["txt2img", "inpaint", "img2img"]:
                    images = images[1:]

                if adetailer_A:
                    images = ad_model_process(
                        pipe_params_df=detailfix_params_A,
                        detailfix_pipe=detailfix_pipe,
                        image_list_task=images,
                        **adetailer_A_params,
                    )
                if adetailer_B:
                    images = ad_model_process(
                        pipe_params_df=detailfix_params_B,
                        detailfix_pipe=detailfix_pipe,
                        image_list_task=images,
                        **adetailer_B_params,
                    )

                if self.task_name not in ["txt2img", "inpaint", "img2img"]:
                    images = [control_image] + images
                # del detailfix_pipe
                torch.cuda.empty_cache()
                gc.collect()

            if hires_after_adetailer and upscaler_model_path != None:
                logger.debug(f"Hires after; same seed for each image (no batch)")
                images = process_images_high_resolution(
                    images,
                    upscaler_model_path,
                    upscaler_increases_size,
                    esrgan_tile, esrgan_tile_overlap,
                    hires_steps, hires_params_config,
                    self.task_name,
                    generators[0], #pipe_params_config["generator"][0], # no generator
                    hires_pipe,
                )

            logger.info(f"Seeds: {seeds}")

            # Show images if loop
            if display_images:
                mediapy.show_images(images)
                # logger.info(image_list)
                # del images
                if loop_generation > 1:
                    time.sleep(0.5)

            # List images and save
            image_list = []
            metadata = [
                prompt,
                negative_prompt,
                self.base_model_id,
                self.vae_model,
                num_steps,
                guidance_scale,
                sampler,
                0000000000, #calculate_seed,
                img_width,
                img_height,
                clip_skip,
            ]

            valid_seeds = [0] + seeds if self.task_name not in ["txt2img", "inpaint", "img2img"] else seeds
            for image_, seed_ in zip(images, valid_seeds):
                image_path = "not saved in storage"
                if save_generated_images:
                    metadata[7] = seed_
                    image_path = save_pil_image_with_metadata(image_, image_storage_location, metadata)
                image_list.append(image_path)

            torch.cuda.empty_cache()
            gc.collect()

            if image_list[0] != "not saved in storage":
                logger.info(image_list)

        if hasattr(self, "compel") and not retain_compel_previous_load:
          del self.compel
        torch.cuda.empty_cache()
        gc.collect()

        return images, image_list