import os import gradio as gr import numpy as np import random import spaces from diffusers import DiffusionPipeline import torch import json import logging from diffusers import DiffusionPipeline, AutoencoderTiny, AutoencoderKL from live_preview_helpers import calculate_shift, retrieve_timesteps, flux_pipe_call_that_returns_an_iterable_of_images from huggingface_hub import login from huggingface_hub import hf_hub_download, HfFileSystem, ModelCard, snapshot_download import copy import random import time import boto3 from io import BytesIO from datetime import datetime from transformers import AutoTokenizer from diffusers import UNet2DConditionModel HF_TOKEN = os.environ.get("HF_TOKEN") login(token=HF_TOKEN) # init dtype = torch.bfloat16 device = "cuda" if torch.cuda.is_available() else "cpu" base_model = "black-forest-labs/FLUX.1-dev" # unet = UNet2DConditionModel.from_pretrained( # base_model, # torch_dtype=torch.float16, # use_safetensors=True, # variant="fp16", # subfolder="unet", # # ).to("cuda") # tokenizer = AutoTokenizer.from_pretrained("openai/clip-vit-base-patch32") pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype).to(device) MAX_SEED = 2**32-1 class calculateDuration: def __init__(self, activity_name=""): self.activity_name = activity_name def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_value, traceback): self.end_time = time.time() self.elapsed_time = self.end_time - self.start_time if self.activity_name: print(f"Elapsed time for {self.activity_name}: {self.elapsed_time:.6f} seconds") else: print(f"Elapsed time: {self.elapsed_time:.6f} seconds") def upload_image_to_r2(image, account_id, access_key, secret_key, bucket_name): print("upload_image_to_r2", account_id, access_key, secret_key, bucket_name) connectionUrl = f"https://{account_id}.r2.cloudflarestorage.com" s3 = boto3.client( 's3', endpoint_url=connectionUrl, region_name='auto', aws_access_key_id=access_key, aws_secret_access_key=secret_key ) current_time = datetime.now().strftime("%Y/%m/%d/%H%M%S") image_file = f"generated_images/{current_time}_{random.randint(0, MAX_SEED)}.png" buffer = BytesIO() image.save(buffer, "PNG") buffer.seek(0) s3.upload_fileobj(buffer, bucket_name, image_file) print("upload finish", image_file) return image_file @spaces.GPU def generate_image(prompt, steps, seed, cfg_scale, width, height, progress): pipe.to("cuda") text_inputs = pipe.tokenizer(prompt, return_tensors="pt").to("cuda") input_ids = text_inputs.input_ids[0] # 获取每个主体对应的令牌 ID boy_token_id = pipe.tokenizer.convert_tokens_to_ids("boy_asia_05") print(boy_token_id) girl_token_id = pipe.tokenizer.convert_tokens_to_ids("girl_asia_04") print(girl_token_id) # 找到每个主体在输入中的索引位置 boy_indices = (input_ids == boy_token_id).nonzero(as_tuple=True)[0] girl_indices = (input_ids == girl_token_id).nonzero(as_tuple=True)[0] # 准备 cross_attention_kwargs def attention_control(attention_probs, adapter_name): # 根据 adapter_name 和令牌索引控制注意力 print("attention_control", adapter_name) if adapter_name == "boy_asia_05": # 对女孩的令牌注意力设为零 attention_probs[:, :, :, girl_indices] = 0 elif adapter_name == "girl_asia_04": # 对男孩的令牌注意力设为零 attention_probs[:, :, :, boy_indices] = 0 return attention_probs joint_attention_kwargs = {"attention_control": attention_control} generator = torch.Generator(device="cuda").manual_seed(seed) with calculateDuration("Generating image"): # Generate image generate_image = pipe( prompt=prompt, num_inference_steps=steps, guidance_scale=cfg_scale, width=width, height=height, generator=generator, joint_attention_kwargs=joint_attention_kwargs ).images[0] progress(99, "Generate success!") return generate_image # 在 Transformer 中,自定义注意力处理器 class CustomAttentionProcessor(torch.nn.Module): def __init__(self, attention_control, adapter_name): super().__init__() self.attention_control = attention_control self.adapter_name = adapter_name def forward(self, attention_probs): # 调用自定义的注意力控制函数 attention_probs = self.attention_control(attention_probs, self.adapter_name) return attention_probs def run_lora(prompt, cfg_scale, steps, lora_strings, randomize_seed, seed, width, height, lora_scale, upload_to_r2, account_id, access_key, secret_key, bucket, progress=gr.Progress(track_tqdm=True)): # Load LoRA weights if lora_strings: with calculateDuration(f"Loading LoRA weights for {lora_strings}"): pipe.unload_lora_weights() lora_array = lora_strings.split(',') adapter_names = [] for lora_string in lora_array: parts = lora_string.split(':') if len(parts) == 3: lora_repo, weights, adapter_name = parts # 调用 pipe.load_lora_weights() 方法加载权重 pipe.load_lora_weights(lora_repo, weight_name=weights, adapter_name=adapter_name) adapter_names.append(adapter_name) else: print(f"Invalid format for lora_string: {lora_string}") adapter_weights = [lora_scale] * len(adapter_names) # 调用 pipeline.set_adapters 方法设置 adapter 和对应权重 pipe.set_adapters(adapter_names, adapter_weights=adapter_weights) # Set random seed for reproducibility if randomize_seed: seed = random.randint(0, MAX_SEED) final_image = generate_image(prompt, steps, seed, cfg_scale, width, height, progress) if upload_to_r2: with calculateDuration("upload r2"): url = upload_image_to_r2(final_image, account_id, access_key, secret_key, bucket) result = {"status": "success", "url": url} else: result = {"status": "success", "message": "Image generated but not uploaded"} progress(100, "Completed!") yield final_image, seed, json.dumps(result) css=""" #col-container { margin: 0 auto; max-width: 640px; } """ with gr.Blocks(css=css) as demo: gr.Markdown("Flux with lora") with gr.Row(): with gr.Column(): prompt = gr.Text(label="Prompt", show_label=False, max_lines=1, placeholder="Enter your prompt", container=False) lora_strings = gr.Text( label="lora_strings", max_lines=1, placeholder="Enter a lora strings", visible=True) run_button = gr.Button("Run", scale=0) with gr.Accordion("Advanced Settings", open=False): with gr.Row(): seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0, randomize=True) randomize_seed = gr.Checkbox(label="Randomize seed", value=True) lora_scale = gr.Slider(label="LoRA Scale", minimum=0, maximum=1, step=0.01, value=0.5) with gr.Row(): width = gr.Slider(label="Width", minimum=256, maximum=1536, step=64, value=1024) height = gr.Slider(label="Height", minimum=256, maximum=1536, step=64, value=1024) with gr.Row(): cfg_scale = gr.Slider(label="CFG Scale", minimum=1, maximum=20, step=0.5, value=3.5) steps = gr.Slider(label="Steps", minimum=1, maximum=50, step=1, value=28) upload_to_r2 = gr.Checkbox(label="Upload to R2", value=False) account_id = gr.Textbox(label="Account Id", placeholder="Enter R2 account id") access_key = gr.Textbox(label="Access Key", placeholder="Enter R2 access key here") secret_key = gr.Textbox(label="Secret Key", placeholder="Enter R2 secret key here") bucket = gr.Textbox(label="Bucket Name", placeholder="Enter R2 bucket name here") with gr.Column(): result = gr.Image(label="Result", show_label=False) json_text = gr.Text() gr.on( triggers=[run_button.click, prompt.submit], fn = run_lora, inputs = [prompt, cfg_scale, steps, lora_strings, randomize_seed, seed, width, height, lora_scale, upload_to_r2, account_id, access_key, secret_key, bucket], outputs=[result, seed, json_text] ) demo.queue().launch()