Code source de src.convolution

from typing import Literal
from .module import Module
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view

r"""
We tried to vectorize our convolutions to the maximum, prioritizing the performance.

It implies creating special views of our array, by using the `numpy.lib.stride_tricks`
functions. `sliding_window_view` is the easiest to understand, while maybe not the
fastest compared to `as_strided` (but maybe less risky too).

The calculations are done using `np.einsum`, which is relatively easy to understand
and use. The key relies in understanding the shapes of your inputs/outputs.

Shape
-----
Reminder for 1D:
input : ndarray (batch, length, chan_in)
d_out : ndarray (batch, length, chan_in) == input.shape
X_view : ndarray (batch, out_length, chan_in, self.k_size)
delta : ndarray (batch, out_length, chan_out)
_gradient["weight"] : ndarray (k_size, chan_in, chan_out)
_parameters["weight"] : ndarray (k_size, chan_in, chan_out)

Notes
-----
Notation used for `np.einsum`:
- b : batch_size
- w : width (2D) / length (1D)
- h : height (2D)
- o : out_width (2D) / out_length (1D)
- p : out_height (2D)
- c : chan_in
- d : chan_out
- k : k_size (ij for 2D)

Examples
--------
Quick demonstration of `sliding_window_view` in 1D:

>>> batch, length, chan_in, k_size = 1, 8, 1, 3
>>> input = np.random.randn(batch, length, chan_in)
>>> input
array([[[-0.41982262],
        [ 1.10111123],
        [-0.41115195],
        [ 1.18733225],
        [-1.93463567],
        [-0.22472025],
        [-0.30581971],
        [ 0.40578667]]])

>>> window = np.lib.stride_tricks.sliding_window_view(input, (1, k_size, chan_in))
>>> window
array([[[[[[-0.41982262],
           [ 1.10111123],
           [-0.41115195]]]],
        [[[[ 1.10111123],
           [-0.41115195],
           [ 1.18733225]]]],
    ...

How to deal with stride != 1?
>>> stride = 3
>>> window = np.lib.stride_tricks.sliding_window_view(input, (1, k_size, chan_in))[::1, ::stride, ::1]
>>> window
array([[[[[[-0.41982262],
           [ 1.10111123],
           [-0.41115195]]]],
        [[[[ 1.18733225],
           [-1.93463567],
           [-0.22472025]]]]]])

Then it is just a matter of reshape, to drop unnecessaries dimensions, e.g. :
>>> window = window.reshape(batch, out_length, chan_in, k_size)
>>> window
array([[[[-0.41982262,  1.10111123, -0.41115195]],
        [[ 1.18733225, -1.93463567, -0.22472025]]]])

And voilĂ !
"""


