transformer架构(带公式与代码)

本文最后更新于 2025年3月20日 晚上

NLP的救星:transformer架构
当然不只是NLP,如今transformer的注意力机制几乎可以做任何事。


一,传统RNN的缺陷

  • 只能串行,对于较长序列的问题效率较低。
  • 在处理长序列时难以捕捉到长期依赖关系,只能有效利用较短的上下文信息。
  • 反向传播时,由于参数共享和多次连乘的特性,容易出现梯度消失或梯度爆炸的问题,导致模型难以训练或无法收敛。

二,transformer(编码器)的核心——(self-attention,自注意力机制)

  1. 假设输入序列是 $ n $ 个词,每个词被编码为维度 $ d_{model} $(如 512)的向量,输入矩阵为:
    $$
    X \in \mathbb{R}^{n \times d_{\text {model }}}
    $$

  2. 自注意力通过三个可学习的权重矩阵,将输入映射为 $ Query(Q)、Key(K)、Value(V)$:
    $$
    Q=X \cdot W^{Q}, \quad K=X \cdot W^{K}, \quad V=X \cdot W^{V}
    $$
    $ W^{Q},W^{K},W^{V} \in \mathbb{R}^{d_{\text {model}} \times d_{k}} ,d_{k} $是每个头的维度(如 64)。

    • 每个词生成对应的 Q、K、V 向量,用于后续计算。
    • 在自注意力机制中,求的是序列内各词的关联度。
    • 在普通注意力机制中,计算一个序列(如源序列)与另一个序列(如目标序列)之间的关系。
  3. 对每个 Query 向量$Q_i$,计算它与所有 key 向量$K_j$的相似度:
    $$ \text{Attention Score}(Q_i,K_j)=\frac{Q_i\cdot K_j^T}{\sqrt{d_k}} $$

    • 点积:衡量两个向量的相似度(值越大越相关)。
    • 缩放因子$\sqrt{d_k}$:防止点积结果过大导致梯度消失(尤其在维度较高时)。
  4. 最后对注意力分数进行 Softmax 即可得到权重分布。

  5. 用注意力权重对 Value 向量加权求和,得到当前词的最终表示:
    $$
    \mathrm{Output}=\text{Attention Weights}\cdot V
    $$

  6. 多头注意力(Multi-Head Attention)
    为了捕捉不同类型的依赖关系,Transformer 使用 多个独立的注意力头:

    • 将 Q、K、V 拆分为 h 个头(如 8 个头),每个头维度为均分总维度。
    • 每个头独立计算注意力,得到 h 个输出矩阵。
    • 拼接所有头的输出,并通过线性变换合并为最终结果。

三,总体结构

  • Add:残差连接
  • Norm:层归一化

输入序列 → 词嵌入 + 位置编码 → 编码器(多头自注意力 + FFN) → 编码器输出

解码器输入 → 词嵌入 + 位置编码 → 掩码自注意力 → 编码器-解码器注意力 → FFN → 输出概率

3.1 整体架构

  • 编码器(Encoder)解码器(Decoder) 组成,堆叠多层(原始论文中为 6 层)。
  • 完全依赖自注意力机制,无需循环(RNN)或卷积(CNN)。
  • Q,K,V的权重矩阵通过解码器的输出进行反向传播。

3.2 编码器(Encoder)

作用:将输入序列转换为一系列富含上下文信息的表示。
每层编码器结构:

  1. 多头自注意力层(Multi-Head Self-Attention)
    • 计算输入序列内部关系,拆分多个头并行处理。
  2. 前馈神经网络(FFN)
    • 每个位置独立处理,维度扩展后压缩(如 512 → 2048 → 512)。

每小层附加操作

  • 残差连接:每个子层输出与输入相加。
  • 层归一化:对残差结果进行归一化。

3.3 解码器(Decoder)

作用:根据编码器的输出和已生成的部分输出序列,逐步生成目标序列。
每层解码器结构:

  1. 掩码多头自注意力层(Masked Multi-Head Self-Attention)
    • 通过掩码避免模型看到未来信息。就是给注意力分数加上值,后面的变量减无穷大使得分数被抑制,否则加0。
    • 同时还可以调整模型动态输入不同长度的序列。
  2. 编码器-解码器注意力层(Cross-Attention)
    • Key 和 Value 来自编码器输出,Query 来自解码器输入。
  3. 前馈神经网络(FFN):与编码器结构相同。

