Spaces:
Paused
Paused
import glob | |
import json | |
import os | |
import random | |
import cv2 | |
import numpy as np | |
import torch | |
import torch.nn.functional as F | |
from transformers import CLIPImageProcessor | |
from model.llava import conversation as conversation_lib | |
from model.segment_anything.utils.transforms import ResizeLongestSide | |
from .data_processing import get_mask_from_json | |
from .utils import (ANSWER_LIST, DEFAULT_IMAGE_TOKEN, | |
EXPLANATORY_QUESTION_LIST, LONG_QUESTION_LIST, | |
SHORT_QUESTION_LIST) | |
class ReasonSegDataset(torch.utils.data.Dataset): | |
pixel_mean = torch.Tensor([123.675, 116.28, 103.53]).view(-1, 1, 1) | |
pixel_std = torch.Tensor([58.395, 57.12, 57.375]).view(-1, 1, 1) | |
img_size = 1024 | |
ignore_label = 255 | |
def __init__( | |
self, | |
base_image_dir, | |
tokenizer, | |
vision_tower, | |
samples_per_epoch=500 * 8 * 2 * 10, | |
precision: str = "fp32", | |
image_size: int = 224, | |
num_classes_per_sample: int = 3, | |
exclude_val=False, | |
reason_seg_data="ReasonSeg|train", | |
explanatory=0.1, | |
): | |
self.exclude_val = exclude_val | |
self.reason_seg_data = reason_seg_data | |
self.samples_per_epoch = samples_per_epoch | |
self.explanatory = explanatory | |
self.num_classes_per_sample = num_classes_per_sample | |
self.base_image_dir = base_image_dir | |
self.image_size = image_size | |
self.tokenizer = tokenizer | |
self.precision = precision | |
self.transform = ResizeLongestSide(image_size) | |
self.clip_image_processor = CLIPImageProcessor.from_pretrained(vision_tower) | |
self.short_question_list = SHORT_QUESTION_LIST | |
self.long_question_list = LONG_QUESTION_LIST | |
self.answer_list = ANSWER_LIST | |
reason_seg_data, splits = reason_seg_data.split("|") | |
splits = splits.split("_") | |
images = [] | |
for split in splits: | |
images_split = glob.glob( | |
os.path.join( | |
base_image_dir, "reason_seg", reason_seg_data, split, "*.jpg" | |
) | |
) | |
images.extend(images_split) | |
jsons = [path.replace(".jpg", ".json") for path in images] | |
self.reason_seg_data = (images, jsons) | |
print("number of reason_seg samples: ", len(images)) | |
if explanatory != -1: | |
self.explanatory_question_list = EXPLANATORY_QUESTION_LIST | |
self.img_to_explanation = {} | |
with open( | |
os.path.join( | |
base_image_dir, | |
"reason_seg", | |
reason_seg_data, | |
"explanatory", | |
"train.json", | |
) | |
) as f: | |
items = json.load(f) | |
for item in items: | |
img_name = item["image"] | |
self.img_to_explanation[img_name] = { | |
"query": item["query"], | |
"outputs": item["outputs"], | |
} | |
print("len(self.img_to_explanation): ", len(self.img_to_explanation)) | |
def __len__(self): | |
return self.samples_per_epoch | |
def preprocess(self, x: torch.Tensor) -> torch.Tensor: | |
"""Normalize pixel values and pad to a square input.""" | |
# Normalize colors | |
x = (x - self.pixel_mean) / self.pixel_std | |
# Pad | |
h, w = x.shape[-2:] | |
padh = self.img_size - h | |
padw = self.img_size - w | |
x = F.pad(x, (0, padw, 0, padh)) | |
return x | |
def __getitem__(self, idx): | |
images, jsons = self.reason_seg_data | |
idx = random.randint(0, len(images) - 1) | |
image_path = images[idx] | |
json_path = jsons[idx] | |
image = cv2.imread(image_path) | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
ori_size = image.shape[:2] | |
# preprocess image for clip | |
image_clip = self.clip_image_processor.preprocess(image, return_tensors="pt")[ | |
"pixel_values" | |
][0] | |
mask, sents, is_sentence = get_mask_from_json(json_path, image) | |
if len(sents) >= self.num_classes_per_sample: | |
sampled_inds = np.random.choice( | |
list(range(len(sents))), size=self.num_classes_per_sample, replace=False | |
) | |
else: | |
sampled_inds = list(range(len(sents))) | |
sampled_sents = np.vectorize(sents.__getitem__)(sampled_inds).tolist() | |
sampled_masks = [ | |
(mask == 1).astype(np.float32) for _ in range(len(sampled_inds)) | |
] | |
image = self.transform.apply_image(image) # preprocess image for sam | |
resize = image.shape[:2] | |
image_name = image_path.split("/")[-1] | |
if self.explanatory != -1 and image_name in self.img_to_explanation: | |
if random.random() < self.explanatory: | |
choice = 2 | |
else: | |
choice = random.randint(0, 1) | |
questions = [] | |
answers = [] | |
for text in sampled_sents: | |
if is_sentence: | |
question_template = random.choice(self.long_question_list) | |
questions.append(question_template.format(sent=text)) | |
else: | |
question_template = random.choice(self.short_question_list) | |
questions.append(question_template.format(class_name=text.lower())) | |
# add explanation if applicable | |
img_name = image_path.split("/")[-1] | |
if self.explanatory != -1 and img_name in self.img_to_explanation: | |
if choice == 0: # [SEG] token | |
answers.append(random.choice(self.answer_list)) | |
elif choice == 1: # [SEG] token + text answer | |
image_name = image_path.split("/")[-1] | |
answer = self.img_to_explanation[image_name]["outputs"] | |
answer = random.choice(self.answer_list) + " {}".format(answer) | |
questions[-1] = ( | |
DEFAULT_IMAGE_TOKEN | |
+ "\n" | |
+ text | |
+ " {}".format(random.choice(self.explanatory_question_list)) | |
) | |
answers.append(answer) | |
elif choice == 2: # vanilla text answer | |
image_name = image_path.split("/")[-1] | |
answer = self.img_to_explanation[image_name]["outputs"] | |
questions[-1] = DEFAULT_IMAGE_TOKEN + "\n" + text | |
answers.append(answer) | |
else: | |
raise ValueError("Not implemented yet.") | |
else: | |
answers.append(random.choice(self.answer_list)) | |
conversations = [] | |
conv = conversation_lib.default_conversation.copy() | |
roles = {"human": conv.roles[0], "gpt": conv.roles[1]} | |
i = 0 | |
while i < len(questions): | |
conv.messages = [] | |
conv.append_message(conv.roles[0], questions[i]) | |
conv.append_message(conv.roles[1], answers[i]) | |
conversations.append(conv.get_prompt()) | |
i += 1 | |
image = self.preprocess(torch.from_numpy(image).permute(2, 0, 1).contiguous()) | |
image_name = image_path.split("/")[-1] | |
if ( | |
self.explanatory != -1 | |
and image_name in self.img_to_explanation | |
and choice == 2 | |
): | |
masks = torch.rand(0, *ori_size) | |
label = torch.ones(ori_size) * self.ignore_label | |
else: | |
masks = np.stack(sampled_masks, axis=0) | |
masks = torch.from_numpy(masks) | |
label = torch.ones(masks.shape[1], masks.shape[2]) * self.ignore_label | |
return ( | |
image_path, | |
image, | |
image_clip, | |
conversations, | |
masks, | |
label, | |
resize, | |
questions, | |
sampled_sents, | |
) | |