# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math import paddle import paddle.nn as nn import paddle.nn.functional as F from paddleseg import utils from paddleseg.cvlibs import manager, param_init from paddleseg.models import layers @manager.MODELS.add_component class ESPNetV2(nn.Layer): """ The ESPNetV2 implementation based on PaddlePaddle. The original article refers to Sachin Mehta, Mohammad Rastegari, Linda Shapiro, and Hannaneh Hajishirzi. "ESPNetv2: A Light-weight, Power Efficient, and General Purpose Convolutional Neural Network" (https://arxiv.org/abs/1811.11431). Args: num_classes (int): The unique number of target classes. in_channels (int, optional): Number of input channels. Default: 3. scale (float, optional): The scale of channels, only support scale <= 1.5 and scale == 2. Default: 1.0. drop_prob (floa, optional): The probability of dropout. Default: 0.1. pretrained (str, optional): The path or url of pretrained model. Default: None. """ def __init__(self, num_classes, in_channels=3, scale=1.0, drop_prob=0.1, pretrained=None): super().__init__() self.backbone = EESPNetBackbone(in_channels, drop_prob, scale) self.in_channels = self.backbone.out_channels self.proj_l4_c = layers.ConvBNPReLU( self.in_channels[3], self.in_channels[2], 1, stride=1, bias_attr=False) psp_size = 2 * self.in_channels[2] self.eesp_psp = nn.Sequential( EESP( psp_size, psp_size // 2, stride=1, branches=4, kernel_size_maximum=7), PSPModule(psp_size // 2, psp_size // 2), ) self.project_l3 = nn.Sequential( nn.Dropout2D(p=drop_prob), nn.Conv2D( psp_size // 2, num_classes, 1, 1, bias_attr=False), ) self.act_l3 = BNPReLU(num_classes) self.project_l2 = layers.ConvBNPReLU( self.in_channels[1] + num_classes, num_classes, 1, stride=1, bias_attr=False) self.project_l1 = nn.Sequential( nn.Dropout2D(p=drop_prob), nn.Conv2D( self.in_channels[0] + num_classes, num_classes, 1, 1, bias_attr=False), ) self.pretrained = pretrained self.init_weight() def init_weight(self): if self.pretrained is not None: utils.load_entire_model(self, self.pretrained) def hierarchical_upsample(self, x, factor=3): for i in range(factor): x = F.interpolate( x, scale_factor=2, mode='bilinear', align_corners=True) return x def forward(self, x): out_l1, out_l2, out_l3, out_l4 = self.backbone(x) out_l4_proj = self.proj_l4_c(out_l4) l4_to_l3 = F.interpolate( out_l4_proj, scale_factor=2, mode='bilinear', align_corners=True) merged_l3 = self.eesp_psp(paddle.concat([out_l3, l4_to_l3], axis=1)) proj_merge_l3 = self.project_l3(merged_l3) proj_merge_l3 = self.act_l3(proj_merge_l3) l3_to_l2 = F.interpolate( proj_merge_l3, scale_factor=2, mode='bilinear', align_corners=True) merged_l2 = self.project_l2(paddle.concat([out_l2, l3_to_l2], axis=1)) l2_to_l1 = F.interpolate( merged_l2, scale_factor=2, mode='bilinear', align_corners=True) merged_l1 = self.project_l1(paddle.concat([out_l1, l2_to_l1], axis=1)) if self.training: return [ F.interpolate( merged_l1, scale_factor=2, mode='bilinear', align_corners=True), self.hierarchical_upsample(proj_merge_l3), ] else: return [ F.interpolate( merged_l1, scale_factor=2, mode='bilinear', align_corners=True) ] class BNPReLU(nn.Layer): def __init__(self, out_channels, **kwargs): super().__init__() if 'data_format' in kwargs: data_format = kwargs['data_format'] else: data_format = 'NCHW' self._batch_norm = layers.SyncBatchNorm( out_channels, data_format=data_format) self._prelu = layers.Activation("prelu") def forward(self, x): x = self._batch_norm(x) x = self._prelu(x) return x class EESP(nn.Layer): """ EESP block, principle: reduce -> split -> transform -> merge Args: in_channels (int): Number of input channels. out_channels (int): Number of output channels. stride (int, optional): Factor by which we should skip (useful for down-sampling). If 2, then down-samples the feature map by 2. Default: 1. branches (int, optional): Number of branches. Default: 4. kernel_size_maximum (int, optional): A maximum value of receptive field allowed for EESP block. Default: 7. down_method (str, optional): Down sample or not, only support 'avg' and 'esp'(equivalent to stride is 2 or not). Default: 'esp'. """ def __init__(self, in_channels, out_channels, stride=1, branches=4, kernel_size_maximum=7, down_method='esp'): super(EESP, self).__init__() if out_channels % branches != 0: raise RuntimeError( "The out_channes for EESP should be factorized by branches, but out_channels={} cann't be factorized by branches={}" .format(out_channels, branches)) assert down_method in [ 'avg', 'esp' ], "The down_method for EESP only support 'avg' or 'esp', but got down_method={}".format( down_method) self.in_channels = in_channels self.stride = stride in_branch_channels = int(out_channels / branches) self.group_conv_in = layers.ConvBNPReLU( in_channels, in_branch_channels, 1, stride=1, groups=branches, bias_attr=False) map_ksize_dilation = { 3: 1, 5: 2, 7: 3, 9: 4, 11: 5, 13: 6, 15: 7, 17: 8 } self.kernel_sizes = [] for i in range(branches): kernel_size = 3 + 2 * i kernel_size = kernel_size if kernel_size <= kernel_size_maximum else 3 self.kernel_sizes.append(kernel_size) self.kernel_sizes.sort() self.spp_modules = nn.LayerList() for i in range(branches): dilation = map_ksize_dilation[self.kernel_sizes[i]] self.spp_modules.append( nn.Conv2D( in_branch_channels, in_branch_channels, kernel_size=3, padding='same', stride=stride, dilation=dilation, groups=in_branch_channels, bias_attr=False)) self.group_conv_out = layers.ConvBN( out_channels, out_channels, kernel_size=1, stride=1, groups=branches, bias_attr=False) self.bn_act = BNPReLU(out_channels) self._act = nn.PReLU() self.down_method = True if down_method == 'avg' else False @paddle.jit.not_to_static def convert_group_x(self, group_merge, x): if x.shape == group_merge.shape: group_merge += x return group_merge def forward(self, x): group_out = self.group_conv_in(x) output = [self.spp_modules[0](group_out)] for k in range(1, len(self.spp_modules)): output_k = self.spp_modules[k](group_out) output_k = output_k + output[k - 1] output.append(output_k) group_merge = self.group_conv_out( self.bn_act(paddle.concat( output, axis=1))) if self.stride == 2 and self.down_method: return group_merge group_merge = self.convert_group_x(group_merge, x) out = self._act(group_merge) return out class PSPModule(nn.Layer): def __init__(self, in_channels, out_channels, sizes=4): super().__init__() self.stages = nn.LayerList([ nn.Conv2D( in_channels, in_channels, kernel_size=3, stride=1, groups=in_channels, padding='same', bias_attr=False) for _ in range(sizes) ]) self.project = layers.ConvBNPReLU( in_channels * (sizes + 1), out_channels, 1, stride=1, bias_attr=False) def forward(self, feats): h, w = paddle.shape(feats)[2:4] out = [feats] for stage in self.stages: feats = F.avg_pool2d(feats, kernel_size=3, stride=2, padding='same') upsampled = F.interpolate( stage(feats), size=[h, w], mode='bilinear', align_corners=True) out.append(upsampled) return self.project(paddle.concat(out, axis=1)) class DownSampler(nn.Layer): """ Down sampler. Args: in_channels (int): Number of input channels. out_channels (int): Number of output channels. branches (int, optional): Number of branches. Default: 9. kernel_size_maximum (int, optional): A maximum value of kernel_size for EESP block. Default: 9. shortcut (bool, optional): Use shortcut or not. Default: True. """ def __init__(self, in_channels, out_channels, branches=4, kernel_size_maximum=9, shortcut=True): super().__init__() if out_channels < in_channels: raise RuntimeError( "The out_channes for DownSampler should be bigger than in_channels, but got in_channles={}, out_channels={}" .format(in_channels, out_channels)) self.eesp = EESP( in_channels, out_channels - in_channels, stride=2, branches=branches, kernel_size_maximum=kernel_size_maximum, down_method='avg') self.avg = nn.AvgPool2D(kernel_size=3, padding=1, stride=2) if shortcut: self.shortcut_layer = nn.Sequential( layers.ConvBNPReLU( 3, 3, 3, stride=1, bias_attr=False), layers.ConvBN( 3, out_channels, 1, stride=1, bias_attr=False), ) self._act = nn.PReLU() def forward(self, x, inputs=None): avg_out = self.avg(x) eesp_out = self.eesp(x) output = paddle.concat([avg_out, eesp_out], axis=1) if inputs is not None: w1 = paddle.shape(avg_out)[2] w2 = paddle.shape(inputs)[2] while w2 != w1: inputs = F.avg_pool2d( inputs, kernel_size=3, padding=1, stride=2) w2 = paddle.shape(inputs)[2] # import pdb # pdb.set_trace() output = output + self.shortcut_layer(inputs) return self._act(output) class EESPNetBackbone(nn.Layer): """ The EESPNetBackbone implementation based on PaddlePaddle. The original article refers to Sachin Mehta, Mohammad Rastegari, Linda Shapiro, and Hannaneh Hajishirzi. "ESPNetv2: A Light-weight, Power Efficient, and General Purpose Convolutional Neural Network" (https://arxiv.org/abs/1811.11431). Args: in_channels (int, optional): Number of input channels. Default: 3. drop_prob (float, optional): The probability of dropout. Default: 3. scale (float, optional): The scale of channels, only support scale <= 1.5 and scale == 2. Default: 1.0. """ def __init__(self, in_channels=3, drop_prob=0.1, scale=1.0): super().__init__() reps = [0, 3, 7, 3] num_level = 4 # 1/2, 1/4, 1/8, 1/16 kernel_size_limitations = [13, 11, 9, 7] # kernel size limitation branch_list = [4] * len( kernel_size_limitations) # branches at different levels base_channels = 32 # first conv output channels channels_config = [base_channels] * num_level for i in range(num_level): if i == 0: channels = int(base_channels * scale) channels = math.ceil(channels / branch_list[0]) * branch_list[0] channels_config[ i] = base_channels if channels > base_channels else channels else: channels_config[i] = channels * pow(2, i) self.level1 = layers.ConvBNPReLU( in_channels, channels_config[0], 3, stride=2, bias_attr=False) self.level2 = DownSampler( channels_config[0], channels_config[1], branches=branch_list[0], kernel_size_maximum=kernel_size_limitations[0], shortcut=True) self.level3_0 = DownSampler( channels_config[1], channels_config[2], branches=branch_list[1], kernel_size_maximum=kernel_size_limitations[1], shortcut=True) self.level3 = nn.LayerList() for i in range(reps[1]): self.level3.append( EESP( channels_config[2], channels_config[2], stride=1, branches=branch_list[2], kernel_size_maximum=kernel_size_limitations[2])) self.level4_0 = DownSampler( channels_config[2], channels_config[3], branches=branch_list[2], kernel_size_maximum=kernel_size_limitations[2], shortcut=True) self.level4 = nn.LayerList() for i in range(reps[2]): self.level4.append( EESP( channels_config[3], channels_config[3], stride=1, branches=branch_list[3], kernel_size_maximum=kernel_size_limitations[3])) self.out_channels = channels_config self.init_params() def init_params(self): for m in self.sublayers(): if isinstance(m, nn.Conv2D): param_init.kaiming_normal_init(m.weight) if m.bias is not None: param_init.constant_init(m.bias, value=0.0) elif isinstance(m, nn.BatchNorm2D): param_init.constant_init(m.weight, value=1.0) param_init.constant_init(m.bias, value=0.0) elif isinstance(m, nn.Linear): param_init.normal_init(m.weight, std=0.001) if m.bias is not None: param_init.constant_init(m.bias, value=0.0) def forward(self, x): out_l1 = self.level1(x) out_l2 = self.level2(out_l1, x) out_l3 = self.level3_0(out_l2, x) for i, layer in enumerate(self.level3): out_l3 = layer(out_l3) out_l4 = self.level4_0(out_l3, x) for i, layer in enumerate(self.level4): out_l4 = layer(out_l4) return out_l1, out_l2, out_l3, out_l4 if __name__ == '__main__': import paddle import numpy as np paddle.enable_static() startup_prog = paddle.static.default_startup_program() exe = paddle.static.Executor(paddle.CPUPlace()) exe.run(startup_prog) path_prefix = "./output/model" [inference_program, feed_target_names, fetch_targets] = ( paddle.static.load_inference_model(path_prefix, exe)) print('inference_program:', inference_program) tensor_img = np.array( np.random.random((1, 3, 1024, 2048)), dtype=np.float32) results = exe.run(inference_program, feed={feed_target_names[0]: tensor_img}, fetch_list=fetch_targets)