《动手学深度学习》——注意力机制
一、注意力提示
生物学中的注意力提示
为解释注意力是如何在视觉世界中展开的,一个双组件(two-component)的框架应运而生,在这个框架中,人们基于非自主性提示和自主性提示有选择地引导注意力的焦点
查询、键和值
-
在注意力机制下,将自主性提示称为查询(Queries)。给定任何查询,注意力机制通过注意力池化(attention pooling)将选择偏向于感官输入(sensory inputs)。这些感官输入被称为值(Values)。每个值都与一个键(Keys)配对,可以看作感官输入的非自主提示。
-
我们可以设计注意力池化层,使得给定得查询(自主性提示)可以与键(非自主性提示)进行交互,使得有偏向性得选择某些值(感官输入)
二、注意力池化:Nadaraya-Watson 核回归
非参数注意力池化
- Nadaraya和Waston提出了一个想法,根据输入得位置对输出进行加权:
其中是核(Kernel)。上述估计器被称为Nadaraya-Wastson核回归
- 从注意力机制得角度重写一个更加通用得注意力池化公式:
其中是查询,(, )是键值对。将查询和键之间的关系建模为注意力权重,这个权重将被分配给每一个对应值。
- 例如使用一个高斯核(Gaussian kernel),其定义为:
将高斯核带入得到:
其中若一个键越接近给定的查询,那么分配给这个键对应值的注意力权重就会越大
带参数注意力池化
-
非参数的Nadaraya-Watson核回归具有一致性(consistency)的优点:如果有足够的数据,此模型会收敛到最优结果
-
在查询和键之间的距离乘以可学习参数:
- 定义模型:
class NWKernelRegression(nn.Module): def __init__(self, **kwargs): super().__init__(**kwargs) self.w = nn.Parameter(torch.rand((1,), requires_grad=True)) def forward(self, queries, keys, values): queries = queries.repeat_interleave(keys.shape[1]).reshape((-1, keys.shape[1])) self.attention_weights = nn.functional.softmax(-((queries - keys) * self.w)**2 / 2, dim=1) return torch.bmm(self.attention_weights.unsqueeze(1), values.unsqueeze(-1)).reshape(-1) #平铺为一行 - 训练模型:
X_tile = x_train.repeat((n_train, 1)) #沿着指定的维度重复tensor Y_tile = y_train.repeat((n_train, 1)) keys = X_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1)) #torch.eye生成对角线全1,其余全0的二维数组 values = Y_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1)) net = NWKernelRegression() loss = nn.MSELoss(reduction='none') trainer = torch.optim.SGD(net.parameters(), lr=0.5) epoch_list = [] loss_list = [] for epoch in range(5): trainer.zero_grad() l = loss(net(x_train, keys, values), y_train) / 2 l.sum().backward() trainer.step() print(f'epoch {epoch+1}, loss {float(l.sum()):.6f}') epoch_list.append(epoch+1) loss_list.append(l.sum()) plt.plot(epoch_list, loss_list) plt.xlim([1, 5]) plt.xlabel('epoch') plt.ylabel('loss') plt.show()

5. 预测结果:
keys = x_train.repeat((n_test, 1))
values = y_train.repeat((n_test, 1))
y_hat = net(x_test, keys, values).unsqueeze(1).detach()
plot_kernel_reg(y_hat)

