import os import hashlib import numpy as np import torch from PIL import Image, ImageOps import itertools import folder_paths from comfy.k_diffusion.utils import FolderOfImages from comfy.utils import common_upscale, ProgressBar from .logger import logger from .utils import BIGMAX, calculate_file_hash, get_sorted_dir_files_from_directory, validate_path, strip_path def is_changed_load_images(directory: str, image_load_cap: int = 0, skip_first_images: int = 0, select_every_nth: int = 1, **kwargs): if not os.path.isdir(directory): return False dir_files = get_sorted_dir_files_from_directory(directory, skip_first_images, select_every_nth, FolderOfImages.IMG_EXTENSIONS) if image_load_cap != 0: dir_files = dir_files[:image_load_cap] m = hashlib.sha256() for filepath in dir_files: m.update(calculate_file_hash(filepath).encode()) # strings must be encoded before hashing return m.digest().hex() def validate_load_images(directory: str): if not os.path.isdir(directory): return f"Directory '{directory}' cannot be found." dir_files = os.listdir(directory) if len(dir_files) == 0: return f"No files in directory '{directory}'." return True def images_generator(directory: str, image_load_cap: int = 0, skip_first_images: int = 0, select_every_nth: int = 1, meta_batch=None, unique_id=None): if not os.path.isdir(directory): raise FileNotFoundError(f"Directory '{directory} cannot be found.") dir_files = get_sorted_dir_files_from_directory(directory, skip_first_images, select_every_nth, FolderOfImages.IMG_EXTENSIONS) if len(dir_files) == 0: raise FileNotFoundError(f"No files in directory '{directory}'.") if image_load_cap > 0: dir_files = dir_files[:image_load_cap] sizes = {} has_alpha = False for image_path in dir_files: i = Image.open(image_path) #exif_transpose can only ever rotate, but rotating can swap width/height i = ImageOps.exif_transpose(i) has_alpha |= 'A' in i.getbands() count = sizes.get(i.size, 0) sizes[i.size] = count +1 size = max(sizes.items(), key=lambda x: x[1])[0] yield size[0], size[1], has_alpha iformat = "RGBA" if has_alpha else "RGB" def load_image(file_path): i = Image.open(file_path) i = ImageOps.exif_transpose(i) i = i.convert(iformat) i = np.array(i, dtype=np.float32) #This nonsense provides a nearly 50% speedup on my system torch.from_numpy(i).div_(255) if i.shape[0] != size[1] or i.shape[1] != size[0]: i = torch.from_numpy(i).movedim(-1, 0).unsqueeze(0) i = common_upscale(i, size[0], size[1], "lanczos", "center") i = i.squeeze(0).movedim(0, -1).numpy() if has_alpha: i[:,:,-1] = 1 - i[:,:,-1] return i total_images = len(dir_files) processed_images = 0 pbar = ProgressBar(total_images) images = map(load_image, dir_files) try: prev_image = next(images) while True: next_image = next(images) yield prev_image processed_images += 1 pbar.update_absolute(processed_images, total_images) prev_image = next_image except StopIteration: pass if meta_batch is not None: meta_batch.inputs.pop(unique_id) meta_batch.has_closed_inputs = True if prev_image is not None: yield prev_image def load_images(directory: str, image_load_cap: int = 0, skip_first_images: int = 0, select_every_nth: int = 1, meta_batch=None, unique_id=None): if meta_batch is None or unique_id not in meta_batch.inputs: gen = images_generator(directory, image_load_cap, skip_first_images, select_every_nth, meta_batch, unique_id) (width, height, has_alpha) = next(gen) if meta_batch is not None: meta_batch.inputs[unique_id] = (gen, width, height, has_alpha) else: gen, width, height, has_alpha = meta_batch.inputs[unique_id] if meta_batch is not None: gen = itertools.islice(gen, meta_batch.frames_per_batch) images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (height, width, 3 + has_alpha))))) if has_alpha: #tensors are not continuous. Rewrite will be required if this is an issue masks = images[:,:,:,3] images = images[:,:,:,:3] else: masks = torch.zeros((images.size(0), 64, 64), dtype=torch.float32, device="cpu") if len(images) == 0: raise FileNotFoundError(f"No images could be loaded from directory '{directory}'.") return images, masks, images.size(0) class LoadImagesFromDirectoryUpload: @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() directories = [] for item in os.listdir(input_dir): if not os.path.isfile(os.path.join(input_dir, item)) and item != "clipspace": directories.append(item) return { "required": { "directory": (directories,), }, "optional": { "image_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}), "skip_first_images": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}), "select_every_nth": ("INT", {"default": 1, "min": 1, "max": BIGMAX, "step": 1}), "meta_batch": ("VHS_BatchManager",), }, "hidden": { "unique_id": "UNIQUE_ID" }, } RETURN_TYPES = ("IMAGE", "MASK", "INT") RETURN_NAMES = ("IMAGE", "MASK", "frame_count") FUNCTION = "load_images" CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" def load_images(self, directory: str, **kwargs): directory = folder_paths.get_annotated_filepath(strip_path(directory)) return load_images(directory, **kwargs) @classmethod def IS_CHANGED(s, directory: str, **kwargs): directory = folder_paths.get_annotated_filepath(strip_path(directory)) return is_changed_load_images(directory, **kwargs) @classmethod def VALIDATE_INPUTS(s, directory: str, **kwargs): directory = folder_paths.get_annotated_filepath(strip_path(directory)) return validate_load_images(directory) class LoadImagesFromDirectoryPath: @classmethod def INPUT_TYPES(s): return { "required": { "directory": ("STRING", {"default": "X://path/to/images", "vhs_path_extensions": []}), }, "optional": { "image_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}), "skip_first_images": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}), "select_every_nth": ("INT", {"default": 1, "min": 1, "max": BIGMAX, "step": 1}), "meta_batch": ("VHS_BatchManager",), }, "hidden": { "unique_id": "UNIQUE_ID" }, } RETURN_TYPES = ("IMAGE", "MASK", "INT") RETURN_NAMES = ("IMAGE", "MASK", "frame_count") FUNCTION = "load_images" CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" def load_images(self, directory: str, **kwargs): directory = strip_path(directory) if directory is None or validate_load_images(directory) != True: raise Exception("directory is not valid: " + directory) return load_images(directory, **kwargs) @classmethod def IS_CHANGED(s, directory: str, **kwargs): if directory is None: return "input" return is_changed_load_images(directory, **kwargs) @classmethod def VALIDATE_INPUTS(s, directory: str, **kwargs): if directory is None: return True return validate_load_images(strip_path(directory))