每小层附加操作

  • 残差连接 + 层归一化(每个子层后应用)。

3.4 输入处理

  1. 词嵌入(Embedding)
    • 将词转换为固定维度向量(如 512 维)。
  2. 位置编码(Positional Encoding)
    • 使用正弦/余弦函数或可学习参数生成位置信息。
    • 公式示例(正弦编码):$ PE_{(pos, 2i)} = \sin\left(\frac{pos}{10000^{2i/d}}\right), \quad PE_{(pos, 2i+1)} = \cos\left(\frac{pos}{10000^{2i/d}}\right)$

3.5 注意力机制

单头注意力公式

$$
\text{f}(Q) = \text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V
$$

多头注意力

  • 将 (Q, K, V) 拆分到多个头(如 8 头),独立计算后拼接结果。

  • Query(Q):表示当前词希望“查询”其他词的信息。
  • Key(K):表示其他词提供的“索引标签”,用于与 Query 匹配。
  • Value(V):表示其他词的实际内容,最终通过权重聚合到输出中。
场景输入来源不同值的原因
自注意力同一层的同一输入序列投影矩阵不同
交叉注意力Q:解码器输入;K/V:编码器输出来源不同层

3.6 输出生成

  • 解码器最后一层输出通过线性层 + Softmax 生成概率分布。
  • 训练:Teacher Forcing(输入真实历史词)。
  • 推理:自回归生成(逐步预测词)。

3.7 任务类型

  1. 编码器-解码器架构(如机器翻译)
  2. 仅编码器架构(如文本分类)
  3. 仅解码器架构(如文本生成)
  4. 对于不同长度的输入序列,分别生成 Q、K、V 矩阵,并计算自注意力。或使用“填充”与“掩码”。

四,具体结构问题

  • 编码器和解码器是独立的模块,但解码器的每一层都会通过 交叉注意力(Cross-Attention) 访问编码器的最终输出
    • 输入序列 → [编码器层1 → 编码器层2 → … → 编码器层N] → 编码器最终输出 ↓↓↓
      解码器层1 → 解码器层2 → … → 解码器层N → 输出概率
  • 编码器与解码器宏观上都只有一个,但是它们里面都可以添加多层,例如编码器中有多个(多头自注意力+FFN),解码器中有多个(掩码多头自注意力+交叉注意力+FFN)

五,利用编码器对中文句子进行分类

  • 这里的代码只用到了编码器,并不牵扯解码器。因为两者都用一般用于语言翻译、对话等任务,这里只进行分类,因此没必要。
  • 用解码器时输出会变得很难,本人技术还不到位。有待提高。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pickle
import numpy as np
from tqdm import tqdm

# 加载 npz 文件
npz = np.load(".\data\embedding_Tencent.npz")
#print(len(npz['embeddings'])) # 输出文件中包含的数组名称
# 打开文件并读取词汇表
with open(".\data\\vocab.pkl", "rb") as file:
vocab = pickle.load(file)

# 数据预加载
train_dataset = []
with open('./data/train.txt','r', encoding="utf-8") as file:
while True:
line = file.readline() # 使用readline()逐行读取
if not line: # 如果读到文件末尾,line为空字符串,退出循环
break
columns = line.strip().split("\t") # 使用strip()去除首尾空白字符,然后按Tab分割
train_dataset.append(columns)
test_dataset = []
with open('./data/dev.txt','r', encoding="utf-8") as file:
while True:
line = file.readline() # 使用readline()逐行读取
if not line: # 如果读到文件末尾,line为空字符串,退出循环
break
columns = line.strip().split("\t") # 使用strip()去除首尾空白字符,然后按Tab分割
test_dataset.append(columns)

for i in range(len(train_dataset)):
ind1 = []
for j in range(20):
try:
ind1.append(vocab[train_dataset[i][0][j]])
except KeyError:
ind1.append(vocab['<UNK>'])
except IndexError:
ind1.append(vocab['<PAD>'])
train_dataset[i][0] = ind1

