Пишем свой Transformer

Чтобы поупражняться я решила более детально разобраться и попробовать самостоятельно написать Transformer на PyTorch. Результатом захотелось поделиться здесь. Надеюсь, так же как и мне, это поможет доразобраться в данной архитектуре и ответить на какие-то вопросы.

Оставляю ссылку на свой канал: not_magic_neural_networks

0 Intro

Впервые архитектуру трансформер предложили использовать в 2017 году в статье Google «Attention is all you need» (именно для задачи машинного перевода). Вскоре, Google начал впервые в истории использовать нейросети для перевода (в google translate).

Архитектура оказалась настолько эффективна, что ее адаптировали и под другие задачи, а весь NLP начал так активно развиваться именно с 2017 года.

Итак, transformer — архитектура, основа которой механизм внимания attention. Он так же, как и RNN, состоит из encoder и decoder части.

Основная статья: «Attention is all you need»
Пост с разбором: Transformer
Еще можно почитать тут: 6.3. Трансформеры
Или посмотреть: Лекция. Архитектура Transformer. Decoder, QKV Attention

Далее, я буду полагать, что читатели немного знакомы с transformer и буду рассуждать больше в контексте реализации.

Transformer
Transformer

1 Multi-head Attention

An Explanation of Self-Attention mechanism in Transformer
What is Multi-Head Attention (MHA)
PyTorch: MultiheadAttention

Начнем с главной части трансформера — Multi-head Attention.

Multi-head Attention присутствует в трех местах:

  1. Self Multi-head Attention в энкодере

  2. Masked Multi-head Attention в декодере

  3. Multi-head Attention, соединяющий энкодер и декодер части (так же в декодере)

Multi-head Attention
Multi-head Attention

1.1. QKV — attention

Attention (или механизм внимания) несет информацию о связях слов друг с другом.

QKV-attention — один из способов вычисления связей:

Attention(Q, K, V) = softmax\Bigg(\frac{QK^T}{\sqrt{d_k}}\Bigg) \cdot V

Понятия Q(query), K(key) и V(value) пришли из поиска:
Q — запрос, K — краткая сводка по документу и V — сам документ. Задача состоит в том чтобы по Q и K понять насколько документ релевантен (какой V выбирать).

В attention, Q, K и V были придуманы для того чтобы отобразить вход x в три линейный пространства со следующими смыслами: Q отобразит слово в пространство «откуда»; K отобразит в пространство «куда»; V отобразит в пространство в «что важно».

Пусть на вход нам приходит тензор x.

Получаем матрицы запросов (Q), ключей (K) и значений (V), пропуская эмбеддинги x через линейные слои W_q, W_k и W_v.
q = x * W_q
k = x * W_k
v = x * W_v

Интуиция такая: query-вектор спрашивает у key-вектора, есть ли у него какая-то полезная информация чтобы обновить информацию о себе. То есть, когда мы делаем перемножение q ✕ k.T, то мы фильтруем нужную информацию (выбираем наиболее релевантную) из k для q.

Считаем релевантность какrelevance = q @ k.T / math.sqrt(head_size), где math.sqrt(head_size) — нормировочная константа.

Получаем вероятности: relevance = softmax(relevance)

И считаем выход: head = relevance @ v

1.2. Multi-head Attention

Одну операцию attention(Q, K, V) принято называть «головой». Именно поэтому переменная выше была названа head. Multi-head Attention подразумевает что таких голов будет несколько и их количество будет определяться гиперпараметром num_head.

Подразумевается, что разные головы attention могли бы обращать внимание на разные типы связей между словами. Это и есть multi-head attention или attention с несколькими головами.

MultiHead(Q, K, V) = Concat(head_1, ..., head_H) \cdot W^O

head_i = Attention(Q_i, K_i, V_i)

Multi-head attention — конкатенация результатов нескольких отдельных attention операций (head_i), отображенная в заданную размерность с помощью еще одного линейного слоя.

1.3. Encoder Multi-Head Attention (или Self)

Self-attention работает только с вектором энкодера, то есть энкодер учится обращать внимание сам на себя (потому self). Его идея заключается в следующем: self-attention обновляет embedding-и каждого токена, добавляя в них полезную информацию на основе контекста (всех остальных токенов в предложении) в котором эти embedding-и находятся.

Self Multi-Head Attention
Self Multi-Head Attention

1.4 Encoder-Decoder Multi-Head Attention

Multi-head Attention соединяющий энкодер и декодер трансформера должен учитывать, что размерность Q (пришедшей из декодера) может отчитаться от V и K (пришедшей из энкодера).