[docs]class Conv1D(Module): r"""1D convolution. Parameters ---------- k_size : int Size of the convolving kernel. chan_in : int Number of channels in the input image. chan_out : in Number of channels produced by the convolution. stride : int, optional, default=1 Stride of the convolution. bias : bool, optional, default=False If True, adds a learnable bias to the output. init_type : str, optional, default="xavier_normal" Change the initialization of parameters. Shape ----- - Input : ndarray (batch, length, chan_in) - Output : ndarray (batch, (length - k_size) // stride + 1, chan_out) - Weight : ndarray (k_size, chan_in, chan_out) - Bias : ndarray (chan_out) """ def __init__( self, k_size: int, chan_in: int, chan_out: int, stride: int = 1, bias: bool = False, init_type: Literal[ "normal", "uniform", "zeros", "ones", "he_normal", "he_uniform", "xavier_normal", "xavier_uniform", ] = "xavier_normal", ): super().__init__() self.k_size = k_size self.chan_in = chan_in self.chan_out = chan_out self.stride = stride self.include_bias = bias self.__init_params(init_type) def __init_params(self, init_type): gain = self.calculate_gain() if init_type == "normal": self._parameters["weight"] = np.random.randn( self.k_size, self.chan_in, self.chan_out ) self._parameters["bias"] = np.random.randn(self.chan_out) elif init_type == "uniform": self._parameters["weight"] = np.random.uniform( 0.0, 1.0, (self.k_size, self.chan_in, self.chan_out) ) self._parameters["bias"] = np.random.uniform(0.0, 1.0, (self.chan_out)) elif init_type == "zeros": self._parameters["weight"] = np.zeros( (self.k_size, self.chan_in, self.chan_out) ) self._parameters["bias"] = np.zeros(self.chan_out) elif init_type == "ones": self._parameters["weight"] = np.ones( (self.k_size, self.chan_in, self.chan_out) ) self._parameters["bias"] = np.ones(self.chan_out) elif init_type == "he_normal": std_dev = gain * np.sqrt(2 / self.chan_in) self._parameters["weight"] = np.random.normal( 0, std_dev, (self.k_size, self.chan_in, self.chan_out) ) self._parameters["bias"] = np.random.normal(0, std_dev, (self.chan_out)) elif init_type == "he_uniform": limit = gain * np.sqrt(6 / self.chan_in) self._parameters["weight"] = np.random.uniform( -limit, limit, (self.k_size, self.chan_in, self.chan_out) ) self._parameters["bias"] = np.random.uniform(-limit, limit, (self.chan_out)) elif init_type == "xavier_normal": std_dev = gain * np.sqrt(2 / (self.chan_in + self.chan_out)) self._parameters["weight"] = np.random.normal( 0, std_dev, (self.k_size, self.chan_in, self.chan_out) ) self._parameters["bias"] = np.random.normal(0, std_dev, (self.chan_out)) elif init_type == "xavier_uniform": limit = gain * np.sqrt(6 / (self.chan_in + self.chan_out)) self._parameters["weight"] = np.random.uniform( -limit, limit, (self.k_size, self.chan_in, self.chan_out) ) self._parameters["bias"] = np.random.uniform(-limit, limit, (self.chan_out)) else: raise ValueError(f"Unknown initialization type: {init_type}") self._gradient["weight"] = np.zeros_like(self._parameters["weight"]) self._gradient["bias"] = np.zeros_like(self._parameters["bias"]) if not self.include_bias: self._parameters["bias"] = None self._gradient["bias"] = None
[docs] def zero_grad(self): self._gradient["weight"] = np.zeros_like(self._parameters["weight"]) if self.include_bias: self._gradient["bias"] = np.zeros_like(self._parameters["bias"])
[docs] def forward(self, X): batch_size, length, chan_in = X.shape assert chan_in == self.chan_in out_length = (length - self.k_size) // self.stride + 1 # Prepare the input view for the convolution operation X_view = sliding_window_view(X, (1, self.k_size, self.chan_in))[ ::1, :: self.stride, ::1 ] X_view = X_view.reshape(batch_size, out_length, self.chan_in, self.k_size) # Perform the convolution self.output = np.einsum("bock, kcd -> bod", X_view, self._parameters["weight"]) if self.include_bias: self.output += self._parameters["bias"] return self.output
[docs] def backward_update_gradient(self, input, delta): batch_size, length, chan_in = input.shape assert chan_in == self.chan_in out_length = (length - self.k_size) // self.stride + 1 X_view = sliding_window_view(input, (1, self.k_size, self.chan_in))[ ::1, :: self.stride, ::1 ] X_view = X_view.reshape(batch_size, out_length, self.chan_in, self.k_size) self._gradient["weight"] += ( np.einsum("bock, bod -> kcd", X_view, delta) / batch_size ) if self.include_bias: self._gradient["bias"] += np.sum(delta, axis=(0, 1)) / batch_size
[docs] def backward_delta(self, input, delta): _, length, chan_in = input.shape assert chan_in == self.chan_in out_length = (length - self.k_size) // self.stride + 1 self.d_out = np.zeros_like(input) d_in = np.einsum("bod, kcd -> kboc", delta, self._parameters["weight"]) for i in range(self.k_size): self.d_out[:, i : i + out_length * self.stride : self.stride, :] += d_in[i] return self.d_out
[docs] def update_parameters(self, learning_rate): self._parameters["weight"] -= learning_rate * self._gradient["weight"] if self.include_bias: self._parameters["bias"] -= learning_rate * self._gradient["bias"]
[docs]class MaxPool1D(Module): r"""1D max pooling. Parameters ---------- k_size : int Size of the convolving kernel. stride : int, optional, default=1 Stride of the convolution. Shape ----- - Input : ndarray (batch, length, chan_in) - Output : ndarray (batch, (length - k_size) // stride + 1, chan_out) """ def __init__(self, k_size, stride): self.k_size = k_size self.stride = stride
[docs] def forward(self, X): batch_size, length, chan_in = X.shape out_length = (length - self.k_size) // self.stride + 1 X_view = sliding_window_view(X, (1, self.k_size, 1))[::1, :: self.stride, ::1] X_view = X_view.reshape(batch_size, out_length, chan_in, self.k_size) self.output = np.max(X_view, axis=-1) return self.output
[docs] def zero_grad(self): pass # No gradient in MaxPool1D
[docs] def backward_update_gradient(self, input, delta): pass # No gradient to update in MaxPool1D
[docs] def backward_delta(self, input, delta): batch_size, length, chan_in = input.shape out_length = (length - self.k_size) // self.stride + 1 input_view = sliding_window_view(input, (1, self.k_size, 1))[ ::1, :: self.stride, ::1 ] input_view = input_view.reshape(batch_size, out_length, chan_in, self.k_size) max_indices = np.argmax(input_view, axis=-1) # Create indices for batch and channel dimensions batch_indices, out_indices, chan_indices = np.meshgrid( np.arange(batch_size), np.arange(out_length), np.arange(chan_in), indexing="ij", ) # Update d_out using advanced indexing self.d_out = np.zeros_like(input) self.d_out[ batch_indices, out_indices * self.stride + max_indices, chan_indices ] += delta[batch_indices, max_indices, chan_indices] return self.d_out
[docs] def update_parameters(self, learning_rate): pass # No parameters to update in MaxPool1D
[docs]class AvgPool1D(Module): r"""1D average pooling. Parameters ---------- k_size : int Size of the convolving kernel. stride : int, optional, default=1 Stride of the convolution. Shape ----- - Input : ndarray (batch, length, chan_in) - Output : ndarray (batch, (length - k_size) // stride + 1, chan_out) """ def __init__(self, k_size, stride): self.k_size = k_size self.stride = stride
[docs] def forward(self, X): batch_size, length, chan_in = X.shape out_length = (length - self.k_size) // self.stride + 1 X_view = sliding_window_view(X, (1, self.k_size, 1))[::1, :: self.stride, ::1] X_view = X_view.reshape(batch_size, out_length, chan_in, self.k_size) self.output = np.mean(X_view, axis=-1) return self.output
[docs] def zero_grad(self): pass # No gradient in AvgPool1D
[docs] def backward_update_gradient(self, x, delta): pass # No gradient to update in AvgPool1D
[docs] def backward_delta(self, input, delta): batch_size, length, chan_in = input.shape out_length = (length - self.k_size) // self.stride + 1 self.d_out = np.zeros_like(input) delta_repeated = ( np.repeat(delta[:, :, np.newaxis], self.k_size, axis=2) / self.k_size ) for i in range(self.k_size): self.d_out[ :, i : i + out_length * self.stride : self.stride ] += delta_repeated[:, :, i] return self.d_out
[docs] def update_parameters(self, learning_rate): pass # No parameters to update in AvgPool1D
[docs]class Flatten(Module): r"""Flatten an output. Shape ----- - Input : ndarray (batch, length, chan_in) - Output : ndarray (batch, length * chan_in) """
[docs] def forward(self, X): return X.reshape(X.shape[0], -1)
[docs] def zero_grad(self): pass
[docs] def backward_update_gradient(self, input, delta): pass
[docs] def backward_delta(self, input, delta): return delta.reshape(input.shape)
[docs] def update_parameters(self, learning_rate): pass