for i in range(len(test_dataset)):
ind2 = []
for j in range(20):
try:
ind2.append(vocab[test_dataset[i][0][j]])
except KeyError:
ind2.append(vocab['<UNK>'])
except IndexError:
ind2.append(vocab['<PAD>'])
test_dataset[i][0] = ind2

class CustomDataset(Dataset):
def __init__(self, data):
"""
初始化数据集
data: 原始数据集,格式为 [[features, label], ...]
"""
self.data = data
def __len__(self):
#返回数据集的大小
return len(self.data)
def __getitem__(self, idx):
"""
根据索引返回一个样本及其标签
idx: 样本索引
return: (features, label)
"""
features, label = self.data[idx]
# 将 features 转换为 Tensor
features = torch.tensor(features, dtype=torch.long)
# 将标签转换为 Tensor(假设标签是整数)
label = torch.tensor(int(label), dtype=torch.long)
return features, label
train_dataset = CustomDataset(train_dataset)
test_dataset = CustomDataset(test_dataset)
# 创建DataLoader
train_loader = DataLoader(
train_dataset,
batch_size=8,
shuffle=False)
test_loader = DataLoader(
test_dataset,
batch_size=8,
shuffle=False)

# 词嵌入二维列表
embedding_matrix = torch.tensor(npz['embeddings'], dtype=torch.float)
# 检查词嵌入的维度
vocab_size, embedding_dim = embedding_matrix.shape
print(f"词汇表大小: {vocab_size}, 词嵌入维度: {embedding_dim}")

