使用pytorch从0构建Transformer
Transformer是一种序列到序列的模型。它通过自注意力机制并行处理整个序列。这种机制使模型能够同时考虑序列中的所有元素,并学习上下文之间的关系。
Transformer的架构包括编码器和解码器部分,每部分都由多个相同的层组成。这些层包含自注意力机制和前馈神经网络,加上归一化和Dropout步骤。
1 2 3 4 5 6 7 8 9 import math import torchimport torch.nn as nnfrom labml_helpers.module import Module from labml_nn.utils import clone_module_list from typing import Optional ,List from torch.utils.data import DataLoader,TensorDataset from torch import optimimport torch.nn.functional as F
1. 自注意力计算
核心公式: $$ Attention(Q,K,V) =softmax(\frac{QK^T}{\sqrt{d_K}})V $$ 其中,Q,K,V 分别是查询(Query)、键(Key)和值(Value)矩阵,dk是键的维度。
多头注意力:将输入分割为多个头,分别计算注意力,然后将结果拼接起来。
位置编码:由于Transformer不使用循环结构,因此引入位置编码来保留序列中的位置信息。
2.前馈网络(FeedForward) 前馈网络属于Encoder的一部分,
定义一个前馈网络类,用于Transformer中的前馈层。
全连接前馈网络子层:这由两个线性变换构成,中间加了一个ReLU的激活函数: $$ FFN(x) = max(0,xW_1+b_1)W_2+b_2 $$ 其中$W_1, W_2$和$b_1,b_2$是线性变换的权重和偏置。此外,这两个子层都有残差连接。每个子层的输Attention 出是它的输入与子层自身的输出之和,然后经过层归一化: $$ LayerNorm(x+Sublayer(x)) $$ 其中$Sublayer(x)$是子层自身的输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 class FeedForward (Module ): def __init__ (self,d_model: int , d_ff:int , dropout:float =0.1 , activation=nn.ReLU( ), is_gated: bool = False , bias1:bool =True , bias2: bool =True , bias_gate: bool =True ): super ()._init_() #初始化第一层线性变换,输入维度为 d_model,输出维度为 d_ff self .layer1 = nn.Linear(d_model,d_ff,bias=bias1) # 初始化第二层线性变换,输入维度为 d_ff,输出维度为 d_model self .layer2 = nn.Linear(d_ff,d_model,bias=bias2) #初始化 dropout 层,用于在训练过程中随机关闭部分神经元 self .dropout =nn.Dropout(dropout) #激活函数,用于引入非线性 self .activation =activation #是否启用门控机制 self .is_gated = is_gated if is_gated:#如果启用门控机制,初始化另一层线性变换,用于门控 self .linear_v = nn.Linear(d_model,d_ff,bias=bias_gate) def forward (self, x: torch.Tensor ): #应用第一层线性变换和激活函数 g= self .activation(self .layer1(x)) #如果启用了门控机制 if self .is_gated: #应用门控操作 x=g*self .linear_v(x) else : x=g # 应用 dropout x=self .dropout(x) #应用第二层线性变换 return self .layer2(x)
3. 多头注意力(MultiHeadAttention)
多头自注意力机制子层:在这个机制中,会有多个头(通常为8或更多)并行处理信息。每个头对查询(Query)、键(Key)和值(Value)进行不同的线性变换,然后并行产生输出,最终这些输出被组合以生成最终结果。
多头注意力的公式可以表示为: $$ MultiHead(Q,K,V) =Concat(head1,…,head_h)W^O $$
其中
·$Q,K,V$ 分别是查询(Query)、键(Key)和值(Value)。
·$W^Q_i,W^K_i,W^V_i$:是线性变换的权重矩阵。
·$head_i$是第i个注意力头的输出。
·$W^O$是将所有头的输出合并后的线性变换权重矩阵。
每个注意力头 head;计算的是缩放点积注意力: $$ Attention(Q,K,V) =softmax(\frac{QK^T}{\sqrt{d_K}})V $$ 这里$\sqrt{d_K}$是缩放因子,用于控制内积的大小。多头注意力通过这种方式使模型能够在不同的表示子空间中捕捉序列的不同特征,增强了模型的表达能力。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 class PrepareForMultiHeadAttention (nn.Module): def _init_ (self, d_model: int , heads: int , d_k: int , bias: bool ): super ()._init_() #初始化线性变换层,用于将输入转换为多头注意力所需的形状 self .linear= nn.Linear(d_model,heads * d_k,bias=bias) #注意力头的数量 self .heads=heads #每个头中向量的维度 self .d_k=d_k def forward (self, x: torch.Tensor ): #获取输入的初始形状,用于之后的变形操作 head_shape = x.shape[:-1 ] #应用线性变换 x=self .linear(x) #将最后一个维度分割成多个头,并为每个头分配 d_k维度 x=x.view(*head_shape, self .heads, self .d_k) #返回的形状是[seq_len ,batch_size,heads,d_k]或 [batch_size,heads, d_model] return x class MultiHeadAttention (nn.Module): def _init__ (self,heads: int , d_model: int , dropout_prob: float = 0.1 , bias: bool =True ): super ()._init_() #计算每个注意力头的维度 self .d_k=d_model // heads #注意力头的数量 self .heads=heads #分别为query、key和 value 初始化线性层 self .query = PrepareForMultiHeadAttention(d_model,heads,self .d_k, bias=bias) self .key = PrepareForMultiHeadAttention(d_model,heads,self .d_k, bias=bias) self .value = PrepareForMultiHeadAttention(d_model,heads,self .d_k, bias=True ) #初始化 softmax 层,用于注意力权重的计算 self .softmax=nn.Softmax(dim=1 ) #输出层,将多头注意力的输出合并为一个张量 self .output = nn.Linear(d_model,d_model) # Dropout 层 self .dropout = nn.Dropout(dropout_prob) #注意力计算前的缩放因子 self .scale=1 / math.sqrt(self .d_k) #存储注意力权重,用于调试或其他计算 self .attn=None def get_scores (self,query: torch.Tensor, key: torch.Tensor ): # 计算 query 和key的点积,用于注意力权重 return torch.einsum('ibhd,jbhd->ijbh' ,query,key) def prepare_mask (self, mask: torch.Tensor, query_shape: List [int ], key_shape: List [int ] ): # 调整掩码的形状以匹配 query 和 key的维度 if mask.dim() == 2 : mask = mask.unsqueeze(0 ) # 增加 batch_size 维度 if mask.size(1 ) == query_shape[0 ] and mask.size(2 ) == key_shape[0 ]: mask=mask.unsqueeze(1 ) #增加 head 维度 else : raise ValueError("Mask shape does not match with query and key shapes in MultiHeadAttention" ) return mask def forward (self, *, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, mask: Optional [torch.Tensor]= None ): #获取序列长度和批次大小 seq_len,batch_size,_= query.shape #如果提供掩码,则进行调整 if mask is not None : mask= self .prepare_mask(mask, query.shape, key.shape) #准备 query、key 和 value query=self .query(query) key=self .key(key) value= self .value(value) #计算注意力得分 scores = self .get_scores(query,key) #应用缩放因子 scores *= self .scale if mask is not None : scores = scores.masked_fill(mask == 0 , float ('-inf' )) attn = self .softmax(scores) attn = self .dropout(attn) x = torch.einsum("ijbh,jbhd->ibhd" , attn,value) self .attn = attn.detach() x = x.reshape(seq_len, batch_size, -1 ) return self .output(x)
4. 位置编码(PositionalEncoding)和嵌入 • 实现位置编码和结合位置编码的嵌入层。
位置编码(Positional Encoding)是用来给模型提供关于局部特征在序列中位置的信息。由于 Transformer 没有递归或卷积结构,它本身无法区分序列中不同位置的单词。位置编码通过为每个位置的单词添加唯一的编码来解决这个问题。 位置编码是通过正弦和余弦函数生成的: $$ PE_{(pos,2i)} = sin(\frac{pos}{1000^{\frac{2i}{d_{model}}}}) $$
$$ PE_{(pos,2i+1)} = cos(\frac{pos}{1000^{\frac{2i}{d_{model}}}}) $$
其中, $pos$是位置索引,$i$是维度索引, $d_{model}$是模型的维度。这些编码对于每个维度的索引是交替的,其中偶函数索引使用的是正弦函数,奇函数索引使用的是余弦函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 class PositionalEncoding (nn.Module): def __init__ (self, d_model: int , dropout_prob: float , max_len: int = 5000 ): super ().__init__() self .dropout = nn.Dropout(dropout_prob) self .register_buffer ('positional_encodings' , get_positional_encoding(d_model, max_len), False ) def forward (self , x: torch.Tensor): pe = self .positional_encodings[:x.shape[0 ]].detach().requires_grad_(False ) x = x + pe x = self .dropout (x) return x def get_positional_encoding (d_model: int , max_len: int = 5000 ): encodings = torch.zeros(max_len, d_model) position = torch.arange(0 , max_len, dtype=torch.float32).unsqueeze (1 ) two_i = torch.arange(0 , d_model, 2 , dtype=torch.float32) div_term= torch.exp(two_i (math.log(10000.0 ) / d_model)) encodings[:, 0 ::2 ] = torch.sin(position * div_term) encodings[:, 1 ::2 ] = torch.cos (position div_term) encodings encodings.unsqueeze (1 ).requires_grad_(False ) return encodings class EmbeddingsWithPositionalEncoding (nn.Module): def __init__ (self, d_model: int , n_vocab: int , max_len: int = 5000 ): super ().__init__() self .linear = nn. Embedding (n_vocab, d_model) self .d_model = d_model self .register_buffer ('positional_encodings' , get_positional_encoding(d_model, max_len)) def forward (self , x: torch. Tensor): pe = self .positional_encodings[:x.shape[0 ]].requires_grad_(False ) return self .linear(x) math.sqrt(self .d_model) + pe class EmbeddingsWithLearned PositionalEncoding (nn. Module): def __init__ (self, d_model: int , n_vocab: int , max_len: int = 5000 ): super ().__init__() self .linear = nn. Embedding (n_vocab, d_model) self .d_model = d_model self .positional_encodings = nn.Parameter(torch.zeros(max_len, 1 ,d_model), requires_grad=True ) def forward (self, x: torch.Tensor ): pe = self .positional_encodings[:x.shape[0 ]] return self .linear(x) * math.sqrt(self .d_model) + pe
• 定义一个 Transformer层,包括自注意力和前馈网络。 • 实现编码器(Encoder)和解码器(Decoder) • 输出生成器,用于从解码器输出生成最终预测 • 最后组合编码器、解码器、嵌入层和生成器来构建完整的 Transformer 模型 在 Transformer 模型中,编码器(Encoder)和 解码器(Decoder)是构成序列到序列学习框架 的两个主要部分。
编码器 负责处理输入序列。
它由多个相同的层组成,每层包含两个核心子层:多头自注意力机制和前馈神经网络。
解码器 负责生成输出序列。 它的结构类似于编码器,但每层中有三个子层: 解码器自身的多头自注意力机制、编码器-解码器注意力机制和前馈神经网络。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 class TransformerLayer (nn.Module): def __init__ (self, *, d_model: int , self_attn: MultiHeadAttention, src_attn: MultiHeadAttention = None , feed_forward: FeedForward, dropout_prob: float ): super ().__init__() self .size = d_model self .self_attn = self_attn self .src_attn = src_attn self .feed_forward = feed_forward self .dropout = nn.Dropout(dropout_prob) self .norm_self_attn = nn.LayerNorm([d_model]) if self .src_attn is not None : self .norm_src_attn = nn.LayerNorm([d_model]) self .norm_ff = nn.LayerNorm([d_model]) self .is_save_ff_input = False def forward (self, *, x: torch.Tensor, mask: torch.Tensor, src: torch.Tensor = None , src_mask: torch.Tensor = None ): z = self .norm_self_attn(x) self_attn = self .self_attn(query=z, key=z, value=z, mask=mask) x = x + self .dropout(self_attn) if src is not None : z = self .norm_src_attn(x) attn_src = self .src_attn(query=z, key=src, value=src,mask=src_mask) x = x + self .dropout(attn_src) z = self .norm_ff(x) ff = self .feed_forward(z) x = x + self .dropout(ff) return x
Encoder 主要组成部分: **1.多头自注意力机制子层:**在这个机制中,会有多个头(通常为8或更多)并行处理信息。每个头对查询(Query)、键(Key)和值(Value) 进行不同的线性变换,然后并行产生输出,最终这些输出被组合以生成最终结果。
**2.全连接前馈网络子层:**这个子层由两个线性变换构成,中间加入了 ReLU 激活函数: $FFN(x) = max(0, xW_1 +b_1)W_2 + b_2$,其中$W_1, W_2$ 和 $b_1, b_2$是线性变换的权重和偏置。
每层使用不同的权重$(W)$和偏置$(b)$参数。此外,这两个子层都有残差连接。每个子层的输出是它的输入与子层自身的输出之和,然后经过层归一化: $$ LayerNorm(x+Sublayer(x)) $$
其中 $Sublayer(x)$是子层自身的输出。 由于 Transformer 架构本身无法捕捉序列中词的相对位置信息(因为它不使用递归结构),所以 必须通过在输入嵌入中引入位置编码来注入这种 信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class Encoder (nn.Module): def __init__ (self, layer: TransformerLayer, n_layers: int ): super ().__init__() self .layers = clone_module_list(layer, n_layers) self .norm = nn.LayerNorm([layer.size]) def forward (self, x: torch.Tensor, mask: torch.Tensor ): for layer in self .layers: x = layer(x=x, mask=mask) return self .norm(x)
Decoder 主要组成部分: 解码器同样由6层相同的层组成,每层包含三个子层:1.第一子层——>多头自注意力 :这个子层处理来自解码器前一层的输出,并加入位置信息。它实现了多头自注意力机制。与编码器能关注输入序列中全局信息不同,解码器修改为只关注前面的信息。在多头注意力机制中,通过引入掩码来实现这一点,掩码会抑制在矩阵乘法 $QKT$ 中对非法连接对应的值。2.第二子层——>编码器-解码器注意力 :这个子层 与编码器中第一个子层的多头自注意力机制类似。这使得解码器能够关注输入的全局信息。
3.第三子层——>全连接前馈网络 :与编码器中第二个子层类似的全连接前馈网络。
此外,解码器的这三个子层也都具有残差连接, 并由归一化层跟随。
与编码器一样,解码器的输入嵌入也加入了位置 编码,以同样的方式注入位置信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class Decoder (nn.Module): def __init__ (self, layer: TransformerLayer, n_layers: int ): super ().__init__() self .layers = clone_module_list(layer, n_layers) self .norm = nn.LayerNorm([layer.size]) def forward (self, x: torch.Tensor, memory: torch.Tensor, src_mask:torch.Tensor, tgt_mask: torch.Tensor ): for layer in self .layers: x = layer(x=x, mask=tgt_mask, src=memory, src_mask=src_mask) return self .norm(x)
组合encoder和decoder: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 class Generator (nn.Module): def __init__ (self, n_vocab: int , d_model: int ): super ().__init__() self .projection = nn.Linear(d_model, n_vocab) def forward (self , x): return self .projection(x) class EncoderDecoder (nn.Module): def __init__ (self, encoder: Encoder, decoder: Decoder, src_embed: nn.Module, tgt_embed: nn.Module, generator: nn.Module ): super ().__init__() self .encoder = encoder self .decoder = decoder self .src_embed src_embed self .tgt_embed= tgt_embed self .generator = generator for p in self .parameters(): if p.dim() > 1 : nn.init.xavier_uniform_(p) def forward (self, src: torch.Tensor, tgt: torch.Tensor, src_mask: torch.Tensor, tgt_mask: torch.Tensor ): enc = self .encode(src, src_mask) return self .decode (enc, src_mask, tgt, tgt_mask) def encode (self, src: torch. Tensor, src_mask: torch. Tensor ): return self .encoder (self .src_embed (src), src_mask) def decode (self, memory: torch.Tensor, src_mask: torch.Tensor, tgt: torch.Tensor, tgt_mask: torch.Tensor ): return self .decoder (self .tgt_embed (tgt), memory, src_mask, tgt_mask)
6.参数设置和设备检查 • 设置模型参数,如词汇量大小、模型维度等,并检查是否有GPU 可用
1 2 3 4 5 6 7 8 9 10 11 12 n_vocab = 100 d_model = 512 n_layers = 3 heads = 8 d_ff = 2048 dropout 0.1 device = torch.device ("cuda" if torch.cuda.is_available() else "cpu" ) print (f"Using device: {device} " )
7. 数据生成函数 • 定义一个函数来生成随机数据,用于模型训练和测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def generate_random_data (seq_len, batch_size, n_vocab, dataset_size, device): src = torch.randint(0 , n_vocab, (dataset_size, seq_len), device-device) tgt = torch.randint(0 , n_vocab, (dataset_size, seq_len), device=device) src_mask = torch.ones(dataset_size, 1 , seq_len, device-device) tgt_mask( torch.tril( torch.ones (seq_len, seq_len, device-device) ) == 1 ).unsqueeze(0 ).repeat(dataset_size, 1 , 1 ) return src, tgt, src_mask, tgt_mask seq_len = 10 batch_size = 32 dataset_size = 1000 src, tgt, src_mask, tgt_mask = generate_random_data(seq_len, batch_size, n_vocab, dataset_size, device) test_src, test_tgt, test_src_mask, test_tgt_mask = generate_random_data(seq_len,batch_size,n_vocab,dataset_size, device)
8.数据加载器和训练准备 • 使用 TensorDataset 和 DataLoader 准备训练和测试数据。 • 构建相关的 encoder 和 decoder,优化器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 train_dataset = TensorDataset(src, tgt, src_mask, tgt_mask) train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True ) test_dataset = TensorDataset(test_src, test_tgt, test_src_mask, test_tgt_mask) test_dataloader = DataLoader(test_dataset, batch_size-batch_size, shuffle=True ) self_attn = MultiHeadAttention(heads, d_model) feed_forward = FeedForward(d_model, d_ff) transformer_layer = TransformerLayer(d_model=d_model, self_attn=self_attn, feed_forward-feed_forward, dropout_prob=dropout) encoder Encoder(transformer_layer, n_layers) decoder = Decoder(transformer_layer, n_layers) src_embed= EmbeddingsWithPositionalEncoding(d_model, n_vocab) tgt_embed= EmbeddingsWithPositionalEncoding(d_model, n_vocab) generator = Generator(n_vocab, d_model) model = EncoderDecoder(encoder, decoder, src_embed, tgt_embed, generator) device = torch.device ("cuda" if torch.cuda.is_available() else "cpu" ) model.to(device) optimizer = optim.Adam (model.parameters(), lr=0.0001 )
9.模型训练 ● 定义训练循环,包括前向传播、损失计算、反向传播和参数更新。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 epochs=3 model.train() for epoch in range (epochs): total_loss = 0 for src, tgt, src_mask, tgt_mask in train_dataloader: optimizer.zero_grad() output = model (src, tgt, src_mask, tgt_mask) loss = F.cross_entropy (output.view(-1 , n_vocab), tgt.view(-1 ),ignore_index=0 ) loss.backward () optimizer.step() total_loss += loss.item() print (f"Epoch {epoch+1 } , Loss: {total_loss/len (train_dataloader)} " )
10.模型测试 • 定义测试循环,计算模型在测试数据上的准确率。
1 2 3 4 5 6 7 8 9 10 11 model.eval () total_accuracy = 0 with torch.no_grad(): for src, tgt, src_mask, tgt_mask in test_dataloader: output= model (src, tgt, src_mask, tgt_mask) output_max = output.argmax (dim=-1 ) correct (output_max == tgt).sum ().item() total_accuracy += correct / (tgt.size(0 ) * tgt.size(1 )) print (f"Accuracy: {total_accuracy/len (test_dataloader)} " )