Encoder-Decoder Multi-Head Attention
Encoder-Decoder Multi-Head Attention

1.5. Decoder Multi-head Attention (или Masked)

Последнее место, где используется Multi-head Attention:

Masked Multi-head Attention
Masked Multi-head Attention

Изначально, Transformer применялся к задачу перевода текста, в которой выходы (собственно перевод) должны генерироваться авторегрессионно, слово за словом.

Однако, Masked Multi-Head Attention реализует более хитрый вариант: к attention (матрице, которая отвечает «кто на кого смотрит») применяется авторегрессивная маска, обращаюая в −∞ веса до softmax для токенов из будущего, чтобы после softmax их вероятности стали нулевыми. Эта маска имеет нижнетреугольный вид (левый нижний треугольник включая главную диагональ).

Таким образом, каждый элемент смотрит только на те элементы, которые были в прошлом и не заглядывает в будущее. Это позволяет подавать данные не авторегрессионно, а все сразу.

При этом, на инференсе Masked Multi-Head Attention работает как обычный Multi-Head Attention.

В функции Multi-Head Attention применим к relevance маску, если она задана, до функции softmax: в тех местах, где маска содержит True, ставим -inf, чтобы после применения softmax значение в этой ячейке занулилось (так как экспонента возведенная в степень -inf будет равняться нулю).

1.6. Пишем Multi-head Attention

class MultiHeadAttention(nn.Module):
    def __init__(self, input_size, head_size, num_heads, out_size, query_input_size=None):
        super(MultiHeadAttention, self).__init__()
        
        self.input_size = input_size
        self.head_size = head_size
        self.num_heads = num_heads
        self.out_size = out_size
        # для attention, который смешивает информацию энкодера и декодера
        self.query_input_size = self.input_size if query_input_size is None else query_input_size
        
        self.W_Q = nn.Linear(self.query_input_size, self.num_heads * self.head_size, bias=False)
        self.W_K = nn.Linear(self.input_size, self.num_heads * self.head_size, bias=False)
        self.W_V = nn.Linear(self.input_size, self.num_heads * self.head_size, bias=False)
        
        # последний линейный слой
        self.out = nn.Linear(self.head_size * self.num_heads, self.out_size) 

    # создает маску для Masked Multi-Head Attention
    def make_mask(self, emb):
        batch_size, emb_len, _ = emb.shape
        mask = torch.tril(torch.ones((emb_len, emb_len))).expand(batch_size, 1, emb_len, emb_len).bool()
        return mask
    
    def forward(self, query, key, value, masked=False):
        # batch_size, emb_len, input_size
        
        batch_size = key.size(0)
        emb_len = key.size(1)
        query_emb_len = query.size(1)
        
        q = self.W_Q(query) 
        k = self.W_K(key)
        v = self.W_V(value)
        # batch_size, emb_len, self.num_heads * self.head_size
        
        # разделяем головы
        q = q.view(batch_size, query_emb_len, self.num_heads, self.head_size)
        k = k.view(batch_size, emb_len, self.num_heads, self.head_size)
        v = v.view(batch_size, emb_len, self.num_heads, self.head_size)
        # batch_size, emb_len, num_heads, head_size
        
        q = q.transpose(1,2) 
        k = k.transpose(1,2) 
        v = v.transpose(1,2) 
        # batch_size, num_heads, emb_len, head_size

        k_T = k.transpose(2, 3) 
        # batch_size, num_heads, head_size, emb_len 
        
        relevance = q @ k_T / math.sqrt(self.head_size)  
        # batch_size, num_heads, query_emb_len, emb_len
        
        if masked:
            mask = self.make_mask(key)
            relevance = relevance.masked_fill(mask, -torch.inf)
        
        # softmax вдоль последней размерности
        relevance = F.softmax(relevance, dim=-1)
        # batch_size, num_heads, emb_len, emb_len
        
        heads = relevance @ v
        # batch_size, num_heads, query_emb_len, head_size
        
        heads = heads.transpose(1, 2)
        # batch_size, query_emb_len, num_heads, head_size
        
        concat = heads.reshape(batch_size, query_emb_len, self.head_size * self.num_heads)  
        # batch_size, query_emb_len, num_heads * head_size
                
        out = self.out(concat)
        # batch_size, query_emb_len, out_size
        
        return out

2 Positional Encoding

Главное преимущество RNN моделей — это то, что они сохраняют временнУю связь между словами, читая текст слово за словом.

