import os if os.environ.get("SPACES_ZERO_GPU") is not None: import spaces else: class spaces: @staticmethod def GPU(func): def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper import gradio as gr from huggingface_hub import InferenceClient, HfApi from torch import nn from transformers import AutoModel, AutoProcessor, AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerFast, AutoModelForCausalLM, LlavaForConditionalGeneration from pathlib import Path import torch import torch.amp.autocast_mode from PIL import Image import torchvision.transforms.functional as TVF import gc from peft import PeftModel from typing import Union LOAD_IN_NF4 = True if os.environ.get("SPACES_ZERO_GPU") is not None: import subprocess LOAD_IN_NF4 = False # If true, Custom VLM LoRA doesn't work initially. The rest are fine. subprocess.run('pip install flash-attn --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True) BASE_DIR = Path(__file__).resolve().parent # Define the base directory device = "cuda" if torch.cuda.is_available() else "cpu" HF_TOKEN = os.environ.get("HF_TOKEN", None) use_inference_client = False PIXTRAL_PATHS = ["SeanScripts/pixtral-12b-nf4", "mistral-community/pixtral-12b"] llm_models = { "Orenguteng/Llama-3.1-8B-Lexi-Uncensored-V2": None, #PIXTRAL_PATHS[0]: None, "bunnycore/LLama-3.1-8B-Matrix": None, "Sao10K/Llama-3.1-8B-Stheno-v3.4": None, "unsloth/Meta-Llama-3.1-8B-bnb-4bit": None, "DevQuasar/HermesNova-Llama-3.1-8B": None, "mergekit-community/L3.1-Boshima-b-FIX": None, #"chuanli11/Llama-3.2-3B-Instruct-uncensored": None, # Error(s) in loading state_dict for ImageAdapter:\n\tsize mismatch for linear1.weight: copying a param with shape torch.Size([4096, 1152]) from checkpoint, the shape in current model is torch.Size([3072, 1152]).\n\tsize mismatch for linear1.bias: copying a param with shape torch.Size([4096]) from checkpoint, "unsloth/Meta-Llama-3.1-8B-Instruct": None, } CLIP_PATH = "google/siglip-so400m-patch14-384" MODEL_PATH = list(llm_models.keys())[0] CHECKPOINT_PATH = BASE_DIR / Path("cgrkzexw-599808") LORA_PATH = CHECKPOINT_PATH / "text_model" TITLE = "

JoyCaption Alpha Two (2024-09-26a)

