ravi.naik
initial code push
d0ef04f
raw
history blame
10.5 kB
"""
Creates a Pytorch dataset to load the Pascal VOC & MS COCO datasets
"""
import numpy as np
import os
import pandas as pd
import torch
import random
from PIL import Image, ImageFile
import lightning as L
from torch.utils.data import Dataset, DataLoader
import config as config
from utils.utils import xywhn2xyxy, xyxy2xywhn
from utils.utils import (
cells_to_bboxes,
iou_width_height as iou,
non_max_suppression as nms,
plot_image,
)
ImageFile.LOAD_TRUNCATED_IMAGES = True
class YOLODataset(Dataset):
def __init__(
self,
csv_file,
img_dir,
label_dir,
anchors,
image_size=416,
S=[13, 26, 52],
C=20,
transform=None,
):
self.annotations = pd.read_csv(csv_file)
self.img_dir = img_dir
self.label_dir = label_dir
self.image_size = image_size
self.mosaic_border = [image_size // 2, image_size // 2]
self.transform = transform
self.S = S
self.anchors = torch.tensor(
anchors[0] + anchors[1] + anchors[2]
) # for all 3 scales
self.num_anchors = self.anchors.shape[0]
self.num_anchors_per_scale = self.num_anchors // 3
self.C = C
self.ignore_iou_thresh = 0.5
def __len__(self):
return len(self.annotations)
def load_mosaic(self, index):
# YOLOv5 4-mosaic loader. Loads 1 image + 3 random images into a 4-image mosaic
labels4 = []
s = self.image_size
yc, xc = (
int(random.uniform(x, 2 * s - x)) for x in self.mosaic_border
) # mosaic center x, y
indices = [index] + random.choices(
range(len(self)), k=3
) # 3 additional image indices
random.shuffle(indices)
for i, index in enumerate(indices):
# Load image
label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1])
bboxes = np.roll(
np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1
).tolist()
img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0])
img = np.array(Image.open(img_path).convert("RGB"))
h, w = img.shape[0], img.shape[1]
labels = np.array(bboxes)
# place img in img4
if i == 0: # top left
img4 = np.full(
(s * 2, s * 2, img.shape[2]), 114, dtype=np.uint8
) # base image with 4 tiles
x1a, y1a, x2a, y2a = (
max(xc - w, 0),
max(yc - h, 0),
xc,
yc,
) # xmin, ymin, xmax, ymax (large image)
x1b, y1b, x2b, y2b = (
w - (x2a - x1a),
h - (y2a - y1a),
w,
h,
) # xmin, ymin, xmax, ymax (small image)
elif i == 1: # top right
x1a, y1a, x2a, y2a = xc, max(yc - h, 0), min(xc + w, s * 2), yc
x1b, y1b, x2b, y2b = 0, h - (y2a - y1a), min(w, x2a - x1a), h
elif i == 2: # bottom left
x1a, y1a, x2a, y2a = max(xc - w, 0), yc, xc, min(s * 2, yc + h)
x1b, y1b, x2b, y2b = w - (x2a - x1a), 0, w, min(y2a - y1a, h)
elif i == 3: # bottom right
x1a, y1a, x2a, y2a = xc, yc, min(xc + w, s * 2), min(s * 2, yc + h)
x1b, y1b, x2b, y2b = 0, 0, min(w, x2a - x1a), min(y2a - y1a, h)
img4[y1a:y2a, x1a:x2a] = img[y1b:y2b, x1b:x2b] # img4[ymin:ymax, xmin:xmax]
padw = x1a - x1b
padh = y1a - y1b
# Labels
if labels.size:
labels[:, :-1] = xywhn2xyxy(
labels[:, :-1], w, h, padw, padh
) # normalized xywh to pixel xyxy format
labels4.append(labels)
# Concat/clip labels
labels4 = np.concatenate(labels4, 0)
for x in (labels4[:, :-1],):
np.clip(x, 0, 2 * s, out=x) # clip when using random_perspective()
# img4, labels4 = replicate(img4, labels4) # replicate
labels4[:, :-1] = xyxy2xywhn(labels4[:, :-1], 2 * s, 2 * s)
labels4[:, :-1] = np.clip(labels4[:, :-1], 0, 1)
labels4 = labels4[labels4[:, 2] > 0]
labels4 = labels4[labels4[:, 3] > 0]
return img4, labels4
def __getitem__(self, index):
if random.random() >= config.P_MOSAIC:
image, bboxes = self.load_mosaic(index)
else:
label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1])
bboxes = np.roll(
np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1
).tolist()
img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0])
image = np.array(Image.open(img_path).convert("RGB"))
if self.transform:
augmentations = self.transform(image=image, bboxes=bboxes)
image = augmentations["image"]
bboxes = augmentations["bboxes"]
# Below assumes 3 scale predictions (as paper) and same num of anchors per scale
targets = [torch.zeros((self.num_anchors // 3, S, S, 6)) for S in self.S]
for box in bboxes:
iou_anchors = iou(torch.tensor(box[2:4]), self.anchors)
anchor_indices = iou_anchors.argsort(descending=True, dim=0)
x, y, width, height, class_label = box
has_anchor = [False] * 3 # each scale should have one anchor
for anchor_idx in anchor_indices:
scale_idx = anchor_idx // self.num_anchors_per_scale
anchor_on_scale = anchor_idx % self.num_anchors_per_scale
S = self.S[scale_idx]
i, j = int(S * y), int(S * x) # which cell
anchor_taken = targets[scale_idx][anchor_on_scale, i, j, 0]
if not anchor_taken and not has_anchor[scale_idx]:
targets[scale_idx][anchor_on_scale, i, j, 0] = 1
x_cell, y_cell = S * x - j, S * y - i # both between [0,1]
width_cell, height_cell = (
width * S,
height * S,
) # can be greater than 1 since it's relative to cell
box_coordinates = torch.tensor(
[x_cell, y_cell, width_cell, height_cell]
)
targets[scale_idx][anchor_on_scale, i, j, 1:5] = box_coordinates
targets[scale_idx][anchor_on_scale, i, j, 5] = int(class_label)
has_anchor[scale_idx] = True
elif (
not anchor_taken
and iou_anchors[anchor_idx] > self.ignore_iou_thresh
):
targets[scale_idx][
anchor_on_scale, i, j, 0
] = -1 # ignore prediction
return image, tuple(targets)
def test():
anchors = config.ANCHORS
transform = config.test_transforms
dataset = YOLODataset(
"COCO/train.csv",
"COCO/images/images/",
"COCO/labels/labels_new/",
S=[13, 26, 52],
anchors=anchors,
transform=transform,
)
S = [13, 26, 52]
scaled_anchors = torch.tensor(anchors) / (
1 / torch.tensor(S).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2)
)
loader = DataLoader(dataset=dataset, batch_size=1, shuffle=True)
for x, y in loader:
boxes = []
for i in range(y[0].shape[1]):
anchor = scaled_anchors[i]
print(anchor.shape)
print(y[i].shape)
boxes += cells_to_bboxes(
y[i], is_preds=False, S=y[i].shape[2], anchors=anchor
)[0]
boxes = nms(boxes, iou_threshold=1, threshold=0.7, box_format="midpoint")
print(boxes)
plot_image(x[0].permute(1, 2, 0).to("cpu"), boxes)
class PascalDataModule(L.LightningDataModule):
def __init__(
self,
train_csv_path=None,
test_csv_path=None,
batch_size=512,
shuffle=True,
num_workers=4,
) -> None:
super().__init__()
self.train_csv_path = train_csv_path
self.test_csv_path = test_csv_path
self.batch_size = batch_size
self.shuffle = shuffle
self.num_workers = num_workers
self.IMAGE_SIZE = config.IMAGE_SIZE
def prepare_data(self) -> None:
pass
def setup(self, stage=None):
self.train_dataset = YOLODataset(
self.train_csv_path,
transform=config.train_transforms,
S=[self.IMAGE_SIZE // 32, self.IMAGE_SIZE // 16, self.IMAGE_SIZE // 8],
img_dir=config.IMG_DIR,
label_dir=config.LABEL_DIR,
anchors=config.ANCHORS,
)
self.val_dataset = YOLODataset(
self.test_csv_path,
transform=config.test_transforms,
S=[self.IMAGE_SIZE // 32, self.IMAGE_SIZE // 16, self.IMAGE_SIZE // 8],
img_dir=config.IMG_DIR,
label_dir=config.LABEL_DIR,
anchors=config.ANCHORS,
)
self.test_dataset = YOLODataset(
self.test_csv_path,
transform=config.test_transforms,
S=[self.IMAGE_SIZE // 32, self.IMAGE_SIZE // 16, self.IMAGE_SIZE // 8],
img_dir=config.IMG_DIR,
label_dir=config.LABEL_DIR,
anchors=config.ANCHORS,
)
def train_dataloader(self):
return DataLoader(
dataset=self.train_dataset,
batch_size=config.BATCH_SIZE,
num_workers=config.NUM_WORKERS,
pin_memory=config.PIN_MEMORY,
shuffle=True,
drop_last=False,
)
def val_dataloader(self):
return DataLoader(
dataset=self.val_dataset,
batch_size=config.BATCH_SIZE,
num_workers=config.NUM_WORKERS,
pin_memory=config.PIN_MEMORY,
shuffle=False,
drop_last=False,
)
def test_dataloader(self):
return DataLoader(
dataset=self.test_dataset,
batch_size=config.BATCH_SIZE,
num_workers=config.NUM_WORKERS,
pin_memory=config.PIN_MEMORY,
shuffle=False,
drop_last=False,
)