# Author: thygate # https://github.com/thygate/stable-diffusion-webui-depthmap-script from modules import devices from modules.shared import opts from torchvision.transforms import transforms from operator import getitem import torch, gc import cv2 import numpy as np import skimage.measure whole_size_threshold = 1600 # R_max from the paper pix2pixsize = 1024 def scale_torch(img): """ Scale the image and output it in torch.tensor. :param img: input rgb is in shape [H, W, C], input depth/disp is in shape [H, W] :param scale: the scale factor. float :return: img. [C, H, W] """ if len(img.shape) == 2: img = img[np.newaxis, :, :] if img.shape[2] == 3: transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406) , (0.229, 0.224, 0.225) )]) img = transform(img.astype(np.float32)) else: img = img.astype(np.float32) img = torch.from_numpy(img) return img def estimateleres(img, model, w, h): # leres transform input rgb_c = img[:, :, ::-1].copy() A_resize = cv2.resize(rgb_c, (w, h)) img_torch = scale_torch(A_resize)[None, :, :, :] # compute with torch.no_grad(): img_torch = img_torch.to(devices.get_device_for("controlnet")) prediction = model.depth_model(img_torch) prediction = prediction.squeeze().cpu().numpy() prediction = cv2.resize(prediction, (img.shape[1], img.shape[0]), interpolation=cv2.INTER_CUBIC) return prediction def generatemask(size): # Generates a Guassian mask mask = np.zeros(size, dtype=np.float32) sigma = int(size[0]/16) k_size = int(2 * np.ceil(2 * int(size[0]/16)) + 1) mask[int(0.15*size[0]):size[0] - int(0.15*size[0]), int(0.15*size[1]): size[1] - int(0.15*size[1])] = 1 mask = cv2.GaussianBlur(mask, (int(k_size), int(k_size)), sigma) mask = (mask - mask.min()) / (mask.max() - mask.min()) mask = mask.astype(np.float32) return mask def resizewithpool(img, size): i_size = img.shape[0] n = int(np.floor(i_size/size)) out = skimage.measure.block_reduce(img, (n, n), np.max) return out def rgb2gray(rgb): # Converts rgb to gray return np.dot(rgb[..., :3], [0.2989, 0.5870, 0.1140]) def calculateprocessingres(img, basesize, confidence=0.1, scale_threshold=3, whole_size_threshold=3000): # Returns the R_x resolution described in section 5 of the main paper. # Parameters: # img :input rgb image # basesize : size the dilation kernel which is equal to receptive field of the network. # confidence: value of x in R_x; allowed percentage of pixels that are not getting any contextual cue. # scale_threshold: maximum allowed upscaling on the input image ; it has been set to 3. # whole_size_threshold: maximum allowed resolution. (R_max from section 6 of the main paper) # Returns: # outputsize_scale*speed_scale :The computed R_x resolution # patch_scale: K parameter from section 6 of the paper # speed scale parameter is to process every image in a smaller size to accelerate the R_x resolution search speed_scale = 32 image_dim = int(min(img.shape[0:2])) gray = rgb2gray(img) grad = np.abs(cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)) + np.abs(cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)) grad = cv2.resize(grad, (image_dim, image_dim), cv2.INTER_AREA) # thresholding the gradient map to generate the edge-map as a proxy of the contextual cues m = grad.min() M = grad.max() middle = m + (0.4 * (M - m)) grad[grad < middle] = 0 grad[grad >= middle] = 1 # dilation kernel with size of the receptive field kernel = np.ones((int(basesize/speed_scale), int(basesize/speed_scale)), float) # dilation kernel with size of the a quarter of receptive field used to compute k # as described in section 6 of main paper kernel2 = np.ones((int(basesize / (4*speed_scale)), int(basesize / (4*speed_scale))), float) # Output resolution limit set by the whole_size_threshold and scale_threshold. threshold = min(whole_size_threshold, scale_threshold * max(img.shape[:2])) outputsize_scale = basesize / speed_scale for p_size in range(int(basesize/speed_scale), int(threshold/speed_scale), int(basesize / (2*speed_scale))): grad_resized = resizewithpool(grad, p_size) grad_resized = cv2.resize(grad_resized, (p_size, p_size), cv2.INTER_NEAREST) grad_resized[grad_resized >= 0.5] = 1 grad_resized[grad_resized < 0.5] = 0 dilated = cv2.dilate(grad_resized, kernel, iterations=1) meanvalue = (1-dilated).mean() if meanvalue > confidence: break else: outputsize_scale = p_size grad_region = cv2.dilate(grad_resized, kernel2, iterations=1) patch_scale = grad_region.mean() return int(outputsize_scale*speed_scale), patch_scale # Generate a double-input depth estimation def doubleestimate(img, size1, size2, pix2pixsize, model, net_type, pix2pixmodel): # Generate the low resolution estimation estimate1 = singleestimate(img, size1, model, net_type) # Resize to the inference size of merge network. estimate1 = cv2.resize(estimate1, (pix2pixsize, pix2pixsize), interpolation=cv2.INTER_CUBIC) # Generate the high resolution estimation estimate2 = singleestimate(img, size2, model, net_type) # Resize to the inference size of merge network. estimate2 = cv2.resize(estimate2, (pix2pixsize, pix2pixsize), interpolation=cv2.INTER_CUBIC) # Inference on the merge model pix2pixmodel.set_input(estimate1, estimate2) pix2pixmodel.test() visuals = pix2pixmodel.get_current_visuals() prediction_mapped = visuals['fake_B'] prediction_mapped = (prediction_mapped+1)/2 prediction_mapped = (prediction_mapped - torch.min(prediction_mapped)) / ( torch.max(prediction_mapped) - torch.min(prediction_mapped)) prediction_mapped = prediction_mapped.squeeze().cpu().numpy() return prediction_mapped # Generate a single-input depth estimation def singleestimate(img, msize, model, net_type): # if net_type == 0: return estimateleres(img, model, msize, msize) # else: # return estimatemidasBoost(img, model, msize, msize) def applyGridpatch(blsize, stride, img, box): # Extract a simple grid patch. counter1 = 0 patch_bound_list = {} for k in range(blsize, img.shape[1] - blsize, stride): for j in range(blsize, img.shape[0] - blsize, stride): patch_bound_list[str(counter1)] = {} patchbounds = [j - blsize, k - blsize, j - blsize + 2 * blsize, k - blsize + 2 * blsize] patch_bound = [box[0] + patchbounds[1], box[1] + patchbounds[0], patchbounds[3] - patchbounds[1], patchbounds[2] - patchbounds[0]] patch_bound_list[str(counter1)]['rect'] = patch_bound patch_bound_list[str(counter1)]['size'] = patch_bound[2] counter1 = counter1 + 1 return patch_bound_list # Generating local patches to perform the local refinement described in section 6 of the main paper. def generatepatchs(img, base_size): # Compute the gradients as a proxy of the contextual cues. img_gray = rgb2gray(img) whole_grad = np.abs(cv2.Sobel(img_gray, cv2.CV_64F, 0, 1, ksize=3)) +\ np.abs(cv2.Sobel(img_gray, cv2.CV_64F, 1, 0, ksize=3)) threshold = whole_grad[whole_grad > 0].mean() whole_grad[whole_grad < threshold] = 0 # We use the integral image to speed-up the evaluation of the amount of gradients for each patch. gf = whole_grad.sum()/len(whole_grad.reshape(-1)) grad_integral_image = cv2.integral(whole_grad) # Variables are selected such that the initial patch size would be the receptive field size # and the stride is set to 1/3 of the receptive field size. blsize = int(round(base_size/2)) stride = int(round(blsize*0.75)) # Get initial Grid patch_bound_list = applyGridpatch(blsize, stride, img, [0, 0, 0, 0]) # Refine initial Grid of patches by discarding the flat (in terms of gradients of the rgb image) ones. Refine # each patch size to ensure that there will be enough depth cues for the network to generate a consistent depth map. print("Selecting patches ...") patch_bound_list = adaptiveselection(grad_integral_image, patch_bound_list, gf) # Sort the patch list to make sure the merging operation will be done with the correct order: starting from biggest # patch patchset = sorted(patch_bound_list.items(), key=lambda x: getitem(x[1], 'size'), reverse=True) return patchset def getGF_fromintegral(integralimage, rect): # Computes the gradient density of a given patch from the gradient integral image. x1 = rect[1] x2 = rect[1]+rect[3] y1 = rect[0] y2 = rect[0]+rect[2] value = integralimage[x2, y2]-integralimage[x1, y2]-integralimage[x2, y1]+integralimage[x1, y1] return value # Adaptively select patches def adaptiveselection(integral_grad, patch_bound_list, gf): patchlist = {} count = 0 height, width = integral_grad.shape search_step = int(32/factor) # Go through all patches for c in range(len(patch_bound_list)): # Get patch bbox = patch_bound_list[str(c)]['rect'] # Compute the amount of gradients present in the patch from the integral image. cgf = getGF_fromintegral(integral_grad, bbox)/(bbox[2]*bbox[3]) # Check if patching is beneficial by comparing the gradient density of the patch to # the gradient density of the whole image if cgf >= gf: bbox_test = bbox.copy() patchlist[str(count)] = {} # Enlarge each patch until the gradient density of the patch is equal # to the whole image gradient density while True: bbox_test[0] = bbox_test[0] - int(search_step/2) bbox_test[1] = bbox_test[1] - int(search_step/2) bbox_test[2] = bbox_test[2] + search_step bbox_test[3] = bbox_test[3] + search_step # Check if we are still within the image if bbox_test[0] < 0 or bbox_test[1] < 0 or bbox_test[1] + bbox_test[3] >= height \ or bbox_test[0] + bbox_test[2] >= width: break # Compare gradient density cgf = getGF_fromintegral(integral_grad, bbox_test)/(bbox_test[2]*bbox_test[3]) if cgf < gf: break bbox = bbox_test.copy() # Add patch to selected patches patchlist[str(count)]['rect'] = bbox patchlist[str(count)]['size'] = bbox[2] count = count + 1 # Return selected patches return patchlist def impatch(image, rect): # Extract the given patch pixels from a given image. w1 = rect[0] h1 = rect[1] w2 = w1 + rect[2] h2 = h1 + rect[3] image_patch = image[h1:h2, w1:w2] return image_patch class ImageandPatchs: def __init__(self, root_dir, name, patchsinfo, rgb_image, scale=1): self.root_dir = root_dir self.patchsinfo = patchsinfo self.name = name self.patchs = patchsinfo self.scale = scale self.rgb_image = cv2.resize(rgb_image, (round(rgb_image.shape[1]*scale), round(rgb_image.shape[0]*scale)), interpolation=cv2.INTER_CUBIC) self.do_have_estimate = False self.estimation_updated_image = None self.estimation_base_image = None def __len__(self): return len(self.patchs) def set_base_estimate(self, est): self.estimation_base_image = est if self.estimation_updated_image is not None: self.do_have_estimate = True def set_updated_estimate(self, est): self.estimation_updated_image = est if self.estimation_base_image is not None: self.do_have_estimate = True def __getitem__(self, index): patch_id = int(self.patchs[index][0]) rect = np.array(self.patchs[index][1]['rect']) msize = self.patchs[index][1]['size'] ## applying scale to rect: rect = np.round(rect * self.scale) rect = rect.astype('int') msize = round(msize * self.scale) patch_rgb = impatch(self.rgb_image, rect) if self.do_have_estimate: patch_whole_estimate_base = impatch(self.estimation_base_image, rect) patch_whole_estimate_updated = impatch(self.estimation_updated_image, rect) return {'patch_rgb': patch_rgb, 'patch_whole_estimate_base': patch_whole_estimate_base, 'patch_whole_estimate_updated': patch_whole_estimate_updated, 'rect': rect, 'size': msize, 'id': patch_id} else: return {'patch_rgb': patch_rgb, 'rect': rect, 'size': msize, 'id': patch_id} def print_options(self, opt): """Print and save options It will print both current options and default values(if different). It will save options into a text file / [checkpoints_dir] / opt.txt """ message = '' message += '----------------- Options ---------------\n' for k, v in sorted(vars(opt).items()): comment = '' default = self.parser.get_default(k) if v != default: comment = '\t[default: %s]' % str(default) message += '{:>25}: {:<30}{}\n'.format(str(k), str(v), comment) message += '----------------- End -------------------' print(message) # save to the disk """ expr_dir = os.path.join(opt.checkpoints_dir, opt.name) util.mkdirs(expr_dir) file_name = os.path.join(expr_dir, '{}_opt.txt'.format(opt.phase)) with open(file_name, 'wt') as opt_file: opt_file.write(message) opt_file.write('\n') """ def parse(self): """Parse our options, create checkpoints directory suffix, and set up gpu device.""" opt = self.gather_options() opt.isTrain = self.isTrain # train or test # process opt.suffix if opt.suffix: suffix = ('_' + opt.suffix.format(**vars(opt))) if opt.suffix != '' else '' opt.name = opt.name + suffix #self.print_options(opt) # set gpu ids str_ids = opt.gpu_ids.split(',') opt.gpu_ids = [] for str_id in str_ids: id = int(str_id) if id >= 0: opt.gpu_ids.append(id) #if len(opt.gpu_ids) > 0: # torch.cuda.set_device(opt.gpu_ids[0]) self.opt = opt return self.opt def estimateboost(img, model, model_type, pix2pixmodel, max_res=512): global whole_size_threshold # get settings if hasattr(opts, 'depthmap_script_boost_rmax'): whole_size_threshold = opts.depthmap_script_boost_rmax if model_type == 0: #leres net_receptive_field_size = 448 patch_netsize = 2 * net_receptive_field_size elif model_type == 1: #dpt_beit_large_512 net_receptive_field_size = 512 patch_netsize = 2 * net_receptive_field_size else: #other midas net_receptive_field_size = 384 patch_netsize = 2 * net_receptive_field_size gc.collect() devices.torch_gc() # Generate mask used to smoothly blend the local pathc estimations to the base estimate. # It is arbitrarily large to avoid artifacts during rescaling for each crop. mask_org = generatemask((3000, 3000)) mask = mask_org.copy() # Value x of R_x defined in the section 5 of the main paper. r_threshold_value = 0.2 #if R0: # r_threshold_value = 0 input_resolution = img.shape scale_threshold = 3 # Allows up-scaling with a scale up to 3 # Find the best input resolution R-x. The resolution search described in section 5-double estimation of the main paper and section B of the # supplementary material. whole_image_optimal_size, patch_scale = calculateprocessingres(img, net_receptive_field_size, r_threshold_value, scale_threshold, whole_size_threshold) # print('wholeImage being processed in :', whole_image_optimal_size) # Generate the base estimate using the double estimation. whole_estimate = doubleestimate(img, net_receptive_field_size, whole_image_optimal_size, pix2pixsize, model, model_type, pix2pixmodel) # Compute the multiplier described in section 6 of the main paper to make sure our initial patch can select # small high-density regions of the image. global factor factor = max(min(1, 4 * patch_scale * whole_image_optimal_size / whole_size_threshold), 0.2) # print('Adjust factor is:', 1/factor) # Check if Local boosting is beneficial. if max_res < whole_image_optimal_size: # print("No Local boosting. Specified Max Res is smaller than R20, Returning doubleestimate result") return cv2.resize(whole_estimate, (input_resolution[1], input_resolution[0]), interpolation=cv2.INTER_CUBIC) # Compute the default target resolution. if img.shape[0] > img.shape[1]: a = 2 * whole_image_optimal_size b = round(2 * whole_image_optimal_size * img.shape[1] / img.shape[0]) else: a = round(2 * whole_image_optimal_size * img.shape[0] / img.shape[1]) b = 2 * whole_image_optimal_size b = int(round(b / factor)) a = int(round(a / factor)) """ # recompute a, b and saturate to max res. if max(a,b) > max_res: print('Default Res is higher than max-res: Reducing final resolution') if img.shape[0] > img.shape[1]: a = max_res b = round(max_res * img.shape[1] / img.shape[0]) else: a = round(max_res * img.shape[0] / img.shape[1]) b = max_res b = int(b) a = int(a) """ img = cv2.resize(img, (b, a), interpolation=cv2.INTER_CUBIC) # Extract selected patches for local refinement base_size = net_receptive_field_size * 2 patchset = generatepatchs(img, base_size) # print('Target resolution: ', img.shape) # Computing a scale in case user prompted to generate the results as the same resolution of the input. # Notice that our method output resolution is independent of the input resolution and this parameter will only # enable a scaling operation during the local patch merge implementation to generate results with the same resolution # as the input. """ if output_resolution == 1: mergein_scale = input_resolution[0] / img.shape[0] print('Dynamicly change merged-in resolution; scale:', mergein_scale) else: mergein_scale = 1 """ # always rescale to input res for now mergein_scale = input_resolution[0] / img.shape[0] imageandpatchs = ImageandPatchs('', '', patchset, img, mergein_scale) whole_estimate_resized = cv2.resize(whole_estimate, (round(img.shape[1]*mergein_scale), round(img.shape[0]*mergein_scale)), interpolation=cv2.INTER_CUBIC) imageandpatchs.set_base_estimate(whole_estimate_resized.copy()) imageandpatchs.set_updated_estimate(whole_estimate_resized.copy()) print('Resulting depthmap resolution will be :', whole_estimate_resized.shape[:2]) print('Patches to process: '+str(len(imageandpatchs))) # Enumerate through all patches, generate their estimations and refining the base estimate. for patch_ind in range(len(imageandpatchs)): # Get patch information patch = imageandpatchs[patch_ind] # patch object patch_rgb = patch['patch_rgb'] # rgb patch patch_whole_estimate_base = patch['patch_whole_estimate_base'] # corresponding patch from base rect = patch['rect'] # patch size and location patch_id = patch['id'] # patch ID org_size = patch_whole_estimate_base.shape # the original size from the unscaled input print('\t Processing patch', patch_ind, '/', len(imageandpatchs)-1, '|', rect) # We apply double estimation for patches. The high resolution value is fixed to twice the receptive # field size of the network for patches to accelerate the process. patch_estimation = doubleestimate(patch_rgb, net_receptive_field_size, patch_netsize, pix2pixsize, model, model_type, pix2pixmodel) patch_estimation = cv2.resize(patch_estimation, (pix2pixsize, pix2pixsize), interpolation=cv2.INTER_CUBIC) patch_whole_estimate_base = cv2.resize(patch_whole_estimate_base, (pix2pixsize, pix2pixsize), interpolation=cv2.INTER_CUBIC) # Merging the patch estimation into the base estimate using our merge network: # We feed the patch estimation and the same region from the updated base estimate to the merge network # to generate the target estimate for the corresponding region. pix2pixmodel.set_input(patch_whole_estimate_base, patch_estimation) # Run merging network pix2pixmodel.test() visuals = pix2pixmodel.get_current_visuals() prediction_mapped = visuals['fake_B'] prediction_mapped = (prediction_mapped+1)/2 prediction_mapped = prediction_mapped.squeeze().cpu().numpy() mapped = prediction_mapped # We use a simple linear polynomial to make sure the result of the merge network would match the values of # base estimate p_coef = np.polyfit(mapped.reshape(-1), patch_whole_estimate_base.reshape(-1), deg=1) merged = np.polyval(p_coef, mapped.reshape(-1)).reshape(mapped.shape) merged = cv2.resize(merged, (org_size[1],org_size[0]), interpolation=cv2.INTER_CUBIC) # Get patch size and location w1 = rect[0] h1 = rect[1] w2 = w1 + rect[2] h2 = h1 + rect[3] # To speed up the implementation, we only generate the Gaussian mask once with a sufficiently large size # and resize it to our needed size while merging the patches. if mask.shape != org_size: mask = cv2.resize(mask_org, (org_size[1],org_size[0]), interpolation=cv2.INTER_LINEAR) tobemergedto = imageandpatchs.estimation_updated_image # Update the whole estimation: # We use a simple Gaussian mask to blend the merged patch region with the base estimate to ensure seamless # blending at the boundaries of the patch region. tobemergedto[h1:h2, w1:w2] = np.multiply(tobemergedto[h1:h2, w1:w2], 1 - mask) + np.multiply(merged, mask) imageandpatchs.set_updated_estimate(tobemergedto) # output return cv2.resize(imageandpatchs.estimation_updated_image, (input_resolution[1], input_resolution[0]), interpolation=cv2.INTER_CUBIC)