Transformer大语言模型架构原理学习笔记
1. 模型架构与优化方法
在架构里面,我们做的就是把输入的x映射到输出的y,这个映射过程就是模型架构。但是如果模型里面的参数或者是这个模型是随机的,那么这个模型就没有意义,所以我们需要优化方法来优化模型。
就笔者的理解,模型内部实际上就是一堆矩阵(线性变换),与输入x进行矩阵乘法,得到输出y。通过训练,我们希望模型内部的矩阵能够尽量使得输入x映射到输出y。
但是完全的映射是不能的,因为输入和输出都是离散 的,所以我们需要一个函数 来衡量模型的预测结果与真实结果之间的差距,然后通过优化方法 来最小化这个函数 ,从而优化模型内部的矩阵。
这个函数就是所谓的损失函数,常见的损失函数有均方误差、交叉熵 等。(交叉熵是分类任务常用的损失函数,均方误差是回归任务常用的损失函数)
优化方法有梯度下降、牛顿法等。
这里Transformer模型架构的学习我采用了交叉熵 (原因参见下文)作为损失函数,采用梯度下降作为优化方法(这个算法成本最低,其他的都还不懂)。
2. 大语言模型架构
Transformer大语言模型是一种基于自注意力机制的深度神经网络模型,用于处理自然语言处理任务。Transformer模型的主要架构包括编码器和解码器,以及它们之间的连接。
Transformer大语言模型的主要特点包括:
自注意力机制 :Transformer模型使用自注意力机制来计算序列中每个元素与其他元素之间的关系,从而捕捉序列中的长距离依赖关系。自注意力机制通过计算查询、键和值向量之间的点积来计算注意力权重,然后使用这些权重对值向量进行加权求和,得到每个元素的上下文表示。
位置编码 :Transformer模型使用位置编码 来为序列中的每个元素添加位置信息,以便模型能够区分序列中的不同位置。位置编码通常使用正弦和余弦函数来生成。
前馈神经网络 :Transformer模型中的每个编码器和解码器都包含一个前馈神经网络,用于对输入进行非线性变换 。前馈神经网络由两个线性层和一个非线性激活函数组成。
多头注意力 :Transformer模型使用多头注意力机制来捕捉序列中每个元素与其他元素之间的关系 。多头注意力机制通过将输入序列分成多个子序列,并使用不同的查询、键和值向量 来计算注意力权重,从而捕捉序列中的不同特征。
残差连接 :Transformer模型使用残差连接来缓解深层神经网络中的梯度消失问题 。残差连接通过将输入直接添加到前馈神经网络的输出中,使得梯度能够直接传播到更深的层。
层归一化 :Transformer模型使用层归一化来稳定深层神经网络的训练过程 。层归一化通过对每个输入序列的每个元素进行归一化,使得输入的分布更加稳定。
3. 模型架构原理
模型架构可以先看下面的流程图
在整体记录前,我们需要知道这个输入 x 是啥。
首先x 不可能是用户输入的文本,因为文本无法映射到模型内部的矩阵,那么我们就需要分词,将一段连续的文本分割成一个个的词,然后通过词嵌入(Embedding )将词映射到模型内部的矩阵。
注意词很难独立映射到模型(Embedding )内部的矩阵,在分完词后,我们需要用一个 wordToId 和 IdToWord 来记录词和id之间的映射关系。这里的 id 是一个数字,所有的词会映射到独立的数字,我们用连续的数字映射不同的词,那样就可以用一个二维矩阵来表示所有的词(每一行向量都有自己的词相对应)。注意这里分完词后的词表大小,我们用 vocab_size 来表示,这个东西很重要。
然后我们就可以将词映射到模型内部的矩阵了,这个矩阵就是词嵌入矩阵,我们用 E 来表示。这个矩阵的维度是 [vocab_size, dim_model] ,其中 dim_model 是嵌入向量的维度(就是每个词所对应的向量的维度)。
我们拿到用户输入的文本后,先通过分词将文本转换成一个词的列表,然后通过 wordToId 将词列表转换成 id 列表(注意在原本创建词列表时需要多四个符号,分别是 unk 表示不知道,pad 表示填充,bos 表示文本开始,eos 表示文本结束),在id列表的前后加上 bos 和 eos ,id 列表的长度也为 seq_len 。
这里的id 列表就是我们的输入x 了。
到这里,你已经明白了怎么将输入的文本变成模型可以处理的输入了,那么,模型是如何预测的呢?
这里看到第一个架构
通过中间的Model,将输入x映射到输出y,这里的Model就是我们要设计和训练的东西了。
而输出y是啥呢,我学习到,这里应该会是一个logit数组(即通过逻辑回归得到的数组),通过 Softmax 函数将logit数组映射到概率分布,后通过概率分布取最大概率的id,通过 IdToWord 将id映射到词,这样就可以得到预测的文本了。
所以我们训练时的损失函数就应该是 CrossEntropy Loss,即交叉熵损失函数。公式如下
其中 N 是样本数量,M 是类别数量,y_{ij} 是样本 i 的第 j 类真实标签,y’_{ij} 是样本 i 的第 j 类预测概率。
我们训练数据实际上会是一大段一大段的文本,我们通过滑动窗口的方式,将文本分成很多个样本,每个样本的长度是 seq_len,然后通过上面的流程图,将样本映射到输出y,这里的y会是[window_size, vocab_size]形状的,然后通过交叉熵损失函数计算损失,然后通过梯度下降优化方法来优化模型内部的矩阵。
以上就是大语言模型的基础架构了。就可以拿回之前的流程图。
这样就清晰些了吧~
Transformer流程图
接下来是训练流程和输出流程
4. 训练流程
4.1 分词
当我们拿到一段文本时,我们需要将文本分词,将文本转换成词的列表。这里我们使用jieba分词。(jieba分词是Python中一个常用的中文分词库,它支持多种分词模式,包括精确模式、全模式和搜索引擎模式等。)
实际上,我们训练时会用一大段一大段的txt文件,我们要计划好在哪里分词,哪里插入对应的符号。
由于本人能力有限,只能在网上爬几十本小说来训练,我们就需要批量读取文本,这里先放代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 from torch.utils.data import DataLoaderimport osimport reimport jsonimport jiebafrom typing import List , Dict class DataProcessor : """文本处理与词表构建模块""" def __init__ (self, file_paths=None , max_vocab_size=None ): print (f"Initializing DataProcessor..." ) if file_paths is None : file_paths = [] self .pad = '<pad>' self .unk = '<unk>' self .bos = '<bos>' self .eos = '<eos>' self .special_tokens = [self .pad, self .unk, self .bos, self .eos] self .file_paths = file_paths self .texts: List [str ] = [] self .word_freq: Dict [str , int ] = {} for tok in self .special_tokens: self .word_freq[tok] = 0 self ._load_and_process_data() self .build_vocab(max_vocab_size) print (f"Loaded {len (self.texts)} paragraphs, vocab size={len (self.vocab)} " ) def _load_and_process_data (self ): """加载文件、清洗文本、分词并统计词频""" for file_path in self .file_paths: try : print (f"Loading file: {file_path} " ) with open (file_path, 'r' , encoding='utf-8' ) as f: content = f.read() paragraphs = [p.strip() for p in content.split("\n\n" ) if p.strip()] for idx, paragraph in enumerate (paragraphs): paragraph = self ._clean_text(paragraph) if not paragraph: continue self .texts.append(paragraph) words = jieba.lcut(paragraph) for w in words: self .word_freq[w] = self .word_freq.get(w, 0 ) + 1 self .word_freq[self .bos] += 1 self .word_freq[self .eos] += 1 if idx % 1000 == 0 : print (f"Processed {idx} paragraphs" ) except Exception as e: print (f"Error processing file {file_path} : {e} " ) def _clean_text (self, text: str ) -> str : """清理全角空格、零宽字符、多余空白等""" text = text.replace('\u3000' , ' ' ) text = text.replace('\u200b' , '' ).replace('\u200d' , '' ) text = re.sub(r'\s+' , ' ' , text) return text.strip() def build_vocab (self, max_vocab_size=None ): """按词频排序构建词表,可限制最大词表大小""" sorted_words = sorted ( self .word_freq.items(), key=lambda x: x[1 ], reverse=True ) words = [w for w, _ in sorted_words if w not in self .special_tokens] if max_vocab_size: words = words[:max_vocab_size - len (self .special_tokens)] self .vocab = self .special_tokens + words def get_vocab (self ): return self .vocab def get_texts (self ): return self .texts def get_word_frequency (self, word ): return self .word_freq.get(word, 0 ) def get_total_words (self ): return sum (self .word_freq.values()) def get_sorted_vocab_by_freq (self ): return sorted (self .vocab, key=lambda x: self .word_freq.get(x, 0 ), reverse=True ) def save_vocab (self, path="./vocab/vocab.txt" ): os.makedirs(os.path.dirname(path), exist_ok=True ) with open (path, 'w' , encoding='utf-8' ) as f: for w in self .vocab: f.write(w + "\n" ) print (f"Saved vocab to {path} " ) def save_texts (self, path="./vocab/processed_texts.txt" ): os.makedirs(os.path.dirname(path), exist_ok=True ) with open (path, 'w' , encoding='utf-8' ) as f: for t in self .texts: f.write(t + "\n" ) print (f"Saved processed texts to {path} " ) def save_word_freq (self, path="./vocab/vocab_freq.json" ): os.makedirs(os.path.dirname(path), exist_ok=True ) with open (path, 'w' , encoding='utf-8' ) as f: json.dump(self .word_freq, f, ensure_ascii=False , indent=4 ) print (f"Saved word freq to {path} " )
这个分词的工作流程
这里的分词很好看懂,接下来就是用这个分词创建数据集
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 import torchimport bisectfrom torch.utils.data import Datasetclass TextDataset (Dataset ): """ Transformer/GPT 语言模型数据集 支持: - 分词 -> 数字化 - 滑动窗口序列 - input 与 target 序列对 """ def __init__ (self, data_processor, seq_length=512 , path=None ): self .data_processor = data_processor self .seq_length = seq_length if path: self .load(path) return vocab = data_processor.get_vocab() self .id_to_vocab = {i + 1 : w for i, w in enumerate (vocab)} self .vocab_to_id = {w: i + 1 for i, w in enumerate (vocab)} self .id_to_vocab[0 ] = data_processor.pad self .vocab_to_id[data_processor.pad] = 0 self .vocab_size = len (self .vocab_to_id) self .tokens = self .build_token_stream(data_processor.get_texts()) self .size = len (self .tokens) - seq_length - 1 def build_token_stream (self, texts ): """将所有文本串联成一个长 token 序列""" all_tokens = [] for text in texts: words = [self .data_processor.bos] words.extend(jieba.lcut(text)) words.append(self .data_processor.eos) ids = [self .vocab_to_id.get(w, self .vocab_to_id[self .data_processor.unk]) for w in words] all_tokens.extend(ids) return all_tokens def __len__ (self ): return self .size def __getitem__ (self, idx ): x = self .tokens[idx : idx + self .seq_length] y = self .tokens[idx + 1 : idx + self .seq_length + 1 ] return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long) def save (self, path ): print ("Saving dataset..." ) torch.save(self , path) def load (self, path ): print ("Loading dataset..." ) data = torch.load(path) self .__dict__.update(data.__dict__)
这个数据集的工作流程
这样,就可以通过下面代码
1 2 3 4 5 6 7 8 9 10 11 12 file_paths = [] dir_path = "./data" for name in os.listdir(dir_path): file_paths.append(os.path.join(dir_path, name)) data_processor = DataProcessor(file_paths) text_dataset = TextDataset( data_processor=data_processor, seq_length=128 )
创建数据集结束,到这里我们获得 x 了
4.2 EMbedding
这个英语单词是嵌入的意思,就是将单词转换为向量,这里的向量实际上也算是模型的参数,是需要在训练中学习的,这个过程就是词向量的训练。
所以这个应该也需要设置一个模型,我们命名为 Embedding,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 class PositionalEncoding (nn.Module): def __init__ (self, d_model, max_len=512 ): """ 位置编码类的初始化函数 参数: d_model: 模型的维度 max_len: 序列的最大长度,默认为512 """ super (PositionalEncoding, self ).__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0 , max_len, dtype=torch.float ).unsqueeze(1 ) div_term = torch.exp(torch.arange(0 , d_model, 2 ).float () * (-math.log(10000.0 ) / d_model)) pe[:, 0 ::2 ] = torch.sin(position * div_term) pe[:, 1 ::2 ] = torch.cos(position * div_term) pe = pe.unsqueeze(0 ).transpose(0 , 1 ) self .register_buffer('pe' , pe) def forward (self, x ): """ 前向传播函数 参数: x: 输入张量,形状为(seq_len, batch_size, d_model) 返回: 添加了位置编码的输入张量 """ return x + self .pe[:x.size(1 ), :].transpose(0 , 1 ) class WordEmbeddingModel (nn.Module): def __init__ (self, vocab_size, embedding_dim, max_seq_length=512 , dropout=0.1 ): """ 初始化词嵌入模型 参数: vocab_size (int): 词汇表大小 embedding_dim (int): 词嵌入维度 max_seq_length (int): 最大序列长度,默认为512 dropout (float): dropout比率,默认为0.1 """ super (WordEmbeddingModel, self ).__init__() self .embedding_dim = embedding_dim self .embedding = nn.Embedding(vocab_size, embedding_dim) self .pos_encoding = PositionalEncoding(embedding_dim, max_seq_length) self .dropout = nn.Dropout(dropout) self .layer_norm = nn.LayerNorm(embedding_dim) def forward (self, x ): """ 前向传播过程 参数: x (torch.Tensor): 输入张量,形状为(batch_size, sequence_length) 返回: torch.Tensor: 经过嵌入、位置编码和归一化后的张量 """ embedded = self .embedding(x) * math.sqrt(self .embedding_dim) embedded = self .pos_encoding(embedded) embedded = self .layer_norm(embedded) embedded = self .dropout(embedded) return embedded
这里需要加上位置编码,需要记录每个词的位置因为每个词在不同的语境意思不同,就需要加一个位置编码来区分不同位置。这个有公式如下
最后出来的 X_ 就是词嵌入向量。又或者说 embedded, 形状是 (batch_size, sequence_length, embedding_dim)。
4.3 Attention
注意力层用于计算输入序列中每个元素对输出序列中每个元素的影响程度,从而生成更准确的输出。注意力机制的核心思想是,对于输出序列中的每个元素,模型都会计算输入序列中每个元素对该元素的影响程度,并根据这些影响程度对输入序列进行加权求和,得到该元素的输出。
简而言之就是找到上下文之间的关系,那么ATTENTION就被提出,专门用来找这个关系
单头注意力机制流程图,我们可以先从单头注意力机制来实现,然后再扩展到多头注意力机制。
有单头注意力机制代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class AttentionLayer (nn.Module): """单头因果自注意力层(GPT 风格)""" def __init__ (self, k_dim, v_dim, emb_dim ): super (AttentionLayer, self ).__init__() self .k_dim = k_dim self .v_dim = v_dim self .emb_dim = emb_dim self .query = nn.Linear(emb_dim, k_dim) self .key = nn.Linear(emb_dim, k_dim) self .value = nn.Linear(emb_dim, v_dim) def forward (self, x ): """ x shape: (batch_size, seq_len, emb_dim) """ Q = self .query(x) K = self .key(x) V = self .value(x) attention_scores = torch.matmul(Q, K.transpose(-2 , -1 )) attention_scores = attention_scores / math.sqrt(self .k_dim) mask = torch.tril(torch.ones_like(attention_scores)).bool () attention_scores = attention_scores.masked_fill(mask == 0 , float ('-inf' )) attention_weights = torch.softmax(attention_scores, dim=-1 ) attention_output = torch.matmul(attention_weights, V) return attention_output
由流程图看,先通过 K, Q, V 的线性变换得到 k, q, v,然后计算得分矩阵 S,然后应用因果 Mask,然后缩放,然后 softmax 得到注意力权重,最后加权求和得到输出。
但是单头注意力往往不够,一句话里面的意思具有多重性,事物也有不同的特征,故需要不同的注意力头来捕获不同的特征才能得到更好的效果,所以需要多头注意力机制。
多头注意力机制流程图
多头注意力不必要把多个单独的 Attention 串联起来,而是可以并行计算,最后再拼接起来,所以速度会快很多。我们通过定义三个矩阵来表示总的 K Q V,将三个矩阵分割开就变成了多个 K Q V,然后分别计算,最后拼接起来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 class MultiHeadAttentionLayer (nn.Module): """多头自注意力(Multi-Head Self-Attention)层""" def __init__ (self, num_heads, emb_dim, dropout=0.1 ): """ 多头注意力机制 Args: num_heads: 注意力头数量 h emb_dim: 输入/输出 embedding 维度 d_model """ super (MultiHeadAttentionLayer, self ).__init__() self .num_heads = num_heads self .emb_dim = emb_dim self .head_dim = emb_dim // num_heads assert emb_dim % num_heads == 0 , "emb_dim 必须能被 num_heads 整除" self .W_q = nn.Linear(emb_dim, emb_dim) self .W_k = nn.Linear(emb_dim, emb_dim) self .W_v = nn.Linear(emb_dim, emb_dim) self .W_o = nn.Linear(emb_dim, emb_dim) self .dropout = nn.Dropout(dropout) self .layer_norm = nn.LayerNorm(emb_dim) def forward (self, x, mask=None ): """ Args: x: (batch_size, seq_len, d_model) mask: 可选的掩码 (seq_len, seq_len) 或 (batch, 1, seq_len, seq_len) Returns: output: (batch_size, seq_len, d_model) """ batch_size, seq_len, _ = x.shape residual = x Q = self .W_q(x).view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) K = self .W_k(x).view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) V = self .W_v(x).view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) attention_scores = torch.matmul(Q, K.transpose(-2 , -1 )) / math.sqrt(self .head_dim) if mask is not None : attention_scores = attention_scores.masked_fill(mask == 0 , float ('-inf' )) else : causal_mask = torch.tril(torch.ones(seq_len, seq_len, device=x.device)).bool () attention_scores = attention_scores.masked_fill(~causal_mask, float ('-inf' )) attention_weights = torch.softmax(attention_scores, dim=-1 ) attention_weights = self .dropout(attention_weights) attention_output = torch.matmul(attention_weights, V) attention_output = attention_output.transpose(1 , 2 ).contiguous().view( batch_size, seq_len, self .emb_dim ) attention_output = self .W_o(attention_output) attention_output = self .dropout(attention_output) output = self .layer_norm(attention_output + residual) return output
前馈神经网络(Feed Forward Neural Network)由两个线性层和一个 ReLU 激活函数组成。第一个线性层将输入映射到更高维度,第二个线性层将映射后的结果映射回原始维度。中间的 ReLU 激活函数引入非线性,使模型能够学习更复杂的模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class FeedForwardNetwork (nn.Module): """Transformer 前馈全连接网络(Position-wise Feed Forward Network, FFN)""" def __init__ (self, emb_dim, expansion_factor=4 , dropout=0.1 ): super (FeedForwardNetwork, self ).__init__() self .emb_dim = emb_dim self .hidden_dim = emb_dim * expansion_factor self .network = nn.Sequential( nn.Linear(emb_dim, self .hidden_dim), nn.GELU(), nn.Dropout(dropout), nn.Linear(self .hidden_dim, emb_dim), nn.Dropout(dropout) ) self .layer_norm = nn.LayerNorm(emb_dim) def forward (self, x ): residual = x output = self .network(x) output = self .layer_norm(output + residual) return output
这个前馈神经网络(Feed Forward Neural Network)比较简单,主要是要看使用了 残差连接避免梯度消失
将前面组合成完整的 Transformer 层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class TransformerBlock (nn.Module): """完整的Transformer块""" def __init__ (self, num_heads, emb_dim, dropout=0.1 ): super (TransformerBlock, self ).__init__() self .attention = MultiHeadAttentionLayer(num_heads, emb_dim, dropout) self .feed_forward = FeedForwardNetwork(emb_dim, dropout=dropout) def forward (self, x, mask=None ): x = self .attention(x, mask) x = self .feed_forward(x) return x
4.4 输出预测层
通过前面的铺垫,我们获得了完整语义,完整语境,现在就要来说接下来是啥话了。我们需要通过一个全连接层来输出预测 logit,通过 SoftMax 就可以拿到预测概率然后进行选词。
不过前面的参数量较大,我们可以选择用一个新的 FFN 来表示,也可以共享第一层里面的输出层的参数。
这里我使用第二种,不过给出第一种的代码。
第一种
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class OutputLayer (nn.Module): """输出预测层""" def __init__ (self, emb_dim, vocab_size, dropout=0.1 ): super (OutputLayer, self ).__init__() self .linear = nn.Linear(emb_dim, vocab_size) self .dropout = nn.Dropout(dropout) def forward (self, x ): x = self .dropout(x) output = self .linear(x) return output
第二种
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class LinearLayer (nn.Module): """权重共享输出层 - 大幅减少参数""" def __init__ (self, emb_dim, vocab_size, dropout=0.1 ): super (LinearLayer, self ).__init__() self .output = nn.Linear(emb_dim, vocab_size, bias=False ) self .dropout = nn.Dropout(dropout) def forward (self, x ): x = self .dropout(x) return self .output(x) def tie_weights (self, embedding_layer ): """与词嵌入层共享权重""" self .output.weight = embedding_layer.weight
这个是普通的线性层,没有那么多讲究,维度正确就行
跟着流程图搭建model
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 class Model (nn.Module): """ 完整的 Transformer 语言模型(GPT 类架构,Decoder-only)。 主要组成: - 词嵌入(含位置编码) - 多个 Transformer Block(自注意力 + 前馈网络) - 输出线性层 + 权重共享(tie weights) 输入: x: (batch_size, seq_length) —— 输入 token id 序列 mask: (batch_size, 1, 1, seq_length) 或 None —— 可选的注意力掩码 输出: logits: (batch_size, seq_length, vocab_size) """ def __init__ (self, num_heads, num_transformer_blocks, emb_dim, seq_length, vocab_size, dropout=0.1 ): super (Model, self ).__init__() self .emb_dim = emb_dim self .seq_length = seq_length self .vocab_size = vocab_size self .num_heads = num_heads self .num_transformer_blocks = num_transformer_blocks self .embedding = WordEmbeddingModel( vocab_size, emb_dim, max_seq_length=seq_length, dropout=dropout ) self .transformer_blocks = nn.ModuleList( [TransformerBlock(num_heads, emb_dim, dropout) for _ in range (num_transformer_blocks)] ) self .linear = LinearLayer(emb_dim, vocab_size, dropout) self .linear.tie_weights(self .embedding.embedding) def _init_weights (self, module ): """遵循 GPT/Transformer 标准初始化方式""" if isinstance (module, nn.Linear): torch.nn.init.normal_(module.weight, mean=0.0 , std=0.02 ) if module.bias is not None : torch.nn.init.zeros_(module.bias) elif isinstance (module, nn.Embedding): torch.nn.init.normal_(module.weight, mean=0.0 , std=0.02 ) elif isinstance (module, nn.LayerNorm): torch.nn.init.zeros_(module.bias) torch.nn.init.ones_(module.weight) def forward (self, x, mask=None ): """ x: 输入 token id 序列,形状 (batch_size, seq_length) mask: 注意力掩码(可选),用于遮盖未来 token(因果掩码) 返回: logits: (batch_size, seq_length, vocab_size) """ x = self .embedding(x) for block in self .transformer_blocks: x = block(x, mask) logits = self .linear(x) return logits def save (self, path ): """保存模型参数到文件""" torch.save(self .state_dict(), path) def load (self, path ): """从文件中加载模型参数""" if os.path.exists(path): self .load_state_dict(torch.load(path))
到这里,模型搭建完毕
4.6 损失函数
由于是预测模型,我们用 交叉熵 损失函数
1 2 3 4 5 crossLoss = nn.CrossEntropyLoss(ignore_index=0 ) def Loss (y_pred, y_true ): y_pred = y_pred.transpose(1 , 2 ) return crossLoss(y_pred, y_true)
注意要调整维度,设置ignore_index(忽略掉pad)
4.7 训练
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 device = "cuda" if torch.cuda.is_available() else "cpu" file_paths = [] dir_path = "./data" for name in os.listdir(dir_path): file_paths.append(os.path.join(dir_path, name)) data_processor = DataProcessor(file_paths) text_dataset = TextDataset( data_processor=data_processor, seq_length=128 ) test_data_processor = DataProcessor() test_data_processor.loadbutnottextWithdataprocessor(data_processor) test_data_processor.process_text("test_data\神明将世,看见血条的我杀疯了.txt" ) test_dataset = TextDataset( data_processor=test_data_processor, seq_length=128 ) net = Model( num_heads=8 , num_transformer_blocks=6 , emb_dim=256 , seq_length=128 , vocab_size=text_dataset.vocab_size, dropout=0.1 ) net.apply(net._init_weights) net.load("./model/model.pt" ) all_indices = np.arange(len (text_dataset)) test_indices = np.arange(len (test_dataset)) num_samples_per_epoch = 8000 def train_step (X, y, loss_fn, optimizer, net ): """ 一个训练 step: - 前向传播 - 反向传播 - 更新模型参数 """ X = X.to(device) y = y.to(device) optimizer.zero_grad() y_pred = net(X) loss = loss_fn(y_pred, y) loss.backward() optimizer.step() return loss.item() def test_step (X, y, loss_fn, net ): """ 评估步骤: - 仅前向传播 - 计算 loss 和准确率 """ X = X.to(device) y = y.to(device) y_pred = net(X) loss = loss_fn(y_pred, y) preds = y_pred.argmax(dim=-1 ) mask = (y != 0 ) correct = (preds == y) & mask acc = correct.sum ().item() total = mask.sum ().item() return loss.item(), acc, total if __name__ == "__main__" : best_accuracy = 0 optimizer = torch.optim.Adam(net.parameters(), lr=0.001 ) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode='min' , factor=0.1 , patience=2 ) net = net.to(device) for epoch in range (10 ): random_indices = np.random.choice( all_indices, size=num_samples_per_epoch, replace=False ) train_loader = DataLoader( text_dataset, batch_size=8 , sampler=SubsetRandomSampler(random_indices), shuffle=False ) loss = 0 net.train() idx = 0 for X, y in train_loader: loss += train_step(X, y, Loss, optimizer, net) if idx % 5 == 0 : print (f"Epoch {epoch} , Batch {idx} , Loss: {loss/(idx+1 ):.5 f} " ) idx += 1 print (f"Epoch {epoch} , Train Loss: {loss/len (train_loader):.5 f} " ) test_random_indices = np.random.choice(test_indices, size=1000 , replace=False ) test_loader = DataLoader( test_dataset, batch_size=8 , sampler=SubsetRandomSampler(test_random_indices), shuffle=False ) net.eval () with torch.no_grad(): test_loss = 0 total_correct = 0 total_tokens = 0 idx = 0 for X, y in test_loader: batch_loss, batch_correct, batch_tokens = test_step(X, y, Loss, net) test_loss += batch_loss total_correct += batch_correct total_tokens += batch_tokens if idx % 5 == 0 : print (f"Epoch {epoch} , Batch {idx} , Test Loss: {test_loss/(idx+1 ):.5 f} , " f"Test Accuracy: {total_correct/total_tokens:.5 f} " ) idx += 1 print (f"Epoch {epoch} , Test Loss: {test_loss/len (test_loader):.5 f} , " f"Test Accuracy: {total_correct/total_tokens:.5 f} " ) if total_correct/total_tokens > best_accuracy: best_accuracy = total_correct/total_tokens torch.save(net.state_dict(), "model/model.pt" ) print ("Model improved. Saved." ) else : print ("No improvement. Skipping save." ) scheduler.step(loss)
5. 生成流程
生成流程比较简单,就拿到用户输入,然后分解成token,然后传入模型跑出 logit,通过SoftMax拿到概率生成即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 class Generate : """ 文本生成类(基础版本:贪婪搜索) 功能: - 加载模型 - 加载词表 - 对文本分词并转换为 token id - 使用贪婪搜索逐 token 生成 """ def __init__ (self, device ): self .model = Model( num_heads=CONFIG["num_heads" ], num_transformer_blocks=CONFIG["num_transformer_blocks" ], emb_dim=CONFIG["emb_dim" ], seq_length=CONFIG["seq_length" ], vocab_size=CONFIG["vocab_size" ], dropout=CONFIG["dropout" ] ) self .model.load("./model/model.pt" ) self .device = device self .model = self .model.to(self .device) self .model.eval () self .load_id_to_vocab_and_vocab_to_id() self .unk_id = self .vocab_to_id["<unk>" ] self .eos_id = self .vocab_to_id["<eos>" ] self .pad_id = self .vocab_to_id["<pad>" ] def load_id_to_vocab_and_vocab_to_id (self ): """ 加载词表映射: - id_to_vocab: id -> 词 - vocab_to_id: 词 -> id """ import json with open ("./model/id_to_vocab.json" , "r" , encoding="utf-8" ) as f: self .id_to_vocab = json.load(f) self .id_to_vocab = {int (k): v for k, v in self .id_to_vocab.items()} self .vocab_to_id = {v: k for k, v in self .id_to_vocab.items()} def get_output (self, input ): """ 使用模型预测下一个 token(贪婪策略:取最大概率项) input: (1, seq_length) return: 最后一个位置的预测 token id """ with torch.no_grad(): self .model.eval () output = self .model(input ) output = output[:, -1 , :] output_id = output.argmax(dim=-1 ) return output_id.item() def tokenize_text (self, text ): """ 将输入文本分词 → 转 token id """ if isinstance (text, str ): import jieba words = jieba.lcut(text) else : words = text token_ids = [self .vocab_to_id.get(word, self .unk_id) for word in words] return token_ids def generate_greedy (self, text, max_length=100 ): """ 贪婪搜索文本生成: 每一步都选概率最大的 token """ print ("question:" , text) input_tokens = self .tokenize_text(text) if len (input_tokens) > CONFIG["seq_length" ]: input_tokens = input_tokens[-CONFIG["seq_length" ]:] else : padding = [self .pad_id] * (CONFIG["seq_length" ] - len (input_tokens)) input_tokens = padding + input_tokens input_tensor = torch.tensor([input_tokens], dtype=torch.long).to(self .device) print ("<answer>" , "我理解你的问题:" , end="" ) generated_text = "" for step in range (max_length): next_token_id = self .get_output(input_tensor) next_token = self .id_to_vocab.get(next_token_id, "<unk>" ) if next_token_id == self .eos_id: break print (next_token, end="" ) generated_text += next_token new_input = input_tensor[0 , 1 :].tolist() + [next_token_id] input_tensor = torch.tensor([new_input], dtype=torch.long).to(self .device) print ("<answer>" ) return generated_text class AdvancedGenerate (Generate ): """ 基于采样的文本生成类: - 温度 temperature 控制随机性 - top-k 限制采样范围,使文本更合理 """ def __init__ (self, device ): super ().__init__(device) def get_output_with_sampling (self, input_tensor, temperature=1.0 , top_k=50 ): """ 使用温度采样 + Top-k 选择下一 token """ with torch.no_grad(): self .model.eval () input_tensor = input_tensor.to(self .device) output = self .model(input_tensor) next_token_logits = output[:, -1 , :] next_token_logits = next_token_logits / temperature if top_k is not None : threshold = torch.topk(next_token_logits, top_k)[0 ][..., -1 , None ] indices_to_remove = next_token_logits < threshold next_token_logits[indices_to_remove] = -float ('Inf' ) probs = torch.softmax(next_token_logits, dim=-1 ) next_token_id = torch.multinomial(probs, num_samples=1 ) return next_token_id.item() def generate_with_sampling (self, text, max_length=100 , temperature=0.8 , top_k=50 ): """ 使用 top-k + 温度采样生成文本 """ input_tokens = self .tokenize_text(text) if len (input_tokens) > CONFIG["seq_length" ]: input_tokens = input_tokens[-CONFIG["seq_length" ]:] else : padding = [self .pad_id] * (CONFIG["seq_length" ] - len (input_tokens)) input_tokens = padding + input_tokens input_tensor = torch.tensor([input_tokens], dtype=torch.long).to(self .device) print ("<answer>" , "我理解你的问题:" , end="" ) generated_text = "" for step in range (max_length): next_token_id = self .get_output_with_sampling( input_tensor, temperature=temperature, top_k=top_k ) next_token = self .id_to_vocab.get(next_token_id, "<unk>" ) if next_token_id == self .eos_id: break print (next_token, end="" ) generated_text += next_token new_input = input_tensor[0 , 1 :].tolist() + [next_token_id] input_tensor = torch.tensor([new_input], dtype=torch.long).to(self .device) print ("<answer>" ) return generated_text
结束
本文章为作者的学习笔记,仅供参考,知识来自论文,B站讲解,Deepseek,ChatGpt和豆包。感谢指正。