Spaces:
Running
on
Zero
Running
on
Zero
File size: 15,821 Bytes
6373ff8 ccc80c2 6373ff8 f93e467 3fc0dd0 6373ff8 8385a65 f93e467 3fc0dd0 6373ff8 f93e467 ccc80c2 f93e467 a9da525 f93e467 9ecc297 a9da525 f93e467 ccc80c2 6373ff8 ccc80c2 6373ff8 f93e467 6373ff8 f93e467 f6c2def f93e467 f6c2def f93e467 53d0f2f f93e467 53d0f2f f93e467 53d0f2f f93e467 6373ff8 f93e467 6373ff8 f93e467 6373ff8 9ecc297 f93e467 9ecc297 f93e467 53d0f2f f93e467 53d0f2f f93e467 ccc80c2 a9da525 f93e467 6373ff8 9e4bb4a f93e467 f6c2def f93e467 f6c2def f93e467 9ecc297 f93e467 ccc80c2 f93e467 ccc80c2 f93e467 ccc80c2 f93e467 6222acc ccc80c2 6373ff8 f93e467 6222acc 5f10541 6373ff8 6222acc f93e467 6222acc f93e467 6222acc f6c2def 6222acc f93e467 ccc80c2 f93e467 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 |
import os
import gradio as gr
import numpy as np
import random
import torch
import json
import logging
from diffusers import DiffusionPipeline
from huggingface_hub import login
import time
from datetime import datetime
from io import BytesIO
from diffusers.models.attention_processor import AttentionProcessor
import re
import json
# 登录 Hugging Face Hub
HF_TOKEN = os.environ.get("HF_TOKEN")
login(token=HF_TOKEN)
# 初始化
dtype = torch.float16 # 您可以根据需要调整数据类型
device = "cuda" if torch.cuda.is_available() else "cpu"
base_model = "black-forest-labs/FLUX.1-dev" # 替换为您的模型
# 加载管道
pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype).to(device)
MAX_SEED = 2**32 - 1
class calculateDuration:
def __init__(self, activity_name=""):
self.activity_name = activity_name
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.end_time = time.time()
self.elapsed_time = self.end_time - self.start_time
if self.activity_name:
print(f"Elapsed time for {self.activity_name}: {self.elapsed_time:.6f} seconds")
else:
print(f"Elapsed time: {self.elapsed_time:.6f} seconds")
# 定义位置、偏移和区域的映射
valid_locations = { # x, y in 90*90
'in the center': (45, 45),
'on the left': (15, 45),
'on the right': (75, 45),
'on the top': (45, 15),
'on the bottom': (45, 75),
'on the top-left': (15, 15),
'on the top-right': (75, 15),
'on the bottom-left': (15, 75),
'on the bottom-right': (75, 75)
}
valid_offsets = { # x, y in 90*90
'no offset': (0, 0),
'slightly to the left': (-10, 0),
'slightly to the right': (10, 0),
'slightly to the upper': (0, -10),
'slightly to the lower': (0, 10),
'slightly to the upper-left': (-10, -10),
'slightly to the upper-right': (10, -10),
'slightly to the lower-left': (-10, 10),
'slightly to the lower-right': (10, 10)
}
valid_areas = { # w, h in 90*90
"a small square area": (50, 50),
"a small vertical area": (40, 60),
"a small horizontal area": (60, 40),
"a medium-sized square area": (60, 60),
"a medium-sized vertical area": (50, 80),
"a medium-sized horizontal area": (80, 50),
"a large square area": (70, 70),
"a large vertical area": (60, 90),
"a large horizontal area": (90, 60)
}
# 解析角色位置的函数
def parse_character_position(character_position):
# 定义正则表达式模式
location_pattern = '|'.join(re.escape(key) for key in valid_locations.keys())
offset_pattern = '|'.join(re.escape(key) for key in valid_offsets.keys())
area_pattern = '|'.join(re.escape(key) for key in valid_areas.keys())
# 提取位置
location_match = re.search(location_pattern, character_position, re.IGNORECASE)
location = location_match.group(0) if location_match else 'in the center'
# 提取偏移
offset_match = re.search(offset_pattern, character_position, re.IGNORECASE)
offset = offset_match.group(0) if offset_match else 'no offset'
# 提取区域
area_match = re.search(area_pattern, character_position, re.IGNORECASE)
area = area_match.group(0) if area_match else 'a medium-sized square area'
return {
'location': location,
'offset': offset,
'area': area
}
# 创建掩码的函数
def create_attention_mask(image_width, image_height, location, offset, area):
# 图像在生成时通常会被缩放为 90x90,因此先定义一个基础尺寸
base_size = 90
# 获取位置坐标
loc_x, loc_y = valid_locations.get(location, (45, 45))
# 获取偏移量
offset_x, offset_y = valid_offsets.get(offset, (0, 0))
# 获取区域大小
area_width, area_height = valid_areas.get(area, (60, 60))
# 计算最终位置
final_x = loc_x + offset_x
final_y = loc_y + offset_y
# 将坐标和尺寸映射到实际图像尺寸
scale_x = image_width / base_size
scale_y = image_height / base_size
center_x = final_x * scale_x
center_y = final_y * scale_y
width = area_width * scale_x
height = area_height * scale_y
# 计算左上角和右下角坐标
x_start = int(max(center_x - width / 2, 0))
y_start = int(max(center_y - height / 2, 0))
x_end = int(min(center_x + width / 2, image_width))
y_end = int(min(center_y + height / 2, image_height))
# 创建掩码
mask = torch.zeros((image_height, image_width), dtype=torch.float32, device="cuda")
mask[y_start:y_end, x_start:x_end] = 1.0
# 展平成一维
mask_flat = mask.view(-1) # 形状为 (image_height * image_width,)
return mask_flat
# 自定义注意力处理器
class CustomCrossAttentionProcessor(AttentionProcessor):
def __init__(self, masks, embeddings, adapter_names):
super().__init__()
self.masks = masks # 列表,包含每个角色的掩码
self.embeddings = embeddings # 列表,包含每个角色的嵌入
self.adapter_names = adapter_names # 列表,包含每个角色的 LoRA 适配器名称
def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None, **kwargs):
# 获取当前的 adapter_name
adapter_name = getattr(attn, 'adapter_name', None)
if adapter_name is None or adapter_name not in self.adapter_names:
# 如果没有 adapter_name,直接执行默认的注意力计算
return super().__call__(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs)
# 查找 adapter_name 对应的索引
idx = self.adapter_names.index(adapter_name)
mask = self.masks[idx]
# 标准的注意力计算
batch_size, sequence_length, _ = hidden_states.shape
query = attn.to_q(hidden_states)
key = attn.to_k(encoder_hidden_states)
value = attn.to_v(encoder_hidden_states)
# 重塑以适应多头注意力
query = query.view(batch_size, -1, attn.heads, attn.head_dim).transpose(1, 2)
key = key.view(batch_size, -1, attn.heads, attn.head_dim).transpose(1, 2)
value = value.view(batch_size, -1, attn.heads, attn.head_dim).transpose(1, 2)
# 计算注意力得分
attention_scores = torch.matmul(query, key.transpose(-1, -2)) * attn.scale
# 应用掩码调整注意力得分
# 将 mask 调整为与 attention_scores 兼容的形状
# 假设 key_len 与 mask 的长度一致
mask_expanded = mask.unsqueeze(0).unsqueeze(0).unsqueeze(0) # (1, 1, 1, key_len)
# 将掩码应用于 attention_scores
attention_scores += mask_expanded * 1e6 # 增强对应位置的注意力
# 计算注意力概率
attention_probs = torch.nn.functional.softmax(attention_scores, dim=-1)
# 计算上下文向量
context = torch.matmul(attention_probs, value)
# 重塑回原始形状
context = context.transpose(1, 2).reshape(batch_size, -1, attn.heads * attn.head_dim)
# 输出投影
hidden_states = attn.to_out(context)
return hidden_states
# 替换注意力处理器的函数
def replace_attention_processors(pipe, masks, embeddings, adapter_names):
custom_processor = CustomCrossAttentionProcessor(masks, embeddings, adapter_names)
for name, module in pipe.unet.named_modules():
if hasattr(module, 'attn2'):
# 设置 adapter_name 为模块的属性
module.attn2.adapter_name = getattr(module, 'adapter_name', None)
module.attn2.processor = custom_processor
# 生成图像的函数
@spaces.GPU
@torch.inference_mode()
def generate_image_with_embeddings(prompt_embeddings, steps, seed, cfg_scale, width, height, progress):
pipe.to("cuda")
generator = torch.Generator(device="cuda").manual_seed(seed)
with calculateDuration("Generating image"):
# Generate image
generated_image = pipe(
prompt_embeds=prompt_embeddings,
num_inference_steps=steps,
guidance_scale=cfg_scale,
width=width,
height=height,
generator=generator,
).images[0]
progress(99, "Generate success!")
return generated_image
# 主函数
def run_lora(prompt_bg, character_prompts_json, character_positions_json, lora_strings_json, prompt_details, cfg_scale, steps, randomize_seed, seed, width, height, lora_scale, upload_to_r2, account_id, access_key, secret_key, bucket, progress=gr.Progress(track_tqdm=True)):
# 解析角色提示词、位置和 LoRA 字符串
try:
character_prompts = json.loads(character_prompts_json)
character_positions = json.loads(character_positions_json)
lora_strings = json.loads(lora_strings_json)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON input: {e}")
# 确保提示词、位置和 LoRA 字符串的数量一致
if len(character_prompts) != len(character_positions) or len(character_prompts) != len(lora_strings):
raise ValueError("The number of character prompts, positions, and LoRA strings must be the same.")
# 角色的数量
num_characters = len(character_prompts)
# Load LoRA weights
with calculateDuration("Loading LoRA weights"):
pipe.unload_lora_weights()
adapter_names = []
for lora_info in lora_strings:
lora_repo = lora_info.get("repo")
weights = lora_info.get("weights")
adapter_name = lora_info.get("adapter_name")
if lora_repo and weights and adapter_name:
# 调用 pipe.load_lora_weights() 方法加载权重
pipe.load_lora_weights(lora_repo, weight_name=weights, adapter_name=adapter_name)
adapter_names.append(adapter_name)
# 将 adapter_name 设置为模型的属性
setattr(pipe.unet, 'adapter_name', adapter_name)
else:
raise ValueError("Invalid LoRA string format. Each item must have 'repo', 'weights', and 'adapter_name' keys.")
adapter_weights = [lora_scale] * len(adapter_names)
# 调用 pipeline.set_adapters 方法设置 adapter 和对应权重
pipe.set_adapters(adapter_names, adapter_weights=adapter_weights)
# 确保 adapter_names 的数量与角色数量匹配
if len(adapter_names) != num_characters:
raise ValueError("The number of LoRA adapters must match the number of characters.")
# Set random seed for reproducibility
if randomize_seed:
with calculateDuration("Set random seed"):
seed = random.randint(0, MAX_SEED)
# 编码提示词
with calculateDuration("Encoding prompts"):
# 编码背景提示词
bg_text_input = pipe.tokenizer(prompt_bg, return_tensors="pt").to("cuda")
bg_embeddings = pipe.text_encoder(bg_text_input.input_ids.to(device))[0]
# 编码角色提示词
character_embeddings = []
for prompt in character_prompts:
char_text_input = pipe.tokenizer(prompt, return_tensors="pt").to("cuda")
char_embeddings = pipe.text_encoder(char_text_input.input_ids.to(device))[0]
character_embeddings.append(char_embeddings)
# 编码互动细节提示词
details_text_input = pipe.tokenizer(prompt_details, return_tensors="pt").to("cuda")
details_embeddings = pipe.text_encoder(details_text_input.input_ids.to(device))[0]
# 合并背景和互动细节的嵌入
prompt_embeddings = torch.cat([bg_embeddings, details_embeddings], dim=1)
# 解析角色位置
character_infos = []
for position_str in character_positions:
info = parse_character_position(position_str)
character_infos.append(info)
# 创建角色的掩码
masks = []
for info in character_infos:
mask = create_attention_mask(width, height, info['location'], info['offset'], info['area'])
masks.append(mask)
# 替换注意力处理器
replace_attention_processors(pipe, masks, character_embeddings, adapter_names)
# Generate image
final_image = generate_image_with_embeddings(prompt_embeddings, steps, seed, cfg_scale, width, height, progress)
# 您可以在此处添加上传图片的代码
result = {"status": "success", "message": "Image generated"}
progress(100, "Completed!")
return final_image, seed, json.dumps(result)
# Gradio 界面
css="""
#col-container {
margin: 0 auto;
max-width: 640px;
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown("Flux with LoRA")
with gr.Row():
with gr.Column():
prompt_bg = gr.Text(label="Background Prompt", placeholder="Enter background/scene prompt", lines=2)
character_prompts = gr.Text(label="Character Prompts (JSON List)", placeholder='["Character 1 prompt", "Character 2 prompt"]', lines=5)
character_positions = gr.Text(label="Character Positions (JSON List)", placeholder='["Character 1 position", "Character 2 position"]', lines=5)
lora_strings_json = gr.Text(label="LoRA Strings (JSON List)", placeholder='[{"repo": "lora_repo1", "weights": "weights1", "adapter_name": "adapter_name1"}, {"repo": "lora_repo2", "weights": "weights2", "adapter_name": "adapter_name2"}]', lines=5)
prompt_details = gr.Text(label="Interaction Details", placeholder="Enter interaction details between characters", lines=2)
run_button = gr.Button("Run", scale=0)
with gr.Accordion("Advanced Settings", open=False):
with gr.Row():
seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0, randomize=True)
randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
lora_scale = gr.Slider(label="LoRA Scale", minimum=0, maximum=1, step=0.01, value=0.5)
with gr.Row():
width = gr.Slider(label="Width", minimum=256, maximum=1536, step=64, value=512)
height = gr.Slider(label="Height", minimum=256, maximum=1536, step=64, value=512)
with gr.Row():
cfg_scale = gr.Slider(label="CFG Scale", minimum=1, maximum=20, step=0.5, value=7.5)
steps = gr.Slider(label="Steps", minimum=1, maximum=50, step=1, value=28)
upload_to_r2 = gr.Checkbox(label="Upload to R2", value=False)
account_id = gr.Textbox(label="Account Id", placeholder="Enter R2 account id")
access_key = gr.Textbox(label="Access Key", placeholder="Enter R2 access key here")
secret_key = gr.Textbox(label="Secret Key", placeholder="Enter R2 secret key here")
bucket = gr.Textbox(label="Bucket Name", placeholder="Enter R2 bucket name here")
with gr.Column():
result = gr.Image(label="Result", show_label=False)
seed_output = gr.Text(label="Seed")
json_text = gr.Text(label="Result JSON")
inputs = [
prompt_bg,
character_prompts,
character_positions,
lora_strings_json,
prompt_details,
cfg_scale,
steps,
randomize_seed,
seed,
width,
height,
lora_scale,
upload_to_r2,
account_id,
access_key,
secret_key,
bucket
]
outputs = [result, seed_output, json_text]
run_button.click(
fn=run_lora,
inputs=inputs,
outputs=outputs
)
demo.queue().launch()
|