CS336(1): Basics
资料来源:斯坦福CS336: Language Modeling from Scratch
国内视频:B站 - (中英字幕完结)斯坦福CS336:从头开始构建大模型 | 2025年
本章包含课程:第 1、2 课
1. Tokenization
Tokenizer: strings
<-> tokens
(indexes
)
基于字符、基于字节、基于单词的标记化非常次优。
BPE是一个有效的启发式查看语料库统计。
标记化是一种必要的弊端,也许有一天我们会从字节来做…
1.1 Character-based tokenization
直接按照 Unicode 字符编码分词即可。
问题1:这是一个非常大的词汇表。大约有150K个Unicode字符。
问题2:许多字符非常罕见(例如🌍),这是对词汇表的低效使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class CharacterTokenizer (Tokenizer ): """Represent a string as a sequence of Unicode code points.""" def encode (self, string: str ) -> list [int ]: return list (map (ord , string)) def decode (self, indices: list [int ] ) -> str : return "" .join(map (chr , indices)) def character_tokenizer (): assert ord ("a" ) == 97 assert ord ("🌍" ) == 127757 assert chr (97 ) == "a" assert chr (127757 ) == "🌍" tokenizer = CharacterTokenizer() string = "Hello, 🌍! 你好!" indices = tokenizer.encode(string) reconstructed_string = tokenizer.decode(indices) assert string == reconstructed_string
1.2 Byte-based tokenization
每个字符都转为字节码,按照字节码分词。
词汇表很好而且很小:一个字节可以表示256个值。
压缩比=1,很糟糕,这意味着序列会太长。考虑到Transformer的上下文长度是有限的(因为注意力是二次的),这看起来不太好……
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class ByteTokenizer (Tokenizer ): """Represent a string as a sequence of bytes.""" def encode (self, string: str ) -> list [int ]: string_bytes = string.encode("utf-8" ) indices = list (map (int , string_bytes)) return indices def decode (self, indices: list [int ] ) -> str : string_bytes = bytes (indices) string = string_bytes.decode("utf-8" ) return string def byte_tokenizer (): assert bytes ("a" , encoding="utf-8" ) == b"a" assert bytes ("🌍" , encoding="utf-8" ) == b"\xf0\x9f\x8c\x8d" tokenizer = ByteTokenizer() string = "Hello, 🌍! 你好!" indices = tokenizer.encode(string) reconstructed_string = tokenizer.decode(indices) assert string == reconstructed_string
1.3 Word-based tokenization
另一种方法(更接近NLP中的经典方法)是将字符串拆分为单词。然后,我们可以建立一个从每个段到整数的映射。
字数很大(就像Unicode字符一样)。很多单词都是罕见的,模型不会学到太多。
这显然没有提供固定的词汇表大小。我们在训练中没有见过的新词会得到一个特殊的UNK标记,这很难看,可能会扰乱困惑度的计算。
1 2 3 4 5 6 7 8 9 def word_tokenizer (): string = "I'll say supercalifragilisticexpialidocious!" segments = regex.findall(r"\w+|." , string) GPT2_TOKENIZER_REGEX = \ r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""" pattern = GPT2_TOKENIZER_REGEX segments = regex.findall(pattern, string)
1.4 Byte Pair Encoding (BPE)
BPE算法 是由Philip Gage在1994年提出的用于数据压缩的算法。它被用于ML翻译的NLP。(此前,论文一直在使用基于单词的标记化。然后GPT-2使用BPE。)
基本思想:在原始文本上训练 标记器以自动确定词汇表。
直觉:常见的字符序列由单个标记表示,罕见的序列由多个标记表示。
GPT-2论文使用基于单词的标记法将文本分解为初始片段,并在每个片段上运行原始的BPE算法。
从每个字节作为标记开始,并依次合并最常见的相邻标记对。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @dataclass(frozen=True ) class BPETokenizerParams : """All you need to specify a BPETokenizer.""" vocab: dict [int , bytes ] merges: dict [tuple [int , int ], int ] class BPETokenizer (Tokenizer ): """BPE tokenizer given a set of merges and a vocabulary.""" def __init__ (self, params: BPETokenizerParams ): self.params = params def encode (self, string: str ) -> list [int ]: indices = list (map (int , string.encode("utf-8" ))) for pair, new_index in self.params.merges.items(): indices = merge(indices, pair, new_index) return indices def decode (self, indices: list [int ] ) -> str : bytes_list = list (map (self.params.vocab.get, indices)) string = b"" .join(bytes_list).decode("utf-8" ) return string def bpe_tokenizer (): string = "the cat in the hat" params = train_bpe(string, num_merges=3 ) tokenizer = BPETokenizer(params) string = "the quick brown fox" indices = tokenizer.encode(string) reconstructed_string = tokenizer.decode(indices) assert string == reconstructed_string def train_bpe (string: str , num_merges: int ) -> BPETokenizerParams: text("Start with the list of bytes of `string`." ) indices = list (map (int , string.encode("utf-8" ))) merges: dict [tuple [int , int ], int ] = {} vocab: dict [int , bytes ] = {x: bytes ([x]) for x in range (256 )} for i in range (num_merges): text("Count the number of occurrences of each pair of tokens" ) counts = defaultdict(int ) for index1, index2 in zip (indices, indices[1 :]): counts[(index1, index2)] += 1 text("Find the most common pair." ) pair = max (counts, key=counts.get) index1, index2 = pair text("Merge that pair." ) new_index = 256 + i merges[pair] = new_index vocab[new_index] = vocab[index1] + vocab[index2] indices = merge(indices, pair, new_index) return BPETokenizerParams(vocab=vocab, merges=merges)
附:PyTorch 使用
1. 张量基础
张量是 PyTorch 中的基本数据结构,用于存储参数、梯度、优化器状态、数据和激活值
1 2 3 4 5 x = torch.tensor([[1. , 2 , 3 ], [4 , 5 , 6 ]]) x = torch.zeros(4 , 8 ) x = torch.ones(4 , 8 ) x = torch.randn(4 , 8 ) x = torch.empty(4 , 8 )
2. 张量内存管理
深度学习中的大多数数据都以浮点数形式存储
主要浮点数据类型:
float32 (fp32):默认类型,4字节/元素,传统科学计算基准
float16 (fp16):2字节/元素,节省内存但动态范围小,容易下溢
bfloat16 (bf16):2字节/元素,与 float32 相同的动态范围,Google Brain 开发
fp8 :更新的标准,H100 GPU 支持 E4M3 和 E5M2 两种变体
1 2 3 4 5 6 7 8 9 10 x = torch.zeros(4 , 8 ) assert x.dtype == torch.float32 x = torch.zeros(4 , 8 , dtype=torch.float16) x = torch.tensor([1e-8 ], dtype=torch.float16) assert x == 0 x = torch.tensor([1e-8 ], dtype=torch.bfloat16) assert x != 0
3. GPU 张量
默认张量存储在 CPU 内存中
使用 .to("cuda:0")
或 device="cuda:0"
将张量移动到 GPU
可以使用 torch.cuda
模块查询 GPU 信息和内存使用情况
1 2 3 4 5 6 7 8 9 10 11 12 torch.cuda.is_available() num_gpus = torch.cuda.device_count() for i in range (num_gpus): properties = torch.cuda.get_device_properties(i) x = torch.zeros(32 , 32 ) assert x.device == torch.device("cpu" )y = x.to("cuda:0" ) assert y.device == torch.device("cuda" , 0 )z = torch.zeros(32 , 32 , device="cuda:0" ) memory_allocated = torch.cuda.memory_allocated()
4. 张量操作
(1)存储结构
PyTorch 张量是指向内存的指针,带有描述如何访问元素的元数据
stride
属性表示在每个维度上移动时需要跳过的元素数量
1 2 3 4 5 6 7 8 9 10 11 x = torch.tensor([ [0. , 1 , 2 , 3 ], [4 , 5 , 6 , 7 ], [8 , 9 , 10 , 11 ], [12 , 13 , 14 , 15 ], ]) assert x.stride(0 ) == 4 assert x.stride(1 ) == 1 r, c = 1 , 2 index = r * x.stride(0 ) + c * x.stride(1 ) assert index == 6
(2)切片操作
切片操作提供张量的不同视图 ,不复制数据
修改原始张量会影响所有相关视图
非连续张量需要先调用 .contiguous()
才能进行某些视图操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 x = torch.tensor([[1. , 2 , 3 ], [4 , 5 , 6 ]]) y = x[0 ] y = x[:, 1 ] y = x.view(3 , 2 ) y = x.transpose(1 , 0 ) x[0 ][0 ] = 100 x = torch.tensor([[1. , 2 , 3 ], [4 , 5 , 6 ]]) y = x.transpose(1 , 0 ) y.view(2 , 3 ) y = x.transpose(1 , 0 ).contiguous().view(2 , 3 )
(3)逐元素操作
对每个元素应用操作,返回相同形状的新张量
包括:pow()
, sqrt()
, rsqrt()
, 算术运算等
triu()
获取上三角矩阵,用于因果注意力掩码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 x = torch.tensor([1 , 4 , 9 ]) y = x.pow (2 ) y = x.sqrt() y = x.rsqrt() y = x + x y = x * 2 y = x / 0.5 x = torch.ones(3 , 3 ).triu() assert torch.equal(x, torch.tensor([ [1 , 1 , 1 ], [0 , 1 , 1 ], [0 , 0 , 1 ]], ))
(4)矩阵乘法
深度学习的核心操作:@
或 matmul()
支持批量矩阵乘法,自动广播处理额外维度
1 2 3 4 5 6 7 x = torch.ones(16 , 32 ) w = torch.ones(32 , 2 ) y = x @ w x = torch.ones(4 , 8 , 16 , 32 ) w = torch.ones(32 , 2 ) y = x @ w
5. 计算性能分析
FLOP 概念
FLOPs:浮点操作次数(计算量度量)
FLOP/s:每秒浮点操作次数(硬件性能度量)
性能基准
GPT-3:3.14e23 FLOPs
GPT-4(推测):2e25 FLOPs
A100:312 TFLOPS(float32)
H100:∼990 TFLOPS(float32,无稀疏性)
矩阵乘法 FLOP 计算
线性模型:2 × B × D × K FLOPs(B=批量大小,D=输入维度,K=输出维度)
一般规律:前向传播 FLOPs ≈ 2 × (token数量) × (参数数量)
模型 FLOP 利用率 (MFU)
MFU = (实际 FLOP/s) / (理论峰值 FLOP/s)
MFU ≥ 0.5 通常被认为是良好的
不同数据类型(bfloat16 vs float32)和硬件(H100 vs A100)性能差异显著
6. 梯度计算
基本概念
前向传播:构建计算图,计算预测值和损失
反向传播:自动计算梯度(通过链式法则)
1 2 3 4 5 6 7 8 9 10 11 12 13 import torchx = torch.tensor([1. , 2 , 3 ]) w = torch.tensor([1. , 1 , 1 ], requires_grad=True ) pred_y = x @ w loss = 0.5 * (pred_y - 5 ).pow (2 ) loss.backward() print ("w的梯度:" , w.grad)
梯度计算的FLOPs分析
1 输入 x → 线性层 w1 → 隐藏层 h1 → 线性层 w2 → 输出 h2 → 损失 loss
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 B, D, K = 16384 , 32768 , 8192 x = torch.ones(B, D) w1 = torch.randn(D, D, requires_grad=True ) w2 = torch.randn(D, K, requires_grad=True ) h1 = x @ w1 h2 = h1 @ w2 loss = h2.pow (2 ).mean() num_forward_flops = (2 * B * D * D) + (2 * B * D * K) h1.retain_grad() h2.retain_grad() loss.backward() num_backward_flops = 2 * B * D * K num_backward_flops += 2 * B * D * K num_backward_flops += 4 * B * D * D
FLOPs总结公式
通用规则
前向传播 :2 × (数据点数量) × (参数数量)
FLOPs
反向传播 :4 × (数据点数量) × (参数数量)
FLOPs
总计算量 :6 × (数据点数量) × (参数数量)
FLOPs
实际意义
反向传播的计算成本大约是前向传播的2倍
训练总FLOPs ≈ 6 × 批量大小 × 参数数量
这对于估算训练时间和硬件需求很重要
7. 模型
模型参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import torchimport torch.nn as nnimport numpy as npinput_dim = 16384 output_dim = 32 w = nn.Parameter(torch.randn(input_dim, output_dim)) x = nn.Parameter(torch.randn(input_dim)) output = x @ w print (f"输出值规模: {output[0 ]} " ) w = nn.Parameter(torch.randn(input_dim, output_dim) / np.sqrt(input_dim)) output = x @ w print (f"缩放后输出值: {output[0 ]} " ) w = nn.Parameter(nn.init.trunc_normal_( torch.empty(input_dim, output_dim), std=1 / np.sqrt(input_dim), a=-3 , b=3 ))
线性层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class Linear (nn.Module): """简单的线性层""" def __init__ (self, input_dim: int , output_dim: int ): super ().__init__() self.weight = nn.Parameter(torch.randn(input_dim, output_dim) / np.sqrt(input_dim)) def forward (self, x: torch.Tensor ) -> torch.Tensor: return x @ self.weight class Cruncher (nn.Module): def __init__ (self, dim: int , num_layers: int ): super ().__init__() self.layers = nn.ModuleList([ Linear(dim, dim) for i in range (num_layers) ]) self.final = Linear(dim, 1 ) def forward (self, x: torch.Tensor ) -> torch.Tensor: B, D = x.size() for layer in self.layers: x = layer(x) x = self.final(x) assert x.size() == torch.Size([B, 1 ]) x = x.squeeze(-1 ) assert x.size() == torch.Size([B]) return x D = 64 num_layers = 2 model = Cruncher(dim=D, num_layers=num_layers) param_sizes = [ (name, param.numel()) for name, param in model.state_dict().items() ] print ("参数大小:" , param_sizes)num_parameters = sum (param.numel() for param in model.parameters()) assert num_parameters == (D * D) + (D * D) + Ddevice = torch.device("cuda" if torch.cuda.is_available() else "cpu" ) model = model.to(device) B = 8 x = torch.randn(B, D, device=device) y = model(x) print ("输出形状:" , y.shape)
8. 随机性管理
随机性的来源
参数初始化
Dropout
数据顺序
其他随机操作
设置随机种子确保可重现性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import torchimport numpy as npimport randomseed = 0 torch.manual_seed(seed) np.random.seed(seed) random.seed(seed)
9. optimizer
优化器演化
SGD + Momentum :梯度指数平均
AdaGrad :SGD + 梯度平方平均
RMSProp :AdaGrad + 梯度平方指数平均
Adam :RMSProp + Momentum
AdaGrad 使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import torchimport torch.nn.functional as Fmodel = Cruncher(dim=4 , num_layers=2 ).to(get_device()) optimizer = torch.optim.Adagrad(model.parameters(), lr=0.01 ) x = torch.randn(2 , 4 , device=get_device()) y = torch.tensor([4. , 5. ], device=get_device()) pred_y = model(x) loss = F.mse_loss(pred_y, y) loss.backward() optimizer.step() optimizer.zero_grad(set_to_none=True )
10. 内存计算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 num_parameters = (D * D * num_layers) + D num_activations = B * D * num_layers num_gradients = num_parameters num_optimizer_states = num_parameters total_memory = 4 * (num_parameters + num_activations + num_gradients + num_optimizer_states) flops = 6 * B * num_parameters
11. 训练
基础训练流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def train (name: str , get_batch, D: int , num_layers: int , B: int , num_train_steps: int , lr: float ): model = Cruncher(dim=D, num_layers=num_layers).to(get_device()) optimizer = torch.optim.SGD(model.parameters(), lr=lr) for t in range (num_train_steps): x, y = get_batch(B=B) pred_y = model(x) loss = F.mse_loss(pred_y, y) loss.backward() optimizer.step() optimizer.zero_grad(set_to_none=True )
数据生成示例
1 2 3 4 5 6 def get_batch (B: int ) -> tuple [torch.Tensor, torch.Tensor]: D = 16 true_w = torch.arange(D, dtype=torch.float32, device=get_device()) x = torch.randn(B, D).to(get_device()) true_y = x @ true_w return (x, true_y)
12. checkpointing
保存和加载检查点
1 2 3 4 5 6 7 8 9 10 11 checkpoint = { "model" : model.state_dict(), "optimizer" : optimizer.state_dict(), } torch.save(checkpoint, "model_checkpoint.pt" ) loaded_checkpoint = torch.load("model_checkpoint.pt" ) model.load_state_dict(loaded_checkpoint["model" ]) optimizer.load_state_dict(loaded_checkpoint["optimizer" ])
13. 混合精度训练
精度权衡
高精度 (float32):更准确稳定,但内存和计算成本高
低精度 (bfloat16/fp8):内存和计算成本低,但精度可能受影响
混合精度策略
前向传播使用 bfloat16/fp8(激活值)
参数和梯度使用 float32
使用 PyTorch AMP(Automatic Mixed Precision)库
14. 实用工具函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def get_memory_usage (x: torch.Tensor ): return x.numel() * x.element_size() def same_storage (x: torch.Tensor, y: torch.Tensor ): return x.untyped_storage().data_ptr() == y.untyped_storage().data_ptr() def time_matmul (a: torch.Tensor, b: torch.Tensor ) -> float : if torch.cuda.is_available(): torch.cuda.synchronize() num_trials = 5 total_time = timeit.timeit(lambda : a @ b, number=num_trials) return total_time / num_trials def get_num_parameters (model: nn.Module ) -> int : return sum (param.numel() for param in model.parameters()) def get_device (index: int = 0 ) -> torch.device: if torch.cuda.is_available(): return torch.device(f"cuda:{index} " ) else : return torch.device("cpu" )