CS336(1): Basics

资料来源:斯坦福CS336: Language Modeling from Scratch

国内视频:B站 - (中英字幕完结)斯坦福CS336:从头开始构建大模型 | 2025年

本章包含课程:第 1、2 课

1. Tokenization

  • Tokenizer: strings <-> tokens (indexes)
  • 基于字符、基于字节、基于单词的标记化非常次优。
  • BPE是一个有效的启发式查看语料库统计。
    标记化是一种必要的弊端,也许有一天我们会从字节来做…

1.1 Character-based tokenization

直接按照 Unicode 字符编码分词即可。

  • 问题1:这是一个非常大的词汇表。大约有150K个Unicode字符。
  • 问题2:许多字符非常罕见(例如🌍),这是对词汇表的低效使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class CharacterTokenizer(Tokenizer):
"""Represent a string as a sequence of Unicode code points."""
def encode(self, string: str) -> list[int]:
return list(map(ord, string))

def decode(self, indices: list[int]) -> str:
return "".join(map(chr, indices))

def character_tokenizer():
assert ord("a") == 97
assert ord("🌍") == 127757
assert chr(97) == "a"
assert chr(127757) == "🌍"

tokenizer = CharacterTokenizer()
string = "Hello, 🌍! 你好!" # @inspect string
indices = tokenizer.encode(string) # @inspect indices
reconstructed_string = tokenizer.decode(indices) # @inspect reconstructed_string
assert string == reconstructed_string

1.2 Byte-based tokenization

每个字符都转为字节码,按照字节码分词。

  • 词汇表很好而且很小:一个字节可以表示256个值。
  • 压缩比=1,很糟糕,这意味着序列会太长。考虑到Transformer的上下文长度是有限的(因为注意力是二次的),这看起来不太好……
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ByteTokenizer(Tokenizer):
"""Represent a string as a sequence of bytes."""
def encode(self, string: str) -> list[int]:
string_bytes = string.encode("utf-8") # @inspect string_bytes
indices = list(map(int, string_bytes)) # @inspect indices
return indices

def decode(self, indices: list[int]) -> str:
string_bytes = bytes(indices) # @inspect string_bytes
string = string_bytes.decode("utf-8") # @inspect string
return string

def byte_tokenizer():
assert bytes("a", encoding="utf-8") == b"a"
assert bytes("🌍", encoding="utf-8") == b"\xf0\x9f\x8c\x8d"

tokenizer = ByteTokenizer()
string = "Hello, 🌍! 你好!" # @inspect string
indices = tokenizer.encode(string) # @inspect indices
reconstructed_string = tokenizer.decode(indices) # @inspect reconstructed_string
assert string == reconstructed_string

1.3 Word-based tokenization

另一种方法(更接近NLP中的经典方法)是将字符串拆分为单词。然后,我们可以建立一个从每个段到整数的映射。

  • 字数很大(就像Unicode字符一样)。很多单词都是罕见的,模型不会学到太多。
  • 这显然没有提供固定的词汇表大小。我们在训练中没有见过的新词会得到一个特殊的UNK标记,这很难看,可能会扰乱困惑度的计算。
1
2
3
4
5
6
7
8
9

def word_tokenizer():
string = "I'll say supercalifragilisticexpialidocious!"
segments = regex.findall(r"\w+|.", string) # @inspect segments

