Пишем свой Transformer
Чтобы поупражняться я решила более детально разобраться и попробовать самостоятельно написать Transformer на PyTorch. Результатом захотелось поделиться здесь. Надеюсь, так же как и мне, это поможет доразобраться в данной архитектуре и ответить на какие-то вопросы.
Оставляю ссылку на свой канал: not_magic_neural_networks
0 Intro
Впервые архитектуру трансформер предложили использовать в 2017 году в статье Google «Attention is all you need» (именно для задачи машинного перевода). Вскоре, Google начал впервые в истории использовать нейросети для перевода (в google translate).
Архитектура оказалась настолько эффективна, что ее адаптировали и под другие задачи, а весь NLP начал так активно развиваться именно с 2017 года.
Итак, transformer — архитектура, основа которой механизм внимания attention. Он так же, как и RNN, состоит из encoder и decoder части.
Основная статья: «Attention is all you need»
Пост с разбором: Transformer
Еще можно почитать тут: 6.3. Трансформеры
Или посмотреть: Лекция. Архитектура Transformer. Decoder, QKV Attention
Далее, я буду полагать, что читатели немного знакомы с transformer и буду рассуждать больше в контексте реализации.

1 Multi-head Attention
An Explanation of Self-Attention mechanism in Transformer
What is Multi-Head Attention (MHA)
PyTorch: MultiheadAttention
Начнем с главной части трансформера — Multi-head Attention
.
Multi-head Attention присутствует в трех местах:
Self Multi-head Attention в энкодере
Masked Multi-head Attention в декодере
Multi-head Attention, соединяющий энкодер и декодер части (так же в декодере)

1.1. QKV — attention
Attention (или механизм внимания) несет информацию о связях слов друг с другом.
QKV-attention — один из способов вычисления связей:
Понятия Q(query)
, K(key)
и V(value)
пришли из поиска:
Q — запрос, K — краткая сводка по документу и V — сам документ. Задача состоит в том чтобы по Q и K понять насколько документ релевантен (какой V выбирать).
В attention, Q, K и V были придуманы для того чтобы отобразить вход x
в три линейный пространства со следующими смыслами: Q отобразит слово в пространство «откуда»; K отобразит в пространство «куда»; V отобразит в пространство в «что важно».
Пусть на вход нам приходит тензор x
.
Получаем матрицы запросов (Q), ключей (K) и значений (V), пропуская эмбеддинги x
через линейные слои W_q, W_k и W_v.q = x * W_q
k = x * W_k
v = x * W_v
Интуиция такая: query-вектор спрашивает у key-вектора, есть ли у него какая-то полезная информация чтобы обновить информацию о себе. То есть, когда мы делаем перемножение q ✕ k.T
, то мы фильтруем нужную информацию (выбираем наиболее релевантную) из k для q.
Считаем релевантность какrelevance = q @ k.T / math.sqrt(head_size)
, где math.sqrt(head_size)
— нормировочная константа.
Получаем вероятности: relevance = softmax(relevance)
И считаем выход: head = relevance @ v
1.2. Multi-head Attention
Одну операцию attention(Q, K, V)
принято называть «головой». Именно поэтому переменная выше была названа head
. Multi-head Attention подразумевает что таких голов будет несколько и их количество будет определяться гиперпараметром num_head
.
Подразумевается, что разные головы attention могли бы обращать внимание на разные типы связей между словами. Это и есть multi-head attention или attention с несколькими головами.
Multi-head attention — конкатенация результатов нескольких отдельных attention операций (), отображенная в заданную размерность с помощью еще одного линейного слоя.
1.3. Encoder Multi-Head Attention (или Self)
Self-attention работает только с вектором энкодера, то есть энкодер учится обращать внимание сам на себя (потому self). Его идея заключается в следующем: self-attention обновляет embedding-и каждого токена, добавляя в них полезную информацию на основе контекста (всех остальных токенов в предложении) в котором эти embedding-и находятся.

1.4 Encoder-Decoder Multi-Head Attention
Multi-head Attention соединяющий энкодер и декодер трансформера должен учитывать, что размерность Q (пришедшей из декодера) может отчитаться от V и K (пришедшей из энкодера).