" CAPTION_TYPE_MAP = { "Descriptive": [ "Write a descriptive caption for this image in a formal tone.", "Write a descriptive caption for this image in a formal tone within {word_count} words.", "Write a {length} descriptive caption for this image in a formal tone.", ], "Descriptive (Informal)": [ "Write a descriptive caption for this image in a casual tone.", "Write a descriptive caption for this image in a casual tone within {word_count} words.", "Write a {length} descriptive caption for this image in a casual tone.", ], "Training Prompt": [ "Write a stable diffusion prompt for this image.", "Write a stable diffusion prompt for this image within {word_count} words.", "Write a {length} stable diffusion prompt for this image.", ], "MidJourney": [ "Write a MidJourney prompt for this image.", "Write a MidJourney prompt for this image within {word_count} words.", "Write a {length} MidJourney prompt for this image.", ], "Booru tag list": [ "Write a list of Booru tags for this image.", "Write a list of Booru tags for this image within {word_count} words.", "Write a {length} list of Booru tags for this image.", ], "Booru-like tag list": [ "Write a list of Booru-like tags for this image.", "Write a list of Booru-like tags for this image within {word_count} words.", "Write a {length} list of Booru-like tags for this image.", ], "Art Critic": [ "Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc.", "Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc. Keep it within {word_count} words.", "Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc. Keep it {length}.", ], "Product Listing": [ "Write a caption for this image as though it were a product listing.", "Write a caption for this image as though it were a product listing. Keep it under {word_count} words.", "Write a {length} caption for this image as though it were a product listing.", ], "Social Media Post": [ "Write a caption for this image as if it were being used for a social media post.", "Write a caption for this image as if it were being used for a social media post. Limit the caption to {word_count} words.", "Write a {length} caption for this image as if it were being used for a social media post.", ], } class ImageAdapter(nn.Module): def __init__(self, input_features: int, output_features: int, ln1: bool, pos_emb: bool, num_image_tokens: int, deep_extract: bool): super().__init__() self.deep_extract = deep_extract if self.deep_extract: input_features = input_features * 5 self.linear1 = nn.Linear(input_features, output_features) self.activation = nn.GELU() self.linear2 = nn.Linear(output_features, output_features) self.ln1 = nn.Identity() if not ln1 else nn.LayerNorm(input_features) self.pos_emb = None if not pos_emb else nn.Parameter(torch.zeros(num_image_tokens, input_features)) # Other tokens (<|image_start|>, <|image_end|>, <|eot_id|>) self.other_tokens = nn.Embedding(3, output_features) self.other_tokens.weight.data.normal_(mean=0.0, std=0.02) # Matches HF's implementation of llama3 def forward(self, vision_outputs: torch.Tensor): if self.deep_extract: x = torch.concat(( vision_outputs[-2], vision_outputs[3], vision_outputs[7], vision_outputs[13], vision_outputs[20], ), dim=-1) assert len(x.shape) == 3, f"Expected 3, got {len(x.shape)}" # batch, tokens, features assert x.shape[-1] == vision_outputs[-2].shape[-1] * 5, f"Expected {vision_outputs[-2].shape[-1] * 5}, got {x.shape[-1]}" else: x = vision_outputs[-2] x = self.ln1(x) if self.pos_emb is not None: assert x.shape[-2:] == self.pos_emb.shape, f"Expected {self.pos_emb.shape}, got {x.shape[-2:]}" x = x + self.pos_emb x = self.linear1(x) x = self.activation(x) x = self.linear2(x) # <|image_start|>, IMAGE, <|image_end|> other_tokens = self.other_tokens(torch.tensor([0, 1], device=self.other_tokens.weight.device).expand(x.shape[0], -1)) assert other_tokens.shape == (x.shape[0], 2, x.shape[2]), f"Expected {(x.shape[0], 2, x.shape[2])}, got {other_tokens.shape}" x = torch.cat((other_tokens[:, 0:1], x, other_tokens[:, 1:2]), dim=1) return x def get_eot_embedding(self): return self.other_tokens(torch.tensor([2], device=self.other_tokens.weight.device)).squeeze(0) # https://huggingface.co/docs/transformers/v4.44.2/gguf # https://github.com/city96/ComfyUI-GGUF/issues/7 # https://github.com/THUDM/ChatGLM-6B/issues/18 # https://github.com/meta-llama/llama/issues/394 # https://huggingface.co/meta-llama/Meta-Llama-3-8B-Instruct/discussions/109 # https://huggingface.co/docs/transformers/main/en/main_classes/quantization#offload-between-cpu-and-gpu # https://huggingface.co/google/flan-ul2/discussions/8 # https://huggingface.co/blog/4bit-transformers-bitsandbytes # https://huggingface.co/docs/transformers/main/en/peft # https://huggingface.co/docs/transformers/main/en/peft#enable-and-disable-adapters # https://huggingface.co/docs/transformers/main/quantization/bitsandbytes?bnb=4-bit # https://huggingface.co/lllyasviel/flux1-dev-bnb-nf4 # https://github.com/huggingface/transformers/issues/28515 # https://gist.github.com/ChrisHayduk/1a53463331f52dca205e55982baf9930 tokenizer = None text_model_client = None text_model = None image_adapter = None pixtral_model = None pixtral_processor = None def load_text_model(model_name: str=MODEL_PATH, gguf_file: Union[str, None]=None, is_nf4: bool=True, is_lora: bool=True): global tokenizer, text_model, image_adapter, pixtral_model, pixtral_processor, text_model_client, use_inference_client try: tokenizer = None text_model_client = None text_model = None image_adapter = None pixtral_model = None pixtral_processor = None torch.cuda.empty_cache() gc.collect() lora_device = "auto" from transformers import BitsAndBytesConfig nf4_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=torch.bfloat16) if model_name in PIXTRAL_PATHS: # Pixtral print(f"Loading LLM: {model_name}") if is_nf4: pixtral_model = LlavaForConditionalGeneration.from_pretrained(model_name, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval() else: pixtral_model = LlavaForConditionalGeneration.from_pretrained(model_name, device_map=device, torch_dtype=torch.bfloat16).eval() pixtral_processor = AutoProcessor.from_pretrained(model_name) print(f"pixtral_model: {type(pixtral_model)}") # print(f"pixtral_processor: {type(pixtral_processor)}") # return print("Loading tokenizer") tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT_PATH / "text_model", use_fast=True) assert isinstance(tokenizer, PreTrainedTokenizer) or isinstance(tokenizer, PreTrainedTokenizerFast), f"Tokenizer is of type {type(tokenizer)}" print(f"Loading LLM: {model_name}") if gguf_file: if device == "cpu": text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=device, torch_dtype=torch.bfloat16).eval() elif is_nf4: text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval() else: text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=lora_device, torch_dtype=torch.bfloat16).eval() else: if device == "cpu": text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=device, torch_dtype=torch.bfloat16).eval() elif is_nf4: text_model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval() else: text_model = AutoModelForCausalLM.from_pretrained(model_name, device_map=lora_device, torch_dtype=torch.bfloat16).eval() if is_lora and LORA_PATH.exists() and not is_nf4: print("Loading VLM's custom text model") if is_nf4: # omitted text_model = PeftModel.from_pretrained(model=text_model, model_id=LORA_PATH, device_map=device, quantization_config=nf4_config) else: text_model = PeftModel.from_pretrained(model=text_model, model_id=LORA_PATH, device_map=device) text_model = text_model.merge_and_unload(safe_merge=True) # to avoid PEFT bug https://github.com/huggingface/transformers/issues/28515 else: print("VLM's custom text model is not loaded") print("Loading image adapter") image_adapter = ImageAdapter(clip_model.config.hidden_size, text_model.config.hidden_size, False, False, 38, False).eval().to("cpu") image_adapter.load_state_dict(torch.load(CHECKPOINT_PATH / "image_adapter.pt", map_location="cpu", weights_only=False)) image_adapter.eval().to(device) except Exception as e: print(f"LLM load error: {e}") raise Exception(f"LLM load error: {e}") from e finally: torch.cuda.empty_cache() gc.collect() load_text_model.zerogpu = True # Load CLIP print("Loading CLIP") clip_processor = AutoProcessor.from_pretrained(CLIP_PATH) clip_model = AutoModel.from_pretrained(CLIP_PATH).vision_model assert (CHECKPOINT_PATH / "clip_model.pt").exists() if (CHECKPOINT_PATH / "clip_model.pt").exists(): print("Loading VLM's custom vision model") checkpoint = torch.load(CHECKPOINT_PATH / "clip_model.pt", map_location='cpu', weights_only=False) checkpoint = {k.replace("_orig_mod.module.", ""): v for k, v in checkpoint.items()} clip_model.load_state_dict(checkpoint) del checkpoint clip_model.eval().requires_grad_(False).to(device) # Tokenizer # LLM # Image Adapter #load_text_model(PIXTRAL_PATHS[0]) #print(f"pixtral_model: {type(pixtral_model)}") # #print(f"pixtral_processor: {type(pixtral_processor)}") # load_text_model(MODEL_PATH, None, LOAD_IN_NF4, True) #print(f"pixtral_model: {type(pixtral_model)}") # #print(f"pixtral_processor: {type(pixtral_processor)}") # @spaces.GPU @torch.inference_mode() def stream_chat_mod(input_image: Image.Image, caption_type: str, caption_length: Union[str, int], extra_options: list[str], name_input: str, custom_prompt: str, max_new_tokens: int=300, top_p: float=0.9, temperature: float=0.6, model_name: str=MODEL_PATH, progress=gr.Progress(track_tqdm=True)) -> tuple[str, str]: global tokenizer, text_model, image_adapter, pixtral_model, pixtral_processor, text_model_client, use_inference_client torch.cuda.empty_cache() gc.collect() # 'any' means no length specified length = None if caption_length == "any" else caption_length if isinstance(length, str): try: length = int(length) except ValueError: pass # Build prompt if length is None: map_idx = 0 elif isinstance(length, int): map_idx = 1 elif isinstance(length, str): map_idx = 2 else: raise ValueError(f"Invalid caption length: {length}") prompt_str = CAPTION_TYPE_MAP[caption_type][map_idx] # Add extra options if len(extra_options) > 0: prompt_str += " " + " ".join(extra_options) # Add name, length, word_count prompt_str = prompt_str.format(name=name_input, length=caption_length, word_count=caption_length) if custom_prompt.strip() != "": prompt_str = custom_prompt.strip() # For debugging print(f"Prompt: {prompt_str}") # Pixtral if model_name in PIXTRAL_PATHS: print(f"pixtral_model: {type(pixtral_model)}") # print(f"pixtral_processor: {type(pixtral_processor)}") # input_images = [input_image.convert("RGB")] input_prompt = "[INST]Caption this image:\n[IMG][/INST]" inputs = pixtral_processor(images=input_images, text=input_prompt, return_tensors="pt").to(device) generate_ids = pixtral_model.generate(**inputs, max_new_tokens=max_new_tokens) output = pixtral_processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] return input_prompt, output.strip() # Preprocess image # NOTE: I found the default processor for so400M to have worse results than just using PIL directly #image = clip_processor(images=input_image, return_tensors='pt').pixel_values image = input_image.resize((384, 384), Image.LANCZOS) pixel_values = TVF.pil_to_tensor(image).unsqueeze(0) / 255.0 pixel_values = TVF.normalize(pixel_values, [0.5], [0.5]) pixel_values = pixel_values.to(device) # Embed image # This results in Batch x Image Tokens x Features with torch.amp.autocast_mode.autocast(device, enabled=True): vision_outputs = clip_model(pixel_values=pixel_values, output_hidden_states=True) image_features = vision_outputs.hidden_states embedded_images = image_adapter(image_features) embedded_images = embedded_images.to(device) # Build the conversation convo = [ { "role": "system", "content": "You are a helpful image captioner.", }, { "role": "user", "content": prompt_str, }, ] # Format the conversation convo_string = tokenizer.apply_chat_template(convo, tokenize = False, add_generation_prompt = True) assert isinstance(convo_string, str) # Tokenize the conversation # prompt_str is tokenized separately so we can do the calculations below convo_tokens = tokenizer.encode(convo_string, return_tensors="pt", add_special_tokens=False, truncation=False) prompt_tokens = tokenizer.encode(prompt_str, return_tensors="pt", add_special_tokens=False, truncation=False) assert isinstance(convo_tokens, torch.Tensor) and isinstance(prompt_tokens, torch.Tensor) convo_tokens = convo_tokens.squeeze(0) # Squeeze just to make the following easier prompt_tokens = prompt_tokens.squeeze(0) # Calculate where to inject the image eot_id_indices = (convo_tokens == tokenizer.convert_tokens_to_ids("<|eot_id|>")).nonzero(as_tuple=True)[0].tolist() assert len(eot_id_indices) == 2, f"Expected 2 <|eot_id|> tokens, got {len(eot_id_indices)}" preamble_len = eot_id_indices[1] - prompt_tokens.shape[0] # Number of tokens before the prompt # Embed the tokens convo_embeds = text_model.model.embed_tokens(convo_tokens.unsqueeze(0).to(device)) # Construct the input input_embeds = torch.cat([ convo_embeds[:, :preamble_len], # Part before the prompt embedded_images.to(dtype=convo_embeds.dtype), # Image convo_embeds[:, preamble_len:], # The prompt and anything after it ], dim=1).to(device) input_ids = torch.cat([ convo_tokens[:preamble_len].unsqueeze(0), torch.zeros((1, embedded_images.shape[1]), dtype=torch.long), # Dummy tokens for the image (TODO: Should probably use a special token here so as not to confuse any generation algorithms that might be inspecting the input) convo_tokens[preamble_len:].unsqueeze(0), ], dim=1).to(device) attention_mask = torch.ones_like(input_ids) # Debugging #print(f"Input to model: {repr(tokenizer.decode(input_ids[0]))}") text_model.to(device) generate_ids = text_model.generate(input_ids, inputs_embeds=input_embeds, attention_mask=attention_mask, max_new_tokens=max_new_tokens, do_sample=True, suppress_tokens=None, top_p=top_p, temperature=temperature) # Trim off the prompt generate_ids = generate_ids[:, input_ids.shape[1]:] if generate_ids[0][-1] == tokenizer.eos_token_id or generate_ids[0][-1] == tokenizer.convert_tokens_to_ids("<|eot_id|>"): generate_ids = generate_ids[:, :-1] caption = tokenizer.batch_decode(generate_ids, skip_special_tokens=False, clean_up_tokenization_spaces=False)[0] return prompt_str, caption.strip() # https://huggingface.co/docs/transformers/v4.44.2/main_classes/text_generation#transformers.FlaxGenerationMixin.generate # https://github.com/huggingface/transformers/issues/6535 # https://zenn.dev/hijikix/articles/8c445f4373fdcc ja # https://github.com/ggerganov/llama.cpp/discussions/7712 # https://huggingface.co/docs/huggingface_hub/guides/inference#openai-compatibility # https://huggingface.co/docs/huggingface_hub/v0.24.6/en/package_reference/inference_client#huggingface_hub.InferenceClient.text_generation def is_repo_name(s): import re return re.fullmatch(r'^[^/,\s\"\']+/[^/,\s\"\']+$', s) def is_repo_exists(repo_id): try: api = HfApi(token=HF_TOKEN) if api.repo_exists(repo_id=repo_id): return True else: return False except Exception as e: print(f"Error: Failed to connect {repo_id}. {e}") return True # for safe def is_valid_repo(repo_id): import re try: if not re.fullmatch(r'^[^/,\s\"\']+/[^/,\s\"\']+$', repo_id): return False api = HfApi() if api.repo_exists(repo_id=repo_id): return True else: return False except Exception as e: print(f"Failed to connect {repo_id}. {e}") return False def get_text_model(): return list(llm_models.keys()) def is_gguf_repo(repo_id: str): try: api = HfApi(token=HF_TOKEN) if not is_repo_name(repo_id) or not is_repo_exists(repo_id): return False files = api.list_repo_files(repo_id=repo_id) except Exception as e: print(f"Error: Failed to get {repo_id}'s info. {e}") gr.Warning(f"Error: Failed to get {repo_id}'s info. {e}") return False files = [f for f in files if f.endswith(".gguf")] if len(files) == 0: return False else: return True def get_repo_gguf(repo_id: str): try: api = HfApi(token=HF_TOKEN) if not is_repo_name(repo_id) or not is_repo_exists(repo_id): return gr.update(value="", choices=[]) files = api.list_repo_files(repo_id=repo_id) except Exception as e: print(f"Error: Failed to get {repo_id}'s info. {e}") gr.Warning(f"Error: Failed to get {repo_id}'s info. {e}") return gr.update(value="", choices=[]) files = [f for f in files if f.endswith(".gguf")] if len(files) == 0: return gr.update(value="", choices=[]) else: return gr.update(value=files[0], choices=files) @spaces.GPU def change_text_model(model_name: str=MODEL_PATH, use_client: bool=False, gguf_file: Union[str, None]=None, is_nf4: bool=True, is_lora: bool=True, progress=gr.Progress(track_tqdm=True)): global use_inference_client, llm_models use_inference_client = use_client try: if not is_repo_name(model_name) or not is_repo_exists(model_name): raise gr.Error(f"Repo doesn't exist: {model_name}") if not gguf_file and is_gguf_repo(model_name): gr.Info(f"Please select a gguf file.") return gr.update(visible=True) if use_inference_client: pass # else: load_text_model(model_name, gguf_file, is_nf4, is_lora) if model_name not in llm_models: llm_models[model_name] = gguf_file if gguf_file else None return gr.update(choices=get_text_model()) except Exception as e: raise gr.Error(f"Model load error: {model_name}, {e}")