|
import torch |
|
from . import a1111_compat |
|
import comfy |
|
from .libs import common |
|
from comfy import model_management |
|
from comfy.samplers import CFGGuider |
|
from comfy_extras.nodes_perpneg import Guider_PerpNeg |
|
import math |
|
|
|
class KSampler_progress(a1111_compat.KSampler_inspire): |
|
@classmethod |
|
def INPUT_TYPES(s): |
|
return {"required": { |
|
"model": ("MODEL",), |
|
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}), |
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}), |
|
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), |
|
"scheduler": (common.SCHEDULERS, ), |
|
"positive": ("CONDITIONING", ), |
|
"negative": ("CONDITIONING", ), |
|
"latent_image": ("LATENT", ), |
|
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), |
|
"noise_mode": (["GPU(=A1111)", "CPU"],), |
|
"interval": ("INT", {"default": 1, "min": 1, "max": 10000}), |
|
"omit_start_latent": ("BOOLEAN", {"default": True, "label_on": "True", "label_off": "False"}), |
|
}, |
|
"optional": { |
|
"scheduler_func_opt": ("SCHEDULER_FUNC",), |
|
} |
|
} |
|
|
|
CATEGORY = "InspirePack/analysis" |
|
|
|
RETURN_TYPES = ("LATENT", "LATENT") |
|
RETURN_NAMES = ("latent", "progress_latent") |
|
|
|
@staticmethod |
|
def doit(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise, noise_mode, interval, omit_start_latent, scheduler_func_opt=None): |
|
adv_steps = int(steps / denoise) |
|
|
|
if omit_start_latent: |
|
result = [] |
|
else: |
|
result = [latent_image['samples']] |
|
|
|
result = [] |
|
|
|
def progress_callback(step, x0, x, total_steps): |
|
if (total_steps-1) != step and step % interval != 0: |
|
return |
|
|
|
x = model.model.process_latent_out(x) |
|
x = x.to(model_management.intermediate_device()) |
|
result.append(x) |
|
|
|
latent_image, noise = a1111_compat.KSamplerAdvanced_inspire.sample(model, True, seed, adv_steps, cfg, sampler_name, scheduler, positive, negative, latent_image, (adv_steps-steps), |
|
adv_steps, noise_mode, False, callback=progress_callback, scheduler_func_opt=scheduler_func_opt) |
|
|
|
if len(result) > 0: |
|
result = torch.cat(result) |
|
result = {'samples': result} |
|
else: |
|
result = latent_image |
|
|
|
return latent_image, result |
|
|
|
|
|
class KSamplerAdvanced_progress(a1111_compat.KSamplerAdvanced_inspire): |
|
@classmethod |
|
def INPUT_TYPES(s): |
|
return {"required": { |
|
"model": ("MODEL",), |
|
"add_noise": ("BOOLEAN", {"default": True, "label_on": "enable", "label_off": "disable"}), |
|
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}), |
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.5, "round": 0.01}), |
|
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), |
|
"scheduler": (common.SCHEDULERS, ), |
|
"positive": ("CONDITIONING", ), |
|
"negative": ("CONDITIONING", ), |
|
"latent_image": ("LATENT", ), |
|
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}), |
|
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}), |
|
"noise_mode": (["GPU(=A1111)", "CPU"],), |
|
"return_with_leftover_noise": ("BOOLEAN", {"default": False, "label_on": "enable", "label_off": "disable"}), |
|
"interval": ("INT", {"default": 1, "min": 1, "max": 10000}), |
|
"omit_start_latent": ("BOOLEAN", {"default": False, "label_on": "True", "label_off": "False"}), |
|
}, |
|
"optional": { |
|
"prev_progress_latent_opt": ("LATENT",), |
|
"scheduler_func_opt": ("SCHEDULER_FUNC",), |
|
} |
|
} |
|
|
|
FUNCTION = "doit" |
|
|
|
CATEGORY = "InspirePack/analysis" |
|
|
|
RETURN_TYPES = ("LATENT", "LATENT") |
|
RETURN_NAMES = ("latent", "progress_latent") |
|
|
|
def doit(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, |
|
noise_mode, return_with_leftover_noise, interval, omit_start_latent, prev_progress_latent_opt=None, scheduler_func_opt=None): |
|
if omit_start_latent: |
|
result = [] |
|
else: |
|
result = [latent_image['samples']] |
|
|
|
result = [] |
|
|
|
def progress_callback(step, x0, x, total_steps): |
|
if (total_steps-1) != step and step % interval != 0: |
|
return |
|
|
|
x = model.model.process_latent_out(x) |
|
x = x.to(model_management.intermediate_device()) |
|
result.append(x) |
|
|
|
latent_image, noise = a1111_compat.KSamplerAdvanced_inspire.sample(model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, |
|
noise_mode, False, callback=progress_callback, scheduler_func_opt=scheduler_func_opt) |
|
|
|
if len(result) > 0: |
|
result = torch.cat(result) |
|
result = {'samples': result} |
|
else: |
|
result = latent_image |
|
|
|
if prev_progress_latent_opt is not None: |
|
result['samples'] = torch.cat((prev_progress_latent_opt['samples'], result['samples']), dim=0) |
|
|
|
return latent_image, result |
|
|
|
|
|
def exponential_interpolation(from_cfg, to_cfg, i, steps): |
|
if i == steps-1: |
|
return to_cfg |
|
|
|
if from_cfg == to_cfg: |
|
return from_cfg |
|
|
|
if from_cfg == 0: |
|
return to_cfg * (1 - math.exp(-5 * i / steps)) / (1 - math.exp(-5)) |
|
elif to_cfg == 0: |
|
return from_cfg * (math.exp(-5 * i / steps) - math.exp(-5)) / (1 - math.exp(-5)) |
|
else: |
|
log_from = math.log(from_cfg) |
|
log_to = math.log(to_cfg) |
|
log_value = log_from + (log_to - log_from) * i / steps |
|
return math.exp(log_value) |
|
|
|
|
|
def logarithmic_interpolation(from_cfg, to_cfg, i, steps): |
|
if i == 0: |
|
return from_cfg |
|
|
|
if i == steps-1: |
|
return to_cfg |
|
|
|
log_i = math.log(i + 1) |
|
log_steps = math.log(steps + 1) |
|
|
|
t = log_i / log_steps |
|
|
|
return from_cfg + (to_cfg - from_cfg) * t |
|
|
|
|
|
class Guider_scheduled(CFGGuider): |
|
def __init__(self, model_patcher, sigmas, from_cfg, to_cfg, schedule): |
|
super().__init__(model_patcher) |
|
self.default_cfg = self.cfg |
|
self.sigmas = sigmas |
|
self.cfg_sigmas = None |
|
self.cfg_sigmas_i = None |
|
self.from_cfg = from_cfg |
|
self.to_cfg = to_cfg |
|
self.schedule = schedule |
|
self.last_i = 0 |
|
self.renew_cfg_sigmas() |
|
|
|
def set_cfg(self, cfg): |
|
self.default_cfg = cfg |
|
self.renew_cfg_sigmas() |
|
|
|
def renew_cfg_sigmas(self): |
|
self.cfg_sigmas = {} |
|
self.cfg_sigmas_i = {} |
|
i = 0 |
|
steps = len(self.sigmas) - 1 |
|
for x in self.sigmas: |
|
k = float(x) |
|
delta = self.to_cfg - self.from_cfg |
|
if self.schedule == 'exp': |
|
self.cfg_sigmas[k] = exponential_interpolation(self.from_cfg, self.to_cfg, i, steps), i |
|
elif self.schedule == 'log': |
|
self.cfg_sigmas[k] = logarithmic_interpolation(self.from_cfg, self.to_cfg, i, steps), i |
|
else: |
|
self.cfg_sigmas[k] = self.from_cfg + delta * i / steps, i |
|
|
|
self.cfg_sigmas_i[i] = self.cfg_sigmas[k] |
|
i += 1 |
|
|
|
def predict_noise(self, x, timestep, model_options={}, seed=None): |
|
k = float(timestep[0]) |
|
|
|
v = self.cfg_sigmas.get(k) |
|
if v is None: |
|
|
|
v = self.cfg_sigmas_i[self.last_i+1] |
|
self.cfg_sigmas[k] = v |
|
|
|
self.last_i = v[1] |
|
self.cfg = v[0] |
|
|
|
return super().predict_noise(x, timestep, model_options, seed) |
|
|
|
|
|
class Guider_PerpNeg_scheduled(Guider_PerpNeg): |
|
def __init__(self, model_patcher, sigmas, from_cfg, to_cfg, schedule, neg_scale): |
|
super().__init__(model_patcher) |
|
self.default_cfg = self.cfg |
|
self.sigmas = sigmas |
|
self.cfg_sigmas = None |
|
self.cfg_sigmas_i = None |
|
self.from_cfg = from_cfg |
|
self.to_cfg = to_cfg |
|
self.schedule = schedule |
|
self.neg_scale = neg_scale |
|
self.last_i = 0 |
|
self.renew_cfg_sigmas() |
|
|
|
def set_cfg(self, cfg): |
|
self.default_cfg = cfg |
|
self.renew_cfg_sigmas() |
|
|
|
def renew_cfg_sigmas(self): |
|
self.cfg_sigmas = {} |
|
self.cfg_sigmas_i = {} |
|
i = 0 |
|
steps = len(self.sigmas) - 1 |
|
for x in self.sigmas: |
|
k = float(x) |
|
delta = self.to_cfg - self.from_cfg |
|
if self.schedule == 'exp': |
|
self.cfg_sigmas[k] = exponential_interpolation(self.from_cfg, self.to_cfg, i, steps), i |
|
elif self.schedule == 'log': |
|
self.cfg_sigmas[k] = logarithmic_interpolation(self.from_cfg, self.to_cfg, i, steps), i |
|
else: |
|
self.cfg_sigmas[k] = self.from_cfg + delta * i / steps, i |
|
|
|
self.cfg_sigmas_i[i] = self.cfg_sigmas[k] |
|
i += 1 |
|
|
|
def predict_noise(self, x, timestep, model_options={}, seed=None): |
|
k = float(timestep[0]) |
|
|
|
v = self.cfg_sigmas.get(k) |
|
if v is None: |
|
|
|
v = self.cfg_sigmas_i[self.last_i+1] |
|
self.cfg_sigmas[k] = v |
|
|
|
self.last_i = v[1] |
|
self.cfg = v[0] |
|
|
|
return super().predict_noise(x, timestep, model_options, seed) |
|
|
|
|
|
class ScheduledCFGGuider: |
|
@classmethod |
|
def INPUT_TYPES(s): |
|
return {"required": { |
|
"model": ("MODEL", ), |
|
"positive": ("CONDITIONING", ), |
|
"negative": ("CONDITIONING", ), |
|
"sigmas": ("SIGMAS", ), |
|
"from_cfg": ("FLOAT", {"default": 6.5, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}), |
|
"to_cfg": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}), |
|
"schedule": (["linear", "log", "exp"], {'default': 'log'}) |
|
} |
|
} |
|
|
|
RETURN_TYPES = ("GUIDER", "SIGMAS") |
|
|
|
FUNCTION = "get_guider" |
|
CATEGORY = "sampling/custom_sampling/guiders" |
|
|
|
def get_guider(self, model, positive, negative, sigmas, from_cfg, to_cfg, schedule): |
|
guider = Guider_scheduled(model, sigmas, from_cfg, to_cfg, schedule) |
|
guider.set_conds(positive, negative) |
|
return guider, sigmas |
|
|
|
|
|
class ScheduledPerpNegCFGGuider: |
|
@classmethod |
|
def INPUT_TYPES(s): |
|
return {"required": { |
|
"model": ("MODEL", ), |
|
"positive": ("CONDITIONING", ), |
|
"negative": ("CONDITIONING", ), |
|
"empty_conditioning": ("CONDITIONING", ), |
|
"neg_scale": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.01}), |
|
"sigmas": ("SIGMAS", ), |
|
"from_cfg": ("FLOAT", {"default": 6.5, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}), |
|
"to_cfg": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}), |
|
"schedule": (["linear", "log", "exp"], {'default': 'log'}) |
|
} |
|
} |
|
|
|
RETURN_TYPES = ("GUIDER", "SIGMAS") |
|
|
|
FUNCTION = "get_guider" |
|
CATEGORY = "sampling/custom_sampling/guiders" |
|
|
|
def get_guider(self, model, positive, negative, empty_conditioning, neg_scale, sigmas, from_cfg, to_cfg, schedule): |
|
guider = Guider_PerpNeg_scheduled(model, sigmas, from_cfg, to_cfg, schedule, neg_scale) |
|
guider.set_conds(positive, negative, empty_conditioning) |
|
return guider, sigmas |
|
|
|
|
|
NODE_CLASS_MAPPINGS = { |
|
"KSamplerProgress //Inspire": KSampler_progress, |
|
"KSamplerAdvancedProgress //Inspire": KSamplerAdvanced_progress, |
|
"ScheduledCFGGuider //Inspire": ScheduledCFGGuider, |
|
"ScheduledPerpNegCFGGuider //Inspire": ScheduledPerpNegCFGGuider |
|
} |
|
|
|
NODE_DISPLAY_NAME_MAPPINGS = { |
|
"KSamplerProgress //Inspire": "KSampler Progress (Inspire)", |
|
"KSamplerAdvancedProgress //Inspire": "KSampler Advanced Progress (Inspire)", |
|
"ScheduledCFGGuider //Inspire": "Scheduled CFGGuider (Inspire)", |
|
"ScheduledPerpNegCFGGuider //Inspire": "Scheduled PerpNeg CFGGuider (Inspire)" |
|
} |
|
|