1.5. Decoder Multi-head Attention (или Masked)
Последнее место, где используется Multi-head Attention:

Изначально, Transformer применялся к задачу перевода текста, в которой выходы (собственно перевод) должны генерироваться авторегрессионно, слово за словом.
Однако, Masked Multi-Head Attention реализует более хитрый вариант: к attention (матрице, которая отвечает «кто на кого смотрит») применяется авторегрессивная маска, обращаюая в −∞ веса до softmax для токенов из будущего, чтобы после softmax их вероятности стали нулевыми. Эта маска имеет нижнетреугольный вид (левый нижний треугольник включая главную диагональ).
Таким образом, каждый элемент смотрит только на те элементы, которые были в прошлом и не заглядывает в будущее. Это позволяет подавать данные не авторегрессионно, а все сразу.
При этом, на инференсе Masked Multi-Head Attention работает как обычный Multi-Head Attention.
В функции Multi-Head Attention применим к relevance маску, если она задана, до функции softmax: в тех местах, где маска содержит True, ставим -inf, чтобы после применения softmax значение в этой ячейке занулилось (так как экспонента возведенная в степень -inf будет равняться нулю).
1.6. Пишем Multi-head Attention
class MultiHeadAttention(nn.Module):
def __init__(self, input_size, head_size, num_heads, out_size, query_input_size=None):
super(MultiHeadAttention, self).__init__()
self.input_size = input_size
self.head_size = head_size
self.num_heads = num_heads
self.out_size = out_size
# для attention, который смешивает информацию энкодера и декодера
self.query_input_size = self.input_size if query_input_size is None else query_input_size
self.W_Q = nn.Linear(self.query_input_size, self.num_heads * self.head_size, bias=False)
self.W_K = nn.Linear(self.input_size, self.num_heads * self.head_size, bias=False)
self.W_V = nn.Linear(self.input_size, self.num_heads * self.head_size, bias=False)
# последний линейный слой
self.out = nn.Linear(self.head_size * self.num_heads, self.out_size)
# создает маску для Masked Multi-Head Attention
def make_mask(self, emb):
batch_size, emb_len, _ = emb.shape
mask = torch.tril(torch.ones((emb_len, emb_len))).expand(batch_size, 1, emb_len, emb_len).bool()
return mask
def forward(self, query, key, value, masked=False):
# batch_size, emb_len, input_size
batch_size = key.size(0)
emb_len = key.size(1)
query_emb_len = query.size(1)
q = self.W_Q(query)
k = self.W_K(key)
v = self.W_V(value)
# batch_size, emb_len, self.num_heads * self.head_size
# разделяем головы
q = q.view(batch_size, query_emb_len, self.num_heads, self.head_size)
k = k.view(batch_size, emb_len, self.num_heads, self.head_size)
v = v.view(batch_size, emb_len, self.num_heads, self.head_size)
# batch_size, emb_len, num_heads, head_size
q = q.transpose(1,2)
k = k.transpose(1,2)
v = v.transpose(1,2)
# batch_size, num_heads, emb_len, head_size
k_T = k.transpose(2, 3)
# batch_size, num_heads, head_size, emb_len
relevance = q @ k_T / math.sqrt(self.head_size)
# batch_size, num_heads, query_emb_len, emb_len
if masked:
mask = self.make_mask(key)
relevance = relevance.masked_fill(mask, -torch.inf)
# softmax вдоль последней размерности
relevance = F.softmax(relevance, dim=-1)
# batch_size, num_heads, emb_len, emb_len
heads = relevance @ v
# batch_size, num_heads, query_emb_len, head_size
heads = heads.transpose(1, 2)
# batch_size, query_emb_len, num_heads, head_size
concat = heads.reshape(batch_size, query_emb_len, self.head_size * self.num_heads)
# batch_size, query_emb_len, num_heads * head_size
out = self.out(concat)
# batch_size, query_emb_len, out_size
return out
2 Positional Encoding
Главное преимущество RNN моделей — это то, что они сохраняют временнУю связь между словами, читая текст слово за словом.
Positional Encoding добавляет к изначальным эмбеддингам информацию о позиции эмбеддинга в предложении (позиционный вектор такого же размера, что и сам эмбеддинг):


