import os import gradio as gr import numpy as np import random import spaces from diffusers import DiffusionPipeline import torch import json import logging from diffusers import DiffusionPipeline, AutoencoderTiny, AutoencoderKL from live_preview_helpers import calculate_shift, retrieve_timesteps, flux_pipe_call_that_returns_an_iterable_of_images from huggingface_hub import login from huggingface_hub import hf_hub_download, HfFileSystem, ModelCard, snapshot_download import copy import random import time import boto3 from io import BytesIO from datetime import datetime HF_TOKEN = os.environ.get("HF_TOKEN") login(token=HF_TOKEN) # init dtype = torch.bfloat16 device = "cuda" if torch.cuda.is_available() else "cpu" base_model = "black-forest-labs/FLUX.1-dev" taef1 = AutoencoderTiny.from_pretrained("madebyollin/taef1", torch_dtype=dtype).to(device) good_vae = AutoencoderKL.from_pretrained(base_model, subfolder="vae", torch_dtype=dtype).to(device) pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype, vae=taef1).to(device) MAX_SEED = 2**32-1 # pipe.flux_pipe_call_that_returns_an_iterable_of_images = flux_pipe_call_that_returns_an_iterable_of_images.__get__(pipe) class calculateDuration: def __init__(self, activity_name=""): self.activity_name = activity_name def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_value, traceback): self.end_time = time.time() self.elapsed_time = self.end_time - self.start_time if self.activity_name: print(f"Elapsed time for {self.activity_name}: {self.elapsed_time:.6f} seconds") else: print(f"Elapsed time: {self.elapsed_time:.6f} seconds") def upload_image_to_r2(image, account_id, access_key, secret_key, bucket_name): print("upload_image_to_r2", account_id, access_key, secret_key, bucket_name) connectionUrl = f"https://{account_id}.r2.cloudflarestorage.com" s3 = boto3.client( 's3', endpoint_url=connectionUrl, region_name='auto', aws_access_key_id=access_key, aws_secret_access_key=secret_key ) current_time = datetime.now().strftime("%Y/%m/%d/%H%M%S") image_file = f"generated_images/{current_time}_{random.randint(0, MAX_SEED)}.png" buffer = BytesIO() image.save(buffer, "PNG") buffer.seek(0) s3.upload_fileobj(buffer, bucket_name, image_file) print("upload finish", image_file) return image_file @spaces.GPU def generate_image(prompt, steps, seed, cfg_scale, width, height, lora_scale, progress): pipe.to("cuda") generator = torch.Generator(device="cuda").manual_seed(seed) with calculateDuration("Generating image"): # Generate image generate_image = pipe( prompt=prompt, num_inference_steps=steps, guidance_scale=cfg_scale, width=width, height=height, generator=generator, joint_attention_kwargs={"scale": lora_scale}, output_type="pil", good_vae=good_vae, ) return generate_image def run_lora(prompt, cfg_scale, steps, lora_repo, lora_name, randomize_seed, seed, width, height, lora_scale, upload_to_r2, account_id, access_key, secret_key, bucket, progress=gr.Progress(track_tqdm=True)): with calculateDuration("Unloading LoRA"): pipe.unload_lora_weights() # Load LoRA weights if lora_repo and lora_name: with calculateDuration(f"Loading LoRA weights for {lora_repo} {lora_name}"): pipe.load_lora_weights(lora_repo, weight_name=lora_name) # Set random seed for reproducibility if randomize_seed: seed = random.randint(0, MAX_SEED) final_image = generate_image(prompt, steps, seed, cfg_scale, width, height, lora_scale, progress) if upload_to_r2: url = upload_image_to_r2(final_image, account_id, access_key, secret_key, bucket) result = {"status": "success", "url": url} else: result = {"status": "success", "message": "Image generated but not uploaded"} yield final_image, seed, gr.update(value=progress_bar, visible=False), json.dumps(result) css=""" #col-container { margin: 0 auto; max-width: 640px; } """ with gr.Blocks(css=css) as demo: gr.Markdown("Flux with lora") with gr.Row(): with gr.Column(): prompt = gr.Text(label="Prompt", show_label=False, max_lines=1, placeholder="Enter your prompt", container=False) lora_repo = gr.Text( label="Repo", max_lines=1, placeholder="Enter a lora repo", visible=True) lora_name = gr.Text( label="Weights", max_lines=1, placeholder="Enter a lora weights",visible=True) run_button = gr.Button("Run", scale=0) with gr.Accordion("Advanced Settings", open=False): with gr.Row(): seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0, randomize=True) randomize_seed = gr.Checkbox(label="Randomize seed", value=True) lora_scale = gr.Slider(label="LoRA Scale", minimum=0, maximum=3, step=0.01, value=0.95) with gr.Row(): width = gr.Slider(label="Width", minimum=256, maximum=1536, step=64, value=1024) height = gr.Slider(label="Height", minimum=256, maximum=1536, step=64, value=1024) with gr.Row(): cfg_scale = gr.Slider(label="CFG Scale", minimum=1, maximum=20, step=0.5, value=3.5) steps = gr.Slider(label="Steps", minimum=1, maximum=50, step=1, value=28) upload_to_r2 = gr.Checkbox(label="Upload to R2", value=False) account_id = gr.Textbox(label="Account Id", placeholder="Enter R2 account id") access_key = gr.Textbox(label="Access Key", placeholder="Enter R2 access key here") secret_key = gr.Textbox(label="Secret Key", placeholder="Enter R2 secret key here") bucket = gr.Textbox(label="Bucket Name", placeholder="Enter R2 bucket name here") with gr.Column(): progress_bar = gr.Markdown(elem_id="progress",visible=False) result = gr.Image(label="Result", show_label=False) json_text = gr.Text() gr.on( triggers=[run_button.click, prompt.submit], fn = run_lora, inputs = [prompt, cfg_scale, steps, lora_repo, lora_name, randomize_seed, seed, width, height, lora_scale, upload_to_r2, account_id, access_key, secret_key, bucket], outputs=[result, seed, progress_bar, json_text] ) demo.queue().launch()