训练完带参数的注意力汇聚模型后,在尝试拟合带噪声的训练数据时,预测结果绘制的线不如之前非参数模型的线平滑
三、注意力评分函数
注意力池化
假设有一个查询和个"键-值"对,其中, 。注意力池化函数被表示成值得加权和:
其中查询和键的注意力权重(标量)是通过注意力评分函数将两个向量映射成标量,再经过softmax运算得到的:
遮蔽softmax操作
为将有意义的次元作为值去获取注意力池化,可以指定一个有效序列长度(即词元的个数),以便在计算softmax时过滤掉查出指定范围的位置
def masked_softmax(X, valid_lens):
if valid_lens is None:
return nn.functional.softmax(X, dim=-1)
else:
shape = X.shape
if valid_lens.dim() == 1:
valid_lens = torch.repeat_interleave(valid_lens, shape[1])
else:
valid_lens = valid_lens.reshape(-1)
X = sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6)
return nn.functional.softmax(X.reshape(shape), dim=-1)
演示效果:
加性注意力
一般当查询和键长度不同时,可以使用加性注意力作为评分函数。给定查询和键, 加性注意力(additive attention)的评分函数为:
其中可学习的参数是、和。将查询和键来连接起来后输入到一个多层感知机中,感知机隐藏单元是一个超参数,通过使用作为激活函数,并且禁用偏置项
class AdditiveAttention(nn.Module):
def __init__(self, key_size, query_size, num_hiddens, dropout, **kwargs):
super(AdditiveAttention, self).__init__(**kwargs)
self.W_k = nn.Linear(key_size, num_hiddens, bias=False)
self.W_q = nn.Linear(query_size, num_hiddens, bias=False)
self.w_v = nn.Linear(num_hiddens, 1, bias=False)
self.dropout = nn.Dropout(dropout)
def forward(self, queries, keys, values, valid_lens):
queries, keys = self.W_q(queries), self.W_k(keys)
features = queries.unsqueeze(2) + keys.unsqueeze(1)
features = torch.tanh(features)
scores = self.w_v(features).squeeze(-1)
self.attention_weights = masked_softmax(scores, valid_lens)
return torch.bmm(self.dropout(self.attention_weights), values)

缩放点积注意力
点积可以得到计算效率更高的评分函数,但要求查询和键具有相同的长度。为确保点积的方差在不考虑向量长度的情况下仍是1,则可使用缩放点积注意力(scaled dot-product attention)评分函数:
在实践中,通常从小批量角度来考虑提高效率,例如基于个查询和个键—值对计算注意力,其中查询和键的长度为,值的长度为。查询、键和值的缩放点积注意力是:
class DotProductAttention(nn.Module):
def __init__(self, dropout, **kwargs):
super(DotProductAttention, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
def forward(self, queries, keys, values, valid_lens=None):
d = queries.shape[-1] # queries的倒数第一维大小
scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d) # keys的1,2维度交换(0维开始)
self.attention_weights = masked_softmax(scores, valid_lens)
return torch.bmm(self.dropout(self.attention_weights), values)
演示效果:
五、多头注意力
定义
为了模型可以基于相同的注意力机制学习到不同的行为,然后将不同的行为作为知识组合起来,我们可以独立学习得到的组不同的线性投影
模型
给定查询、键和值,每个注意力头()的计算方法为:
其中,可学习的参数包括、和,以及注意力池化函数。多头注意力的输出需要经过另一个线性转换,对应个头连结后的结果,其可学习参数是:
这样每个头都可能会关注输入的不同部分,可表示比简单加权平均值更复杂的函数
模型实现:
def transpose_qkv(X, num_heads):
X = X.reshape(X.shape[0], X.shape[1], num_heads, -1)
X = X.permute(0, 2, 1, 3) # 任意交换维度
return X.reshape(-1, X.shape[2], X.shape[3])
class MultiHeadAttention(nn.Module):
def __init__(self, key_size, query_size, value_size, num_hiddens, num_heads, dropout, bias=False, **kwargs):
super(MultiHeadAttention, self).__init__(**kwargs)
self.num_heads = num_heads
self.attention = DotProductAttention(dropout)
self.W_q = nn.Linear(query_size, num_hiddens, bias=bias)
self.W_k = nn.Linear(key_size, num_hiddens, bias=bias)
self.W_v = nn.Linear(value_size, num_hiddens, bias=bias)
self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)
def forward(self, queries, keys, values, valid_lens):
queries = transpose_qkv(self.W_q(queries), self.num_heads)
keys = transpose_qkv(self.W_k(keys), self.num_heads)
values = transpose_qkv(self.W_v(values), self.num_heads)
if valid_lens is not None:
valid_lens = torch.repeat_interleave(valid_lens, repeats=self.num_heads, dim=0)
output = self.attention(queries, keys, values, valid_lens)
output_concat = transpose_output(output, self.num_heads)
return self.W_o(output_concat)