class TransformerModel(nn.Module):
def __init__(self, input_dim, model_dim, num_heads, num_encoder_layers, dim_feedforward, dropout=0.1, embedding_matrix=None):
super(TransformerModel, self).__init__()
# 嵌入层
self.embedding = nn.Embedding(input_dim, model_dim)
if embedding_matrix is not None:
self.embedding.weight = nn.Parameter(embedding_matrix, requires_grad=True) # 使用预训练的词嵌入
# Transformer Encoder
encoder_layer = nn.TransformerEncoderLayer(d_model=model_dim, nhead=num_heads, dim_feedforward=dim_feedforward, dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
# 输出层
self.fc_out = nn.Linear(model_dim, 10) # num_classes 是标签的数量
def forward(self, src):
# 嵌入输入
src_embedded = self.embedding(src) # 输出(batch_size, seq_len, model_dim)
# 输入(seq_len, batch_size, model_dim)
transformer_output = self.transformer_encoder(src_embedded.permute(1, 0, 2))
# 恢复维度
transformer_output = transformer_output.permute(1, 0, 2)
# 取编码器的最后一个时间步或全局池化
# 这里假设取最后一个时间步的隐藏状态
pooled_output = transformer_output[:, -1, :]
# 输出层
output = self.fc_out(pooled_output)
return output

# 参数设置
input_dim = 4762 # 词汇表大小
model_dim = 200 # 模型维度,将每个单词或标记转换为多少维的密集向量
num_heads = 4 # 编码器头数,确保 model_dim 能被 num_heads 整除
num_encoder_layers = 2 # 编码器层数,包含多头注意力层和前馈网络层
dim_feedforward = 400 # 前馈网络的维度,通常设置为 model_dim 的 2-4 倍
dropout = 0.1 # Dropout概率
num_classes = 10 # 假设有 10 个类别

# 初始化模型
model = TransformerModel(input_dim, model_dim, num_heads, num_encoder_layers, dim_feedforward, dropout, embedding_matrix)
criterion = nn.CrossEntropyLoss() # 分类任务使用交叉熵损失,包含softmax
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

# 训练函数
def train(model, train_loader, criterion, optimizer, device):
model.train()
total_loss = 0
for src, tgt in train_loader:
src, tgt = src.to(device), tgt.to(device)
# 前向传播
optimizer.zero_grad()
output = model(src) # 只传入 src
# 计算损失
loss = criterion(output, tgt) # tgt 的形状是 [batch_size]
# 反向传播和优化
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(train_loader)

# 测试函数
def evaluate(model, test_loader, criterion, device):
model.eval()
correct = 0
total = 0
total_loss = 0
with torch.no_grad():
for src, tgt in test_loader:
src, tgt = src.to(device), tgt.to(device)
# 前向传播
output = model(src)
_, predicted = torch.max(output.data, 1)
total += tgt.size(0)
correct += (predicted == tgt).sum().item()
# 计算损失
loss = criterion(output, tgt)
total_loss += loss.item()
print(f'Accuracy of the network on the 10000 test: {100 * correct / total:.2f}%')
return total_loss / len(test_loader)
# Accuracy of the network on the 10000 test: 85.40%
# 设备设置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# 训练和测试循环
num_epochs = 1
for epoch in tqdm(range(num_epochs)):
train_loss = train(model, train_loader, criterion, optimizer, device)
test_loss = evaluate(model, test_loader, criterion, device)
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')

# 保存模型
torch.save(model.state_dict(), "transformer_classify_model.pth")

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import torch
import torch.nn as nn
import pickle
import numpy as np

# 加载 npz 文件
npz = np.load(".\data\embedding_Tencent.npz")
# 打开文件并读取词汇表
with open(".\data\\vocab.pkl", "rb") as file:
vocab = pickle.load(file)

class TransformerModel(nn.Module):
def __init__(self, input_dim, model_dim, num_heads, num_encoder_layers, dim_feedforward, dropout=0.1, embedding_matrix=None):
super(TransformerModel, self).__init__()
# 嵌入层
self.embedding = nn.Embedding(input_dim, model_dim)
if embedding_matrix is not None:
self.embedding.weight = nn.Parameter(embedding_matrix, requires_grad=False) # 使用预训练的词嵌入
# Transformer Encoder
encoder_layer = nn.TransformerEncoderLayer(d_model=model_dim, nhead=num_heads, dim_feedforward=dim_feedforward, dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
# 输出层
self.fc_out = nn.Linear(model_dim, 10) # num_classes 是标签的数量
def forward(self, src):
# 嵌入输入
src_embedded = self.embedding(src) # 输出(batch_size, seq_len, model_dim)
# 输入(seq_len, batch_size, model_dim)
transformer_output = self.transformer_encoder(src_embedded.permute(1, 0, 2))
# 恢复维度
transformer_output = transformer_output.permute(1, 0, 2)
# 取编码器的最后一个时间步或全局池化
# 这里假设取最后一个时间步的隐藏状态
pooled_output = transformer_output[:, -1, :]
# 输出层
output = self.fc_out(pooled_output)
return output

# 参数设置
input_dim = 4762 # 词汇表大小
model_dim = 200 # 模型维度,将每个单词或标记转换为多少维的密集向量
num_heads = 4 # 编码器头数,确保 model_dim 能被 num_heads 整除
num_encoder_layers = 2 # 编码器层数,包含多头注意力层和前馈网络层
dim_feedforward = 400 # 前馈网络的维度,通常设置为 model_dim 的 2-4 倍
dropout = 0.1 # Dropout概率
num_classes = 10 # 假设有 10 个类别

# 词嵌入二维列表
embedding_matrix = torch.tensor(npz['embeddings'], dtype=torch.float)
# 检查词嵌入的维度
vocab_size, embedding_dim = embedding_matrix.shape
print(f"词汇表大小: {vocab_size}, 词嵌入维度: {embedding_dim}")

# 初始化模型
model = TransformerModel(input_dim, model_dim, num_heads, num_encoder_layers, dim_feedforward, dropout, embedding_matrix)
model.load_state_dict(torch.load('transformer_classify_model.pth'))
# 将模型设置为评估模式
model.eval()
xx = '中国和美国关系不是很好'
categories = [
"finance",
"realty",
"stocks",
"education",
"science",
"society",
"politics",
"sports",
"game",
"entertainment"]
x = []
for j in range(20):
try:
x.append(vocab[xx[j]])
except KeyError:
x.append(vocab['<UNK>'])
except IndexError:
x.append(vocab['<PAD>'])

# 预测
with torch.no_grad():
output = model(torch.tensor(x).unsqueeze(0))
print(output)
_, predicted = torch.max(output, 1)
print(f"Predicted class: {categories[predicted.item()]}")

transformer架构(带公式与代码)
https://jimes.cn/2025/01/22/transformer/
作者
Jimes
发布于
2025年1月22日
许可协议