jiuface's picture
try to custom attention control
53d0f2f
raw
history blame
8.95 kB
import os
import gradio as gr
import numpy as np
import random
import spaces
from diffusers import DiffusionPipeline
import torch
import json
import logging
from diffusers import DiffusionPipeline, AutoencoderTiny, AutoencoderKL
from live_preview_helpers import calculate_shift, retrieve_timesteps, flux_pipe_call_that_returns_an_iterable_of_images
from huggingface_hub import login
from huggingface_hub import hf_hub_download, HfFileSystem, ModelCard, snapshot_download
import copy
import random
import time
import boto3
from io import BytesIO
from datetime import datetime
from transformers import AutoTokenizer
from diffusers import UNet2DConditionModel
HF_TOKEN = os.environ.get("HF_TOKEN")
login(token=HF_TOKEN)
# init
dtype = torch.bfloat16
device = "cuda" if torch.cuda.is_available() else "cpu"
base_model = "black-forest-labs/FLUX.1-dev"
# unet = UNet2DConditionModel.from_pretrained(
# base_model,
# torch_dtype=torch.float16,
# use_safetensors=True,
# variant="fp16",
# subfolder="unet",
# # ).to("cuda")
# tokenizer = AutoTokenizer.from_pretrained("openai/clip-vit-base-patch32")
pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype).to(device)
MAX_SEED = 2**32-1
class calculateDuration:
def __init__(self, activity_name=""):
self.activity_name = activity_name
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.end_time = time.time()
self.elapsed_time = self.end_time - self.start_time
if self.activity_name:
print(f"Elapsed time for {self.activity_name}: {self.elapsed_time:.6f} seconds")
else:
print(f"Elapsed time: {self.elapsed_time:.6f} seconds")
def upload_image_to_r2(image, account_id, access_key, secret_key, bucket_name):
print("upload_image_to_r2", account_id, access_key, secret_key, bucket_name)
connectionUrl = f"https://{account_id}.r2.cloudflarestorage.com"
s3 = boto3.client(
's3',
endpoint_url=connectionUrl,
region_name='auto',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key
)
current_time = datetime.now().strftime("%Y/%m/%d/%H%M%S")
image_file = f"generated_images/{current_time}_{random.randint(0, MAX_SEED)}.png"
buffer = BytesIO()
image.save(buffer, "PNG")
buffer.seek(0)
s3.upload_fileobj(buffer, bucket_name, image_file)
print("upload finish", image_file)
return image_file
@spaces.GPU
def generate_image(prompt, steps, seed, cfg_scale, width, height, progress):
pipe.to("cuda")
text_inputs = pipe.tokenizer(prompt, return_tensors="pt").to("cuda")
input_ids = text_inputs.input_ids[0]
# 获取每个主体对应的令牌 ID
boy_token_id = pipe.tokenizer.convert_tokens_to_ids("boy_asia_05")
print(boy_token_id)
girl_token_id = pipe.tokenizer.convert_tokens_to_ids("girl_asia_04")
print(girl_token_id)
# 找到每个主体在输入中的索引位置
boy_indices = (input_ids == boy_token_id).nonzero(as_tuple=True)[0]
girl_indices = (input_ids == girl_token_id).nonzero(as_tuple=True)[0]
# 准备 cross_attention_kwargs
def attention_control(attention_probs, adapter_name):
# 根据 adapter_name 和令牌索引控制注意力
print("attention_control", adapter_name)
if adapter_name == "boy_asia_05":
# 对女孩的令牌注意力设为零
attention_probs[:, :, :, girl_indices] = 0
elif adapter_name == "girl_asia_04":
# 对男孩的令牌注意力设为零
attention_probs[:, :, :, boy_indices] = 0
return attention_probs
joint_attention_kwargs = {"attention_control": attention_control}
generator = torch.Generator(device="cuda").manual_seed(seed)
with calculateDuration("Generating image"):
# Generate image
generate_image = pipe(
prompt=prompt,
num_inference_steps=steps,
guidance_scale=cfg_scale,
width=width,
height=height,
generator=generator,
joint_attention_kwargs=joint_attention_kwargs
).images[0]
progress(99, "Generate success!")
return generate_image
# 在 Transformer 中,自定义注意力处理器
class CustomAttentionProcessor(torch.nn.Module):
def __init__(self, attention_control, adapter_name):
super().__init__()
self.attention_control = attention_control
self.adapter_name = adapter_name
def forward(self, attention_probs):
# 调用自定义的注意力控制函数
attention_probs = self.attention_control(attention_probs, self.adapter_name)
return attention_probs
def run_lora(prompt, cfg_scale, steps, lora_strings, randomize_seed, seed, width, height, lora_scale, upload_to_r2, account_id, access_key, secret_key, bucket, progress=gr.Progress(track_tqdm=True)):
# Load LoRA weights
if lora_strings:
with calculateDuration(f"Loading LoRA weights for {lora_strings}"):
pipe.unload_lora_weights()
lora_array = lora_strings.split(',')
adapter_names = []
for lora_string in lora_array:
parts = lora_string.split(':')
if len(parts) == 3:
lora_repo, weights, adapter_name = parts
# 调用 pipe.load_lora_weights() 方法加载权重
pipe.load_lora_weights(lora_repo, weight_name=weights, adapter_name=adapter_name)
adapter_names.append(adapter_name)
else:
print(f"Invalid format for lora_string: {lora_string}")
adapter_weights = [lora_scale] * len(adapter_names)
# 调用 pipeline.set_adapters 方法设置 adapter 和对应权重
pipe.set_adapters(adapter_names, adapter_weights=adapter_weights)
# Set random seed for reproducibility
if randomize_seed:
seed = random.randint(0, MAX_SEED)
final_image = generate_image(prompt, steps, seed, cfg_scale, width, height, progress)
if upload_to_r2:
with calculateDuration("upload r2"):
url = upload_image_to_r2(final_image, account_id, access_key, secret_key, bucket)
result = {"status": "success", "url": url}
else:
result = {"status": "success", "message": "Image generated but not uploaded"}
progress(100, "Completed!")
yield final_image, seed, json.dumps(result)
css="""
#col-container {
margin: 0 auto;
max-width: 640px;
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown("Flux with lora")
with gr.Row():
with gr.Column():
prompt = gr.Text(label="Prompt", show_label=False, max_lines=1, placeholder="Enter your prompt", container=False)
lora_strings = gr.Text( label="lora_strings", max_lines=1, placeholder="Enter a lora strings", visible=True)
run_button = gr.Button("Run", scale=0)
with gr.Accordion("Advanced Settings", open=False):
with gr.Row():
seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0, randomize=True)
randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
lora_scale = gr.Slider(label="LoRA Scale", minimum=0, maximum=1, step=0.01, value=0.5)
with gr.Row():
width = gr.Slider(label="Width", minimum=256, maximum=1536, step=64, value=1024)
height = gr.Slider(label="Height", minimum=256, maximum=1536, step=64, value=1024)
with gr.Row():
cfg_scale = gr.Slider(label="CFG Scale", minimum=1, maximum=20, step=0.5, value=3.5)
steps = gr.Slider(label="Steps", minimum=1, maximum=50, step=1, value=28)
upload_to_r2 = gr.Checkbox(label="Upload to R2", value=False)
account_id = gr.Textbox(label="Account Id", placeholder="Enter R2 account id")
access_key = gr.Textbox(label="Access Key", placeholder="Enter R2 access key here")
secret_key = gr.Textbox(label="Secret Key", placeholder="Enter R2 secret key here")
bucket = gr.Textbox(label="Bucket Name", placeholder="Enter R2 bucket name here")
with gr.Column():
result = gr.Image(label="Result", show_label=False)
json_text = gr.Text()
gr.on(
triggers=[run_button.click, prompt.submit],
fn = run_lora,
inputs = [prompt, cfg_scale, steps, lora_strings, randomize_seed, seed, width, height, lora_scale, upload_to_r2, account_id, access_key, secret_key, bucket],
outputs=[result, seed, json_text]
)
demo.queue().launch()