六、自注意力和位置编码
自注意力
- 给定一个词元组成的输入序列,其中任意()。该序列的自注意力输出为一个长度相同的序列,其中:
基于多头注意力对一个张量完成自注意力的计算,张量的形状为(批量大小,时间步的数目或词元序列的长度,)。输出与输入的张量形状相同
num_hiddens, num_heads = 100, 5
attention = MultiHeadAttention(num_hiddens, num_hiddens, num_hiddens, num_hiddens, num_heads, 0.5)
attention.eval()

batch_size, num_queries, valid_lens = 2, 4, torch.tensor([3, 2])
X = torch.ones((batch_size, num_queries, num_hiddens))
attention(X, X, X, valid_lens).shape

比较CNN、RNN和Self-Attention
- 目标:将由个词元组成的序列映射到另一个长度相等的序列,其中的每个输入词元或输出词元都由维向量表示,比较三种架构的计算复杂性、顺序操作和最大路径长度。
- 对于一个卷积核大小为的卷积层,序列长度是,输入和输出的通道数量都是,所以卷积层的计算复杂度为,有个顺序操作,最大路径长度为。
- 对于循环神经网络,权重矩阵的维隐藏状态的乘法计算复杂度为。序列长度是,所以RNN的计算复杂度为。有个顺序操作无法并行化,最大路径长度也是。
- 对于自注意力,查询、键和值都是矩阵。考虑缩放的“点-积”注意力,其中矩阵乘以矩阵,输出的矩阵乘以矩阵。所以自注意力有计算复杂性。每个词元都通过自注意力直接来连接到其他词元,因此有个顺序操作可以并行计算,最大路径长度也是。
- 综上,CNN和自注意力都拥有并行计算的优势,而自注意力的最大路径长度最短,但因其计算复杂度是关于序列长度的二次方,所以在很长的序列中计算会很慢
位置编码
在处理词元序列时,自注意力因并行计算而放弃了顺序操作,为使用序列的顺序信息,通过在输入中添加位置编码(positional encoding)来注入绝对的或相对的位置信息。位置编码可以通过学习得到也可以直接固定得到。
例如:基于正弦函数和余弦函数的固定位置编码:
假设输入表示包含一个序列中个词元的维嵌入表示。位置编码使用相同形状的位置嵌入矩阵输出,矩阵第行、第列上的元素为:
class PositionalEncoding(nn.Module):
def __init__(self, num_hiddens, dropout, max_len=1000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(dropout)
self.P = torch.zeros((1, max_len, num_hiddens))
X = torch.arange(max_len, dtype=torch.float32).reshape(-1, 1) / torch.pow(10000, torch.arange(0, num_hiddens, 2, dtype=torch.float32) / num_hiddens)
self.P[:, :, 0::2] = torch.sin(X) #切片,从下标0开始,间隔2
self.P[:, :, 1::2] = torch.cos(X)
def forward(self, X):
X = X + self.P[:, :X.shape[1], :].to(X.device) #取第2维的前X.shape[1]个元素
return self.dropout(X)
encoding_dim, num_steps = 32, 60
pos_encoding = PositionalEncoding(encoding_dim, 0)
pos_encoding.eval()
X = pos_encoding(torch.zeros((1, num_steps, encoding_dim)))
P = pos_encoding.P[:, :X.shape[1], :]
plt.figure(figsize=(6, 2.5))
plt.plot(torch.arange(num_steps), P[0, :, 6].T, label='Col 6')
plt.plot(torch.arange(num_steps), P[0, :, 7].T, label='Col 7')
plt.plot(torch.arange(num_steps), P[0, :, 8].T, label='Col 8')
plt.plot(torch.arange(num_steps), P[0, :, 9].T, label='Col 9')
plt.legend(loc='best')
plt.xlabel('Raw(position)')
plt.show()

可与看出第6,7列的频率高于第8,9列
- 绝对位置信息
在二进制表示中,较高比特位的交替频率低于较低比特位。位置编码通过使用三角函数在编码维度上降低频率。由于输出是浮点数,因此此类连续表示比二进制表示法更节省空间 - 相对位置信息
除了获得绝对位置信息外,上述位置编码还允许模型学习得到输入序列中相对位置信息。对于任何确定的位置偏移,位置处的位置编码可以线性投影位置处的编码来表示
七、Transformer
模型
- Transformer由编码器和解码器组成,其中编码器和解码器是基于自注意力的模块叠加而成,源序列和目标序列的嵌入表示将加上位置编码再分别输入到编码器和解码器中
- Transformer的编码器由多个相同的层组成,每个层有两个子层。第一个子层是多头自注意力池化(multi-head self-attention);第二个子层是基于位置的前馈网络(positionwise feed-forward network)。每个子层都采用残差连接。之后使用层归一化(layer normalization)
- Transformer的解码器也有多个相同的层构成。除了编码器中的两个子层外,解码器还在两层之间插入了第三个子层:编码器-解码器注意力(encoder-decoder attention)层。它的查询来自前一个解码器层的输出,键和值来自整个编码器的输出。在解码器自注意力中,查询、键和值都来自上一个解码器层的输出。解码器中的每个位置只能考虑该位置之前的所有位置,即遮蔽(masked)注意力,这保留了自回归(auto-regressive)属性,详细介绍参考来源于极市平台的Vision Transformer综述
基于位置的前馈网络
- 基于位置的前馈网络是基于位置的原因是:对序列中所有位置的表示进行变换时使用的是同一个MLP。
class PositionWiseFFN(nn.Module): def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs, **kwargs): super(PositionWiseFFN, self).__init__(**kwargs) self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens) self.relu = nn.ReLU() self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs) def forward(self, X): return self.dense2(self.relu(self.dense1(X)))
因为用同一个MLP对所有位置上的输入进行变换,所以当这些位置的而输入相同时,输出也相同
残差连接和层归一化
- 引用深度学习中的五种归一化(BN, LN, IN, GN, SN)简介中的对比,层归一化是基于特征维度进行归一化的。在NLP任务中(输入通常是变长序列)BN通常不如LN的效果好
- 对比LN与BN:

- 实现
AddNorm类:
Normalized_shape是输入第一维之后几维的大小:class AddNorm(nn.Module): def __init__(self, normalized_shape, dropout, **kwargs): super(AddNorm, self).__init__(**kwargs) self.dropout = nn.Dropout(dropout) self.ln = nn.LayerNorm(normalized_shape) def forward(self, X, Y): return self.ln(self.dropout(Y) + X)

编码器
- 编码器块实现:
Transformer编码器中的任何层不会改变输入的形状:class EncoderBlock(nn.Module): def __init__(self, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, use_bias=False, **kwargs): super(EncoderBlock, self).__init__(**kwargs) self.attention = d2l.MultiHeadAttention(key_size, query_size, value_size, num_hiddens, num_heads, dropout, use_bias) self.addnorm1 = AddNorm(norm_shape, dropout) self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens, num_hiddens) self.addnorm2 = AddNorm(norm_shape, dropout) def forward(self, X, valid_lens): Y = self.addnorm1(X, self.attention(X, X, X, valid_lens)) return self.addnorm2(Y, self.ffn(Y))

- 编码器实现:
Transformer编码器输出的形状是batch_size、序列长度和num_hiddens:class TransformerEncoder(d2l.Encoder): def __init__(self, vocab_size, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout, use_bias=False, **kwargs): super(TransformerEncoder, self).__init__(**kwargs) self.num_hiddens = num_hiddens self.embedding = nn.Embedding(vocab_size, num_hiddens) # 生成对应的词嵌入,每个符号用num_hiddens维表示 self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout) self.blks = nn.Sequential() for i in range(num_layers): self.blks.add_module("block"+str(i), EncoderBlock(key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, use_bias)) def forward(self, X, valid_lens, *args): # 因使用范围在-1和1之间的固定位置编码,用嵌入维度的平方和进行缩放 X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens)) self.attention_weights = [None] * len(self.blks) for i, blk in enumerate(self.blks): X = blk(X, valid_lens) self.attention_weights[i] = blk.attention.attention.attention_weights # 用于权重可视化 return X

