Spaces:
Running
on
T4
Running
on
T4
# -*- coding: utf-8 -*- | |
import argparse | |
import cv2 | |
import torch | |
import numpy as np | |
import os, shutil, time | |
import sys, random | |
from multiprocessing import Pool | |
from os import path as osp | |
from tqdm import tqdm | |
from math import log10, sqrt | |
import torch.nn.functional as F | |
root_path = os.path.abspath('.') | |
sys.path.append(root_path) | |
from degradation.ESR.degradations_functionality import * | |
from degradation.ESR.diffjpeg import * | |
from degradation.ESR.utils import filter2D | |
from degradation.image_compression.jpeg import JPEG | |
from degradation.image_compression.webp import WEBP | |
from degradation.image_compression.heif import HEIF | |
from degradation.image_compression.avif import AVIF | |
from opt import opt | |
def PSNR(original, compressed): | |
mse = np.mean((original - compressed) ** 2) | |
if(mse == 0): # MSE is zero means no noise is present in the signal . | |
# Therefore PSNR have no importance. | |
return 100 | |
max_pixel = 255.0 | |
psnr = 20 * log10(max_pixel / sqrt(mse)) | |
return psnr | |
def downsample_1st(out, opt): | |
# Resize with different mode | |
updown_type = random.choices(['up', 'down', 'keep'], opt['resize_prob'])[0] | |
if updown_type == 'up': | |
scale = np.random.uniform(1, opt['resize_range'][1]) | |
elif updown_type == 'down': | |
scale = np.random.uniform(opt['resize_range'][0], 1) | |
else: | |
scale = 1 | |
mode = random.choice(opt['resize_options']) | |
out = F.interpolate(out, scale_factor=scale, mode=mode) | |
return out | |
def downsample_2nd(out, opt, ori_h, ori_w): | |
# Second Resize for 4x scaling | |
if opt['scale'] == 4: | |
updown_type = random.choices(['up', 'down', 'keep'], opt['resize_prob2'])[0] | |
if updown_type == 'up': | |
scale = np.random.uniform(1, opt['resize_range2'][1]) | |
elif updown_type == 'down': | |
scale = np.random.uniform(opt['resize_range2'][0], 1) | |
else: | |
scale = 1 | |
mode = random.choice(opt['resize_options']) | |
# Resize这边改回来原来的版本,不用连续的resize了 | |
# out = F.interpolate(out, scale_factor=scale, mode=mode) | |
out = F.interpolate( | |
out, size=(int(ori_h / opt['scale'] * scale), int(ori_w / opt['scale'] * scale)), mode=mode | |
) | |
return out | |
def common_degradation(out, opt, kernels, process_id, verbose = False): | |
jpeger = DiffJPEG(differentiable=False).cuda() | |
kernel1, kernel2 = kernels | |
downsample_1st_position = random.choices([0, 1, 2])[0] | |
if opt['scale'] == 4: | |
# Only do the second downsample at 4x scale | |
downsample_2nd_position = random.choices([0, 1, 2])[0] | |
else: | |
# print("We don't use the second resize") | |
downsample_2nd_position = -1 | |
####---------------------------- Frist Degradation ----------------------------------#### | |
batch_size, _, ori_h, ori_w = out.size() | |
if downsample_1st_position == 0: | |
out = downsample_1st(out, opt) | |
# Bluring kernel | |
out = filter2D(out, kernel1) | |
if verbose: print(f"(1st) blur noise") | |
if downsample_1st_position == 1: | |
out = downsample_1st(out, opt) | |
# Noise effect (gaussian / poisson) | |
gray_noise_prob = opt['gray_noise_prob'] | |
if np.random.uniform() < opt['gaussian_noise_prob']: | |
# Gaussian noise | |
out = random_add_gaussian_noise_pt( | |
out, sigma_range=opt['noise_range'], clip=True, rounds=False, gray_prob=gray_noise_prob) | |
name = "gaussian_noise" | |
else: | |
# Poisson noise | |
out = random_add_poisson_noise_pt( | |
out, | |
scale_range=opt['poisson_scale_range'], | |
gray_prob=gray_noise_prob, | |
clip=True, | |
rounds=False) | |
name = "poisson_noise" | |
if verbose: print("(1st) " + str(name)) | |
if downsample_1st_position == 2: | |
out = downsample_1st(out, opt) | |
# Choose an image compression codec (All degradation batch use the same codec) | |
image_codec = random.choices(opt['compression_codec1'], opt['compression_codec_prob1'])[0] # All lower case | |
if image_codec == "jpeg": | |
out = JPEG.compress_tensor(out) | |
elif image_codec == "webp": | |
try: | |
out = WEBP.compress_tensor(out, idx=process_id) | |
except Exception: | |
print("There is exception again in webp!") | |
out = WEBP.compress_tensor(out, idx=process_id) | |
elif image_codec == "heif": | |
out = HEIF.compress_tensor(out, idx=process_id) | |
elif image_codec == "avif": | |
out = AVIF.compress_tensor(out, idx=process_id) | |
else: | |
raise NotImplementedError("We don't have such image compression designed!") | |
# ########################################################################################## | |
# ####---------------------------- Second Degradation ----------------------------------#### | |
if downsample_2nd_position == 0: | |
out = downsample_2nd(out, opt, ori_h, ori_w) | |
# Add blur 2nd time | |
if np.random.uniform() < opt['second_blur_prob']: | |
# 这个bluring不是必定触发的 | |
if verbose: print("(2nd) blur noise") | |
out = filter2D(out, kernel2) | |
if downsample_2nd_position == 1: | |
out = downsample_2nd(out, opt, ori_h, ori_w) | |
# Add noise 2nd time | |
gray_noise_prob = opt['gray_noise_prob2'] | |
if np.random.uniform() < opt['gaussian_noise_prob2']: | |
# gaussian noise | |
if verbose: print("(2nd) gaussian noise") | |
out = random_add_gaussian_noise_pt( | |
out, sigma_range=opt['noise_range2'], clip=True, rounds=False, gray_prob=gray_noise_prob) | |
name = "gaussian_noise" | |
else: | |
# poisson noise | |
if verbose: print("(2nd) poisson noise") | |
out = random_add_poisson_noise_pt( | |
out, scale_range=opt['poisson_scale_range2'], gray_prob=gray_noise_prob, clip=True, rounds=False) | |
name = "poisson_noise" | |
if downsample_2nd_position == 2: | |
out = downsample_2nd(out, opt, ori_h, ori_w) | |
return out |