Spaces:
Sleeping
Sleeping
from torch import nn | |
from torch import Tensor | |
from typing import Callable, Optional, List | |
from utils.learning import freeze_params | |
__all__ = ['MobileNetV2'] | |
def _make_divisible(v: float, | |
divisor: int, | |
min_value: Optional[int] = None) -> int: | |
""" | |
This function is taken from the original tf repo. | |
It ensures that all layers have a channel number that is divisible by 8 | |
It can be seen here: | |
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py | |
""" | |
if min_value is None: | |
min_value = divisor | |
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor) | |
# Make sure that round down does not go down by more than 10%. | |
if new_v < 0.9 * v: | |
new_v += divisor | |
return new_v | |
class ConvBNActivation(nn.Sequential): | |
def __init__( | |
self, | |
in_planes: int, | |
out_planes: int, | |
kernel_size: int = 3, | |
stride: int = 1, | |
groups: int = 1, | |
padding: int = -1, | |
norm_layer: Optional[Callable[..., nn.Module]] = None, | |
activation_layer: Optional[Callable[..., nn.Module]] = None, | |
dilation: int = 1, | |
) -> None: | |
if padding == -1: | |
padding = (kernel_size - 1) // 2 * dilation | |
if norm_layer is None: | |
norm_layer = nn.BatchNorm2d | |
if activation_layer is None: | |
activation_layer = nn.ReLU6 | |
super().__init__( | |
nn.Conv2d(in_planes, | |
out_planes, | |
kernel_size, | |
stride, | |
padding, | |
dilation=dilation, | |
groups=groups, | |
bias=False), norm_layer(out_planes), | |
activation_layer(inplace=True)) | |
self.out_channels = out_planes | |
# necessary for backwards compatibility | |
ConvBNReLU = ConvBNActivation | |
class InvertedResidual(nn.Module): | |
def __init__( | |
self, | |
inp: int, | |
oup: int, | |
stride: int, | |
dilation: int, | |
expand_ratio: int, | |
norm_layer: Optional[Callable[..., nn.Module]] = None) -> None: | |
super(InvertedResidual, self).__init__() | |
self.stride = stride | |
assert stride in [1, 2] | |
if norm_layer is None: | |
norm_layer = nn.BatchNorm2d | |
self.kernel_size = 3 | |
self.dilation = dilation | |
hidden_dim = int(round(inp * expand_ratio)) | |
self.use_res_connect = self.stride == 1 and inp == oup | |
layers: List[nn.Module] = [] | |
if expand_ratio != 1: | |
# pw | |
layers.append( | |
ConvBNReLU(inp, | |
hidden_dim, | |
kernel_size=1, | |
norm_layer=norm_layer)) | |
layers.extend([ | |
# dw | |
ConvBNReLU(hidden_dim, | |
hidden_dim, | |
stride=stride, | |
dilation=dilation, | |
groups=hidden_dim, | |
norm_layer=norm_layer), | |
# pw-linear | |
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False), | |
norm_layer(oup), | |
]) | |
self.conv = nn.Sequential(*layers) | |
self.out_channels = oup | |
self._is_cn = stride > 1 | |
def forward(self, x: Tensor) -> Tensor: | |
if self.use_res_connect: | |
return x + self.conv(x) | |
else: | |
return self.conv(x) | |
class MobileNetV2(nn.Module): | |
def __init__(self, | |
output_stride=8, | |
norm_layer: Optional[Callable[..., nn.Module]] = None, | |
width_mult: float = 1.0, | |
inverted_residual_setting: Optional[List[List[int]]] = None, | |
round_nearest: int = 8, | |
block: Optional[Callable[..., nn.Module]] = None, | |
freeze_at=0) -> None: | |
""" | |
MobileNet V2 main class | |
Args: | |
num_classes (int): Number of classes | |
width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount | |
inverted_residual_setting: Network structure | |
round_nearest (int): Round the number of channels in each layer to be a multiple of this number | |
Set to 1 to turn off rounding | |
block: Module specifying inverted residual building block for mobilenet | |
norm_layer: Module specifying the normalization layer to use | |
""" | |
super(MobileNetV2, self).__init__() | |
if block is None: | |
block = InvertedResidual | |
if norm_layer is None: | |
norm_layer = nn.BatchNorm2d | |
last_channel = 1280 | |
input_channel = 32 | |
current_stride = 1 | |
rate = 1 | |
if inverted_residual_setting is None: | |
inverted_residual_setting = [ | |
# t, c, n, s | |
[1, 16, 1, 1], | |
[6, 24, 2, 2], | |
[6, 32, 3, 2], | |
[6, 64, 4, 2], | |
[6, 96, 3, 1], | |
[6, 160, 3, 2], | |
[6, 320, 1, 1], | |
] | |
# only check the first element, assuming user knows t,c,n,s are required | |
if len(inverted_residual_setting) == 0 or len( | |
inverted_residual_setting[0]) != 4: | |
raise ValueError("inverted_residual_setting should be non-empty " | |
"or a 4-element list, got {}".format( | |
inverted_residual_setting)) | |
# building first layer | |
input_channel = _make_divisible(input_channel * width_mult, | |
round_nearest) | |
self.last_channel = _make_divisible( | |
last_channel * max(1.0, width_mult), round_nearest) | |
features: List[nn.Module] = [ | |
ConvBNReLU(3, input_channel, stride=2, norm_layer=norm_layer) | |
] | |
current_stride *= 2 | |
# building inverted residual blocks | |
for t, c, n, s in inverted_residual_setting: | |
if current_stride == output_stride: | |
stride = 1 | |
dilation = rate | |
rate *= s | |
else: | |
stride = s | |
dilation = 1 | |
current_stride *= s | |
output_channel = _make_divisible(c * width_mult, round_nearest) | |
for i in range(n): | |
if i == 0: | |
features.append( | |
block(input_channel, output_channel, stride, dilation, | |
t, norm_layer)) | |
else: | |
features.append( | |
block(input_channel, output_channel, 1, rate, t, | |
norm_layer)) | |
input_channel = output_channel | |
# building last several layers | |
features.append( | |
ConvBNReLU(input_channel, | |
self.last_channel, | |
kernel_size=1, | |
norm_layer=norm_layer)) | |
# make it nn.Sequential | |
self.features = nn.Sequential(*features) | |
self._initialize_weights() | |
feature_4x = self.features[0:4] | |
feautre_8x = self.features[4:7] | |
feature_16x = self.features[7:14] | |
feature_32x = self.features[14:] | |
self.stages = [feature_4x, feautre_8x, feature_16x, feature_32x] | |
self.freeze(freeze_at) | |
def forward(self, x): | |
xs = [] | |
for stage in self.stages: | |
x = stage(x) | |
xs.append(x) | |
return xs | |
def _initialize_weights(self): | |
# weight initialization | |
for m in self.modules(): | |
if isinstance(m, nn.Conv2d): | |
nn.init.kaiming_normal_(m.weight, mode='fan_out') | |
if m.bias is not None: | |
nn.init.zeros_(m.bias) | |
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): | |
nn.init.ones_(m.weight) | |
nn.init.zeros_(m.bias) | |
elif isinstance(m, nn.Linear): | |
nn.init.normal_(m.weight, 0, 0.01) | |
nn.init.zeros_(m.bias) | |
def freeze(self, freeze_at): | |
if freeze_at >= 1: | |
for m in self.stages[0][0]: | |
freeze_params(m) | |
for idx, stage in enumerate(self.stages, start=2): | |
if freeze_at >= idx: | |
freeze_params(stage) | |