MultiHeadAttention实现详解

Transformer自2017年推出之后,已经横扫NLP领域,成为当之无愧的state-of-the-art。原始paper “Attention is All you Need”中对attention提出了通用的query/key/value抽象,开始时觉得很难理解,后来随着读的文献更多,慢慢体会到了其中的意思。关于Transformer和attention的各种解释类文章有很多,不再赘述,本文仅就其中的核心,MultiHeadAttention的实现进行源码剖析。

Transformer的实现有很多,但我看到实现得最clean的还是 # The Annotated Transformer。它的实现是一个最基本的版本,但五脏俱全,理解原理再合适不过。

MultiHeadAttention

如图所示,所谓Multi-Head Attention其实是把QKV的计算并行化,原始attention计算d_model维的向量,而Multi-Head Attention则是将d_model维向量先经过一个Linear Layer,再分解为h个Head计算attention,最终将这些attention向量连在一起后再经过一层Linear Layer输出。所以在整个过程中需要4个输入和输出维度都是d_model的Linear Layer,而整个Model的输入是(batch_size, seq_length, d_model),输出也是(batch_size, seq_length, d_model)。

先上原始代码:

class MultiHeadedAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        "Take in model size and number of heads."
        super(MultiHeadedAttention, self).__init__()
        assert d_model % h == 0
        # We assume d_v always equals d_k
        self.d_k = d_model // h
        self.h = h
        self.linears = clones(nn.Linear(d_model, d_model), 4)
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)
        
    def forward(self, query, key, value, mask=None):
        "Implements Figure 2"
        if mask is not None:
            # Same mask applied to all h heads.
            mask = mask.unsqueeze(1)
        nbatches = query.size(0)
        
        # 1) Do all the linear projections in batch from d_model => h x d_k 
        query, key, value = \
            [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
             for l, x in zip(self.linears, (query, key, value))]
        
        # 2) Apply attention on all the projected vectors in batch. 
        x, self.attn = attention(query, key, value, mask=mask, 
                                 dropout=self.dropout)
        
        # 3) "Concat" using a view and apply a final linear. 
        x = x.transpose(1, 2).contiguous() \
             .view(nbatches, -1, self.h * self.d_k)
        return self.linears[-1](x)

这段代码中最费解的地方:

# 1) Do all the linear projections in batch from d_model => h x d_k 
query, key, value = \
    [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
     for l, x in zip(self.linears, (query, key, value))]

前面提到MultiHeadAttention需要4个Linear Layer,而上面这段代码用到了其中前三个,最后一个用在最后一行self.linears[-1]。重写下这段代码:

query, key, value = [l(x) for l, x in zip(self.linears, (query, key, value))]
query, key, value = [x.view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
                     for x in (query, key, value)]

第一行把QKV分别经过一层Linear变换,tensor size不变,第二行将QKV的d_model维向量分解为h * d_k。

跑一个self-attention的实例,作为输入,query/key/value的shape为(batch_size, seq_lengh, d_model):

h = 8
d_model = 512
batch_size = 1
seq_length = 10
model = MultiHeadAttention(h, d_model)

query = torch.randn([batch_size, seq_length, d_model])
key = query
value = query

print ('Input size: ' + str(query.size()))

将代码中的tensor变换维度加上注释,类重命名为MultiHeadAttention,可运行的完整代码如下:

import torch
import torch.nn as nn
import torch.nn.functional as F
import math, copy

def clones(module, N):
    "Produce N identical layers."
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

def attention(query, key, value, mask=None, dropout=None):
    "Compute 'Scaled Dot Product Attention'"
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)
    p_attn = F.softmax(scores, dim = -1)
    if dropout is not None:
        p_attn = dropout(p_attn)
    return torch.matmul(p_attn, value), p_attn

class MultiHeadAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        "Take in model size and number of heads."
        super(MultiHeadAttention, self).__init__()
        assert d_model % h == 0
        # We assume d_v always equals d_k
        self.d_k = d_model // h
        self.h = h
        self.linears = clones(nn.Linear(d_model, d_model), 4) # create 4 linear layers
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)
        
    def forward(self, query, key, value, mask=None):
        if mask is not None:
            # Same mask applied to all h heads.
            mask = mask.unsqueeze(1)
        batch_size = query.size(0)
        
        print ('Before transform query: ' + str(query.size())) # (batch_size, seq_length, d_model)        

        query, key, value = [l(x) for l, x in zip(self.linears, (query, key, value))] # (batch_size, seq_length, d_model), use first 3 self.linears
        query, key, value = [x.view(batch_size, -1, self.h, self.d_k).transpose(1, 2)
                             for x in (query, key, value)] # (batch_size, h, seq_length, d_k)
                
        print ('After transform query: ' + str(query.size()))
        
        # 2) Apply attention on all the projected vectors in batch. 
        x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
        
        # 3) "Concat" using a view and apply a final linear. 
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.h * self.d_k)
        return self.linears[-1](x)

    
h = 8
d_model = 512
batch_size = 1
seq_length = 10
model = MultiHeadAttention(h, d_model)

query = torch.randn([batch_size, seq_length, d_model])
key = query
value = query

print ('Input size: ' + str(query.size()))

m = model(query, key, value)

print ('Output size: ' + str(m.size()))

运行结果:

Input size: torch.Size([1, 10, 512])
Before transform query: torch.Size([1, 10, 512])
After transform query: torch.Size([1, 8, 10, 64])
Output size: torch.Size([1, 10, 512])