Spaces:
Running
on
Zero
Running
on
Zero
import os | |
if os.environ.get("SPACES_ZERO_GPU") is not None: | |
import spaces | |
else: | |
class spaces: | |
def GPU(func): | |
def wrapper(*args, **kwargs): | |
return func(*args, **kwargs) | |
return wrapper | |
import gradio as gr | |
from huggingface_hub import InferenceClient, HfApi | |
from torch import nn | |
from transformers import AutoModel, AutoProcessor, AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerFast, AutoModelForCausalLM, LlavaForConditionalGeneration | |
from pathlib import Path | |
import torch | |
import torch.amp.autocast_mode | |
from PIL import Image | |
import torchvision.transforms.functional as TVF | |
import gc | |
from peft import PeftModel | |
from typing import Union | |
LOAD_IN_NF4 = True | |
if os.environ.get("SPACES_ZERO_GPU") is not None: | |
import subprocess | |
LOAD_IN_NF4 = False # If true, Custom VLM LoRA doesn't work initially. The rest are fine. | |
subprocess.run('pip install flash-attn --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True) | |
BASE_DIR = Path(__file__).resolve().parent # Define the base directory | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
HF_TOKEN = os.environ.get("HF_TOKEN", None) | |
use_inference_client = False | |
PIXTRAL_PATHS = ["SeanScripts/pixtral-12b-nf4", "mistral-community/pixtral-12b"] | |
llm_models = { | |
"Orenguteng/Llama-3.1-8B-Lexi-Uncensored-V2": None, | |
#PIXTRAL_PATHS[0]: None, | |
"bunnycore/LLama-3.1-8B-Matrix": None, | |
"Sao10K/Llama-3.1-8B-Stheno-v3.4": None, | |
"unsloth/Meta-Llama-3.1-8B-bnb-4bit": None, | |
"DevQuasar/HermesNova-Llama-3.1-8B": None, | |
"mergekit-community/L3.1-Boshima-b-FIX": None, | |
#"chuanli11/Llama-3.2-3B-Instruct-uncensored": None, # Error(s) in loading state_dict for ImageAdapter:\n\tsize mismatch for linear1.weight: copying a param with shape torch.Size([4096, 1152]) from checkpoint, the shape in current model is torch.Size([3072, 1152]).\n\tsize mismatch for linear1.bias: copying a param with shape torch.Size([4096]) from checkpoint, | |
"unsloth/Meta-Llama-3.1-8B-Instruct": None, | |
} | |
CLIP_PATH = "google/siglip-so400m-patch14-384" | |
MODEL_PATH = list(llm_models.keys())[0] | |
CHECKPOINT_PATH = BASE_DIR / Path("cgrkzexw-599808") | |
LORA_PATH = CHECKPOINT_PATH / "text_model" | |
TITLE = "<h1><center>JoyCaption Alpha Two (2024-09-26a)</center></h1>" | |
CAPTION_TYPE_MAP = { | |
"Descriptive": [ | |
"Write a descriptive caption for this image in a formal tone.", | |
"Write a descriptive caption for this image in a formal tone within {word_count} words.", | |
"Write a {length} descriptive caption for this image in a formal tone.", | |
], | |
"Descriptive (Informal)": [ | |
"Write a descriptive caption for this image in a casual tone.", | |
"Write a descriptive caption for this image in a casual tone within {word_count} words.", | |
"Write a {length} descriptive caption for this image in a casual tone.", | |
], | |
"Training Prompt": [ | |
"Write a stable diffusion prompt for this image.", | |
"Write a stable diffusion prompt for this image within {word_count} words.", | |
"Write a {length} stable diffusion prompt for this image.", | |
], | |
"MidJourney": [ | |
"Write a MidJourney prompt for this image.", | |
"Write a MidJourney prompt for this image within {word_count} words.", | |
"Write a {length} MidJourney prompt for this image.", | |
], | |
"Booru tag list": [ | |
"Write a list of Booru tags for this image.", | |
"Write a list of Booru tags for this image within {word_count} words.", | |
"Write a {length} list of Booru tags for this image.", | |
], | |
"Booru-like tag list": [ | |
"Write a list of Booru-like tags for this image.", | |
"Write a list of Booru-like tags for this image within {word_count} words.", | |
"Write a {length} list of Booru-like tags for this image.", | |
], | |
"Art Critic": [ | |
"Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc.", | |
"Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc. Keep it within {word_count} words.", | |
"Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc. Keep it {length}.", | |
], | |
"Product Listing": [ | |
"Write a caption for this image as though it were a product listing.", | |
"Write a caption for this image as though it were a product listing. Keep it under {word_count} words.", | |
"Write a {length} caption for this image as though it were a product listing.", | |
], | |
"Social Media Post": [ | |
"Write a caption for this image as if it were being used for a social media post.", | |
"Write a caption for this image as if it were being used for a social media post. Limit the caption to {word_count} words.", | |
"Write a {length} caption for this image as if it were being used for a social media post.", | |
], | |
} | |
class ImageAdapter(nn.Module): | |
def __init__(self, input_features: int, output_features: int, ln1: bool, pos_emb: bool, num_image_tokens: int, deep_extract: bool): | |
super().__init__() | |
self.deep_extract = deep_extract | |
if self.deep_extract: | |
input_features = input_features * 5 | |
self.linear1 = nn.Linear(input_features, output_features) | |
self.activation = nn.GELU() | |
self.linear2 = nn.Linear(output_features, output_features) | |
self.ln1 = nn.Identity() if not ln1 else nn.LayerNorm(input_features) | |
self.pos_emb = None if not pos_emb else nn.Parameter(torch.zeros(num_image_tokens, input_features)) | |
# Other tokens (<|image_start|>, <|image_end|>, <|eot_id|>) | |
self.other_tokens = nn.Embedding(3, output_features) | |
self.other_tokens.weight.data.normal_(mean=0.0, std=0.02) # Matches HF's implementation of llama3 | |
def forward(self, vision_outputs: torch.Tensor): | |
if self.deep_extract: | |
x = torch.concat(( | |
vision_outputs[-2], | |
vision_outputs[3], | |
vision_outputs[7], | |
vision_outputs[13], | |
vision_outputs[20], | |
), dim=-1) | |
assert len(x.shape) == 3, f"Expected 3, got {len(x.shape)}" # batch, tokens, features | |
assert x.shape[-1] == vision_outputs[-2].shape[-1] * 5, f"Expected {vision_outputs[-2].shape[-1] * 5}, got {x.shape[-1]}" | |
else: | |
x = vision_outputs[-2] | |
x = self.ln1(x) | |
if self.pos_emb is not None: | |
assert x.shape[-2:] == self.pos_emb.shape, f"Expected {self.pos_emb.shape}, got {x.shape[-2:]}" | |
x = x + self.pos_emb | |
x = self.linear1(x) | |
x = self.activation(x) | |
x = self.linear2(x) | |
# <|image_start|>, IMAGE, <|image_end|> | |
other_tokens = self.other_tokens(torch.tensor([0, 1], device=self.other_tokens.weight.device).expand(x.shape[0], -1)) | |
assert other_tokens.shape == (x.shape[0], 2, x.shape[2]), f"Expected {(x.shape[0], 2, x.shape[2])}, got {other_tokens.shape}" | |
x = torch.cat((other_tokens[:, 0:1], x, other_tokens[:, 1:2]), dim=1) | |
return x | |
def get_eot_embedding(self): | |
return self.other_tokens(torch.tensor([2], device=self.other_tokens.weight.device)).squeeze(0) | |
# https://huggingface.co/docs/transformers/v4.44.2/gguf | |
# https://github.com/city96/ComfyUI-GGUF/issues/7 | |
# https://github.com/THUDM/ChatGLM-6B/issues/18 | |
# https://github.com/meta-llama/llama/issues/394 | |
# https://huggingface.co/meta-llama/Meta-Llama-3-8B-Instruct/discussions/109 | |
# https://huggingface.co/docs/transformers/main/en/main_classes/quantization#offload-between-cpu-and-gpu | |
# https://huggingface.co/google/flan-ul2/discussions/8 | |
# https://huggingface.co/blog/4bit-transformers-bitsandbytes | |
# https://huggingface.co/docs/transformers/main/en/peft | |
# https://huggingface.co/docs/transformers/main/en/peft#enable-and-disable-adapters | |
# https://huggingface.co/docs/transformers/main/quantization/bitsandbytes?bnb=4-bit | |
# https://huggingface.co/lllyasviel/flux1-dev-bnb-nf4 | |
# https://github.com/huggingface/transformers/issues/28515 | |
# https://gist.github.com/ChrisHayduk/1a53463331f52dca205e55982baf9930 | |
tokenizer = None | |
text_model_client = None | |
text_model = None | |
image_adapter = None | |
pixtral_model = None | |
pixtral_processor = None | |
def load_text_model(model_name: str=MODEL_PATH, gguf_file: Union[str, None]=None, is_nf4: bool=True, is_lora: bool=True): | |
global tokenizer, text_model, image_adapter, pixtral_model, pixtral_processor, text_model_client, use_inference_client | |
try: | |
tokenizer = None | |
text_model_client = None | |
text_model = None | |
image_adapter = None | |
pixtral_model = None | |
pixtral_processor = None | |
torch.cuda.empty_cache() | |
gc.collect() | |
lora_device = "auto" | |
from transformers import BitsAndBytesConfig | |
nf4_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", | |
bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=torch.bfloat16) | |
if model_name in PIXTRAL_PATHS: # Pixtral | |
print(f"Loading LLM: {model_name}") | |
if is_nf4: | |
pixtral_model = LlavaForConditionalGeneration.from_pretrained(model_name, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval() | |
else: | |
pixtral_model = LlavaForConditionalGeneration.from_pretrained(model_name, device_map=device, torch_dtype=torch.bfloat16).eval() | |
pixtral_processor = AutoProcessor.from_pretrained(model_name) | |
print(f"pixtral_model: {type(pixtral_model)}") # | |
print(f"pixtral_processor: {type(pixtral_processor)}") # | |
return | |
print("Loading tokenizer") | |
tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT_PATH / "text_model", use_fast=True) | |
assert isinstance(tokenizer, PreTrainedTokenizer) or isinstance(tokenizer, PreTrainedTokenizerFast), f"Tokenizer is of type {type(tokenizer)}" | |
print(f"Loading LLM: {model_name}") | |
if gguf_file: | |
if device == "cpu": | |
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=device, torch_dtype=torch.bfloat16).eval() | |
elif is_nf4: | |
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval() | |
else: | |
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=lora_device, torch_dtype=torch.bfloat16).eval() | |
else: | |
if device == "cpu": | |
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=device, torch_dtype=torch.bfloat16).eval() | |
elif is_nf4: | |
text_model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval() | |
else: | |
text_model = AutoModelForCausalLM.from_pretrained(model_name, device_map=lora_device, torch_dtype=torch.bfloat16).eval() | |
if is_lora and LORA_PATH.exists() and not is_nf4: | |
print("Loading VLM's custom text model") | |
if is_nf4: # omitted | |
text_model = PeftModel.from_pretrained(model=text_model, model_id=LORA_PATH, device_map=device, quantization_config=nf4_config) | |
else: | |
text_model = PeftModel.from_pretrained(model=text_model, model_id=LORA_PATH, device_map=device) | |
text_model = text_model.merge_and_unload(safe_merge=True) # to avoid PEFT bug https://github.com/huggingface/transformers/issues/28515 | |
else: print("VLM's custom text model is not loaded") | |
print("Loading image adapter") | |
image_adapter = ImageAdapter(clip_model.config.hidden_size, text_model.config.hidden_size, False, False, 38, False).eval().to("cpu") | |
image_adapter.load_state_dict(torch.load(CHECKPOINT_PATH / "image_adapter.pt", map_location="cpu", weights_only=False)) | |
image_adapter.eval().to(device) | |
except Exception as e: | |
print(f"LLM load error: {e}") | |
raise Exception(f"LLM load error: {e}") from e | |
finally: | |
torch.cuda.empty_cache() | |
gc.collect() | |
load_text_model.zerogpu = True | |
# Load CLIP | |
print("Loading CLIP") | |
clip_processor = AutoProcessor.from_pretrained(CLIP_PATH) | |
clip_model = AutoModel.from_pretrained(CLIP_PATH).vision_model | |
assert (CHECKPOINT_PATH / "clip_model.pt").exists() | |
if (CHECKPOINT_PATH / "clip_model.pt").exists(): | |
print("Loading VLM's custom vision model") | |
checkpoint = torch.load(CHECKPOINT_PATH / "clip_model.pt", map_location='cpu', weights_only=False) | |
checkpoint = {k.replace("_orig_mod.module.", ""): v for k, v in checkpoint.items()} | |
clip_model.load_state_dict(checkpoint) | |
del checkpoint | |
clip_model.eval().requires_grad_(False).to(device) | |
# Tokenizer | |
# LLM | |
# Image Adapter | |
#load_text_model(PIXTRAL_PATHS[0]) | |
#print(f"pixtral_model: {type(pixtral_model)}") # | |
#print(f"pixtral_processor: {type(pixtral_processor)}") # | |
load_text_model(MODEL_PATH, None, LOAD_IN_NF4, True) | |
#print(f"pixtral_model: {type(pixtral_model)}") # | |
#print(f"pixtral_processor: {type(pixtral_processor)}") # | |
def stream_chat_mod(input_image: Image.Image, caption_type: str, caption_length: Union[str, int], extra_options: list[str], name_input: str, custom_prompt: str, | |
max_new_tokens: int=300, top_p: float=0.9, temperature: float=0.6, model_name: str=MODEL_PATH, progress=gr.Progress(track_tqdm=True)) -> tuple[str, str]: | |
global tokenizer, text_model, image_adapter, pixtral_model, pixtral_processor, text_model_client, use_inference_client | |
torch.cuda.empty_cache() | |
gc.collect() | |
# 'any' means no length specified | |
length = None if caption_length == "any" else caption_length | |
if isinstance(length, str): | |
try: | |
length = int(length) | |
except ValueError: | |
pass | |
# Build prompt | |
if length is None: | |
map_idx = 0 | |
elif isinstance(length, int): | |
map_idx = 1 | |
elif isinstance(length, str): | |
map_idx = 2 | |
else: | |
raise ValueError(f"Invalid caption length: {length}") | |
prompt_str = CAPTION_TYPE_MAP[caption_type][map_idx] | |
# Add extra options | |
if len(extra_options) > 0: | |
prompt_str += " " + " ".join(extra_options) | |
# Add name, length, word_count | |
prompt_str = prompt_str.format(name=name_input, length=caption_length, word_count=caption_length) | |
if custom_prompt.strip() != "": | |
prompt_str = custom_prompt.strip() | |
# For debugging | |
print(f"Prompt: {prompt_str}") | |
# Pixtral | |
if model_name in PIXTRAL_PATHS: | |
print(f"pixtral_model: {type(pixtral_model)}") # | |
print(f"pixtral_processor: {type(pixtral_processor)}") # | |
input_images = [input_image.convert("RGB")] | |
input_prompt = "[INST]Caption this image:\n[IMG][/INST]" | |
inputs = pixtral_processor(images=input_images, text=input_prompt, return_tensors="pt").to(device) | |
generate_ids = pixtral_model.generate(**inputs, max_new_tokens=max_new_tokens) | |
output = pixtral_processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] | |
return input_prompt, output.strip() | |
# Preprocess image | |
# NOTE: I found the default processor for so400M to have worse results than just using PIL directly | |
#image = clip_processor(images=input_image, return_tensors='pt').pixel_values | |
image = input_image.resize((384, 384), Image.LANCZOS) | |
pixel_values = TVF.pil_to_tensor(image).unsqueeze(0) / 255.0 | |
pixel_values = TVF.normalize(pixel_values, [0.5], [0.5]) | |
pixel_values = pixel_values.to(device) | |
# Embed image | |
# This results in Batch x Image Tokens x Features | |
with torch.amp.autocast_mode.autocast(device, enabled=True): | |
vision_outputs = clip_model(pixel_values=pixel_values, output_hidden_states=True) | |
image_features = vision_outputs.hidden_states | |
embedded_images = image_adapter(image_features) | |
embedded_images = embedded_images.to(device) | |
# Build the conversation | |
convo = [ | |
{ | |
"role": "system", | |
"content": "You are a helpful image captioner.", | |
}, | |
{ | |
"role": "user", | |
"content": prompt_str, | |
}, | |
] | |
# Format the conversation | |
convo_string = tokenizer.apply_chat_template(convo, tokenize = False, add_generation_prompt = True) | |
assert isinstance(convo_string, str) | |
# Tokenize the conversation | |
# prompt_str is tokenized separately so we can do the calculations below | |
convo_tokens = tokenizer.encode(convo_string, return_tensors="pt", add_special_tokens=False, truncation=False) | |
prompt_tokens = tokenizer.encode(prompt_str, return_tensors="pt", add_special_tokens=False, truncation=False) | |
assert isinstance(convo_tokens, torch.Tensor) and isinstance(prompt_tokens, torch.Tensor) | |
convo_tokens = convo_tokens.squeeze(0) # Squeeze just to make the following easier | |
prompt_tokens = prompt_tokens.squeeze(0) | |
# Calculate where to inject the image | |
eot_id_indices = (convo_tokens == tokenizer.convert_tokens_to_ids("<|eot_id|>")).nonzero(as_tuple=True)[0].tolist() | |
assert len(eot_id_indices) == 2, f"Expected 2 <|eot_id|> tokens, got {len(eot_id_indices)}" | |
preamble_len = eot_id_indices[1] - prompt_tokens.shape[0] # Number of tokens before the prompt | |
# Embed the tokens | |
convo_embeds = text_model.model.embed_tokens(convo_tokens.unsqueeze(0).to(device)) | |
# Construct the input | |
input_embeds = torch.cat([ | |
convo_embeds[:, :preamble_len], # Part before the prompt | |
embedded_images.to(dtype=convo_embeds.dtype), # Image | |
convo_embeds[:, preamble_len:], # The prompt and anything after it | |
], dim=1).to(device) | |
input_ids = torch.cat([ | |
convo_tokens[:preamble_len].unsqueeze(0), | |
torch.zeros((1, embedded_images.shape[1]), dtype=torch.long), # Dummy tokens for the image (TODO: Should probably use a special token here so as not to confuse any generation algorithms that might be inspecting the input) | |
convo_tokens[preamble_len:].unsqueeze(0), | |
], dim=1).to(device) | |
attention_mask = torch.ones_like(input_ids) | |
# Debugging | |
#print(f"Input to model: {repr(tokenizer.decode(input_ids[0]))}") | |
text_model.to(device) | |
generate_ids = text_model.generate(input_ids, inputs_embeds=input_embeds, attention_mask=attention_mask, max_new_tokens=max_new_tokens, | |
do_sample=True, suppress_tokens=None, top_p=top_p, temperature=temperature) | |
# Trim off the prompt | |
generate_ids = generate_ids[:, input_ids.shape[1]:] | |
if generate_ids[0][-1] == tokenizer.eos_token_id or generate_ids[0][-1] == tokenizer.convert_tokens_to_ids("<|eot_id|>"): | |
generate_ids = generate_ids[:, :-1] | |
caption = tokenizer.batch_decode(generate_ids, skip_special_tokens=False, clean_up_tokenization_spaces=False)[0] | |
return prompt_str, caption.strip() | |
# https://huggingface.co/docs/transformers/v4.44.2/main_classes/text_generation#transformers.FlaxGenerationMixin.generate | |
# https://github.com/huggingface/transformers/issues/6535 | |
# https://zenn.dev/hijikix/articles/8c445f4373fdcc ja | |
# https://github.com/ggerganov/llama.cpp/discussions/7712 | |
# https://huggingface.co/docs/huggingface_hub/guides/inference#openai-compatibility | |
# https://huggingface.co/docs/huggingface_hub/v0.24.6/en/package_reference/inference_client#huggingface_hub.InferenceClient.text_generation | |
def is_repo_name(s): | |
import re | |
return re.fullmatch(r'^[^/,\s\"\']+/[^/,\s\"\']+$', s) | |
def is_repo_exists(repo_id): | |
try: | |
api = HfApi(token=HF_TOKEN) | |
if api.repo_exists(repo_id=repo_id): return True | |
else: return False | |
except Exception as e: | |
print(f"Error: Failed to connect {repo_id}. {e}") | |
return True # for safe | |
def is_valid_repo(repo_id): | |
import re | |
try: | |
if not re.fullmatch(r'^[^/,\s\"\']+/[^/,\s\"\']+$', repo_id): return False | |
api = HfApi() | |
if api.repo_exists(repo_id=repo_id): return True | |
else: return False | |
except Exception as e: | |
print(f"Failed to connect {repo_id}. {e}") | |
return False | |
def get_text_model(): | |
return list(llm_models.keys()) | |
def is_gguf_repo(repo_id: str): | |
try: | |
api = HfApi(token=HF_TOKEN) | |
if not is_repo_name(repo_id) or not is_repo_exists(repo_id): return False | |
files = api.list_repo_files(repo_id=repo_id) | |
except Exception as e: | |
print(f"Error: Failed to get {repo_id}'s info. {e}") | |
gr.Warning(f"Error: Failed to get {repo_id}'s info. {e}") | |
return False | |
files = [f for f in files if f.endswith(".gguf")] | |
if len(files) == 0: return False | |
else: return True | |
def get_repo_gguf(repo_id: str): | |
try: | |
api = HfApi(token=HF_TOKEN) | |
if not is_repo_name(repo_id) or not is_repo_exists(repo_id): return gr.update(value="", choices=[]) | |
files = api.list_repo_files(repo_id=repo_id) | |
except Exception as e: | |
print(f"Error: Failed to get {repo_id}'s info. {e}") | |
gr.Warning(f"Error: Failed to get {repo_id}'s info. {e}") | |
return gr.update(value="", choices=[]) | |
files = [f for f in files if f.endswith(".gguf")] | |
if len(files) == 0: return gr.update(value="", choices=[]) | |
else: return gr.update(value=files[0], choices=files) | |
def change_text_model(model_name: str=MODEL_PATH, use_client: bool=False, gguf_file: Union[str, None]=None, | |
is_nf4: bool=True, is_lora: bool=True, progress=gr.Progress(track_tqdm=True)): | |
global use_inference_client, llm_models | |
use_inference_client = use_client | |
try: | |
if not is_repo_name(model_name) or not is_repo_exists(model_name): | |
raise gr.Error(f"Repo doesn't exist: {model_name}") | |
if not gguf_file and is_gguf_repo(model_name): | |
gr.Info(f"Please select a gguf file.") | |
return gr.update(visible=True) | |
if use_inference_client: | |
pass # | |
else: | |
load_text_model(model_name, gguf_file, is_nf4, is_lora) | |
if model_name not in llm_models: llm_models[model_name] = gguf_file if gguf_file else None | |
return gr.update(choices=get_text_model()) | |
except Exception as e: | |
raise gr.Error(f"Model load error: {model_name}, {e}") | |