Существуют разные подходы получения позициональных векторов. Один из самых распространенных — через тригонометрические функции. Он и был предложен в статье «Attention is all you need»:

emb_dim
— размер каждого эмбеддинга, i
— номер элемента позиционного вектора, t
— позиция токена.
class PositionalEncoding(nn.Module):
def __init__(self, max_emb_len, emb_dim):
super(PositionalEncoding, self).__init__()
self.max_emb_len = max_emb_len
self.emb_dim = emb_dim
t = torch.arange(max_emb_len)[:, None]
i = torch.arange(emb_dim)[None, ::2]
pe = torch.zeros(self.max_emb_len, self.emb_dim)
# max_emb_len, emb_dim
pe[:, ::2] = torch.sin(t / (10000 ** ((2 * i) / self.emb_dim)))
pe[:, 1::2] = torch.cos(t / (10000 ** ((2 * i) / self.emb_dim)))
pe = pe.unsqueeze(0)
# 1, max_emb_len, emb_dim
# Данный тензор будем хранить в stage dict этого модуля
# Optimizer его не будет оптимизировать
self.register_buffer('PositionalEncoding', pe)
def forward(self, emb):
# batch_size, emb_len, input_size
emb_len = emb.size(1)
return emb + self.PositionalEncoding[:, :emb_len] # batch_size, emb_len, input_size
3. Собираем Encoder

Enсoder Block повторяется N раз и состоит из состоит :
Self Multi-Head Attention
Add & Norm
(Add
— прибавляет к embedding-ам self-attention слой;Norm
— нормализует значения embedding-ов по слоям)Feed Forward
(два полносвязных слоя, с функцией активациейReLU
между ними)Add & Norm
class EncoderBlock(nn.Module):
def __init__(self, input_size, head_size, num_heads, out_size, ff_hidden_size, query_input_size=None):
super(EncoderBlock, self).__init__()
self.input_size = input_size
self.head_size = head_size
self.num_heads = num_heads
self.out_size = out_size
self.ff_hidden_size = ff_hidden_size
self.query_input_size = input_size if query_input_size is None else query_input_size
self.attention = MultiHeadAttention(self.input_size, self.head_size, self.num_heads, self.out_size, self.query_input_size)
if self.query_input_size != self.out_size:
self.adapt = nn.Linear(self.query_input_size, self.out_size)
else:
self.adapt = nn.Identity() # возвращает входные данные без изменений
self.norm_1 = nn.LayerNorm(self.out_size)
self.feed_forward = nn.Sequential(OrderedDict([
("lin_1", nn.Linear(self.out_size, self.ff_hidden_size)),
("act", nn.ReLU()),
("lin_2", nn.Linear(self.ff_hidden_size, self.out_size)),
]))
self.norm_2 = nn.LayerNorm(self.out_size)
def forward(self, query, key, value):
# batch_size, seq_len, in_size
# Self Multi-Head Attention
attention_out = self.attention(query, key, value)
# batch_size, seq_len, out_size
# Add + Norm
# add_1_out = attention_out + query
# out_size - гиперпараметр, потому in_size (из query) может отличаться от out_size (из attention_out)
# Потому воспользуемся линейным слоем adapt чтобы привести к одной размерности out_size
add_1_out = attention_out + self.adapt(query)
norm_1_out = self.norm_1(add_1_out)
# batch_size, seq_len, out_size
# Feed Foerward
feed_forward_out = self.feed_forward(norm_1_out)
# batch_size, seq_len, out_size
# Add + Norm
add_out = feed_forward_out + norm_1_out
norm_2_out = self.norm_2(add_out)
return norm_2_out
Повторяем EncoderBlock N раз:
На вход подается embedding размером
batch_size, seq_len, input_size
.Применяем слой позиционного кодирования
PositionalEncoding
Прогоняем информацию N раз EncoderBlock
class TransformerEncoder(nn.Module):
def __init__(self, max_seq_len, vocab_size, emb_size, N, att_out_size, att_head_size, num_heads, ff_hidden_size):
super(TransformerEncoder, self).__init__()
self.max_seq_len = max_seq_len
self.vocab_size = vocab_size
self.emb_size = emb_size
self.N = N
self.att_out_size = att_out_size
self.att_head_size = att_head_size
self.num_heads = num_heads
self.ff_hidden_size = ff_hidden_size
self.embedding_layer = nn.Embedding(self.vocab_size, self.emb_size)
self.positional_encoder = PositionalEncoding(self.max_seq_len, self.emb_size)
self.encoder_blocks = nn.ModuleDict({
f"encoder_block_{i}": EncoderBlock(
input_size=self.emb_size if i==0 else self.att_out_size,
head_size=self.att_head_size,
num_heads=self.num_heads,
out_size=self.att_out_size,
ff_hidden_size=self.ff_hidden_size,
) for i in range(self.N)
})
def forward(self, encoder_input):
# batch_size x seq_len
encoder_emb = self.embedding_layer(encoder_input)
# batch_size, seq_len, emb_size
out = self.positional_encoder(encoder_emb)
# batch_size, seq_len, emb_size
for block in self.encoder_blocks.values():
out = block(out, out, out)
# batch_size, seq_len, att_out_size
return out
4. Собираем Decoder
Сначала, аналогично энкодеру, соберем декодер блок, который будем повторять N раз:

class DecoderBlock(nn.Module):
def __init__(self, input_size, head_size, num_heads, out_size, ff_hidden_size, encoder_out_size=None):
super(DecoderBlock, self).__init__()
self.input_size = input_size
self.head_size = head_size
self.num_heads = num_heads
self.out_size = out_size
self.ff_hidden_size = ff_hidden_size
self.encoder_out_size = input_size if encoder_out_size is None else encoder_out_size
self.masked_attention = MultiHeadAttention(
input_size=self.input_size,
head_size=self.head_size,
num_heads=self.num_heads,
out_size=self.out_size
)
if self.input_size != self.out_size:
self.adapt = nn.Linear(self.input_size, self.out_size)
else:
self.adapt = nn.Identity()
self.norm_1 = nn.LayerNorm(self.out_size)
self.attention = MultiHeadAttention(
input_size=self.encoder_out_size,
head_size=self.head_size,
num_heads=self.num_heads,
out_size=self.out_size,
query_input_size=self.out_size
)
self.norm_2 = nn.LayerNorm(self.out_size)
self.feed_forward = nn.Sequential(OrderedDict([
("lin_1", nn.Linear(self.out_size, self.ff_hidden_size)),
("act", nn.ReLU()),
("lin_2", nn.Linear(self.ff_hidden_size, self.out_size)),
]))
self.norm_3 = nn.LayerNorm(self.out_size)
def forward(self, decoder_emb, encoder_output):
# decoder_emb.size() = batch_size, seq_len, input_size
# encoder_output.size() = batch_size, encoder_seq_len, encoder_out_size
masked_attention = self.masked_attention(decoder_emb, decoder_emb, decoder_emb, masked=True)
out_add_1 = masked_attention + self.adapt(decoder_emb)
out_norm_1 = self.norm_1(out_add_1)
# batch_size, emb_len, out_size
attention = self.attention(out_norm_1, encoder_output, encoder_output)
out_add_2 = attention + out_norm_1
out_norm_2 = self.norm_2(out_add_2)
# batch_size, emb_len, out_size
out_feed_forward = self.feed_forward(out_norm_2)
out_add_3 = masked_attention + out_norm_2
out_norm_3 = self.norm_3(out_add_2)
# batch_size, seq_len, out_size
return out_norm_3
На вход декодеру приходят эмбеддинги энкодера
Добавляем к ним
PositionalEnсoding
Проходим N раз через декодер-блок
Завершаем последним линейным слоем и softmax-ом
class TransformerDecoder(nn.Module):
def __init__(self, max_seq_len, vocab_size, emb_size, num_layers, att_out_size, att_head_size, num_heads, ff_hidden_size, encoder_out_size=None):
super(TransformerDecoder, self).__init__()
self.max_seq_len = max_seq_len
self.vocab_size = vocab_size
self.emb_size = emb_size
self.num_layers = num_layers
self.att_out_size = att_out_size
self.att_head_size = att_head_size
self.num_heads = num_heads
self.ff_hidden_size = ff_hidden_size
self.encoder_out_size = input_size if encoder_out_size is None else encoder_out_size
self.embedding_layer = nn.Embedding(self.vocab_size, self.emb_size)
self.positional_encoder = PositionalEncoding(self.max_seq_len, self.emb_size)
self.decoder_blocks = nn.ModuleDict({
f"decoder_block_{i}": DecoderBlock(
input_size=self.emb_size if i==0 else self.att_out_size,
head_size=self.att_head_size,
num_heads=self.num_heads,
out_size=self.att_out_size,
ff_hidden_size=self.ff_hidden_size,
encoder_out_size=self.encoder_out_size,
) for i in range(self.num_layers)
})
self.fc = nn.Linear(self.att_out_size, self.vocab_size)
def forward(self, decoder_input, encoder_output):
# decoder_input.size() = batch_size, seq_len
# encoder_output.size() = batch_size, encoder_seq_le, encoder_out_size
decoder_emb = self.embedding_layer(decoder_input)
# batch_size, seq_len, emb_size
decoder_emb = self.positional_encoder(decoder_emb)
# batch_size, seq_len, emb_size
out = decoder_emb
for block in self.decoder_blocks.values():
out = block(out, encoder_output)
# batch_size, seq_len, att_out_size
out = self.fc(out)
return out
# batch_size x vocab_size
5. Собираем Transformer
class Transformer(nn.Module):
def __init__(self, max_seq_len, vocab_size, emb_size, num_encoder_layers, enc_att_out_size, enc_att_head_size, enc_num_heads, enc_ff_hidden_size, num_decoder_layers, dec_att_out_size, dec_att_head_size, dec_num_heads, dec_ff_hidden_size,):
super(Transformer, self).__init__()
self.max_seq_len = max_seq_len
self.vocab_size = vocab_size
self.emb_size = emb_size
self.num_encoder_layers = num_encoder_layers
self.enc_att_out_size = enc_att_out_size
self.enc_att_head_size = enc_att_head_size
self.enc_num_heads = enc_num_heads
self.enc_ff_hidden_size = enc_ff_hidden_size
self.num_decoder_layers = num_decoder_layers
self.dec_att_out_size = dec_att_out_size
self.dec_att_head_size = dec_att_out_size
self.dec_num_heads = dec_num_heads
self.dec_ff_hidden_size = dec_ff_hidden_size
# Encoder
self.encoder = TransformerEncoder(
max_seq_len=self.max_seq_len,
vocab_size=self.vocab_size,
emb_size=self.emb_size,
num_layers=self.num_encoder_layers,
att_head_size=self.enc_att_head_size,
num_heads=self.enc_num_heads,
att_out_size=self.enc_att_out_size,
ff_hidden_size=self.enc_ff_hidden_size,
)
# Decoder
self.decoder = TransformerDecoder(
max_seq_len=self.max_seq_len,
vocab_size=self.vocab_size,
emb_size=self.emb_size,
num_layers=self.num_decoder_layers,
att_head_size=self.dec_att_head_size,
num_heads=self.dec_num_heads,
att_out_size=self.dec_att_out_size,
ff_hidden_size=self.dec_ff_hidden_size,
encoder_out_size=self.enc_att_out_size,
)
def forward(self, encoder_input, decoder_input):
# encoder_input.size() = batch_size, encoder_seq_len
# decoder_input.size() = batch_size, decoder_seq_len
encoder_output = self.encoder(encoder_input)
# batch_size, enc_seq_len, enc_att_out_size
decoder_output = self.decoder(decoder_input, encoder_output)
# batch_size, dec_seq_len, vocab_size
return decoder_output
Замечания
На самом деле в декодере можно бы было использовать часть энкодера, а не дублировать слои. Для этого надо добавить query_input_size в энкодер и аккуратно создать слой энкодера в декодере и правильно передать параметры.

2.Нет DropOut слоев