Positional Encoding добавляет к изначальным эмбеддингам информацию о позиции эмбеддинга в предложении (позиционный вектор такого же размера, что и сам эмбеддинг):

1e73ff48bd4136ee71a0e333a5a5af2f.pngfa0965c508bcfe404f2d412e644d956f.png

Существуют разные подходы получения позициональных векторов. Один из самых распространенных — через тригонометрические функции. Он и был предложен в статье «Attention is all you need»:

Positional Encoding
Positional Encoding

emb_dim — размер каждого эмбеддинга, i — номер элемента позиционного вектора, t — позиция токена.

class PositionalEncoding(nn.Module):
    def __init__(self, max_emb_len, emb_dim):
        super(PositionalEncoding, self).__init__()
        
        self.max_emb_len = max_emb_len
        self.emb_dim = emb_dim
        
        t = torch.arange(max_emb_len)[:, None]
        i = torch.arange(emb_dim)[None, ::2]

        pe = torch.zeros(self.max_emb_len, self.emb_dim)
        # max_emb_len, emb_dim
        
        pe[:, ::2] = torch.sin(t / (10000 ** ((2 * i) / self.emb_dim)))
        pe[:, 1::2] = torch.cos(t / (10000 ** ((2 * i) / self.emb_dim)))
        
        pe = pe.unsqueeze(0)
        # 1, max_emb_len, emb_dim
        
        # Данный тензор будем хранить в stage dict этого модуля
        # Optimizer его не будет оптимизировать
        self.register_buffer('PositionalEncoding', pe)

    def forward(self, emb):
        # batch_size, emb_len, input_size
        emb_len = emb.size(1)
        return emb + self.PositionalEncoding[:, :emb_len] # batch_size, emb_len, input_size

3. Собираем Encoder

Encoder
Encoder

Enсoder Block повторяется N раз и состоит из состоит :

  1. Self Multi-Head Attention

  2. Add & Norm(Add — прибавляет к embedding-ам self-attention слой; Norm — нормализует значения embedding-ов по слоям)

  3. Feed Forward(два полносвязных слоя, с функцией активацией ReLU между ними)

  4. Add & Norm

class EncoderBlock(nn.Module):
    def __init__(self, input_size, head_size, num_heads, out_size, ff_hidden_size, query_input_size=None):
        super(EncoderBlock, self).__init__()
        
        self.input_size = input_size
        self.head_size = head_size
        self.num_heads = num_heads
        self.out_size = out_size
        
        self.ff_hidden_size = ff_hidden_size
        self.query_input_size = input_size if query_input_size is None else query_input_size
        
        self.attention = MultiHeadAttention(self.input_size, self.head_size, self.num_heads, self.out_size, self.query_input_size)
        
        if self.query_input_size != self.out_size:
            self.adapt = nn.Linear(self.query_input_size, self.out_size) 
        else:
            self.adapt = nn.Identity() # возвращает входные данные без изменений
        
        self.norm_1 = nn.LayerNorm(self.out_size) 
        
        self.feed_forward = nn.Sequential(OrderedDict([
            ("lin_1", nn.Linear(self.out_size, self.ff_hidden_size)),
            ("act", nn.ReLU()),
            ("lin_2", nn.Linear(self.ff_hidden_size, self.out_size)),
        ]))
        
        self.norm_2 = nn.LayerNorm(self.out_size)

    def forward(self, query, key, value):
        # batch_size, seq_len, in_size
        
        # Self Multi-Head Attention
        attention_out = self.attention(query, key, value)  
        # batch_size, seq_len, out_size
        
        # Add + Norm
        # add_1_out = attention_out + query
        # out_size - гиперпараметр, потому in_size (из query) может отличаться от out_size (из attention_out)
        # Потому воспользуемся линейным слоем adapt чтобы привести к одной размерности out_size
        add_1_out = attention_out + self.adapt(query)
        norm_1_out = self.norm_1(add_1_out)
        # batch_size, seq_len, out_size
        
        # Feed Foerward
        feed_forward_out = self.feed_forward(norm_1_out)
        # batch_size, seq_len, out_size
        
        # Add + Norm
        add_out = feed_forward_out + norm_1_out
        norm_2_out = self.norm_2(add_out)
        
        return norm_2_out

Повторяем EncoderBlock N раз:

  1. На вход подается embedding размером batch_size, seq_len, input_size.

  2. Применяем слой позиционного кодирования PositionalEncoding

  3. Прогоняем информацию N раз EncoderBlock