解码器
- 对于序列到序列模型(sequence-to-squence model),在训练阶段其输出序列的所有位置的词元都是已知的,在测试阶段其输出序列的词元是逐个生成的。
- 解码器块实现:
class DecoderBlock(nn.Module): def __init__(self, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, i, **kwargs): super(DecoderBlock, self).__init__(**kwargs) self.i = i self.attention1 = d2l.MultiHeadAttention(key_size, query_size, value_size, num_hiddens, num_heads, dropout) self.addnorm1 = AddNorm(norm_shape, dropout) self.attention2 = d2l.MultiHeadAttention(key_size, query_size, value_size, num_hiddens, num_heads, dropout) self.addnorm2 = AddNorm(norm_shape, dropout) self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens, num_hiddens) self.addnorm3 = AddNorm(norm_shape, dropout) def forward(self, X, state): enc_outputs, enc_valid_lens = state[0], state[1] # 训练阶段,输出序列的所有词元已知,所以`state[2][self.i]`初始化为`None` # 预测阶段,输出序列词元是逐个生成,所以`state[2][self.i]`包含从开始到当前位置的解码输出表示 if state[2][self.i] is None: key_values = X else: key_values = torch.cat((state[2][self.i], X), axis=1) state[2][self.i] = key_values if self.training: batch_size, num_steps, _ = X.shape # 动态调整valid_lens dec_valid_lens = torch.arange(1, num_steps + 1, device=X.device).repeat(batch_size, 1) else: dec_valid_lens = None X2 = self.attention1(X, key_values, key_values, dec_valid_lens) Y = self.addnorm1(X, X2) Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens) Z = self.addnorm2(Y, Y2) return self.addnorm3(Z, self.ffn(Z)), state
- 解码器实现:
class TransformerDecoder(d2l.AttentionDecoder): def __init__(self, vocab_size, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout, **kwargs): super(TransformerDecoder, self).__init__(**kwargs) self.num_hiddens = num_hiddens self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, num_hiddens) self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout) self.blks = nn.Sequential() for i in range(num_layers): self.blks.add_module("block"+str(i), DecoderBlock(key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, i)) self.dense = nn.Linear(num_hiddens, vocab_size) def init_state(self, enc_outputs, enc_valid_lens, *args): return [enc_outputs, enc_valid_lens, [None] * self.num_layers] def forward(self, X, state): X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens)) self._attention_weights = [[None] * len(self.blks) for _ in range(2)] for i, blk in enumerate(self.blks): X, state = blk(X, state) # 解码器自注意力权重 self._attention_weights[0][i] = blk.attention1.attention.attention_weights # "编码器-解码器"自注意力权重 self._attention_weights[1][i] = blk.attention2.attention.attention_weights return self.dense(X), state @property # 修饰方法,使方法可以像属性一样访问 def attention_weights(self): return self._attention_weights
训练
- 指定Transformer的编码器和解码器都是2层,使用4头注意力。在“英-法”机器翻译数据集上训练:
num_hiddens, num_layers, dropout, batch_size, num_steps = 32, 2, 0.1, 64, 10 lr, num_epochs, device = 0.005, 200, d2l.try_gpu() ffn_num_input, ffn_num_hiddens, num_heads = 32, 64, 4 key_size, query_size, value_size = 32, 32, 32 norm_shape = [32] train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps) encoder = TransformerEncoder(len(src_vocab), key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout) decoder = TransformerDecoder(len(tgt_vocab), key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout) net = d2l.EncoderDecoder(encoder, decoder) d2l.train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
- 使用训练出的模型进行翻译,计算BLEU分数:
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .'] fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .'] for eng, fra in zip(engs, fras): translation, dec_attention_weight_seq = d2l.predict_seq2seq(net, eng, src_vocab, tgt_vocab, num_steps, device, True) print(f'{eng} => {translation}, ', f'bleu {d2l.bleu(translation, fra, k=2):.3f}')