Diffusers documentation
Utilities
Utilities
Utility and helper functions for working with 🤗 Diffusers.
numpy_to_pil
Convert a numpy image or a batch of images to a PIL image.
pt_to_pil
Convert a torch image to a PIL image.
load_image
diffusers.utils.load_image
< source >( image: typing.Union[str, PIL.Image.Image] convert_method: typing.Optional[typing.Callable[[PIL.Image.Image], PIL.Image.Image]] = None ) → PIL.Image.Image
Parameters
- image (
str
orPIL.Image.Image
) — The image to convert to the PIL Image format. - convert_method (Callable[[PIL.Image.Image], PIL.Image.Image], optional) —
A conversion method to apply to the image after loading it. When set to
None
the image will be converted “RGB”.
Returns
PIL.Image.Image
A PIL Image.
Loads image
to a PIL Image.
export_to_gif
diffusers.utils.export_to_gif
< source >( image: typing.List[PIL.Image.Image] output_gif_path: str = None fps: int = 10 )
export_to_video
diffusers.utils.export_to_video
< source >( video_frames: typing.Union[typing.List[numpy.ndarray], typing.List[PIL.Image.Image]] output_video_path: str = None fps: int = 10 )
make_image_grid
diffusers.utils.make_image_grid
< source >( images: typing.List[PIL.Image.Image] rows: int cols: int resize: int = None )
Prepares a single grid of images. Useful for visualization purposes.
randn_tensor
diffusers.utils.torch_utils.randn_tensor
< source >( shape: typing.Union[typing.Tuple, typing.List] generator: typing.Union[typing.List[ForwardRef('torch.Generator')], ForwardRef('torch.Generator'), NoneType] = None device: typing.Optional[ForwardRef('torch.device')] = None dtype: typing.Optional[ForwardRef('torch.dtype')] = None layout: typing.Optional[ForwardRef('torch.layout')] = None )
A helper function to create random tensors on the desired device
with the desired dtype
. When
passing a list of generators, you can seed each batch size individually. If CPU generators are passed, the tensor
is always created on the CPU.
apply_layerwise_casting
diffusers.hooks.apply_layerwise_casting
< source >( module: Module storage_dtype: dtype compute_dtype: dtype skip_modules_pattern: typing.Union[str, typing.Tuple[str, ...]] = 'auto' skip_modules_classes: typing.Optional[typing.Tuple[typing.Type[torch.nn.modules.module.Module], ...]] = None non_blocking: bool = False )
Parameters
- module (
torch.nn.Module
) — The module whose leaf modules will be cast to a high precision dtype for computation, and to a low precision dtype for storage. - storage_dtype (
torch.dtype
) — The dtype to cast the module to before/after the forward pass for storage. - compute_dtype (
torch.dtype
) — The dtype to cast the module to during the forward pass for computation. - skip_modules_pattern (
Tuple[str, ...]
, defaults to"auto"
) — A list of patterns to match the names of the modules to skip during the layerwise casting process. If set to"auto"
, the default patterns are used. If set toNone
, no modules are skipped. If set toNone
alongsideskip_modules_classes
beingNone
, the layerwise casting is applied directly to the module instead of its internal submodules. - skip_modules_classes (
Tuple[Type[torch.nn.Module], ...]
, defaults toNone
) — A list of module classes to skip during the layerwise casting process. - non_blocking (
bool
, defaults toFalse
) — IfTrue
, the weight casting operations are non-blocking.
Applies layerwise casting to a given module. The module expected here is a Diffusers ModelMixin but it can be any nn.Module using diffusers layers or pytorch primitives.
Example:
>>> import torch
>>> from diffusers import CogVideoXTransformer3DModel
>>> transformer = CogVideoXTransformer3DModel.from_pretrained(
... model_id, subfolder="transformer", torch_dtype=torch.bfloat16
... )
>>> apply_layerwise_casting(
... transformer,
... storage_dtype=torch.float8_e4m3fn,
... compute_dtype=torch.bfloat16,
... skip_modules_pattern=["patch_embed", "norm", "proj_out"],
... non_blocking=True,
... )
apply_group_offloading
diffusers.hooks.apply_group_offloading
< source >( module: Module onload_device: device offload_device: device = device(type='cpu') offload_type: str = 'block_level' num_blocks_per_group: typing.Optional[int] = None non_blocking: bool = False use_stream: bool = False )
Parameters
- module (
torch.nn.Module
) — The module to which group offloading is applied. - onload_device (
torch.device
) — The device to which the group of modules are onloaded. - offload_device (
torch.device
, defaults totorch.device("cpu")
) — The device to which the group of modules are offloaded. This should typically be the CPU. Default is CPU. - offload_type (
str
, defaults to “block_level”) — The type of offloading to be applied. Can be one of “block_level” or “leaf_level”. Default is “block_level”. - num_blocks_per_group (
int
, optional) — The number of blocks per group when using offload_type=“block_level”. This is required when using offload_type=“block_level”. - non_blocking (
bool
, defaults toFalse
) — If True, offloading and onloading is done with non-blocking data transfer. - use_stream (
bool
, defaults toFalse
) — If True, offloading and onloading is done asynchronously using a CUDA stream. This can be useful for overlapping computation and data transfer.
Applies group offloading to the internal layers of a torch.nn.Module. To understand what group offloading is, and where it is beneficial, we need to first provide some context on how other supported offloading methods work.
Typically, offloading is done at two levels:
- Module-level: In Diffusers, this can be enabled using the
ModelMixin::enable_model_cpu_offload()
method. It works by offloading each component of a pipeline to the CPU for storage, and onloading to the accelerator device when needed for computation. This method is more memory-efficient than keeping all components on the accelerator, but the memory requirements are still quite high. For this method to work, one needs memory equivalent to size of the model in runtime dtype + size of largest intermediate activation tensors to be able to complete the forward pass. - Leaf-level: In Diffusers, this can be enabled using the
ModelMixin::enable_sequential_cpu_offload()
method. It works by offloading the lowest leaf-level parameters of the computation graph to the CPU for storage, and onloading only the leafs to the accelerator device for computation. This uses the lowest amount of accelerator memory, but can be slower due to the excessive number of device synchronizations.
Group offloading is a middle ground between the two methods. It works by offloading groups of internal layers,
(either torch.nn.ModuleList
or torch.nn.Sequential
). This method uses lower memory than module-level
offloading. It is also faster than leaf-level/sequential offloading, as the number of device synchronizations is
reduced.
Another supported feature (for CUDA devices with support for asynchronous data transfer streams) is the ability to overlap data transfer and computation to reduce the overall execution time compared to sequential offloading. This is enabled using layer prefetching with streams, i.e., the layer that is to be executed next starts onloading to the accelerator device while the current layer is being executed - this increases the memory requirements slightly. Note that this implementation also supports leaf-level offloading but can be made much faster when using streams.
Example:
>>> from diffusers import CogVideoXTransformer3DModel
>>> from diffusers.hooks import apply_group_offloading
>>> transformer = CogVideoXTransformer3DModel.from_pretrained(
... "THUDM/CogVideoX-5b", subfolder="transformer", torch_dtype=torch.bfloat16
... )
>>> apply_group_offloading(
... transformer,
... onload_device=torch.device("cuda"),
... offload_device=torch.device("cpu"),
... offload_type="block_level",
... num_blocks_per_group=2,
... use_stream=True,
... )