File size: 5,997 Bytes
561c629
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# -*- coding: utf-8 -*-

import argparse
import cv2
import torch
import numpy as np
import os, shutil, time
import sys, random
from multiprocessing import Pool
from os import path as osp
from tqdm import tqdm
from math import log10, sqrt
import torch.nn.functional as F

root_path = os.path.abspath('.')
sys.path.append(root_path)
from degradation.ESR.degradations_functionality import *
from degradation.ESR.diffjpeg import *
from degradation.ESR.utils import filter2D
from degradation.image_compression.jpeg import JPEG
from degradation.image_compression.webp import WEBP
from degradation.image_compression.heif import HEIF
from degradation.image_compression.avif import AVIF
from opt import opt


def PSNR(original, compressed):
    mse = np.mean((original - compressed) ** 2)
    if(mse == 0):  # MSE is zero means no noise is present in the signal .
                  # Therefore PSNR have no importance.
        return 100
    max_pixel = 255.0
    psnr = 20 * log10(max_pixel / sqrt(mse))
    return psnr



def downsample_1st(out, opt):
    # Resize with different mode
    updown_type = random.choices(['up', 'down', 'keep'], opt['resize_prob'])[0]
    if updown_type == 'up':
        scale = np.random.uniform(1, opt['resize_range'][1])
    elif updown_type == 'down':
        scale = np.random.uniform(opt['resize_range'][0], 1)
    else:
        scale = 1
    mode = random.choice(opt['resize_options'])
    out = F.interpolate(out, scale_factor=scale, mode=mode)

    return out


def downsample_2nd(out, opt, ori_h, ori_w):
    # Second Resize for 4x scaling
    if opt['scale'] == 4:
        updown_type = random.choices(['up', 'down', 'keep'], opt['resize_prob2'])[0]
        if updown_type == 'up':
            scale = np.random.uniform(1, opt['resize_range2'][1])
        elif updown_type == 'down':
            scale = np.random.uniform(opt['resize_range2'][0], 1)
        else:
            scale = 1
        mode = random.choice(opt['resize_options'])
        # Resize这边改回来原来的版本,不用连续的resize了
        # out = F.interpolate(out, scale_factor=scale, mode=mode)
        out = F.interpolate(
            out, size=(int(ori_h / opt['scale'] * scale), int(ori_w / opt['scale'] * scale)), mode=mode
        )
    
    return out


def common_degradation(out, opt, kernels, process_id, verbose = False):
    jpeger = DiffJPEG(differentiable=False).cuda()
    kernel1, kernel2 = kernels


    downsample_1st_position = random.choices([0, 1, 2])[0]
    if opt['scale'] == 4:
        # Only do the second downsample at 4x scale
        downsample_2nd_position = random.choices([0, 1, 2])[0]
    else:
        # print("We don't use the second resize")
        downsample_2nd_position = -1

        
    ####---------------------------- Frist Degradation ----------------------------------####
    batch_size, _, ori_h, ori_w = out.size()

    if downsample_1st_position == 0:
        out = downsample_1st(out, opt)

    # Bluring kernel
    out = filter2D(out, kernel1)
    if verbose: print(f"(1st) blur noise")


    if downsample_1st_position == 1:
        out = downsample_1st(out, opt)


    # Noise effect (gaussian / poisson)
    gray_noise_prob = opt['gray_noise_prob']
    if np.random.uniform() < opt['gaussian_noise_prob']:
        # Gaussian noise
        out = random_add_gaussian_noise_pt(
            out, sigma_range=opt['noise_range'], clip=True, rounds=False, gray_prob=gray_noise_prob)
        name = "gaussian_noise"
    else:
        # Poisson noise
        out = random_add_poisson_noise_pt(
            out,
            scale_range=opt['poisson_scale_range'],
            gray_prob=gray_noise_prob,
            clip=True,
            rounds=False)
        name = "poisson_noise"
    if verbose: print("(1st) " + str(name))


    if downsample_1st_position == 2:
        out = downsample_1st(out, opt)


    # Choose an image compression codec (All degradation batch use the same codec)
    image_codec = random.choices(opt['compression_codec1'], opt['compression_codec_prob1'])[0]     # All lower case
    if image_codec == "jpeg":
        out = JPEG.compress_tensor(out)
    elif image_codec == "webp":
        try:
            out = WEBP.compress_tensor(out, idx=process_id)
        except Exception:
            print("There is exception again in webp!")
            out = WEBP.compress_tensor(out, idx=process_id)
    elif image_codec == "heif":
        out = HEIF.compress_tensor(out, idx=process_id)
    elif image_codec == "avif":
        out = AVIF.compress_tensor(out, idx=process_id)
    else:
        raise NotImplementedError("We don't have such image compression designed!")
    # ##########################################################################################


    # ####---------------------------- Second Degradation ----------------------------------####
    if downsample_2nd_position == 0:
        out = downsample_2nd(out, opt, ori_h, ori_w)


    # Add blur 2nd time
    if np.random.uniform() < opt['second_blur_prob']:
        # 这个bluring不是必定触发的
        if verbose: print("(2nd) blur noise")
        out = filter2D(out, kernel2)


    if downsample_2nd_position == 1:
        out = downsample_2nd(out, opt, ori_h, ori_w)


    # Add noise 2nd time
    gray_noise_prob = opt['gray_noise_prob2']
    if np.random.uniform() < opt['gaussian_noise_prob2']:
        # gaussian noise
        if verbose: print("(2nd) gaussian noise")
        out = random_add_gaussian_noise_pt(
            out, sigma_range=opt['noise_range2'], clip=True, rounds=False, gray_prob=gray_noise_prob)
        name = "gaussian_noise"
    else:
        # poisson noise
        if verbose: print("(2nd) poisson noise")
        out = random_add_poisson_noise_pt(
            out, scale_range=opt['poisson_scale_range2'], gray_prob=gray_noise_prob, clip=True, rounds=False)
        name = "poisson_noise"


    if downsample_2nd_position == 2:
        out = downsample_2nd(out, opt, ori_h, ori_w)


    return out