Пишем свой PyTorch на NumPy. Часть 3. Строим граф вычислений
PyTorch — это мощный и гибкий фреймворк для машинного обучения, широко используемый для создания нейронных сетей. Он особенно популярен благодаря простоте использования, динамическим вычислительным графам и богатой экосистеме инструментов для обучения моделей. Для использования этого фреймворка, часто достаточно поверхностно понимать работу алгоритмов машинного обучения.
Но Андрей Карпаты, известный исследователь в области ИИ, считает, что реализация алгоритмов с нуля позволяет понять их суть и детали работы, что сложно осознать, используя только готовые библиотеки. Это помогает развить интуицию для дальнейшего применения и улучшения методов. Андрей посвящает много собственного времени, чтобы объяснять ключевые принципы работы нейросетей в своих блогах и на своём ютуб-канале. Он также не раз подчеркивал, что на его курсе в Cтэнфорде есть задачи по реализации различных алгоритмов, например, обратное распространение.
Я хотел бы посвятить данную статью этой идеи, потому что мне самому особенно интересно копаться в алгоритмах глубокого обучения. Эта статья продолжение второй статьи
Сегодня мы:
представим аналог
pytorch.tensor()
переведём все вычисления на динамический вычислительный граф
проведём рефакторинг библиотеки
Поехали!
Note!
Перед началом чтения статьи я крайне рекомендую к просмотру нескольких видео по теме вычислительный граф и чтению статьи
Я буду использовать идеи из этой статьи как базу для своих реализаций
Давайте еще раз глянем отрывок нашего последнего кода!
class SimpleConvNet(Module):
def __init__(self):
...
def forward(self, x):
x = self.conv1(x)
x = self.relu(x)
x = self.maxpool1(x)
x = self.conv2(x)
x = self.relu(x)
x = self.maxpool2(x)
x = self.flatten(x)
x = self.linear1(x)
x = self.relu(x)
x = self.linear2(x)
return x
Подумайте, какие потенциальные случаи мы можем упустить из рассмотрения?
Я предложу несколько идей:
Что если мы производим вычисления вне определенных нами слоёв? Например:
class SimpleConvNet(Module):
def __init__(self):
...
def forward(self, x):
x = self.conv1(x)
x = self.relu(x)
x = self.maxpool1(x)
x = self.conv2(x)
x = self.relu(x)
x = x * 2
x = self.maxpool2(x)
x = x ** 2
x = self.flatten(x)
x = self.linear1(x)
x = self.relu(x)
x = x + 1
x = self.linear2(x)
return x
Если вы вглядитесь в метод .backward()
класса CrossEntropyLoss
, то поймете, что мы не умеем обрабатывать случаи когда наши значения изменяются вне классов Linear
, Conv2d
, BatchNorm2d
и т.д. Если запустим градиентный спуск, то он будет работать так, как будто этих промежуточных вычислений не было, что очевидно не приведёт ни к чему хорошему. А если мы еще каким-то образом меняли размер наших матриц, то программа вообще упадёт с ошибкой.
Вспомните, как мы производили алгоритм вычисления градиентов? Мы их считали на бумаге, а потом переписывали в коде. Так было с линейными слоями, свёрточными слоями, слоями нормализации. Но как только вы захотим что-то более сложное реализовать, мы просто утоним в бесконечных вычислениях градиентов! Вот, например, я считал градиенты для
RNN
.И это только самая простая реализация рекуррентных слоев, для LSTM и GRU — у которых ещё более сложные зависимости, я сомневаюсь что-то вообще реально выписать формулы. А использовать их очень хочется! Значит надо что-то придумать!
Нет гибкости! Для каждой операции нам нужно продумать градиент!
Нам хочется, чтобы программа сама считала градиенты! Точно также, как мы возложили вычисление градиентов на метод
.backward()
в слоях, теперь на.backward()
— мы хотим возложить вообще все вычисления!
Все наши проблемы поможет решить граф вычислений!
Computational Graph
Давайте представлять все наши вычисления в виде графа, например!
В узлах будут храниться значения при инициализации либо результат действия операции. Узел в синем квадратике будет хранить в себе информацию, что была произведена операция «сложение», двух чисел a
и b
, и полученный результат — число c
.
Узел в красном квадратике будет хранить в себе информацию, что была произведена операция «умножение», двух чисел b
и c
, и полученный результат — число f
.
Посчитаем производные такой функции.
На самом деле мы просто получили красивую визуализацию chain rule!
Ключевая идея заключается в том, чтобы после каждой операции получать не только значение, но и сразу считать производные.
Например, c
= a
+ b
. Тогда в c
мы будем хранить само значение c
, а также производные от a
и b
, то есть (c
, 1
, 1
)
Для c = a * b
будем хранить (c
, b
, a
), для c = a / b
, будем хранить (c
, 1 / b
, -a / b^2
) — логика я надеюсь стала понятна.
Как только мы определим все базовые операции и их производные, мы сможем производить любые вычисления и брать любые производные от них, потому что мы на каждом этапе считаем свой локальный градиент -, а это задача сильно проще, чем брать градиент от итогового выражения. Пример из жизни. Вы читаете много книг разных жанров. Но вы также хотите, чтобы каждый жанр лежал в своей полке. Что проще — после прочтения каждой книги класть в нужную полку или перебирать целую стопку накопившихся книг? Также например и здесь
Проще посчитать производную всей функции или сначала от x1/x2
, потом от sin(x)
, потом exp(x2)
и в конце просто перемножить их по правилу chain rule?
Заглянем сюда
Переменная (или узел) содержит две части данных:
value — значение переменной.
local_gradients — дочерние переменные и соответствующие «локальные производные».
Функция get_gradients
использует данные из local_gradients
переменных для рекурсивного обхода графа, вычисляя градиенты. (Т. е. local_gradients
содержит ссылки на дочерние переменные, у которых есть свои local_gradients
, которые содержат ссылки на дочерние переменные, у которых есть свои local_gradients
, и так далее.)
Градиент переменной относительно дочерней переменной вычисляется с использованием следующих правил:
Для каждого пути от переменной к дочерней переменной умножьте значения рёбер пути (что даёт
path_value
).Сложите все
path_value
для каждого пути.
… Это даёт частную производную первого порядка переменной относительно дочерней переменной.
Давайте реализуем скелет будущего класса!
class Tensor:
def __init__(self, value, local_gradients=None):
self.value = value
self.local_gradients = local_gradients
Попробуем перезагрузить операцию сложения!
class Tensor:
def __init__(self, value, local_gradients=None):
self.value = value
self.local_gradients = local_gradients
def __add__(self, other):
value = self.value + other.value
local_gradients = ((self, 1), (other, 1))
return Tensor(value, local_gradients)
Посмотрим на работу
a = Tensor(5)
b = Tensor(10)
c = a + b
c, c.value, c.local_gradients
>>> (<__main__.Tensor at 0x7d85aad85810>,
15,
((<__main__.Tensor at 0x7d85aad87160>, 1),
(<__main__.Tensor at 0x7d85aad85660>, 1)))
Попробуем теперь посчитать производные! Добавим метод .backward()
from collections import defaultdict
class Tensor:
def backward(self):
# словарь в котором будем хранить градиенты для всех переменных
gradients = defaultdict(lambda: 0)
# рекурсивно вызываемая функция для вычисления градиентов у детей, потом у их детей и т.д.
def compute_gradients(obj, path_value):
if obj.local_gradients: # проверяем не является ли узел листом (leaf)
# получаем ссылку на ребенка и его предпосчитанный градиент
for child, local_grad_value in obj.local_gradients:
# используем chain rule и умножаем накопленный градиент на градиент child
path_value_to_child = path_value * local_grad_value
# добавляем градиенты от разных листьев
gradients[child] += path_value_to_child
# считаем градиенты для детей текущего child
compute_gradients(child, path_value_to_child)
compute_gradients(self, path_value=1)
return gradients
Смотрим
a = Tensor(5)
b = Tensor(10)
c = a + b
gradients = c.backward()
gradients[a], gradients[b]
>>> (1, 1)
Теперь перегрузим операцию умножения
def __mul__(self, other):
value = self.value * other.value
local_gradients = ((self, other.value), (other, self.value))
return Tensor(value, local_gradients)
Обратите внимание, в качестве производной по первому объекту, будет значение второго и наоборот!
a = Tensor(4)
b = Tensor(3)
c = a + b # = 4 + 3 = 7
d = a * c # = 4 * 7 = 28
gradients = d.backward()
print('d.value =', d.value)
print("The partial derivative of d with respect to a =", gradients[a])
>>> d.value = 28
The partial derivative of d with respect to a = 11
print('gradients[b] =', gradients[b])
print('gradients[c] =', gradients[c])
>>> gradients[b] = 4
gradients[c] = 4
Посмотрим на промежуточные градиенты
print('dict(d.local_gradients)[a] =', dict(d.local_gradients)[a])
print('dict(d.local_gradients)[c] =', dict(d.local_gradients)[c])
print('dict(c.local_gradients)[a] =', dict(c.local_gradients)[a])
print('dict(c.local_gradients)[b] =', dict(c.local_gradients)[b])
>>> dict(d.local_gradients)[a] = 7
dict(d.local_gradients)[c] = 4
dict(c.local_gradients)[a] = 1
dict(c.local_gradients)[b] = 1
Всё верно, можете перепроверить на бумаге!
Добавим еще несколько базовых операций!
# вычитание
def __sub__(self, other):
value = self.value - other.value
local_gradients = ((self, 1), (other, -1))
return Tensor(value, local_gradients)
# унарный минус
def __neg__(self):
value = -self.value
local_gradients = ((self, -1),)
return Tensor(value, local_gradients)
# деление
def __truediv__(self, other):
value = self.value / other.value
local_gradients = ((self, 1 / other.value), (other, - self.value / (other.value**2)))
return Tensor(value, local_gradients)
Посчитаем производную по аргументам более сложной функции
def f(a, b):
return (a / b - a) * (b / a + a + b) * (a - b)
a = Tensor(230.3)
b = Tensor(33.2)
y = f(a, b)
gradients = y.backward()
print("The partial derivative of y with respect to a =", gradients[a])
print("The partial derivative of y with respect to b =", gradients[b])
>>> The partial derivative of y with respect to a = -153284.83150602411
The partial derivative of y with respect to b = 3815.0389441500956
Мы можем использовать численные оценки, чтобы проверить правильность получаемых результатов.
delta = Tensor(1e-10)
numerical_grad_a = (f(a + delta, b) - f(a, b)) / delta
numerical_grad_b = (f(a, b + delta) - f(a, b)) / delta
print("The numerical estimate for a =", numerical_grad_a.value)
print("The numerical estimate for b =", numerical_grad_b.value)
>>> The numerical estimate for a = -153258.44287872314
The numerical estimate for b = 3837.0490074157715
Добавим еще несколько базовых математических функций
# возведение в степень
def __pow__(self, power):
value = self.value ** power
local_gradients = ((self, power * (self.value**(power-1))),)
return Tensor(value, local_gradients)
@classmethod
def sin(cls, obj):
value = np.sin(obj.value)
local_gradients = ((obj, np.cos(obj.value)),)
return Tensor(value, local_gradients)
@classmethod
def cos(cls, obj):
value = np.cos(obj.value)
local_gradients = ((obj, -np.sin(obj.value)),)
return Tensor(value, local_gradients)
@classmethod
def exp(cls, obj):
value = np.exp(obj.value)
local_gradients = ((obj, value),)
return Tensor(value, local_gradients)
@classmethod
def log(cls, a):
value = np.log(a.value)
local_gradients = (
('log', a, lambda x: x * 1. / a.value),
)
return Tensor(value, local_gradients)
И ещё раз проверим!
def f(a, b):
return ((a ** 2) / Tensor.sin(b) - a) * (b / a + Tensor.cos(a) + b) * (a - Tensor.exp(b))
a = Tensor(230.3)
b = Tensor(33.2)
y = f(a, b)
gradients = y.backward()
print("The partial derivative of y with respect to a =", gradients[a])
print("The partial derivative of y with respect to b =", gradients[b])
delta = Tensor(1e-10)
numerical_grad_a = (f(a + delta, b) - f(a, b)) / delta
numerical_grad_b = (f(a, b + delta) - f(a, b)) / delta
print("The numerical estimate for a =", numerical_grad_a.value)
print("The numerical estimate for b =", numerical_grad_b.value)
>>> The partial derivative of y with respect to a = -1.5667411882581273e+19
The partial derivative of y with respect to b = -5.795077766989229e+20
The numerical estimate for a = -1.566703616e+19
The numerical estimate for b = -5.79518464e+20
Круто! Мы с вами только что реализовали одну из самых фундаментальных идей в нейронных сетях — граф вычислений. Дальше мы будем опираться полностью на него, всё что нам останется сделать с полученным классом Tensor
— это постепенно усложнять его добавляя новые операции и функционал!
Как вы помните, буквально ВСЁ в нейронках — это произведения матриц. Реализуем её в нашем новом классе. Производные мы уже знаем! Если ,
Для вычисления производной по матрице (где ):
Таким образом:
Аналогично для производной по матрице :
Но есть одна загвоздочка! Посмотрим ещё раз на реализацию метода .backward()
for child, local_grad_value in obj.local_gradients:
path_value_to_child = path_value * local_grad_value
gradients[child] += path_value_to_child
compute_gradients(child, path_value_to_child)
Для получения нового значения, мы умножаем path_value
и local_grad_value
, проблема в том, что в случае матриц нам нужен не оператор *
, а оператор @
. Можно конечно обрабатывать каждый случай отдельно, но предлагаю поступить более умно. Покажу на примере:
def __add__(self, other):
value = self.value + other.value
local_gradients = ((self, lambda x: x), (other, lambda x: x))
return Tensor(value, local_gradients=local_gradients)
Давайте хранить в local_gradients
— не само значение производной, а функцию, которая будет получать значение с предыдущего шага и преобразовывать его для следующего шага. В данном случае функция получает x
и возвращает также x
, так как производная равна 1
. Для вычитания
def __sub__(self, other):
value = self.value + other.value
local_gradients = ((self, lambda x: x), (other, lambda x: -x))
return Tensor(value, local_gradients=local_gradients)
Для первого слагаемого она получает x
и возвращает также x
, а для второго она получает x
, а возвращает -x
, так как производная равна -1
. Также предлагаю хранить название операции, это будет очень полезно для отладки!
def __add__(self, other):
value = self.value + other.value
local_gradients = (('add', self, lambda x: x), ('add', other, lambda x: x))
return Tensor(value, local_gradients=local_gradients)
Итак, матричное умножение будет выглядеть в конечном итоге так!
def __matmul__(self, other):
value = self.value @ other.value
local_gradients = (('matmul', self, lambda x: x @ other.value.T), ('matmul', other, lambda x: self.value.T @ x))
return Tensor(value, local_gradients=local_gradients)
И метод .backward()
тоже немного преобразится с учётом наших изменений.
for operation, child, child_gradient_func in obj.local_gradients:
# child_gradient_func как раз та самая lambda функция
path_value_to_child = child_gradient_func(path_value)
gradients[child] += path_value_to_child
compute_gradients(child, path_value_to_child)
Теперь мы готовы переходить к нейронным слоям. Нужно будет немного перестроить логику!
Вернемся к самому первому примеру с «изображением» собаки. Теперь это будет не объект numpy.ndarray
, а объект нашего нового класса Tensor
input_x = np.array([[ 0.99197708, -0.77980023, -0.8391331 , -0.41970686, 0.72636492],
[ 0.85901409, -0.22374584, -1.95850625, -0.81685145, 0.96359871],
[-0.42707937, -0.50053309, 0.34049477, 0.62106931, -0.76039365],
[ 0.34206742, 2.15131285, 0.80851759, 0.28673013, 0.84706839],
[-1.70231094, 0.36473216, 0.33631525, -0.92515589, -2.57602677]])
target_x = [1.0, 0.0]
input_tensor = Tensor(input_x)
target_tensor = Tensor(target_x)
Класс Module
и Linear
оставим такими же, только теперь веса модели это также объекты классаTensor
class Module:
def __init__(self):
self._constructor_Parameter = ParameterObj()
global Parameter
Parameter = self._constructor_Parameter
def forward(self):
pass
def __call__(self, x):
return self.forward(x)
def parameters(self):
return self
class Linear:
def __init__(self, input_channels: int, output_channels: int, bias = True):
self.input_channels = input_channels
self.output_channels = output_channels
self.bias_flag = bias
self.backward_list = []
# теперь объекты класса Tensor
self.weight = Tensor(np.random.uniform(- 0.5, 0.5, size=(self.input_channels, self.output_channels)))
self.bias = Tensor(np.random.uniform(- 0.5, 0.5, size=self.output_channels) * bias)
Parameter([self, self.weight, self.bias])
def __call__(self, x: Tensor):
self.x = Tensor(x)
result = x @ Parameter.calling[self][0] + Parameter.calling[self][1]
return result
Давайте при инициализации объекта класса Tensor
, проверять является ли он объектом этого же класса или является объектом другого класса (число или numpy.ndarray
) и переводить его в объект numpy.ndarray
.Также добавим информацию о форме объекта в атрибут shape
class Tensor:
def __init__(self, value, local_gradients=None):
if isinstance(value, Tensor):
self.value = value.value
self.local_gradients = value.local_gradients
else:
self.value = np.array(value)
self.local_gradients = local_gradients
self.shape = self.value.shape
Соберём модель
class SimpleNet(Module):
def __init__(self):
super().__init__()
self.linear1 = Linear(input_channels=5, output_channels=10, bias=True)
def forward(self, x):
return self.linear1(x)
model = SimpleNet()
model(input_tensor).shape
>>> (5, 10)
Отлично, модель выдаёт значения! Добавим ещё несколько полезных команд!
Метод reshape
, он нам пригодится, так как мы часто будем менять размеры наших тензоров:
def reshape(self, *args):
local_gradients = (('reshape', self, lambda x: x.reshape(self.shape)),)
return Tensor(self.value.reshape(*args), local_gradients=local_gradients)
Отображение:
сейчас вывод нашего тензоры выглядит так, не очень красиво и информативно: <__main__.Tensor at 0x7c2122fce0e0>
Давайте поправим
def __repr__(self):
return np.array_repr(self.value)
Теперь: >>> array(4)
Добавим ещё несколько полезных команд:
# Создать тензор нулей. В целом полезный метод для инициализации тензора
@classmethod
def zeros(cls, shape):
return cls(np.zeros(shape))
# Cоздать тензор нормального распределения. Очень часто используется для инициализации весов
@classmethod
def randn(cls, shape):
return cls(np.random.normal(size=shape))
# Определить знак для каждого значения, будем использовать в relu
@classmethod
def sign(cls, a):
value = np.sign(a.value)
return cls(value)
# поможет перевести 5.0 в 5
@classmethod
def int_(cls, *args):
return cls(np.int_(*args))
# Тоже полезный метод для инициализации последовательности чисел
@classmethod
def arange(cls, *args):
return cls(np.arange(*args))
# Суммирование, одна из самых важных функций. Почти везде используется
@classmethod
def sum(cls, array, axis=None, keepdims=False):
if not keepdims: # Не хотим сохранить размерность
if axis is not None:
local_gradients = (('sum', array, lambda x: np.expand_dims(np.array(x), axis=axis) + np.zeros(array.shape)),)
return Tensor(np.sum(array.value, axis=axis), local_gradients=local_gradients)
else:
local_gradients = (('sum', array, lambda x: x + np.zeros(array.shape)),)
return Tensor(np.sum(array.value, axis=axis), local_gradients=local_gradients)
else: # Хотим сохранить размерность
value = np.sum(array.value, axis=axis, keepdims=True) * np.ones_like(array.value)
local_gradients = (('sum', array, lambda x: x),)
return cls(value, local_gradients=local_gradients)
# код может быть немного запутанным из-за того, что нужно учесть разную размерность матриц при расчете градиентов
# классический уже знакомый нам softmax
@classmethod
def softmax(cls, z, axis=-1,):
return cls.exp(z) / cls.sum(cls.exp(z), axis=axis, keepdims=True)
Попробуем более сложную модель, немного поменяв наши слои
class Flatten:
def __init__(self):
Parameter([self, []])
def __call__(self, x):
self.init_shape = x.shape
return x.reshape(self.init_shape[0], -1)
class ReLU:
def __init__(self):
pass
def __call__(self, x):
return x * (Tensor.sign(x) + 1) / 2
class SimpleNet(Module):
def __init__(self):
super().__init__()
self.linear1 = Linear(input_channels=25, output_channels=10, bias=True)
self.linear2 = Linear(input_channels=10, output_channels=2, bias=True)
self.flatten = Flatten()
self.relu = ReLU()
def forward(self, x):
x_1 = self.flatten(x)
x_2 = self.linear1(x_1)
x_3 = self.relu(x_2)
x_4 = self.linear2(x_3)
return x_4
model = SimpleNet()
model(input_tensor.reshape(1, -1))
>>> array([[ 0.2440679 , -1.75806267]])
Круто! Всё работает, осталось обучить! Дальше считаем значение loss
-функции.
class CrossEntropyLoss:
def __init__(self):
self.predicted = None
self.true = None
def __call__(self, logits, true):
predicted = Tensor.exp(logits) / Tensor.sum(Tensor.exp(logits), axis=1).reshape(-1, 1) # softmax
self.true = true
# вычисляем значение лосс-функции прямо по формуле
self.loss = Tensor.sum(self.true * Tensor.log(predicted + 1e-5), axis=1) * -1
return self
Заметьте, эта та же самая реализация, что и в прошлых статьях, но мы поменяли np
на Tensor
.
loss = loss_fn(model(input_tensor.reshape(1, -1)), target_tensor)
loss.loss
>>> array([0.2581563])
Что ж, мы получили значение и мы уже знаем, что можем вызвать .backward()
прямо с тензора loss.loss
, чтобы посчитать все градиенты (спойлер: не сможем, у нас вылетит ошибка)! Но открою для вас небольшой секретик. Градиент для кросс-энтропии + softmax
можно считать не через граф, а через формулу. Вот так вот! Мы убегали от формульных вычислений, а сами же вернулись к ним. Но здесь это оправдано, ведь вспомните какая там простая производная получается, а значит мы может сделать небольшой трюк
Для него нам потребуется добавить метод detach
— он вытаскивает тензор из графа. То есть это просто матрица значений.
def detach(self):
return Tensor(self.value)
class CrossEntropyLoss:
def __call__(self, predicted, true):
### сохраним значения выхода модели
self.logits = Tensor(predicted, local_gradients=predicted.local_gradients)
###
self.predicted = Tensor.softmax(predicted) # softmax
#number_of_classes = predicted.shape[1]
#self.true = Tensor.int_(Tensor.arange(0, number_of_classes) == true)
self.true = true
# вычисляем значение лосс-функции прямо по формуле
self.loss = Tensor.sum(self.true * Tensor.log(self.predicted + 1e-5), axis=1) * -1
return self
def backward(self):
# Посчитаем градиент по формуле
self.analytics = (self.predicted - self.true)
# Вытащим из графа, то есть по факту просто получим значения и домножим на self.logits, который всё еще находится в графе.
self.analytics = self.analytics.detach() * self.logits
self.gradients = self.analytics.backward()
То есть мы с помощью self.analytics
подменили вычисление производной внутри графа. А домножив self.analytics
наself.logits
, мы вернулись в граф, который был еще до применения softmax
и кросс-энтропии, и уже отсюда можем честно считать градиенты внутри графа!
Ещё раз: self.logits.backward()
— посчитает градиенты для графа, в котором нет softmax
+ кросс-энтропии, а ((self.predicted - self.true).detach() * self.logits).backward()
— также посчитает градиенты для графа, в котором нет softmax
+ кросс-энтропии, но при этом неявно учтёт их существование за счет множителя (self.predicted - self.true).detach()
Получаем
loss.backward()
loss.gradients[model.linear1.weight].shape, loss.gradients[model.linear1.bias].shape
>>> ((25, 10), (1, 10))
Теперь давайте снова ручками сделаем градиентный спуск и обучим модель!
model = SimpleNet()
loss_fn = CrossEntropyLoss()
lr = 0.01
for i in range(100):
output = model(input_tensor.reshape(1, -1))
loss = loss_fn(output, target_tensor)
loss.backward()
gradients = loss.gradients
for layer in [model.linear1, model.linear2]:
layer.weight.value = layer.weight.value - lr * gradients[layer.weight]
layer.bias.value = layer.bias.value - lr * gradients[layer.bias]
if i % 10 == 0:
print(loss.loss)
>>> array([1.29812516])
array([0.46082039])
array([0.21713806])
array([0.13151886])
array([0.0906402])
array([0.06659202])
array([0.05139489])
array([0.04118782])
array([0.03398361])
array([0.0286905])
Ура. Наша модель обучается! Идем дальше и запихнём градиентный спуск в уже знакомый нам SGD
class Tensor:
def __init__(self, value, local_gradients=None):
self.shape = self.value.shape
self.grad = 0
class CrossEntropyLoss
def backward(self):
self.analytics = (self.predicted - self.true)
self.analytics = self.analytics.detach() * self.logits
self.gradients = self.analytics.backward()
global Parameter
for index, layer in enumerate(Parameter.layers[::-1]):
if type(layer).__name__ == 'Linear':
layer.weight.grad += self.gradients[layer.weight] / self.loss.shape[0]
layer.bias.grad += self.gradients[layer.bias] / self.loss.shape[0]
class SGD:
def __init__(self, model, lr=2e-4):
self.model = model
self.lr = lr
def step(self):
for index, layer in enumerate(self.model._constructor_Parameter.layers[::-1]):
if type(layer).__name__ == 'Linear':
layer.weight.value -= self.lr * layer.weight.grad
layer.bias.value -= self.lr * layer.b.grad.mean(axis=0)
Но посмотрите сюда
layer.weight.grad += self.gradients[layer.weight] / self.loss.shape[0]
layer.bias.grad += self.gradients[layer.bias] / self.loss.shape[0]
Это же накопление градиентов! А разве оно нам нужно? Нет! Значит нам нужно обнулять накопленные градиенты в каждой операции, для этого введём новый метод .zero_grad()
class Module:
def zero_grad(self):
for index, layer in enumerate(self._constructor_Parameter.layers):
if type(layer).__name__ == 'Linear':
layer.weight.grad = 0
layer.bias.grad = 0
Обучаем!
model = SimpleNet()
loss_fn = CrossEntropyLoss()
optim = SGD(model.parameters(), lr=1e-3)
lr = 0.001
for i in range(100):
output = model(input_tensor.reshape(1, -1))
loss = loss_fn(output, target_tensor)
model.zero_grad()
loss.backward()
optim.step()
if i % 10 == 0:
print(loss.loss)
>>> array([0.51065697])
array([0.15970178])
array([0.01386941])
array([0.00090227])
array([4.67924761e-05])
array([-6.95378636e-06])
array([-9.88413122e-06])
array([-9.99684238e-06])
array([-9.99989122e-06])
array([-9.99994927e-06])
Круто! Мы обучили нашу первую нейронку на графе вычислений!
NOTE!
В следующем блоке я буду рассказывать реализацию свёрточной нейронки на графе вычислений. По итогу она работает, но не обучается. Ошибку я не успел найти, но очень постараюсь отладить код и дополнить статью. Я решил оставить эту часть, так как хотел донести именно идейную составляющую моего рассказа. И пусть обучить модель не получится, я надеюсь понимание происходящего у читателя останется!
Conv2d
Оказывается, NumPy позволяет провести свертку при помощи обычного перемножения матриц. Для этого используется numpy.lib.stride_tricks.sliding_window_view
Функция numpy.lib.stride_tricks.sliding_window_view
в NumPy используется для создания представления массивов с окнами скользящих данных. Это полезный инструмент для анализа временных рядов, вычисления свёрток и других операций, где требуется работать с подмножествами данных в скользящем окне. В результате каждого окно у нас представимо в вытянутого вектора.
Например, для картинки (2, 5, 5)
и фильтра (2, 3, 3)
получим представление в виде (3, 3, 2, 3, 3)
, и для расчета свёртки для позиции (i, j)
, возьмём [i][j]
, вытянем в вектор, [i][j].reshape(-1)
и умножим на вытянутый вектор фильтра [i][j].reshape(-1) @ kernel.reshape(-1)
Проверим
image = np.array([[0, 50, 0, 29],
[0, 80, 31, 2],
[33, 90, 0, 75],
[0, 9, 0, 95]
])
kernel = np.ones((3, 3))
v = sliding_window_view(image, (kernel.shape[0], kernel.shape[1]), axis=(-1, -2))
*not_used, a, b = v.shape
v = v.reshape(*not_used, -1)
kernel_s = kernel.reshape(-1)
result = v @ kernel_s
np.allclose(result, scipy.signal.fftconvolve(image, kernel, mode='valid'))
>>> True
Круто! Усложним задачу
image = np.random.randn(3, 7, 7)
kernel = np.ones((3, 3, 3))
v = sliding_window_view(image, kernel.shape, axis=(-1, -2, -3))
*not_used, a, b, c= v.shape
v = v.reshape(*not_used, -1)
kernel_s = kernel.reshape(-1)
result = v @ kernel_s
np.allclose(result, scipy.signal.fftconvolve(image, kernel, mode='valid'))
>>> True
Продолжаем
image = np.random.randn(10, 3, 7, 7)
kernel = np.ones((1, 3, 3, 3))
v = sliding_window_view(image, kernel.shape[1:], axis=(-1, -2, -3))
*not_used, a, b, c = v.shape
v = v.reshape(*not_used, -1)
kernel_s = kernel.reshape(-1)
result = v @ kernel_s
np.allclose(result, scipy.signal.fftconvolve(image, kernel, mode='valid'))
>>> True
И заключительная проверка с нашей реализацией из предыдущей статьи
image = np.random.randn(11, 1, 4, 7, 7)
kernel = np.random.randn(1, 5, 4, 3, 3)
i = image_.shape[-1]
f = kernel_.shape[-1]
padding = 0
step = 1
m = (i-f + 2*padding) // step +1
number_of_kernels = kernel_.shape[1]
number_of_images = image_.shape[0]
new_image = np.zeros((number_of_images, number_of_kernels, m, m))
for image_n in range(number_of_images):
for kernel_n in range(number_of_kernels):
for y in range(m):
for x in range(m):
start_x = x * step
end_x = start_x + f
start_y = y * step
end_y = start_y + f
new_image[image_n][kernel_n][y][x] = np.sum(image_[image_n, 0, :, start_y:end_y, start_x:end_x] * kernel_[0, kernel_n])
kernel = kernel.squeeze(axis=0)
image = image.squeeze(axis=1)
num_images, matrix_z, matrix_y, matrix_x = image.shape
num_kernels, kernel_z, kernel_y, kernel_x = kernel.shape
result_x, result_y = matrix_x - kernel_x + 1, matrix_y - kernel_y + 1
new_matrix = sliding_window_view(image, (1, kernel_z, kernel_y, kernel_x))
new_kernel = kernel.transpose(1, 2, 3, 0)
result = new_matrix.reshape(num_images, -1, kernel_z * kernel_y * kernel_x) @ new_kernel.reshape(-1, num_kernels)
result = result.transpose(0, 2, 1)
result = result.reshape(num_images, num_kernels, result_y, result_x)
np.allclose(result, new_image)
>>> True
Отлично! Мы научились проводить свёртку с помощью матричного перемножения, теперь добавим эту операцию в наш класс и определим для неё производную!
class Tensor:
@classmethod
def sliding_window_view(cls, matrix, kernel_z, kernel_y, kernel_x):
result = np.lib.stride_tricks.sliding_window_view(matrix.value, (1, kernel_z, kernel_y, kernel_x)).copy()
def multiply_by_locgrad(path_value):
temp = np.zeros(matrix.shape)
np.add.at(np.lib.stride_tricks.sliding_window_view(temp, (1, kernel_z, kernel_y, kernel_x), writeable=True), None, path_value)
return temp
local_gradients = (('slide', matrix, multiply_by_locgrad),)
return cls(result, local_gradients=local_gradients)
Используется метод
np.add.at
, который позволяет эффективно добавлять значения в массивtemp
на основеpath_value
.Для работы с «окнами» в массиве используется ещё одно представление
sliding_window_view
с параметромwriteable=True
, что позволяет модифицировать данные.
Как вы также могли увидеть, что мы несколько раз использовали операцию .transpose()
, но не определили её в классе, исправим!
def transpose(self, *args):
local_gradients = (('transpose', self, lambda x: x.transpose(*args)),)
return Tensor(self.value.transpose(*args), local_gradients=local_gradients)
Наконец переопределим класс Conv2d
с учётом новых знаний
class Conv2d:
def __init__(self, input_channels: int, output_channels: int, kernel_size: int, bias = True):
self.param = None
self.bias_flag = bias
self.input_channels = input_channels
self.kernel_size = (input_channels, kernel_size, kernel_size)
self.n_filters = output_channels
self.weight = Tensor.randn((self.n_filters, input_channels, kernel_size, kernel_size), )
self.bias = Tensor.randn((self.n_filters, 1, 1))
self.weight.value *= 1e-2 # уменьшаем для стабильности
self.bias.value *= 1e-2
Parameter([self, self.weight, self.bias])
def __call__(self, x):
matrix = x
kernel = self.weight
num_images, matrix_z, matrix_y, matrix_x = matrix.shape
num_kernels, kernel_z, kernel_y, kernel_x = kernel.shape
result_x, result_y = matrix_x - kernel_x + 1, matrix_y - kernel_y + 1
new_matrix = Tensor.sliding_window_view(matrix, kernel_z, kernel_y, kernel_x)
tranposed_kernel = kernel.transpose(1, 2, 3, 0)
result = new_matrix.reshape(num_images, -1, kernel_z * kernel_y * kernel_x) @ tranposed_kernel.reshape(-1, num_kernels)
result = result.transpose(0, 2, 1)
return result.reshape(num_images, num_kernels, result_y, result_x) + self.bias
Собираем модель для обучения на MNIST
. Код для подготовки данных возьмём из прошлой статьи.
А вот так примерно будет выглядеть вычисление градиентов!
mul child shape: (64, 10) obj shape: (64, 10)
mul child shape: (64, 10) obj shape: (64, 10)
add child shape: (64, 10) obj shape: (64, 10)
matmul child shape: (64, 50) obj shape: (64, 10)
div child shape: (64, 50) obj shape: (64, 50)
mul child shape: (64, 50) obj shape: (64, 50)
add child shape: (64, 50) obj shape: (64, 50)
matmul child shape: (64, 1296) obj shape: (64, 50)
reshape child shape: (64, 4, 18, 18) obj shape: (64, 1296)
div child shape: (64, 4, 18, 18) obj shape: (64, 4, 18, 18)
mul child shape: (64, 4, 18, 18) obj shape: (64, 4, 18, 18)
add child shape: (64, 4, 18, 18) obj shape: (64, 4, 18, 18)
reshape child shape: (64, 4, 324) obj shape: (64, 4, 18, 18)
transpose child shape: (64, 324, 4) obj shape: (64, 4, 324)
matmul child shape: (64, 324, 36) obj shape: (64, 324, 4)
reshape child shape: (64, 1, 18, 18, 1, 4, 3, 3) obj shape: (64, 324, 36)
slide child shape: (64, 4, 20, 20) obj shape: (64, 1, 18, 18, 1, 4, 3, 3)
div child shape: (64, 4, 20, 20) obj shape: (64, 4, 20, 20)
mul child shape: (64, 4, 20, 20) obj shape: (64, 4, 20, 20)
add child shape: (64, 4, 20, 20) obj shape: (64, 4, 20, 20)
reshape child shape: (64, 4, 400) obj shape: (64, 4, 20, 20)
transpose child shape: (64, 400, 4) obj shape: (64, 4, 400)
matmul child shape: (64, 400, 36) obj shape: (64, 400, 4)
reshape child shape: (64, 1, 20, 20, 1, 4, 3, 3) obj shape: (64, 400, 36)
slide child shape: (64, 4, 22, 22) obj shape: (64, 1, 20, 20, 1, 4, 3, 3)
class SimpleConvNet(Module):
def __init__(self):
super().__init__()
self.conv1 = Conv2d(input_channels = 1, output_channels = 5, kernel_size=5) #28 -> 24
self.conv2 = Conv2d(input_channels = 5, output_channels = 10, kernel_size=5) #24 -> 20
self.conv3 = Conv2d(input_channels = 10, output_channels = 20, kernel_size=5) #20 -> 16
self.conv4 = Conv2d(input_channels = 20, output_channels = 20, kernel_size=5) #16 -> 12
self.conv5 = Conv2d(input_channels = 20, output_channels = 20, kernel_size=5) #12 -> 8
self.conv6 = Conv2d(input_channels = 20, output_channels = 10, kernel_size=5) #8 -> 4
self.flatten = Flatten()
self.linear1 = Linear(input_channels= 4 * 4 * 10, output_channels=20, bias=True)
self.linear2 = Linear(input_channels= 20, output_channels=10, bias=True)
self.relu = ReLU()
def forward(self, x):
x = self.conv1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.relu(x)
x = self.conv3(x)
x = self.relu(x)
x = self.conv4(x)
x = self.relu(x)
x = self.conv5(x)
x = self.relu(x)
x = self.conv6(x)
x = self.relu(x)
x = self.flatten(x)
x = self.linear1(x)
x = self.relu(x)
x = self.linear2(x)
return x
model = SimpleConvNet()
loss_fn = CrossEntropyLoss()
optim = SGD(model.parameters(), lr=1e-3)
for i in range(5):
y_pred_list = []
y_true_list = []
for index, batch in enumerate(data_loader):
input_x, target = batch
input_x = input_x / 255
input_x = np.expand_dims(input_x, axis=1) # (64, 28, 28) -> (64, 1, 28, 28)
input_tensor = Tensor(input_x)
target_tensor = Tensor(target)
output = model(input_tensor)
loss = loss_fn(output, target_tensor)
model.zero_grad()
loss.backward()
optim.step()
print(loss.loss.value.mean())
>>> 2.3434739196082752
2.3261346480555405
2.3450367034537822
2.328755621690293
2.290884864380055
2.3062695760361183
2.312287414927344
2.3049557593729144
2.2829010337160796
Не обучается
Но надеюсь вы хотя бы поняли идею!
Я опустил очень много моментов, например добавление __hash__
, __eq__
, работу с градиентами внутри оптимизатора, проверка совпадения размерностей тензоров, обработку broadcasting
для всех операций. Все они не несут большой идейной составляющей, но безусловно необходимы для корректной работы всех алгоритмов. Я не стал зацикливать на этом внимание и надеюсь вы поймете меня!
КУЛЬМИНАЦИЯ
Итак, вспоминаем самый первый блок кода из первой статьи!
# Создаем простой набор данных
X = torch.randn(100, 3) # 100 примеров с 3 признаками
y = torch.randint(0, 2, (100,)) # 100 меток классов (0 или 1)
# Определим простую нейронную сеть
class SimpleNN(nn.Module):
def __init__(self):
super(SimpleNN, self).__init__()
self.fc1 = nn.Linear(3, 5) # Первый слой: 3 входа, 5 выходов
self.fc2 = nn.Linear(5, 2) # Второй слой: 5 входов, 2 выхода (классы)
self.softmax = nn.Softmax(dim=1) # Для получения вероятностей классов
def forward(self, x):
x = torch.relu(self.fc1(x)) # Применяем активацию ReLU
x = self.fc2(x) # Второй слой
x = self.softmax(x) # Преобразуем в вероятности
return x
# Создаем модель
model = SimpleNN()
# Определяем функцию потерь и оптимизатор
criterion = nn.CrossEntropyLoss() # Кросс-энтропия для многоклассовой классификации
optimizer = optim.SGD(model.parameters(), lr=0.01) # Стохастический градиентный спуск
# Обучаем модель
num_epochs = 100
for epoch in range(num_epochs):
# Прямой проход
outputs = model(X)
# Вычисление потерь
loss = criterion(outputs, y)
# Обратный проход
optimizer.zero_grad() # Обнуляем градиенты
loss.backward() # Вычисляем градиенты
optimizer.step() # Обновляем параметры модели
Смотрим и понимаем: Мы с вами разобрались в каждой строчке этого кода. Как я и обещал, собрав знания я одну библиотеку, мы заменим наконец
import torch
import torch.nn as nn
на
import <наша библиотека>
import <наша библиотека>.nn as nn
А так я могу сделать например со своей библиотекой
import candle
import candle.nn as nn
Основная часть моего рассказа подошла к концу. Я надеюсь, что смог достаточно понятно пояснить за работу алгоритмов глубокого обучения и библиотеки PyTorch! Спасибо за внимание!
В следующей финальной статье, я хочу уже воспользовавшись собственной написанной библиотекой реализовать и запустить обученный GPT2, тем самым показав, что мы в достаточной степени овладели мастерством машинного обучения!
Первая версия библиотеки
Вторая версия библиотеки
GPT-2 на этой библиотеке