GPT2_TOKENIZER_REGEX = \
r"""'(?:[sdmt]|ll|ve|re)| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""
pattern = GPT2_TOKENIZER_REGEX # @inspect pattern
segments = regex.findall(pattern, string) # @inspect segments

1.4 Byte Pair Encoding (BPE)

BPE算法是由Philip Gage在1994年提出的用于数据压缩的算法。它被用于ML翻译的NLP。(此前,论文一直在使用基于单词的标记化。然后GPT-2使用BPE。)

  • 基本思想:在原始文本上训练标记器以自动确定词汇表。
  • 直觉:常见的字符序列由单个标记表示,罕见的序列由多个标记表示。
  • GPT-2论文使用基于单词的标记法将文本分解为初始片段,并在每个片段上运行原始的BPE算法。
  • 从每个字节作为标记开始,并依次合并最常见的相邻标记对。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@dataclass(frozen=True)
class BPETokenizerParams:
"""All you need to specify a BPETokenizer."""
vocab: dict[int, bytes] # index -> bytes
merges: dict[tuple[int, int], int] # index1,index2 -> new_index

class BPETokenizer(Tokenizer):
"""BPE tokenizer given a set of merges and a vocabulary."""
def __init__(self, params: BPETokenizerParams):
self.params = params

def encode(self, string: str) -> list[int]:
indices = list(map(int, string.encode("utf-8"))) # @inspect indices
# Note: this is a very slow implementation
for pair, new_index in self.params.merges.items(): # @inspect pair, @inspect new_index
indices = merge(indices, pair, new_index)
return indices

def decode(self, indices: list[int]) -> str:
bytes_list = list(map(self.params.vocab.get, indices)) # @inspect bytes_list
string = b"".join(bytes_list).decode("utf-8") # @inspect string
return string

def bpe_tokenizer():
# Training the tokenizer
string = "the cat in the hat" # @inspect string
params = train_bpe(string, num_merges=3)

# Using the tokenizer
tokenizer = BPETokenizer(params)
string = "the quick brown fox" # @inspect string
indices = tokenizer.encode(string) # @inspect indices
reconstructed_string = tokenizer.decode(indices) # @inspect reconstructed_string
assert string == reconstructed_string

def train_bpe(string: str, num_merges: int) -> BPETokenizerParams: # @inspect string, @inspect num_merges
text("Start with the list of bytes of `string`.")
indices = list(map(int, string.encode("utf-8"))) # @inspect indices
merges: dict[tuple[int, int], int] = {} # index1, index2 => merged index
vocab: dict[int, bytes] = {x: bytes([x]) for x in range(256)} # index -> bytes

for i in range(num_merges):
text("Count the number of occurrences of each pair of tokens")
counts = defaultdict(int)
for index1, index2 in zip(indices, indices[1:]): # For each adjacent pair
counts[(index1, index2)] += 1 # @inspect counts

text("Find the most common pair.")
pair = max(counts, key=counts.get) # @inspect pair
index1, index2 = pair

text("Merge that pair.")
new_index = 256 + i # @inspect new_index
merges[pair] = new_index # @inspect merges
vocab[new_index] = vocab[index1] + vocab[index2] # @inspect vocab
indices = merge(indices, pair, new_index) # @inspect indices

return BPETokenizerParams(vocab=vocab, merges=merges)

附:PyTorch 使用

1. 张量基础

  • 张量是 PyTorch 中的基本数据结构,用于存储参数、梯度、优化器状态、数据和激活值
1
2
3
4
5
x = torch.tensor([[1., 2, 3], [4, 5, 6]])  # 从数据直接创建
x = torch.zeros(4, 8) # 4x8 matrix of all zeros 创建全零张量
x = torch.ones(4, 8) # 4x8 matrix of all ones 创建全一张量
x = torch.randn(4, 8) # 4x8 matrix of iid Normal(0, 1) samples 创建正态分布随机张量
x = torch.empty(4, 8) # 4x8 matrix of uninitialized values 分配未初始化内存,后续用自定义逻辑填充

2. 张量内存管理

  • 深度学习中的大多数数据都以浮点数形式存储
  • 主要浮点数据类型:
    • float32 (fp32):默认类型,4字节/元素,传统科学计算基准
    • float16 (fp16):2字节/元素,节省内存但动态范围小,容易下溢
    • bfloat16 (bf16):2字节/元素,与 float32 相同的动态范围,Google Brain 开发
    • fp8:更新的标准,H100 GPU 支持 E4M3 和 E5M2 两种变体
1
2
3
4
5
6
7
8
9
10
x = torch.zeros(4, 8)  # @inspect x
assert x.dtype == torch.float32 # Default type

x = torch.zeros(4, 8, dtype=torch.float16) # @inspect x

x = torch.tensor([1e-8], dtype=torch.float16) # @inspect x
assert x == 0 # Underflow!

x = torch.tensor([1e-8], dtype=torch.bfloat16) # @inspect x
assert x != 0 # No underflow!

3. GPU 张量

  • 默认张量存储在 CPU 内存中
  • 使用 .to("cuda:0")device="cuda:0" 将张量移动到 GPU
  • 可以使用 torch.cuda 模块查询 GPU 信息和内存使用情况
1
2
3
4
5
6
7
8
9
10
11
12
torch.cuda.is_available()
num_gpus = torch.cuda.device_count() # @inspect num_gpus
for i in range(num_gpus):
properties = torch.cuda.get_device_properties(i) # @inspect properties

x = torch.zeros(32, 32)
assert x.device == torch.device("cpu")
y = x.to("cuda:0")
assert y.device == torch.device("cuda", 0)
z = torch.zeros(32, 32, device="cuda:0")

memory_allocated = torch.cuda.memory_allocated() # @inspect memory_allocated

4. 张量操作

(1)存储结构

  • PyTorch 张量是指向内存的指针,带有描述如何访问元素的元数据
  • stride 属性表示在每个维度上移动时需要跳过的元素数量
1
2
3
4
5
6
7
8
9
10
11
x = torch.tensor([
[0., 1, 2, 3],
[4, 5, 6, 7],
[8, 9, 10, 11],
[12, 13, 14, 15],
])
assert x.stride(0) == 4
assert x.stride(1) == 1
r, c = 1, 2
index = r * x.stride(0) + c * x.stride(1) # @inspect index
assert index == 6

(2)切片操作

  • 切片操作提供张量的不同视图,不复制数据
  • 修改原始张量会影响所有相关视图
  • 非连续张量需要先调用 .contiguous() 才能进行某些视图操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
x = torch.tensor([[1., 2, 3], [4, 5, 6]])  # @inspect x
y = x[0] # torch.tensor([1., 2, 3])
y = x[:, 1] # torch.tensor([2, 5])
y = x.view(3, 2) # torch.tensor([[1, 2], [3, 4], [5, 6]])
y = x.transpose(1, 0) # torch.tensor([[1, 4], [2, 5], [3, 6]])
x[0][0] = 100 # 同时也会变 y[0][0] == 100

# 创建原始张量
x = torch.tensor([[1., 2, 3], [4, 5, 6]]) # 内存布局:[1, 2, 3, 4, 5, 6] (连续)
# 转置操作 - 创建视图,不复制数据。通过stride机制实现视图:y.stride() = (1, 2)
y = x.transpose(1, 0) # 转置后:[[1, 4], [2, 5], [3, 6]] 内存布局仍然是:[1, 2, 3, 4, 5, 6]
y.view(2, 3) # 会失败因为 not y.is_contiguous()

# 可以先强制重排( contiguous 会创建新存储,复制数据)
y = x.transpose(1, 0).contiguous().view(2, 3)

(3)逐元素操作

  • 对每个元素应用操作,返回相同形状的新张量
  • 包括:pow(), sqrt(), rsqrt(), 算术运算等
  • triu() 获取上三角矩阵,用于因果注意力掩码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
x = torch.tensor([1, 4, 9])
y = x.pow(2) # torch.tensor([1, 16, 81]))
y = x.sqrt() # torch.tensor([1, 2, 3]))
y = x.rsqrt() # torch.tensor([1, 1 / 2, 1 / 3])) # i -> 1/sqrt(x_i)
y = x + x # torch.tensor([2, 8, 18]))
y = x * 2 # torch.tensor([2, 8, 18]))
y = x / 0.5 # torch.tensor([2, 8, 18]))

# 取矩阵的上三角部分,计算因果注意掩码很有用
x = torch.ones(3, 3).triu()
assert torch.equal(x, torch.tensor([
[1, 1, 1],
[0, 1, 1],
[0, 0, 1]],
))

(4)矩阵乘法

  • 深度学习的核心操作:@matmul()
  • 支持批量矩阵乘法,自动广播处理额外维度
1
2
3
4
5
6
7
x = torch.ones(16, 32)
w = torch.ones(32, 2)
y = x @ w # y.size() == torch.Size([16, 2])

x = torch.ones(4, 8, 16, 32)
w = torch.ones(32, 2)
y = x @ w # y.size() == torch.Size([4, 8, 16, 2])

5. 计算性能分析

  1. FLOP 概念
  • FLOPs:浮点操作次数(计算量度量)
  • FLOP/s:每秒浮点操作次数(硬件性能度量)
  1. 性能基准
  • GPT-3:3.14e23 FLOPs
  • GPT-4(推测):2e25 FLOPs
  • A100:312 TFLOPS(float32)
  • H100:∼990 TFLOPS(float32,无稀疏性)
  1. 矩阵乘法 FLOP 计算
  • 线性模型:2 × B × D × K FLOPs(B=批量大小,D=输入维度,K=输出维度)
  • 一般规律:前向传播 FLOPs ≈ 2 × (token数量) × (参数数量)
  1. 模型 FLOP 利用率 (MFU)
    • MFU = (实际 FLOP/s) / (理论峰值 FLOP/s)
    • MFU ≥ 0.5 通常被认为是良好的
    • 不同数据类型(bfloat16 vs float32)和硬件(H100 vs A100)性能差异显著

6. 梯度计算

  1. 基本概念
    • 前向传播:构建计算图,计算预测值和损失
    • 反向传播:自动计算梯度(通过链式法则)
1
2
3
4
5
6
7
8
9
10
11
12
13
import torch

# 前向传播:计算损失
x = torch.tensor([1., 2, 3])
w = torch.tensor([1., 1, 1], requires_grad=True) # 需要计算梯度
pred_y = x @ w # 预测值
loss = 0.5 * (pred_y - 5).pow(2) # 损失函数

# 反向传播:计算梯度
loss.backward()

# 检查梯度
print("w的梯度:", w.grad) # tensor([1., 2., 3.])
  1. 梯度计算的FLOPs分析
1
输入 x → 线性层 w1 → 隐藏层 h1 → 线性层 w2 → 输出 h2 → 损失 loss
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# (1)前向传播FLOPs计算
B, D, K = 16384, 32768, 8192 # 批量大小, 输入维度, 输出维度
x = torch.ones(B, D)
w1 = torch.randn(D, D, requires_grad=True)
w2 = torch.randn(D, K, requires_grad=True) # 梯度存储在参数的 `.grad` 属性中
h1 = x @ w1 # FLOPs: 2 * B * D * D
h2 = h1 @ w2 # FLOPs: 2 * B * D * K
loss = h2.pow(2).mean()
num_forward_flops = (2 * B * D * D) + (2 * B * D * K)

# (2)反向传播FLOPs计算
# 保留中间梯度用于调试
h1.retain_grad()
h2.retain_grad()
# 反向传播
loss.backward()
# 计算w2的梯度 FLOPs
# w2.grad[j,k] = sum_i h1[i,j] * h2.grad[i,k]
num_backward_flops = 2 * B * D * K
# 计算h1的梯度 FLOPs
# h1.grad[i,j] = sum_k w2[j,k] * h2.grad[i,k]
num_backward_flops += 2 * B * D * K
# 计算w1的梯度 FLOPs(类似过程)
num_backward_flops += 4 * B * D * D # 简化为4倍

FLOPs总结公式

  1. 通用规则
    • 前向传播2 × (数据点数量) × (参数数量) FLOPs
    • 反向传播4 × (数据点数量) × (参数数量) FLOPs
    • 总计算量6 × (数据点数量) × (参数数量) FLOPs
  2. 实际意义
    • 反向传播的计算成本大约是前向传播的2倍
    • 训练总FLOPs ≈ 6 × 批量大小 × 参数数量
    • 这对于估算训练时间和硬件需求很重要

7. 模型

模型参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import torch
import torch.nn as nn
import numpy as np

# 创建模型参数
input_dim = 16384
output_dim = 32
w = nn.Parameter(torch.randn(input_dim, output_dim)) # Parameter 是特殊的 Tensor

# 参数w无初始化,直接参与计算
x = nn.Parameter(torch.randn(input_dim))
output = x @ w # 输出值会随 input_dim 增大而变大
print(f"输出值规模: {output[0]}") # 大约为 sqrt(input_dim)

# Xavier 初始化(标准缩放)
# 通过 1/sqrt(input_dim) 缩放,使输出值稳定
w = nn.Parameter(torch.randn(input_dim, output_dim) / np.sqrt(input_dim))
output = x @ w
print(f"缩放后输出值: {output[0]}") # 保持恒定大小

# 截断正态初始化(更安全)
# 避免异常值,截断到 [-3, 3] 范围
w = nn.Parameter(nn.init.trunc_normal_(
torch.empty(input_dim, output_dim),
std=1 / np.sqrt(input_dim),
a=-3, b=3
))

线性层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class Linear(nn.Module):
"""简单的线性层"""
def __init__(self, input_dim: int, output_dim: int):
super().__init__()
# 使用 Xavier 初始化
self.weight = nn.Parameter(torch.randn(input_dim, output_dim) / np.sqrt(input_dim))

def forward(self, x: torch.Tensor) -> torch.Tensor:
return x @ self.weight

class Cruncher(nn.Module):
def __init__(self, dim: int, num_layers: int):
super().__init__()
# 使用 ModuleList 存储多个线性层
self.layers = nn.ModuleList([
Linear(dim, dim) for i in range(num_layers)
])
self.final = Linear(dim, 1) # 最终输出层

def forward(self, x: torch.Tensor) -> torch.Tensor:
B, D = x.size() # 批量大小, 维度

# 应用所有线性层
for layer in self.layers:
x = layer(x)

# 最终输出层
x = self.final(x)
assert x.size() == torch.Size([B, 1])

# 移除最后一个维度
x = x.squeeze(-1)
assert x.size() == torch.Size([B])

return x

# 创建模型
D = 64 # 维度
num_layers = 2
model = Cruncher(dim=D, num_layers=num_layers)

# 检查参数数量和大小
param_sizes = [
(name, param.numel())
for name, param in model.state_dict().items()
]
print("参数大小:", param_sizes)

# 计算总参数数量
num_parameters = sum(param.numel() for param in model.parameters())
assert num_parameters == (D * D) + (D * D) + D

# 移动到GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# 运行模型
B = 8 # 批量大小
x = torch.randn(B, D, device=device)
y = model(x)
print("输出形状:", y.shape) # torch.Size([8])

8. 随机性管理

  1. 随机性的来源
  • 参数初始化
  • Dropout
  • 数据顺序
  • 其他随机操作

设置随机种子确保可重现性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import torch
import numpy as np
import random

seed = 0 # 设置随机种子

# 设置PyTorch随机种子
torch.manual_seed(seed)

# 设置NumPy随机种子
np.random.seed(seed)

# 设置Python内置随机种子
random.seed(seed)

9. optimizer

  1. 优化器演化
    • SGD + Momentum:梯度指数平均
    • AdaGrad:SGD + 梯度平方平均
    • RMSProp:AdaGrad + 梯度平方指数平均
    • Adam:RMSProp + Momentum

AdaGrad 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import torch
import torch.nn.functional as F

# 创建模型
model = Cruncher(dim=4, num_layers=2).to(get_device())

# 定义AdaGrad优化器
optimizer = torch.optim.Adagrad(model.parameters(), lr=0.01)

# 计算梯度
x = torch.randn(2, 4, device=get_device())
y = torch.tensor([4., 5.], device=get_device())
pred_y = model(x)
loss = F.mse_loss(pred_y, y)
loss.backward()

# 更新参数
optimizer.step()

# 清空梯度(内存优化)
optimizer.zero_grad(set_to_none=True)

10. 内存计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 参数数量
num_parameters = (D * D * num_layers) + D

# 激活值数量
num_activations = B * D * num_layers

# 梯度数量(与参数相同)
num_gradients = num_parameters

# 优化器状态数量
num_optimizer_states = num_parameters

# 总内存估算(float32,4字节/元素)
total_memory = 4 * (num_parameters + num_activations + num_gradients + num_optimizer_states)

# 单步训练FLOPs
flops = 6 * B * num_parameters # 前向2倍 + 反向4倍

11. 训练

  1. 基础训练流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def train(name: str, get_batch, D: int, num_layers: int, 
B: int, num_train_steps: int, lr: float):

model = Cruncher(dim=D, num_layers=num_layers).to(get_device())
optimizer = torch.optim.SGD(model.parameters(), lr=lr)

for t in range(num_train_steps):
# 获取数据
x, y = get_batch(B=B)

# 前向传播
pred_y = model(x)
loss = F.mse_loss(pred_y, y)

# 反向传播
loss.backward()

# 参数更新
optimizer.step()
optimizer.zero_grad(set_to_none=True)
  1. 数据生成示例
1
2
3
4
5
6
def get_batch(B: int) -> tuple[torch.Tensor, torch.Tensor]:
D = 16
true_w = torch.arange(D, dtype=torch.float32, device=get_device())
x = torch.randn(B, D).to(get_device())
true_y = x @ true_w
return (x, true_y)

12. checkpointing

保存和加载检查点

1
2
3
4
5
6
7
8
9
10
11
# 保存检查点
checkpoint = {
"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
}
torch.save(checkpoint, "model_checkpoint.pt")

# 加载检查点
loaded_checkpoint = torch.load("model_checkpoint.pt")
model.load_state_dict(loaded_checkpoint["model"])
optimizer.load_state_dict(loaded_checkpoint["optimizer"])

13. 混合精度训练

  1. 精度权衡
    • 高精度(float32):更准确稳定,但内存和计算成本高
    • 低精度(bfloat16/fp8):内存和计算成本低,但精度可能受影响
  2. 混合精度策略
    • 前向传播使用 bfloat16/fp8(激活值)
    • 参数和梯度使用 float32
    • 使用 PyTorch AMP(Automatic Mixed Precision)库

14. 实用工具函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 内存使用计算
def get_memory_usage(x: torch.Tensor):
return x.numel() * x.element_size()

# 存储共享检查
def same_storage(x: torch.Tensor, y: torch.Tensor):
return x.untyped_storage().data_ptr() == y.untyped_storage().data_ptr()

# 性能计时
def time_matmul(a: torch.Tensor, b: torch.Tensor) -> float:
if torch.cuda.is_available():
torch.cuda.synchronize()

num_trials = 5
total_time = timeit.timeit(lambda: a @ b, number=num_trials)
return total_time / num_trials

# 参数统计
def get_num_parameters(model: nn.Module) -> int:
return sum(param.numel() for param in model.parameters())

# 设备选择
def get_device(index: int = 0) -> torch.device:
if torch.cuda.is_available():
return torch.device(f"cuda:{index}")
else:
return torch.device("cpu")