class TransformerEncoder(nn.Module):
    def __init__(self, max_seq_len, vocab_size, emb_size, N, att_out_size, att_head_size, num_heads, ff_hidden_size):
        super(TransformerEncoder, self).__init__()
        
        self.max_seq_len = max_seq_len
        self.vocab_size = vocab_size
        self.emb_size = emb_size
        self.N = N
        self.att_out_size = att_out_size
        self.att_head_size = att_head_size
        self.num_heads = num_heads
        self.ff_hidden_size = ff_hidden_size
        
        self.embedding_layer = nn.Embedding(self.vocab_size, self.emb_size)
        self.positional_encoder = PositionalEncoding(self.max_seq_len, self.emb_size)

        self.encoder_blocks = nn.ModuleDict({
            f"encoder_block_{i}": EncoderBlock(
                input_size=self.emb_size if i==0 else self.att_out_size,
                head_size=self.att_head_size,
                num_heads=self.num_heads,
                out_size=self.att_out_size,
                ff_hidden_size=self.ff_hidden_size,
            ) for i in range(self.N)
        })
    
    def forward(self, encoder_input):
        # batch_size x seq_len
        
        encoder_emb = self.embedding_layer(encoder_input)  
        # batch_size, seq_len, emb_size
        
        out = self.positional_encoder(encoder_emb)
        # batch_size, seq_len, emb_size
        
        for block in self.encoder_blocks.values():
            out = block(out, out, out)  
        # batch_size, seq_len, att_out_size

        return out

4. Собираем Decoder

Сначала, аналогично энкодеру, соберем декодер блок, который будем повторять N раз:

0d49a55ce85b0b97d69c9d31971128db.png
class DecoderBlock(nn.Module):
    def __init__(self, input_size, head_size, num_heads, out_size, ff_hidden_size, encoder_out_size=None):
        super(DecoderBlock, self).__init__()
        
        self.input_size = input_size
        self.head_size = head_size
        self.num_heads = num_heads
        self.out_size = out_size
        self.ff_hidden_size = ff_hidden_size
        self.encoder_out_size = input_size if encoder_out_size is None else encoder_out_size
        
        self.masked_attention = MultiHeadAttention(
            input_size=self.input_size, 
            head_size=self.head_size, 
            num_heads=self.num_heads, 
            out_size=self.out_size
        )
        
        if self.input_size != self.out_size:
            self.adapt = nn.Linear(self.input_size, self.out_size)  
        else:
            self.adapt = nn.Identity()
            
        self.norm_1 = nn.LayerNorm(self.out_size)
        
        self.attention = MultiHeadAttention(
            input_size=self.encoder_out_size, 
            head_size=self.head_size, 
            num_heads=self.num_heads, 
            out_size=self.out_size, 
            query_input_size=self.out_size
        )
            
        self.norm_2 = nn.LayerNorm(self.out_size)
              
        self.feed_forward = nn.Sequential(OrderedDict([
            ("lin_1", nn.Linear(self.out_size, self.ff_hidden_size)),
            ("act", nn.ReLU()),
            ("lin_2", nn.Linear(self.ff_hidden_size, self.out_size)),
        ]))
        
        self.norm_3 = nn.LayerNorm(self.out_size)
    
    def forward(self, decoder_emb, encoder_output):
        # decoder_emb.size() = batch_size, seq_len, input_size
        # encoder_output.size() = batch_size, encoder_seq_len, encoder_out_size
        
        masked_attention = self.masked_attention(decoder_emb, decoder_emb, decoder_emb, masked=True)
        out_add_1 = masked_attention + self.adapt(decoder_emb)
        out_norm_1 = self.norm_1(out_add_1)
        # batch_size, emb_len, out_size
        
        attention = self.attention(out_norm_1, encoder_output, encoder_output)
        out_add_2 = attention + out_norm_1
        out_norm_2 = self.norm_2(out_add_2)
        # batch_size, emb_len, out_size
        
        out_feed_forward = self.feed_forward(out_norm_2)
        out_add_3 = masked_attention + out_norm_2
        out_norm_3 = self.norm_3(out_add_2)
        # batch_size, seq_len, out_size
        
        return out_norm_3  
  1. На вход декодеру приходят эмбеддинги энкодера

  2. Добавляем к ним PositionalEnсoding

  3. Проходим N раз через декодер-блок

  4. Завершаем последним линейным слоем и softmax-ом

