import json |
import os |
import folder_paths as comfy_paths |
from PIL.PngImagePlugin import PngInfo |
from .categories import NodeCategories |
from .shared import hashed_as_strings, DreamImageProcessor, DreamImage, \ |
list_images_in_directory, DreamConfig |
from .dreamtypes import SharedTypes, FrameCounter, AnimationSequence, LogEntry |
CONFIG = DreamConfig() |
def _save_png(pil_image, filepath, embed_info, prompt, extra_pnginfo): |
info = PngInfo() |
if extra_pnginfo is not None: |
for item in extra_pnginfo: |
info.add_text(item, json.dumps(extra_pnginfo[item])) |
if prompt is not None: |
info.add_text("prompt", json.dumps(prompt)) |
if embed_info: |
pil_image.save(filepath, pnginfo=info, optimize=True) |
else: |
pil_image.save(filepath, optimize=True) |
def _save_jpg(pil_image, filepath, quality): |
pil_image.save(filepath, quality=quality, optimize=True) |
class DreamImageSequenceOutput: |
NODE_NAME = "Image Sequence Saver" |
ICON = "💾" |
@classmethod |
def INPUT_TYPES(cls): |
return { |
"required": SharedTypes.frame_counter | { |
"image": ("IMAGE",), |
"directory_path": ("STRING", {"default": comfy_paths.output_directory, "multiline": False}), |
"prefix": ("STRING", {"default": 'frame', "multiline": False}), |
"digits": ("INT", {"default": 5}), |
"at_end": (["stop output", "raise error", "keep going"],), |
"filetype": (['png with embedded workflow', "png", 'jpg'],), |
}, |
"hidden": { |
"prompt": "PROMPT", |
"extra_pnginfo": "EXTRA_PNGINFO" |
}, |
} |
RETURN_TYPES = (AnimationSequence.ID, LogEntry.ID) |
RETURN_NAMES = ("sequence", "log_entry") |
FUNCTION = "save" |
@classmethod |
def IS_CHANGED(cls, *values): |
return hashed_as_strings(*values) |
def _get_new_filename(self, current_frame, prefix, digits, filetype): |
return prefix + "_" + str(current_frame).zfill(digits) + "." + filetype.split(" ")[0] |
def _save_single_image(self, dream_image: DreamImage, batch_counter, frame_counter: FrameCounter, |
directory_path, |
prefix, digits, filetype, prompt, extra_pnginfo, at_end, logger): |
if at_end == "stop output" and frame_counter.is_after_last_frame: |
logger("Reached end of animation - not saving output!") |
return () |
if at_end == "raise error" and frame_counter.is_after_last_frame: |
logger("Reached end of animation - raising error to stop processing!") |
raise Exception("Reached end of animation!") |
filename = self._get_new_filename(frame_counter.current_frame, prefix, digits, filetype) |
if batch_counter >= 0: |
filepath = os.path.join(directory_path, "batch_" + (str(batch_counter).zfill(4)), filename) |
else: |
filepath = os.path.join(directory_path, filename) |
save_dir = os.path.dirname(filepath) |
if not os.path.isdir(save_dir): |
os.makedirs(save_dir) |
if filetype.startswith("png"): |
dream_image.save_png(filepath, filetype == 'png with embedded workflow', prompt, extra_pnginfo) |
elif filetype == "jpg": |
dream_image.save_jpg(filepath, int(CONFIG.get("encoding.jpeg_quality", 95))) |
logger("Saved {} in {}".format(filename, os.path.abspath(save_dir))) |
return () |
def _generate_animation_sequence(self, filetype, directory_path, frame_counter): |
if filetype.startswith("png"): |
pattern = "*.png" |
else: |
pattern = "*.jpg" |
frames = list_images_in_directory(directory_path, pattern, False) |
return AnimationSequence(frame_counter, frames) |
def save(self, image, **args): |
log_texts = list() |
logger = lambda s: log_texts.append(s) |
if not args.get("directory_path", ""): |
args["directory_path"] = comfy_paths.output_directory |
args["logger"] = logger |
proc = DreamImageProcessor(image, **args) |
proc.process(self._save_single_image) |
frame_counter = args["frame_counter"] |
log_entry = LogEntry([]) |
for text in log_texts: |
log_entry = log_entry.add(text) |
if frame_counter.is_final_frame: |
return (self._generate_animation_sequence(args["filetype"], args["directory_path"], |
frame_counter), log_entry) |
else: |
return (AnimationSequence(frame_counter), log_entry) |