Реализация слоев в NN (часть 1)

В первой статье речь пойдет о реализации модуля Sequential и слоёв Dense и Input через библиотеку Numpy для многослойной NN.

Данная статья нацелена на практическую реализацию слоев с минимумом теории, и предполагается что читатель знаком с базовой теорией обучения нейронных сетей.

Начнём с импорта библиотек:

import numpy as np

Реализация Dense слоя

Dense-слой

Dense-слой

class DenseLayer():
    def __init__(self,  units=1, activation='relu', weights=np.array([]), b=np.array([])):
        self.units = units
        self.fl_init = True
        self.activation = activation
        self.weights = weights
        self.b_new = b
        self.w, self.b = np.array([]), np.array([])

Поясню параметры:

  • units — количество нейронов

  • activation — функция активации

  • weights и b_new — скорректированные веса и смещения, которые мы в дальнейшем будем передавать модели

  • w, b — начальные веса и смещения

  • fl_init -флаг, показывающий созданы ли начальные веса и смещения

Далее воспользуемся магическим методом call, чтобы превратить наш класс в функтор:

    def __call__(self, x):
        if (self.fl_init == True) and (self.weights.shape[0] == 0):
            self.w = np.random.normal(loc=0.0, scale=1.0, size=(x.shape[-1], self.units))/np.sqrt(2.0/x.shape[-1])
            self.b = np.ones(shape=(self.units, ), dtype=np.float32)
            self.fl_init = False
            # print(self.w.shape, self.weights)

        elif self.weights.shape[0] != 0:
            self.weights = self.weights.reshape((x.shape[-1], self.units))
            self.w = self.weights
            self.fl_init = False

            self.b_new = self.b_new.reshape((self.units, ))
            self.b = self.b_new
            self.fl_init = False

        y = x.dot(self.w) + self.b

        if self.activation == 'relu':
            return np.maximum(np.zeros(shape=y.shape), y), self.w, self.b, 1, self.units, self.activation
        if self.activation == 'Leaky_relu':
            return np.maximum(0.01*y, y), self.w, self.b, 1, self.units, self.activation
        if self.activation == 'softmax':
            return np.exp(y)/np.sum(np.exp(y), axis=0), self.w, self.b, 1, self.units, self.activation
        if self.activation == 'sigmoid':
            return 1 / (1 + np.exp(-y)), self.w, self.b, 1, self.units, self.activation
        if self.activation == 'tanh':
            return (np.exp(2*y) - 1)/(np.exp(2*y) + 1), self.w, self.b, 1, self.units, self.activation
        if self.activation == 'linear':
            return y, self.w, self.b, 1, self.units, self.activation

Принцип работы такой:

  1. Сначала идет проверка, что начальные веса еще не созданы и нам не передавали уже скорректированные.

  2. Если условие выполняется, то начальные веса создаются с нормальным распределением (математическое ожидание = 0, дисперсия = 1), а начальные смещения задаём как единицы → Флаг переводим в значение False.

  3. Если же скорректированные веса были переданы, то заменяем начальные веса на них.

  4. Вычисляем y

  5. Пропускаем её через переданную функцию активации (если не передана — то пропускаем через 'relu')

Реализуем простенький класс Input:

class Input():
    def __init__(self, shape=None):
        self.shape = shape

    def __call__(self, x):
        if self.shape is not None:
            if x.shape != self.shape:
                return x.reshape(shape=self.shape), 0
            else:
                return x, 0
        return x, 0

Перейдем к написанию модуля Sequential:

class Sequential():
    def __init__(self, layers):
        self.layers = layers # слои в NN

Начнем с метода fit:

Сперва реализуем вспомогательную функцию predict, которая будет возвращать для каждого слоя выходы, функции активации, веса и смещения, используемые слой и количество нейронов. Она нам потребуется в дальнейшем для метода обратного распространения ошибка(BP).

        def predict(x):
            activations = []
            predict_for_layers = []
            weights = []
            b_coef = []
            layer_2 = []
            units = []
            predict = self.layers[0](x)
            layer_2.append(predict[1])
            predict_for_layers.append(predict[0])
            for i in range(1, len(self.layers)):
                predict = self.layers[i](predict[0])
                activations.append(predict[-1])
                predict_for_layers.append(predict[0])
                weights.append(predict[1])
                b_coef.append(predict[2])
                layer_2.append(predict[3])
                units.append(predict[4])
            #print(len(units))

            return predict_for_layers, activations, weights, b_coef, layer_2, units

Далее реализуем функции подсчета градиента:

        def sigmoid_gradient(output):
            return output * (1 - output)

        def tanh_gradient(out):
            return 1/((np.exp(out) + np.exp(-out)/2)**2)

        def relu_gradient(x):
            return (x > 0) * 1

        def leaky_relu_gradient(x):
            return (x > 0) * 1 + (x <= 0) * 0.01

        def linear_gradient(x):
            return 1

Перейдем к самой реализации Backpropagation:

list_back = self.layers[::-1]
for elem in range(x_input.shape[0]):
  x, y = x_input[elem].reshape(1, -1), y_input[elem]
  for epoch in range(epochs):
    predict_layers = predict(x) # 1 - y, 2 - w, 3 - b, 4 - слой, 5 - кол. нейронов
    predict_for_layers, activations, weights, b_coef, layers = predict_layers[0][::-1], predict_layers[1][::-1], predict_layers[2][::-1], predict_layers[3][::-1], predict_layers[4]
    units = predict_layers[5]
    layer_error = predict_for_layers[0] - y
    if len(layer_error.shape) == 1:
      layer_error = layer_error.reshape(1, -1)
    for ind in range(len(list_back) - 1):
      delta_weights = 0
      if activations[ind] == 'linear':
        delta_weights = layer_error * relu_gradient(predict_for_layers[ind])
      if activations[ind] == 'Leaky_relu':
        delta_weights = layer_error * leaky_relu_gradient(predict_for_layers[ind])
      if activations[ind] == 'relu':
        delta_weights = layer_error * relu_gradient(predict_for_layers[ind])
      if activations[ind] == 'sigmoid':
        delta_weights = layer_error * sigmoid_gradient(predict_for_layers[ind])
      if activations[ind] == 'tanh':
        delta_weights = layer_error * tanh_gradient(predict_for_layers[ind])

      b_coef[ind] -= alpha * (np.full(b_coef[ind].shape, layer_error.sum()))
      layer_error = delta_weights.dot(np.transpose(weights[ind]))
      weights[ind] -= alpha * (np.transpose(predict_for_layers[ind + 1]).dot(delta_weights))

    weights_inp = weights[::-1]
    b_inp = b_coef[::-1]
    activations_inp = activations[::-1]

    for indx in range(1, len(self.layers)):
      if layers[indx] == 1:
        self.layers[indx] = DenseLayer(units=units[indx - 1], weights=weights_inp[indx - 1], b=b_inp[indx - 1], activation=activations_inp[indx - 1])

Опишу принцип работы:

  • берется пара элементов — label + input.

  • В цикле по количеству эпох:

    1. С помощью ранее написанной функции predict считается выход (списки переворачиваются, чтобы начинать с последнего слоя).

    2. Считаем ошибку на последнем слое.

    3. delta_weights — производная, взвешенная по ошибкам — (уменьшаем ошибки предсказаний, сделанных с высокой уверенностью. Если наклон касательной линии (значение производной) был небольшим, то в сети содержится либо очень большое, либо очень малое значение) — считаем локальный градиент.

    4. Далее перезаписываем layer_error.

    5. Обновляем веса по следующему правилу:

      обновление весов в NN

      обновление весов в NN

  • Проходимся по слоям и перезаписываем их с новыми весами и смещениями.

Остается написать функцию predict без вспомогательных выходов:

    def predict(self, x):
        predict = self.layers[0](x)
        for i in range(1, len(self.layers)):
            predict = self.layers[i](predict[0])

        return predict

Код полностью:

class Sequential():
    def __init__(self, layers):
        self.layers = layers

    def fit(self, x_input, y_input, epochs=50, alpha=0.01):

        def predict(x):
            activations = []
            predict_for_layers = []
            weights = []
            b_coef = []
            layer_2 = []
            units = []
            predict = self.layers[0](x)
            layer_2.append(predict[1])
            predict_for_layers.append(predict[0])
            for i in range(1, len(self.layers)):
                predict = self.layers[i](predict[0])
                activations.append(predict[-1])
                predict_for_layers.append(predict[0])
                weights.append(predict[1])
                b_coef.append(predict[2])
                layer_2.append(predict[3])
                units.append(predict[4])

            return predict_for_layers, activations, weights, b_coef, layer_2, units

          
        def sigmoid_gradient(output):
            return output * (1 - output)

        def tanh_gradient(out):
            return 1/((np.exp(out) + np.exp(-out)/2)**2)

        def relu_gradient(x):
            return (x > 0) * 1

        def leaky_relu_gradient(x):
            return (x > 0) * 1 + (x <= 0) * 0.01

        def linear_gradient(x):
            return 1

          
        list_back = self.layers[::-1]
        for elem in range(x_input.shape[0]):
            x, y = x_input[elem].reshape(1, -1), y_input[elem]
            for epoch in range(epochs):
                predict_layers = predict(x) # 1 - y, 2 - w, 3 - b, 4 - слой, 5 - кол. нейронов
                predict_for_layers, activations, weights, b_coef, layers = predict_layers[0][::-1], predict_layers[1][::-1], predict_layers[2][::-1], predict_layers[3][::-1], predict_layers[4]
                units = predict_layers[5]
                layer_error = predict_for_layers[0] - y
                if len(layer_error.shape) == 1:
                    layer_error = layer_error.reshape(1, -1)
                for ind in range(len(list_back) - 1):
                    delta_weights = 0
                    if activations[ind] == 'linear':
                        delta_weights = layer_error * relu_gradient(predict_for_layers[ind])
                    if activations[ind] == 'Leaky_relu':
                        delta_weights = layer_error * leaky_relu_gradient(predict_for_layers[ind])
                    if activations[ind] == 'relu':
                        delta_weights = layer_error * relu_gradient(predict_for_layers[ind])
                    if activations[ind] == 'sigmoid':
                        delta_weights = layer_error * sigmoid_gradient(predict_for_layers[ind])
                    if activations[ind] == 'tanh':
                        delta_weights = layer_error * tanh_gradient(predict_for_layers[ind])

                    b_coef[ind] -= alpha * (np.full(b_coef[ind].shape, layer_error.sum()))
                    layer_error = delta_weights.dot(np.transpose(weights[ind]))
                    weights[ind] -= alpha * (np.transpose(predict_for_layers[ind + 1]).dot(delta_weights))


                weights_inp = weights[::-1]
                b_inp = b_coef[::-1]
                activations_inp = activations[::-1]

                for indx in range(1, len(self.layers)):
                    if layers[indx] == 1:
                        self.layers[indx] = DenseLayer(units=units[indx - 1], weights=weights_inp[indx - 1], b=b_inp[indx - 1], activation=activations_inp[indx - 1])

    # Предсказание значений
    def predict(self, x):
        predict = self.layers[0](x)
        for i in range(1, len(self.layers)):
            #print(predict[0].shape)
            predict = self.layers[i](predict[0])

        return predict

Приведу простой пример использования:

x = np.array([[3., 2.], [2., 2.], [3., 3.], [4., 4.]])
y = [5, 4, 6, 8]
#print(x.shape)
model.fit(x, y, epochs=40)
x_test = np.array([[3., 4.], [4., 4.]])

print(model.predict(x_test)[0])

Выход:

[[7.13999622]
[8. ]]

Выводы

Как можно заметить, нейронная сеть после обучения смогла выдать практически правильный ответ на пример, который не встречала при обучении.

В реализации есть много недочётов, так что если есть комментарии, что исправить — пишите.

© Habrahabr.ru