class TransformerDecoder(nn.Module):
    def __init__(self, max_seq_len, vocab_size, emb_size, num_layers, att_out_size, att_head_size, num_heads, ff_hidden_size, encoder_out_size=None):
        super(TransformerDecoder, self).__init__()
        
        self.max_seq_len = max_seq_len
        self.vocab_size = vocab_size
        self.emb_size = emb_size
        self.num_layers = num_layers
        self.att_out_size = att_out_size
        self.att_head_size = att_head_size
        self.num_heads = num_heads
        self.ff_hidden_size = ff_hidden_size
        self.encoder_out_size = input_size if encoder_out_size is None else encoder_out_size
        
        self.embedding_layer = nn.Embedding(self.vocab_size, self.emb_size)
        self.positional_encoder = PositionalEncoding(self.max_seq_len, self.emb_size)

        self.decoder_blocks = nn.ModuleDict({
            f"decoder_block_{i}": DecoderBlock(
                input_size=self.emb_size if i==0 else self.att_out_size,
                head_size=self.att_head_size,
                num_heads=self.num_heads,
                out_size=self.att_out_size,
                ff_hidden_size=self.ff_hidden_size,
                encoder_out_size=self.encoder_out_size,
            ) for i in range(self.num_layers)
        })
        
        self.fc = nn.Linear(self.att_out_size, self.vocab_size)

    def forward(self, decoder_input, encoder_output):
        # decoder_input.size() = batch_size, seq_len
        # encoder_output.size() = batch_size, encoder_seq_le, encoder_out_size
        
        decoder_emb = self.embedding_layer(decoder_input)  
        # batch_size, seq_len, emb_size
        
        decoder_emb = self.positional_encoder(decoder_emb)
        # batch_size, seq_len, emb_size
        
        out = decoder_emb
        for block in self.decoder_blocks.values():
            out = block(out, encoder_output)  
        # batch_size, seq_len, att_out_size
            
        out = self.fc(out)
        return out
        # batch_size x vocab_size

5. Собираем Transformer

class Transformer(nn.Module):
    def __init__(self, max_seq_len, vocab_size, emb_size, num_encoder_layers, enc_att_out_size, enc_att_head_size, enc_num_heads, enc_ff_hidden_size, num_decoder_layers, dec_att_out_size, dec_att_head_size, dec_num_heads, dec_ff_hidden_size,):
        super(Transformer, self).__init__()
        
        self.max_seq_len = max_seq_len
        self.vocab_size = vocab_size
        self.emb_size = emb_size
        
        self.num_encoder_layers = num_encoder_layers
        self.enc_att_out_size = enc_att_out_size
        self.enc_att_head_size = enc_att_head_size
        self.enc_num_heads = enc_num_heads
        self.enc_ff_hidden_size = enc_ff_hidden_size
        
        self.num_decoder_layers = num_decoder_layers
        self.dec_att_out_size = dec_att_out_size
        self.dec_att_head_size = dec_att_out_size
        self.dec_num_heads = dec_num_heads
        self.dec_ff_hidden_size = dec_ff_hidden_size

        # Encoder
        self.encoder = TransformerEncoder(
            max_seq_len=self.max_seq_len,
            vocab_size=self.vocab_size,
            emb_size=self.emb_size,
            num_layers=self.num_encoder_layers,
            att_head_size=self.enc_att_head_size,
            num_heads=self.enc_num_heads,
            att_out_size=self.enc_att_out_size,
            ff_hidden_size=self.enc_ff_hidden_size,
        )
        
        # Decoder
        self.decoder = TransformerDecoder(
            max_seq_len=self.max_seq_len,
            vocab_size=self.vocab_size,
            emb_size=self.emb_size,
            num_layers=self.num_decoder_layers,
            att_head_size=self.dec_att_head_size,
            num_heads=self.dec_num_heads,
            att_out_size=self.dec_att_out_size,
            ff_hidden_size=self.dec_ff_hidden_size,
            encoder_out_size=self.enc_att_out_size,
        )
    
    def forward(self, encoder_input, decoder_input):
        # encoder_input.size() = batch_size, encoder_seq_len
        # decoder_input.size() = batch_size, decoder_seq_len
        
        encoder_output = self.encoder(encoder_input)  
        # batch_size, enc_seq_len, enc_att_out_size
        decoder_output = self.decoder(decoder_input, encoder_output)
        # batch_size, dec_seq_len, vocab_size
        return decoder_output

Замечания

  1. На самом деле в декодере можно бы было использовать часть энкодера, а не дублировать слои. Для этого надо добавить query_input_size в энкодер и аккуратно создать слой энкодера в декодере и правильно передать параметры.

Часть декодера, где можно переиспользовать энкодер
Часть декодера, где можно переиспользовать энкодер

2.Нет DropOut слоев

© Habrahabr.ru