|
import os |
|
import torch |
|
import numpy as np |
|
import gc |
|
|
|
import folder_paths |
|
import comfy.model_management as mm |
|
import comfy.utils |
|
|
|
from diffusers.models import AutoencoderKLTemporalDecoder |
|
from diffusers.schedulers import EulerDiscreteScheduler |
|
from transformers import CLIPImageProcessor, CLIPVisionModelWithProjection |
|
|
|
script_directory = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
from .mimicmotion.pipelines.pipeline_mimicmotion import MimicMotionPipeline, tensor2vid |
|
from .mimicmotion.modules.unet import UNetSpatioTemporalConditionModel |
|
from .mimicmotion.modules.pose_net import PoseNet |
|
|
|
from .lcm_scheduler import AnimateLCMSVDStochasticIterativeScheduler |
|
|
|
def loglinear_interp(t_steps, num_steps): |
|
""" |
|
Performs log-linear interpolation of a given array of decreasing numbers. |
|
""" |
|
xs = np.linspace(0, 1, len(t_steps)) |
|
ys = np.log(t_steps[::-1]) |
|
|
|
new_xs = np.linspace(0, 1, num_steps) |
|
new_ys = np.interp(new_xs, xs, ys) |
|
|
|
interped_ys = np.exp(new_ys)[::-1].copy() |
|
return interped_ys |
|
|
|
|
|
class MimicMotionModel(torch.nn.Module): |
|
def __init__(self, base_model_path, lcm=False): |
|
"""construnct base model components and load pretrained svd model except pose-net |
|
Args: |
|
base_model_path (str): pretrained svd model path |
|
""" |
|
super().__init__() |
|
unet_subfolder = "unet_lcm" if lcm else "unet" |
|
self.unet = UNetSpatioTemporalConditionModel.from_config( |
|
UNetSpatioTemporalConditionModel.load_config(base_model_path, subfolder=unet_subfolder, variant="fp16")) |
|
self.vae = AutoencoderKLTemporalDecoder.from_pretrained( |
|
base_model_path, subfolder="vae", variant="fp16") |
|
self.image_encoder = CLIPVisionModelWithProjection.from_pretrained( |
|
base_model_path, subfolder="image_encoder", variant="fp16") |
|
self.noise_scheduler = EulerDiscreteScheduler.from_pretrained( |
|
base_model_path, subfolder="scheduler") |
|
self.feature_extractor = CLIPImageProcessor.from_pretrained( |
|
base_model_path, subfolder="feature_extractor") |
|
|
|
self.pose_net = PoseNet(noise_latent_channels=self.unet.config.block_out_channels[0]) |
|
|
|
class DownloadAndLoadMimicMotionModel: |
|
@classmethod |
|
def INPUT_TYPES(s): |
|
return {"required": { |
|
"model": ( |
|
[ 'MimicMotion-fp16.safetensors', |
|
], |
|
), |
|
"precision": ( |
|
[ |
|
'fp32', |
|
'fp16', |
|
'bf16', |
|
], { |
|
"default": 'fp16' |
|
}), |
|
"lcm": ("BOOLEAN", {"default": False}), |
|
|
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("MIMICPIPE",) |
|
RETURN_NAMES = ("mimic_pipeline",) |
|
FUNCTION = "loadmodel" |
|
CATEGORY = "MimicMotionWrapper" |
|
|
|
def loadmodel(self, precision, model, lcm): |
|
device = mm.get_torch_device() |
|
mm.soft_empty_cache() |
|
dtype = {"bf16": torch.bfloat16, "fp16": torch.float16, "fp32": torch.float32}[precision] |
|
|
|
pbar = comfy.utils.ProgressBar(3) |
|
|
|
download_path = os.path.join(folder_paths.models_dir, "mimicmotion") |
|
model_path = os.path.join(download_path, model) |
|
|
|
if not os.path.exists(model_path): |
|
print(f"Downloading model to: {model_path}") |
|
from huggingface_hub import snapshot_download |
|
snapshot_download(repo_id="Kijai/MimicMotion_pruned", |
|
allow_patterns=[f"*{model}*"], |
|
local_dir=download_path, |
|
local_dir_use_symlinks=False) |
|
|
|
print(f"Loading model from: {model_path}") |
|
pbar.update(1) |
|
|
|
svd_path = os.path.join(folder_paths.models_dir, "diffusers", "stable-video-diffusion-img2vid-xt-1-1") |
|
svd_lcm_path = os.path.join(folder_paths.models_dir, "diffusers", "stable-video-diffusion-img2vid-xt-1-1-lcm", "unet_lcm") |
|
|
|
if lcm and not os.path.exists(svd_lcm_path): |
|
print(f"Downloading AnimateLCM SVD model to: {model_path}") |
|
from huggingface_hub import snapshot_download |
|
snapshot_download(repo_id="Kijai/AnimateLCM-SVD-Comfy", |
|
allow_patterns=[f"*.json", "*diffusion_pytorch_model.fp16.safetensors*"], |
|
local_dir=svd_path, |
|
local_dir_use_symlinks=False) |
|
else: |
|
if not os.path.exists(svd_path): |
|
print(f"Downloading SVD model to: {model_path}") |
|
from huggingface_hub import snapshot_download |
|
snapshot_download(repo_id="vdo/stable-video-diffusion-img2vid-xt-1-1", |
|
allow_patterns=[f"*.json", "*fp16*"], |
|
local_dir=svd_path, |
|
local_dir_use_symlinks=False) |
|
pbar.update(1) |
|
|
|
mimicmotion_models = MimicMotionModel(svd_path, lcm=lcm).to(device=device).eval() |
|
mimic_motion_sd = comfy.utils.load_torch_file(model_path) |
|
mimicmotion_models.load_state_dict(mimic_motion_sd, strict=False) |
|
|
|
if lcm: |
|
lcm_noise_scheduler = AnimateLCMSVDStochasticIterativeScheduler( |
|
num_train_timesteps=40, |
|
sigma_min=0.002, |
|
sigma_max=700.0, |
|
sigma_data=1.0, |
|
s_noise=1.0, |
|
rho=7, |
|
clip_denoised=False, |
|
) |
|
scheduler = lcm_noise_scheduler |
|
else: |
|
scheduler = mimicmotion_models.noise_scheduler |
|
|
|
pipeline = MimicMotionPipeline( |
|
vae = mimicmotion_models.vae, |
|
image_encoder = mimicmotion_models.image_encoder, |
|
unet = mimicmotion_models.unet, |
|
scheduler = scheduler, |
|
feature_extractor = mimicmotion_models.feature_extractor, |
|
pose_net = mimicmotion_models.pose_net, |
|
) |
|
|
|
pipeline.unet.to(dtype) |
|
pipeline.pose_net.to(dtype) |
|
pipeline.vae.to(dtype) |
|
pipeline.image_encoder.to(dtype) |
|
|
|
mimic_model = { |
|
'pipeline': pipeline, |
|
'dtype': dtype |
|
} |
|
pbar.update(1) |
|
return (mimic_model,) |
|
|
|
class DiffusersScheduler: |
|
@classmethod |
|
def INPUT_TYPES(s): |
|
return {"required": { |
|
"scheduler": ( |
|
[ |
|
'EulerDiscreteScheduler', |
|
'AnimateLCM_SVD' |
|
], |
|
), |
|
"sigma_min": ("FLOAT", {"default": 0.002, "min": 0.0, "max": 700.0, "step": 0.001}), |
|
"sigma_max": ("FLOAT", {"default": 700.0, "min": 0.0, "max": 700.0, "step": 0.001}), |
|
"align_your_steps": ("BOOLEAN", {"default": False}), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("DIFFUSERS_SCHEDULER",) |
|
RETURN_NAMES = ("scheduler",) |
|
FUNCTION = "loadmodel" |
|
CATEGORY = "MimicMotionWrapper" |
|
|
|
def loadmodel(self, scheduler, sigma_min, sigma_max, align_your_steps): |
|
|
|
scheduler_config = { |
|
"beta_end": 0.012, |
|
"beta_schedule": "scaled_linear", |
|
"beta_start": 0.00085, |
|
"clip_sample": False, |
|
"interpolation_type": "linear", |
|
"num_train_timesteps": 1000, |
|
"prediction_type": "v_prediction", |
|
"set_alpha_to_one": False, |
|
"sigma_max": sigma_max, |
|
"sigma_min": sigma_min, |
|
"skip_prk_steps": True, |
|
"steps_offset": 1, |
|
"timestep_spacing": "leading", |
|
"timestep_type": "continuous", |
|
"trained_betas": None, |
|
"use_karras_sigmas": True |
|
} |
|
if scheduler == 'EulerDiscreteScheduler': |
|
noise_scheduler = EulerDiscreteScheduler.from_config(scheduler_config) |
|
elif scheduler == 'AnimateLCM_SVD': |
|
noise_scheduler = AnimateLCMSVDStochasticIterativeScheduler( |
|
num_train_timesteps=40, |
|
sigma_min=sigma_min, |
|
sigma_max=sigma_max, |
|
sigma_data=1.0, |
|
s_noise=1.0, |
|
rho=7, |
|
clip_denoised=False, |
|
) |
|
if align_your_steps: |
|
sigmas = [700.00, 54.5, 15.886, 7.977, 4.248, 1.789, 0.981, 0.403, 0.173, 0.034, 0.002] |
|
|
|
scheduler_options = { |
|
"noise_scheduler": noise_scheduler, |
|
"sigmas": sigmas if align_your_steps else None, |
|
} |
|
|
|
return (scheduler_options,) |
|
|
|
class MimicMotionSampler: |
|
@classmethod |
|
def INPUT_TYPES(s): |
|
return {"required": { |
|
"mimic_pipeline": ("MIMICPIPE",), |
|
"ref_image": ("IMAGE",), |
|
"pose_images": ("IMAGE",), |
|
"steps": ("INT", {"default": 25, "min": 1, "max": 200, "step": 1}), |
|
"cfg_min": ("FLOAT", {"default": 2.0, "min": 0.0, "max": 20.0, "step": 0.01}), |
|
"cfg_max": ("FLOAT", {"default": 2.0, "min": 0.0, "max": 20.0, "step": 0.01}), |
|
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), |
|
"fps": ("INT", {"default": 15, "min": 2, "max": 100, "step": 1}), |
|
"noise_aug_strength": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 10.0, "step": 0.01}), |
|
"context_size": ("INT", {"default": 16, "min": 1, "max": 128, "step": 1}), |
|
"context_overlap": ("INT", {"default": 6, "min": 1, "max": 128, "step": 1}), |
|
"keep_model_loaded": ("BOOLEAN", {"default": True}), |
|
}, |
|
"optional": { |
|
"optional_scheduler": ("DIFFUSERS_SCHEDULER",), |
|
"pose_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), |
|
"pose_start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}), |
|
"pose_end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), |
|
"image_embed_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), |
|
} |
|
} |
|
|
|
RETURN_TYPES = ("LATENT",) |
|
RETURN_NAMES = ("samples",) |
|
FUNCTION = "process" |
|
CATEGORY = "MimicMotionWrapper" |
|
|
|
def process(self, mimic_pipeline, ref_image, pose_images, cfg_min, cfg_max, steps, seed, noise_aug_strength, fps, keep_model_loaded, |
|
context_size, context_overlap, optional_scheduler=None, pose_strength=1.0, image_embed_strength=1.0, pose_start_percent=0.0, pose_end_percent=1.0): |
|
device = mm.get_torch_device() |
|
offload_device = mm.unet_offload_device() |
|
mm.unload_all_models() |
|
mm.soft_empty_cache() |
|
dtype = mimic_pipeline['dtype'] |
|
pipeline = mimic_pipeline['pipeline'] |
|
|
|
original_scheduler = pipeline.scheduler |
|
|
|
if optional_scheduler is not None: |
|
print("Using optional scheduler: ", optional_scheduler) |
|
pipeline.scheduler = optional_scheduler['noise_scheduler'] |
|
sigmas = optional_scheduler['sigmas'] |
|
|
|
if sigmas is not None and (steps + 1) != len(sigmas): |
|
sigmas = loglinear_interp(sigmas, steps + 1) |
|
sigmas = sigmas[-(steps + 1):] |
|
sigmas[-1] = 0 |
|
print("Using timesteps: ", sigmas) |
|
else: |
|
pipeline.scheduler = original_scheduler |
|
sigmas = None |
|
|
|
B, H, W, C = pose_images.shape |
|
|
|
assert B >= context_size, "The number of poses must be greater than the context size" |
|
|
|
ref_image = ref_image.permute(0, 3, 1, 2) |
|
pose_images = pose_images.permute(0, 3, 1, 2) |
|
|
|
pose_images = pose_images * 2 - 1 |
|
|
|
ref_image = ref_image.to(device).to(dtype) |
|
pose_images = pose_images.to(device).to(dtype) |
|
|
|
generator = torch.Generator(device=device) |
|
generator.manual_seed(seed) |
|
|
|
frames = pipeline( |
|
ref_image, |
|
image_pose=pose_images, |
|
num_frames=B, |
|
tile_size = context_size, |
|
tile_overlap= context_overlap, |
|
height=H, |
|
width=W, |
|
fps=fps, |
|
noise_aug_strength=noise_aug_strength, |
|
num_inference_steps=steps, |
|
generator=generator, |
|
min_guidance_scale=cfg_min, |
|
max_guidance_scale=cfg_max, |
|
decode_chunk_size=4, |
|
output_type="latent", |
|
device=device, |
|
sigmas=sigmas, |
|
pose_strength=pose_strength, |
|
pose_start_percent=pose_start_percent, |
|
pose_end_percent=pose_end_percent, |
|
image_embed_strength=image_embed_strength |
|
).frames |
|
|
|
if not keep_model_loaded: |
|
pipeline.unet.to(offload_device) |
|
pipeline.vae.to(offload_device) |
|
mm.soft_empty_cache() |
|
gc.collect() |
|
|
|
return {"samples": frames}, |
|
|
|
class MimicMotionDecode: |
|
@classmethod |
|
def INPUT_TYPES(s): |
|
return {"required": { |
|
"mimic_pipeline": ("MIMICPIPE",), |
|
"samples": ("LATENT",), |
|
"decode_chunk_size": ("INT", {"default": 4, "min": 1, "max": 200, "step": 1}) |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("IMAGE",) |
|
RETURN_NAMES = ("images",) |
|
FUNCTION = "process" |
|
CATEGORY = "MimicMotionWrapper" |
|
|
|
def process(self, mimic_pipeline, samples, decode_chunk_size): |
|
mm.soft_empty_cache() |
|
|
|
pipeline = mimic_pipeline['pipeline'] |
|
num_frames = samples['samples'].shape[0] |
|
try: |
|
frames = pipeline.decode_latents(samples['samples'], num_frames, decode_chunk_size) |
|
except: |
|
frames = pipeline.decode_latents(samples['samples'], num_frames, 1) |
|
frames = tensor2vid(frames, pipeline.image_processor, output_type="pt") |
|
|
|
frames = frames.squeeze(1)[1:].permute(0, 2, 3, 1).cpu().float() |
|
|
|
return frames, |
|
|
|
class MimicMotionGetPoses: |
|
@classmethod |
|
def INPUT_TYPES(s): |
|
return {"required": { |
|
"ref_image": ("IMAGE",), |
|
"pose_images": ("IMAGE",), |
|
"include_body": ("BOOLEAN", {"default": True}), |
|
"include_hand": ("BOOLEAN", {"default": True}), |
|
"include_face": ("BOOLEAN", {"default": True}), |
|
}, |
|
} |
|
|
|
RETURN_TYPES = ("IMAGE", "IMAGE",) |
|
RETURN_NAMES = ("poses_with_ref", "pose_images") |
|
FUNCTION = "process" |
|
CATEGORY = "MimicMotionWrapper" |
|
|
|
def process(self, ref_image, pose_images, include_body, include_hand, include_face): |
|
device = mm.get_torch_device() |
|
from .mimicmotion.dwpose.util import draw_pose |
|
from .mimicmotion.dwpose.dwpose_detector import DWposeDetector |
|
|
|
assert ref_image.shape[1:3] == pose_images.shape[1:3], "ref_image and pose_images must have the same resolution" |
|
|
|
yolo_model = "yolox_l.onnx" |
|
dw_pose_model = "dw-ll_ucoco_384.onnx" |
|
model_base_path = os.path.join(script_directory, "models", "DWPose") |
|
|
|
model_det=os.path.join(model_base_path, yolo_model) |
|
model_pose=os.path.join(model_base_path, dw_pose_model) |
|
|
|
if not os.path.exists(model_det): |
|
print(f"Downloading yolo model to: {model_base_path}") |
|
from huggingface_hub import snapshot_download |
|
snapshot_download(repo_id="yzd-v/DWPose", |
|
allow_patterns=[f"*{yolo_model}*"], |
|
local_dir=model_base_path, |
|
local_dir_use_symlinks=False) |
|
|
|
if not os.path.exists(model_pose): |
|
print(f"Downloading dwpose model to: {model_base_path}") |
|
from huggingface_hub import snapshot_download |
|
snapshot_download(repo_id="yzd-v/DWPose", |
|
allow_patterns=[f"*{dw_pose_model}*"], |
|
local_dir=model_base_path, |
|
local_dir_use_symlinks=False) |
|
|
|
dwprocessor = DWposeDetector( |
|
model_det=os.path.join(model_base_path, "yolox_l.onnx"), |
|
model_pose=os.path.join(model_base_path, "dw-ll_ucoco_384.onnx"), |
|
device=device) |
|
|
|
ref_image = ref_image.squeeze(0).cpu().numpy() * 255 |
|
|
|
|
|
ref_pose = dwprocessor(ref_image) |
|
ref_keypoint_id = [0, 1, 2, 5, 8, 11, 14, 15, 16, 17] |
|
ref_keypoint_id = [i for i in ref_keypoint_id \ |
|
if ref_pose['bodies']['score'].shape[0] > 0 and ref_pose['bodies']['score'][0][i] > 0.3] |
|
ref_body = ref_pose['bodies']['candidate'][ref_keypoint_id] |
|
|
|
height, width, _ = ref_image.shape |
|
pose_images_np = pose_images.cpu().numpy() * 255 |
|
|
|
|
|
pbar = comfy.utils.ProgressBar(len(pose_images_np)) |
|
detected_poses_np_list = [] |
|
for img_np in pose_images_np: |
|
detected_poses_np_list.append(dwprocessor(img_np)) |
|
pbar.update(1) |
|
|
|
detected_bodies = np.stack( |
|
[p['bodies']['candidate'] for p in detected_poses_np_list if p['bodies']['candidate'].shape[0] == 18])[:, |
|
ref_keypoint_id] |
|
|
|
ay, by = np.polyfit(detected_bodies[:, :, 1].flatten(), np.tile(ref_body[:, 1], len(detected_bodies)), 1) |
|
fh, fw, _ = pose_images_np[0].shape |
|
ax = ay / (fh / fw / height * width) |
|
bx = np.mean(np.tile(ref_body[:, 0], len(detected_bodies)) - detected_bodies[:, :, 0].flatten() * ax) |
|
a = np.array([ax, ay]) |
|
b = np.array([bx, by]) |
|
output_pose = [] |
|
|
|
for detected_pose in detected_poses_np_list: |
|
if include_body: |
|
detected_pose['bodies']['candidate'] = detected_pose['bodies']['candidate'] * a + b |
|
if include_hand: |
|
detected_pose['faces'] = detected_pose['faces'] * a + b |
|
if include_face: |
|
detected_pose['hands'] = detected_pose['hands'] * a + b |
|
im = draw_pose(detected_pose, height, width, include_body=include_body, include_hand=include_hand, include_face=include_face) |
|
output_pose.append(np.array(im)) |
|
|
|
output_pose_tensors = [torch.tensor(np.array(im)) for im in output_pose] |
|
output_tensor = torch.stack(output_pose_tensors) / 255 |
|
|
|
ref_pose_img = draw_pose(ref_pose, height, width, include_body=include_body, include_hand=include_hand, include_face=include_face) |
|
ref_pose_tensor = torch.tensor(np.array(ref_pose_img)) / 255 |
|
output_tensor = torch.cat((ref_pose_tensor.unsqueeze(0), output_tensor)) |
|
output_tensor = output_tensor.permute(0, 2, 3, 1).cpu().float() |
|
|
|
return output_tensor, output_tensor[1:] |
|
|
|
NODE_CLASS_MAPPINGS = { |
|
"DownloadAndLoadMimicMotionModel": DownloadAndLoadMimicMotionModel, |
|
"MimicMotionSampler": MimicMotionSampler, |
|
"MimicMotionGetPoses": MimicMotionGetPoses, |
|
"MimicMotionDecode": MimicMotionDecode, |
|
"DiffusersScheduler": DiffusersScheduler, |
|
|
|
} |
|
NODE_DISPLAY_NAME_MAPPINGS = { |
|
"DownloadAndLoadMimicMotionModel": "(Down)Load MimicMotionModel", |
|
"MimicMotionSampler": "MimicMotion Sampler", |
|
"MimicMotionGetPoses": "MimicMotion GetPoses", |
|
"MimicMotionDecode": "MimicMotion Decode", |
|
"DiffusersScheduler": "Diffusers Scheduler", |
|
} |
|
|