参考

召回(三)序列召回

  • 在前面的章节中,我们学习了协同过滤和向量召回的方法。这些方法通常将用户的历史行为汇总成一个静态的表示(比如一个向量),然后基于这个表示进行推荐。但是,用户的行为其实是有时间顺序的,而且这个顺序往往包含了重要的信息。
  • 比如,一个用户先浏览了跑鞋,然后看了运动服,接着又看了健身器材,这个顺序告诉我们这个用户可能对健身运动感兴趣。如果我们只是简单地把这些行为加起来或者平均,就丢失了这种时间顺序的信息。
  • 序列召回就是要利用用户行为的时间顺序信息来进行推荐。它的基本想法是:用户的当前兴趣不仅取决于他过去喜欢什么,还取决于他最近在做什么,以及这些行为的顺序。
  • 序列召回就是要利用用户行为的时间顺序信息来进行推荐。它的基本想法是:用户的当前兴趣不仅取决于他过去喜欢什么,还取决于他最近在做什么,以及这些行为的顺序。
  • 相比传统的静态表示方法,序列召回能够对用户的兴趣理解的更加的准确及全面。它可以理解行为间的因果关系,如用户买手机后通常会买手机壳;能够捕捉兴趣的演化过程,如从数码产品转向户外运动;还能更好地处理多元化兴趣,在不同时段识别用户对电影、运动等领域的偏好变化。
  • 在序列召回的研究中,学者们探索了多种不同的建模思路。本章将重点介绍其中两类具有代表性的方法:
  1. 多兴趣用户表示
    传统方法将用户压缩为单一向量,难以充分表达用户兴趣的多样性和时序性。多兴趣表示方法尝试通过多个向量或动态结构来更好地刻画用户的复杂兴趣模式。我们将介绍MIND模型如何使用多个向量分别捕捉用户的不同兴趣维度,以及SDM模型如何将用户兴趣分解为长期偏好和短期行为两个层次。
  2. 生成式序列预测
    另一类方法将推荐问题重新定义为序列生成任务,借鉴自然语言处理领域的成功经验。我们将探讨SASRec如何将Transformer架构引入推荐系统,通过自注意力机制建模序列依赖关系,以及后续的HSTU和TIGER模型在特征融合和物品表示方面的改进。
  • 通过这些具体案例,我们将深入理解不同建模策略如何挖掘时序信息的价值。

1、深化用户兴趣表示

  • 传统的向量召回方法,如双塔模型,倾向于将用户所有的历史行为“压扁”成一个单一的静态向量。这种“平均化”的处理方式虽然高效,但在两个关键方面存在明显局限:首先,它无法表达用户兴趣的多样性,例如一个用户可能既是需要购买专业书籍的程序员,又是一位需要选购婴儿用品的新手父亲,单一向量很难同时兼顾这两种截然不同的需求;其次,它忽略了兴趣的时效性,无法区分用户长期稳定的爱好(如对摄影的持续关注)和临时的即时需求(如今天突然搜索“感冒药”),而后者往往更能预示下一次的交互行为。
  • 为了构建一个更丰富、更立体的用户画像以实现更精准的召回,研究者们沿着“深化用户兴趣表示”的路径进行了持续探索。本章将介绍其中的两个代表性模型:MIND 和 SDM。我们将首先探讨如何使用多个向量来表示用户的多元兴趣,然后在此基础上,进一步融入时间维度,学习如何动态地捕捉用户兴趣的演化。

1.1 多向量捕捉多元兴趣MIND

  • 想象一下,你在淘宝上的购物历史:今天买了一本编程书,昨天买了运动鞋,上周买了咖啡豆。如果推荐系统只用一个数字向量来描述你,就像是用一个标签来概括一个人的全部——显然是不够的。
  • MIND (Multi-Interest Network with Dynamic Routing) (Li et al., 2019) 模型提出了一个更符合直觉的想法:既然用户有多种兴趣,为什么不用多个向量来分别表示呢?就像给每个兴趣爱好都分配一个专门的“代言人”。
  • 这个模型的巧妙之处在于借鉴了胶囊网络的动态路由思想。简单来说,它会自动把你的行为按照兴趣类型进行分组——编程相关的行为归为一类,运动相关的归为另一类,美食相关的又是一类。每一类都会生成一个专门的兴趣向量,这样推荐系统就能更精准地理解你在不同场景下的需求。
    MIND模型
  • 从整体架构来看,除了常规的Embedding层,MIND模型还包含了两个重要的组件:多兴趣提取层和Label-Aware注意力层。

多兴趣提取

  • MIND模型的多兴趣提取技术源于对胶囊网络动态路由机制的创新性改进。胶囊网络 (Sabour et al., 2017) 最初在计算机视觉领域被提出,其核心思想是用向量而非标量来表示特征,向量的方向编码属性信息,长度表示存在概率。动态路由则是确定不同层级胶囊之间连接强度的算法,它通过迭代优化的方式实现输入特征的软聚类。这种软聚类机制的优势在于,它不需要预先定义聚类数量或类别边界,而是让数据自然地分组,这正好契合了用户兴趣发现的需求。MIND模型引入了这一思想并提出了行为到兴趣(Behavior to Interest,B2I)动态路由:将用户的历史行为视为行为胶囊,将用户的多重兴趣视为兴趣胶囊,通过动态路由算法将相关的行为聚合到对应的兴趣维度中。MIND模型针对推荐系统的特点对原始动态路由算法进行了三个关键改进:
  1. 共享变换矩阵。与原始胶囊网络为每对胶囊使用独立变换矩阵不同,MIND采用共享的双线性映射矩阵 SRd×dS\in \mathcal{R}^{d\times d}。这种设计有两个重要考虑:首先,用户行为序列长度变化很大,从几十到几百不等,共享矩阵确保了算法的通用性 ;其次,共享变换保证所有兴趣向量位于同一表示空间,便于后续的相似度计算和检索操作。路由连接强度的计算公式为:

bij=ujTSei其中ei表示用户历史行为i的物品向量,uj表示第j个兴趣胶囊的向量,bij衡量行为i与兴趣j的关联程度。b_{ij} = u_{j}^T S e_i \\ 其中 e_i 表示用户历史行为 i 的物品向量,u_j 表示第 j 个兴趣胶囊的向量,b_{ij} 衡量行为 i 与兴趣 j 的关联程度。

  1. 随机初始化策略。为避免所有兴趣胶囊收敛到相同状态,算法采用高斯分布随机初始化路由系数 bijb_{ij}。这一策略类似于K-Means聚类中的随机中心初始化,确保不同兴趣胶囊能够捕捉用户兴趣的不同方面。
  2. 自适应兴趣数量。考虑到不同用户的兴趣复杂度差异很大,MIND引入了动态兴趣数量机制:

Ku=max(1,min(K,log2(Iu)))其中Iu表示用户u的历史行为数量,K是预设的最大兴趣数。K^{'}_u = \max ( 1, \min (K, \log_{2} (|\mathcal{I}_u|)) ) \\ 其中 |\mathcal{I}_u| 表示用户 u 的历史行为数量,K 是预设的最大兴趣数。

这种设计为行为较少的用户节省计算资源,同时为活跃用户提供更丰富的兴趣表示。

  • 改进后的动态路由过程通过迭代方式进行更新。 bijb_{ij} 在第一轮迭代时,初始化为0,在每轮迭代中更新路由系数和兴趣胶囊向量,直到收敛。 公式描述了路由系数 bijb_{ij} 的更新,但关键的兴趣胶囊向量 uju_j 是通过以下步骤计算的,这本质上是一个软聚类算法:
  1. 计算路由权重 : 对于每一个历史行为(低层胶囊 i),其分配到各个兴趣(高层胶囊 j)的权重 wijw_{ij} 通过对路由系数 bijb_{ij} 进行Softmax操作得到。

wij=expbijk=1Kuexpbikw_{ij} = \frac{\exp b_{ij}}{\sum_{k=1}^{K_u^{'}} \exp b_{ik}}

这里的 wijw{ij} 可以理解为行为 i 属于兴趣 j 的“软分配”概率。
2. 聚合行为以形成兴趣向量: 每一个兴趣胶囊的初步向量 zjz_j 是通过对所有行为向量 eie_i 进行加权求和得到的。每个行为向量在求和前会先经过共享变换矩阵 SS 的转换。

zj=iIuwijSeiz_j = \sum_{i\in \mathcal{I}_u} w_{ij} S e_i

这一步是聚类的核心:根据刚刚算出的权重,将相关的用户行为聚合起来,形成代表特定兴趣的向量。
3. 非线性压缩 : 为了将向量的模长(长度)约束在 [0, 1) 区间内,同时不改变其方向,模型使用了一个非线性的“squash”函数,从而得到本轮迭代的最终兴趣胶囊向量 uju_j。向量的长度可以被解释为该兴趣存在的概率,而其方向则编码了兴趣的具体属性。

uj=squash(zj)=zj21+zj2zjzju_j = squash(z_j) = \frac{ \| z_j^2\| }{1+\|z_j^2 \| } \frac{z_j}{\|z_j \|}

  1. 更新路由系数 (Updating Routing Logits): 最后,根据新生成的兴趣胶囊 uju_j 和行为向量 eie_i 之间的一致性(通过点积衡量),来更新下一轮迭代的路由系数 bijb_{ij}

bijbij+ujTSeib_{ij} \leftarrow b_{ij} + u_j^T S e_i

以上四个步骤会重复进行固定的次数(通常为3次),最终输出收敛后的兴趣胶囊向量集合 {uj,j=1,..,Ku}\{ u_j\quad,j=1,..,K_u^{'} \} 作为该用户的多兴趣表示。

标签感知的注意力机制

  • 多兴趣提取层生成了用户的多个兴趣向量,但在训练阶段,我们需要确定哪个兴趣向量与当前的目标商品最相关。因为在训练时,我们拥有‘正确答案’(即用户实际点击的下一个商品),所以可以利用这个‘标签’信息,来反向监督模型,让模型学会在多个兴趣向量中,挑出与正确答案最相关的那一个。这相当于在训练阶段给模型一个明确的指引。MIND模型设计了标签感知注意力层来解决这个问题。
  • 该注意力机制以目标商品向量作为查询,以用户的多个兴趣向量作为键和值。具体计算过程如下:

vu=VuSoftmax(pow(VuTei,p))其中Vu=(vu1,...,vuK)表示用户的兴趣胶囊矩阵,通过将兴趣胶囊向量u与用户画像Embedding进行拼接,再经过多层ReLU变换得到。ei是目标商品iEmbedding向量,p是控制注意力集中度的超参数。v_u = V_u \cdot Softmax(pow(V_u^T e_i, p)) \\ 其中V_u = (v_u^1, ..., v_u^K) 表示用户的兴趣胶囊矩阵,通过将兴趣胶囊向量 u 与用户画像Embedding进行拼接,再经过多层ReLU变换得到。 e_i是目标商品i的Embedding向量,p是控制注意力集中度的超参数。

  • 参数p控制注意力分布:当p接近0时,所有兴趣向量获得均等关注;随着p增大,注意力逐渐集中于与目标商品最相似的兴趣向量;当p趋于无穷时,机制退化为硬注意力,只选择相似度最高的兴趣向量。实验表明,使用较大的p值能够加快模型收敛速度。
  • 通过标签感知得到用户向量 vuv_u 后,MIND模型的训练目标就是让用户向量与其真实交互的商品尽可能“匹配”。具体来说,模型会最大化用户与正样本商品的相似度,同时最小化与负样本的相似度。由于商品库通常非常庞大,直接计算所有商品的概率分布在计算上不现实,因此MIND采用了和YouTubeDNN相同的策略——使用Sampled Softmax损失函数,通过随机采样一小部分负样本来近似全局的归一化计算。

代码

  • MIND的核心在于胶囊网络的动态路由实现。在每次迭代中,模型首先通过softmax计算路由权重,然后通过双线性变换聚合行为向量,最后使用squash函数进行非线性压缩。
  • 这里的squash函数实现了向量长度的非线性压缩,确保输出向量的模长在[0,1)区间内。
  • 标签感知注意力的实现比较直观,核心是使用目标商品向量作为查询,计算与各个兴趣向量的相似度。

mind.py 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import tensorflow as tf

from .utils import (
concat_group_embedding,
build_input_layer,
build_group_feature_embedding_table_dict,
pooling_group_embedding,
)
from .layers import DNNs, CapsuleLayer, LabelAwareAttention, SampledSoftmaxLayer


def dynamic_capsule_num(seq_len, k_max):
"""
动态计算胶囊数量

参数:
seq_len: 序列长度 [B, 1]
k_max: 最大胶囊数量

返回:
k_capsule: 动态胶囊数量
"""
seq_len = tf.squeeze(seq_len, axis=1) # [B,]
log_len = tf.math.log1p(tf.cast(seq_len, dtype="float32"))
# 最少一个胶囊
k_capsule = tf.cast(
tf.maximum(
1.0, tf.minimum(tf.cast(k_max, dtype="float32"), log_len / tf.math.log(2.0))
),
dtype="int32",
)
return k_capsule


def build_mind_model(feature_columns, model_config):
"""
构建MIND (Multi-Interest Network with Dynamic routing)模型

参数:
feature_columns: 特征列配置
model_config: 模型配置字典,包含:
- neg_samples: 负采样数量 (默认: 50)
- emb_dims: embedding维度 (默认: 16)
- max_capsulen_nums: 最大胶囊数量 (默认: 4)
- max_seq_len: 最大序列长度 (默认: 50)
- user_dnn_units: 用户DNN层单元数 (默认: [128, 64])
"""
# 从配置中提取参数并设置默认值
neg_samples = model_config.get("neg_samples", 50)
emb_dims = model_config.get("emb_dims", 16)
max_capsulen_nums = model_config.get("max_capsulen_nums", 4)
max_seq_len = model_config.get("max_seq_len", 50)
user_dnn_units = model_config.get("user_dnn_units", [128, 64])

# 从特征列中获取物品词汇表大小和标签名称
item_vocab_size = None
label_name = None
for fc in feature_columns:
if "target_item" in fc.group and fc.name == "movie_id":
item_vocab_size = fc.vocab_size
label_name = fc.name
break

if item_vocab_size is None or label_name is None:
raise ValueError("无法从特征列中找到物品词汇表大小或标签名称")

# 获取用户嵌入特征名称列表
user_emb_feature_name_list = []
for fc in feature_columns:
if any(group in ["user_dnn", "raw_hist_seq"] for group in fc.group):
user_emb_feature_name_list.append(fc.name)
user_emb_feature_name_list.append("hist_len") # 添加hist_len

# 构建输入层
input_layer_dict = build_input_layer(feature_columns)

# 构建特征embedding表
group_embedding_feature_dict, embedding_table_dict = (
build_group_feature_embedding_table_dict(
feature_columns,
input_layer_dict,
prefix="embedding/",
return_embedding_table=True,
)
)

user_dnn_inputs = concat_group_embedding(group_embedding_feature_dict, "user_dnn")
user_hist_seq_embedding = pooling_group_embedding(
group_embedding_feature_dict, "raw_hist_seq"
)
target_item_embedding = pooling_group_embedding(
group_embedding_feature_dict, "target_item"
)

# 转换成胶囊的数量
user_dnn_inputs = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=1))(
user_dnn_inputs
)
user_dnn_inputs = tf.keras.layers.Lambda(
lambda x: tf.tile(x, [1, max_capsulen_nums, 1])
)(
user_dnn_inputs
) # [B, k_max, feat_num x dim]

# 因为序列是左侧padding,所以mask需要反转
hist_len = input_layer_dict["hist_len"]
sequence_mask = tf.keras.layers.Lambda(
lambda x: tf.reverse(
tf.sequence_mask(tf.squeeze(x, axis=1), maxlen=max_seq_len), axis=[1]
)
)(
hist_len
) # [B, max_seq_len]

capsule_num = tf.keras.layers.Lambda(lambda x: dynamic_capsule_num(x[0], x[1]))(
[hist_len, max_capsulen_nums]
)

high_capsule = CapsuleLayer(
input_units=emb_dims,
out_units=emb_dims,
max_len=max_seq_len,
k_max=max_capsulen_nums,
)(
[user_hist_seq_embedding, sequence_mask, capsule_num]
) # [B,k_max,out_units]

# 将每个胶囊都拼接上用户的特征
user_embeddings = tf.keras.layers.Concatenate(axis=-1)(
[user_dnn_inputs, high_capsule]
)
# 再将拼接后的特征降维到胶囊的维度
user_embeddings = DNNs(user_dnn_units + [emb_dims])(user_embeddings)

# 计算LabelAwareAttention
user_embedding_final = LabelAwareAttention(k_max=max_capsulen_nums, pow_p=1.0)(
(user_embeddings, target_item_embedding)
)
user_embedding_final = tf.keras.layers.Lambda(lambda x: tf.nn.l2_normalize(x, -1))(
user_embedding_final
)

# 获取item emb权重
item_embedding_weight = embedding_table_dict[label_name].embeddings
# 获取item embedding
output_item_embedding = tf.keras.layers.Lambda(lambda x: tf.squeeze(x, axis=1))(
embedding_table_dict[label_name](input_layer_dict[label_name])
)
# 构建采样softmax层
sampled_softmax_layer = SampledSoftmaxLayer(
vocab_size=item_vocab_size, num_sampled=neg_samples, emb_dim=emb_dims
)
# 计算输出
output = sampled_softmax_layer(
[item_embedding_weight, user_embedding_final, input_layer_dict[label_name]]
)

# 构建模型
model = tf.keras.Model(inputs=list(input_layer_dict.values()), outputs=output)

# 构建用户模型和物品模型用于评估
user_inputs_list = [
input_layer_dict[feat_name] for feat_name in user_emb_feature_name_list
]
user_model = tf.keras.Model(inputs=user_inputs_list, outputs=user_embeddings)

item_model = tf.keras.Model(
inputs=input_layer_dict[label_name], outputs=output_item_embedding
)

return model, user_model, item_model

1.2 融合长短期兴趣,捕捉动态变化SDM

  • MIND解决了兴趣“广度”的问题,但新的问题随之而来:时间。 用户兴趣不仅是多元的,还是动态演化的。一个用户在一次购物会话(Session)中的行为,往往比他一个月前的行为更能预示他下一刻的需求。MIND虽然能捕捉多个兴趣,但并未在结构上显式地区分它们的时效性。序列深度匹配模型(SDM) (Lv et al., 2019) 正是为了解决这一问题而提出的。SDM模型 的核心思想是分别建模用户的短期即时兴趣和长期稳定偏好,然后智能地融合它们。
    SDM模型

捕捉短期兴趣

  • 为了精准捕捉短期兴趣,SDM设计了一个三层结构来处理用户的当前会话序列。
  • 首先,将短期会话中的商品序列输入LSTM网络,学习序列间的时序依赖关系。LSTM的标准计算过程为:

intu=σ(Win1eitu+Win2ht1u+bin)ftu=σ(Wf1eitu+Wf2ht1u+bf)otu=σ(Wo1eitu+Wo2ht1u+bo)ctu=ftuct1u+intutanh(Wc1eitu+Wc2ht1u+bc)htu=otutanh(ctu)这里eitu表示第t个时间步的商品Embeddingσ表示sigmoid激活函数,W表示权重矩阵,b表示偏置向量。\begin{aligned} in_t^u &= \sigma (W_{in}^1 e_{i_t^u} + W_{in}^2 h_{t-1}^u + b_{in} ) \\ f_t^u &= \sigma (W_{f}^1 e_{i_t^u} + W_{f}^2 h_{t-1}^u + b_{f} ) \\ o_t^u &= \sigma (W_{o}^1 e_{i_t^u} + W_{o}^2 h_{t-1}^u + b_{o} ) \\ c_t^u &= f_t^u c_{t-1}^u + in_{t}^u \tanh (W_c^1 e_{i_t^u} + W_c^2 h_{t-1}^u + b_c) \\ h_t^u &= o_t^u \tanh (c_t^u) \\ 这里e_{i_t^u}表示第t个时间步的商品Embedding,\sigma 表示sigmoid激活函数,W表示权重矩阵,b表示偏置向量。 \end{aligned}

LSTM采用多输入多输出模式,每个时间步都输出隐藏状态 htuRd×1h_t^u \in \mathcal{R}^{d\times 1},最终得到序列表示 Xu=[h1u,...,htu]X^u = [ h_1^u, ..., h_t^u]

  • LSTM的引入主要是为了处理在线购物中的一个常见问题:用户往往会产生一些随机点击,这些不相关的行为会干扰序列表示。通过LSTM的门控机制,模型能够更好地捕捉序列中的有效信息。然后,SDM采用多头自注意力机制来捕捉用户兴趣的多样性。
  • 多头自注意力机制。

headiu=Attention(WiQXu,WIKXu,WiVXu)head_i^u = Attention(W_i^Q X^u, W_I^K X^u, W_i^V X^u)

具体计算过程为:

f(Qiu,Kiu)=QiuTKiuAiu=softmax(f(Qiu,Kiu))headiu=ViuAiuT其中QiuKiuViu分别表示第i个头的查询、键、值矩阵,WiQWiKWiV是对应的权重矩阵。f(Q_i^u, K_i^u) = {Q_i^u}^T K_i^u \\ A_i^u = softmax ( f(Q_i^u, K_i^u) ) \\ head_i^u = V_i^u {A_i^u}^T \\ 其中Q_i^u、K_i^u、V_i^u分别表示第i个头的查询、键、值矩阵,W_i^Q、W_i^K、W_i^V是对应的权重矩阵。

多头注意力的最终输出为:

X^u=MultiHead(Xu)=WOconcat(head1u,...,headhu)其中h是头的数量,WO是输出权重矩阵。\hat{X}^u = MultiHead(X^u) = W^O concat(head_1^u,..., head_h^u) \\ 其中h是头的数量,W^O是输出权重矩阵。

每个头可以专注于不同的兴趣维度,通过多头机制实现对用户多重兴趣的并行建模。

最后,SDM加入个性化注意力层,使用用户画像向量 eue_u 作为查询,对多头注意力输出进行加权:

αk=exp(h^kuTeu)k=1texp(h^kuTeu)stu=k=1tαkh^ku\begin{aligned} \alpha_{k} &=\frac{\exp\left(\hat{\boldsymbol{h}}_{k}^{u T} \boldsymbol{e}_{u}\right)}{\sum_{k=1}^{t} \exp\left(\hat{\boldsymbol{h}}_{k}^{u T} \boldsymbol{e}_{u}\right)} \\ \boldsymbol{s}_{t}^{u} &=\sum_{k=1}^{t} \alpha_{k} \hat{\boldsymbol{h}}_{k}^{u} \end{aligned}

这里 h^ku\hat{\boldsymbol{h}}_{k}^{u} 是多头注意力输出 X^u\hat{X}^{u} 中第 kk个位置的隐藏状态,αk\alpha_{k}是对应的注意力权重。最终得到融合个性化信息的短期兴趣表示 stuRd×1\boldsymbol{s}_{t}^{u} \in \mathbb{R}^{d \times 1}

捕捉长期兴趣

  • 长期行为包含丰富的用户偏好信息,但与短期行为的建模方式不同。SDM从特征维度对长期行为进行聚合,将历史行为按不同特征分成多个子集。

Lu={LfufF}\mathcal{L}^{u}=\left\{\mathcal{L}_{f}^{u} \mid f \in \mathcal{F}\right\}

具体包括:交互过的商品ID集合 Lidu\mathcal{L}^{u}_{id},叶子类别集合 Lleafu\mathcal{L}^{u}_{leaf},一级类别集合 Lcateu\mathcal{L}^{u}_{cate},访问过的商店集合 Lshopu\mathcal{L}^{u}_{shop},交互过的品牌集合 Lbrandu\mathcal{L}^{u}_{brand}。这种特征维度的分离使模型能够从不同角度理解用户的长期偏好模式。

对每个特征子集,模型使用注意力机制计算用户在该维度上的偏好。将特征实体fkuLfuf^{u}_{k} \in \mathcal{L}^{u}_{f}通过嵌入矩阵转换为向量gku\boldsymbol{g}^{u}_{k},然后使用用户画像eu\boldsymbol{e}_u计算注意力权重:

αk=exp(gkuTeu)k=1Lfuexp(gkuTeu)zfu=k=1Lfuαkgku\begin{aligned} \alpha_{k} &=\frac{\exp \left(\boldsymbol{g}_{k}^{u T} \boldsymbol{e}_{u}\right)}{\sum_{k=1}^{\left|\mathcal{L}_{f}^{u}\right|} \exp \left(\boldsymbol{g}_{k}^{u T} \boldsymbol{e}_{u}\right)} \\ \boldsymbol{z}_{f}^{u} &=\sum_{k=1}^{\left|\mathcal{L}_{f}^{u}\right|} \alpha_{k} \boldsymbol{g}_{k}^{u} \end{aligned}

其中Lfu\left|\mathcal{L}_{f}^{u}\right|表示特征子集的大小。

最终将各特征维度的表示拼接,通过全连接网络得到长期兴趣表示:

zu=concat({zfufF})pu=tanh(Wpzu+b)\begin{aligned} \boldsymbol{z}^{u} &=\operatorname{concat}\left(\left\{\boldsymbol{z}_{f}^{u} \mid f \in \mathcal{F}\right\}\right) \\ \boldsymbol{p}^{u} &=\tanh \left(\boldsymbol{W}^{p} \boldsymbol{z}^{u}+\boldsymbol{b}\right) \end{aligned}

其中Wp\boldsymbol{W}^{p}是权重矩阵,b\boldsymbol{b}是偏置向量。

长短期兴趣融合

有了长短期兴趣表示后,关键问题是如何有效融合这两部分信息。用户的长期行为虽然丰富,但通常只有一小部分与当前决策相关。简单的拼接或加权求和难以准确提取相关信息。

SDM创新性地设计了门控融合机制,类似LSTM中的门控思想:

Gtu=sigmoid(W1eu+W2stu+W3pu+b)otu=(1Gtu)pu+Gtustu\begin{aligned} \boldsymbol{G}_{t}^{u} &= \operatorname{sigmoid}\left(\boldsymbol{W}^{1} \boldsymbol{e}_{u}+\boldsymbol{W}^{2} \boldsymbol{s}_{t}^{u}+\boldsymbol{W}^{3} \boldsymbol{p}^{u}+\boldsymbol{b}\right) \\ \boldsymbol{o}_{t}^{u} &= \left(1-\boldsymbol{G}_{t}^{u}\right) \odot \boldsymbol{p}^{u}+\boldsymbol{G}_{t}^{u} \odot \boldsymbol{s}_{t}^{u} \end{aligned}

这里GtuRd×1\boldsymbol{G}_{t}^{u} \in \mathbb{R}^{d \times 1}是门控向量,\odot表示逐元素乘法,W1\boldsymbol{W}^{1}W2\boldsymbol{W}^{2}W3\boldsymbol{W}^{3}是权重矩阵。

门控网络接收三个输入:用户画像eu\boldsymbol{e}_{u}、短期兴趣stu\boldsymbol{s}_{t}^{u}和长期兴趣pu\boldsymbol{p}^{u},输出的门控向量每个元素值介于0到1之间,决定了对应维度上长短期兴趣的贡献比例。这让模型能够在不同兴趣维度上分别选择保留长期或短期信息,避免简单平均可能带来的信息损失,使模型能够精确捕捉长期行为中与当前兴趣最相关的部分。

代码

  • SDM的短期兴趣建模采用了三层架构,逐步从原始序列中提取用户的即时兴趣。
  • 个性化注意力层的实现通过用户画像与序列特征的点积来计算注意力权重。
  • 长期兴趣的建模采用特征维度聚合的方式,对每个特征维度分别应用注意力机制。
  • 门控融合机制通过学习三个权重矩阵来决定如何融合长短期兴趣。
  • 整个SDM模型的最终实现将三个模块串联起来。

sdm.py 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import tensorflow as tf

from .utils import (
concat_group_embedding,
build_input_layer,
build_group_feature_embedding_table_dict,
)
from .layers import SampledSoftmaxLayer, UserAttention, GatedFusion


def build_sdm_model(feature_columns, model_config):
# 从model_config中提取参数
label_name = model_config.get("label_name", "movie_id")
dnn_activation = model_config.get("dnn_activation", "tanh")
emb_dim = model_config.get("emb_dim", 16)
num_heads = model_config.get("num_heads", 2)
num_sampled = model_config.get("num_sampled", 20)

# 从feature_columns中获取item_vocab_size
item_vocab_size = None
for fc in feature_columns:
if fc.name == label_name:
item_vocab_size = fc.vocab_size
break

if item_vocab_size is None:
raise ValueError(f"Could not find vocab_size for label feature '{label_name}'")

input_layer_dict = build_input_layer(feature_columns)
group_embedding_feature_dict, embedding_table_dict = (
build_group_feature_embedding_table_dict(
feature_columns,
input_layer_dict,
prefix="embedding/",
return_embedding_table=True,
)
)

# 用户表示
user_emb_concat = concat_group_embedding(
group_embedding_feature_dict, "user", flatten=False
) # [None, 1, dim * num_user_features]
user_embedding = tf.keras.layers.Dense(emb_dim, activation=dnn_activation)(
user_emb_concat
) # [None, 1, dim]

# ------------ 短期兴趣建模 ------------
# 1. 序列信息学习层(LSTM)
# 获取短期会话序列特征
short_history_features = group_embedding_feature_dict["raw_hist_seq"]
short_history_item_embs = []
for name, short_history_feature in short_history_features.items():
short_history_mask = tf.keras.layers.Lambda(
lambda x: tf.expand_dims(
tf.cast(tf.not_equal(x, 0), dtype=tf.float32), axis=-1
)
)(
input_layer_dict[name]
) # [None, max_len, 1]

# 使用mask
short_history_item_embs.append(
tf.keras.layers.Lambda(lambda x: x[0] * x[1])(
[short_history_feature, short_history_mask]
)
) # [None, max_len, dim]
short_history_item_emb_concat = tf.keras.layers.Concatenate(axis=-1)(
short_history_item_embs
) # [None, max_len, dim * num_short_history_features]
short_history_item_emb = tf.keras.layers.Dense(emb_dim, activation=dnn_activation)(
short_history_item_emb_concat
) # [None, max_len, dim]

# 使用LSTM处理序列
lstm_layer = tf.keras.layers.LSTM(
emb_dim, return_sequences=True, recurrent_initializer="glorot_uniform"
)
sequence_output = lstm_layer(short_history_item_emb)

# 2. 多兴趣提取层(多头自注意力)
# 对LSTM输出应用多头注意力

norm_sequence_output = tf.keras.layers.LayerNormalization()(sequence_output)
sequence_output = tf.keras.layers.MultiHeadAttention(
num_heads=num_heads,
key_dim=emb_dim // num_heads,
dropout=0.1,
use_bias=True,
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
name=f"short_term_attention",
)(
norm_sequence_output, sequence_output, attention_mask=short_history_mask
) # [None, max_len, dim]

short_term_output = tf.keras.layers.LayerNormalization()(
sequence_output
) # [None, max_len, dim]

# 3. 用户个性化注意力层
user_attention = UserAttention(name="user_attention_short")
short_term_interest = user_attention(
user_embedding, short_term_output
) # [None, 1, dim]

# ------------ 长期兴趣建模 ------------
# 从不同特征维度对长期行为进行聚合
long_history_features = group_embedding_feature_dict["raw_hist_seq_long"]

long_term_interests = []
for name, long_history_feature in long_history_features.items():
long_history_mask = tf.keras.layers.Lambda(
lambda x: tf.expand_dims(
tf.cast(tf.not_equal(x, 0), dtype=tf.float32), axis=-1
)
)(
input_layer_dict[name]
) # [None, max_len_long, 1]
long_history_item_emb = tf.keras.layers.Lambda(lambda x: x[0] * x[1])(
[long_history_feature, long_history_mask]
) # [None, max_len_long, dim]

user_attention = UserAttention(name=f"user_attention_long_{name}")
long_term_interests.append(
user_attention(user_embedding, long_history_item_emb)
) # [None, 1, dim]

long_term_interests_concat = tf.keras.layers.Concatenate(axis=-1)(
long_term_interests
) # [None, 1, dim * len(long_history_features)]
long_term_interest = tf.keras.layers.Dense(emb_dim, activation=dnn_activation)(
long_term_interests_concat
) # [None, 1, dim]

# ------------ 长短期兴趣融合 ------------
gated_fusion = GatedFusion(name="gated_fusion")
final_interest = gated_fusion(
[user_embedding, short_term_interest, long_term_interest]
) # [None, 1, dim]
final_interest = tf.keras.layers.Lambda(lambda x: tf.squeeze(x, axis=1))(
final_interest
) # [None, dim]

# ------------ 预测层 ------------
# 获取item索引

# 获取item嵌入权重
item_embedding_weight = embedding_table_dict[label_name].embeddings
# 获取item嵌入
output_item_embedding = tf.keras.layers.Lambda(lambda x: tf.squeeze(x, axis=1))(
embedding_table_dict[label_name](input_layer_dict[label_name])
)
# 构建采样softmax层
sampled_softmax_layer = SampledSoftmaxLayer(item_vocab_size, num_sampled, emb_dim)
# 计算输出
output = sampled_softmax_layer(
[item_embedding_weight, final_interest, input_layer_dict[label_name]]
)

# 构建模型
model = tf.keras.Model(inputs=list(input_layer_dict.values()), outputs=output)

# 构建用户模型和物品模型用于评估
user_feature_names = [
fc.name
for fc in feature_columns
if any(g in fc.group for g in ["user", "raw_hist_seq", "raw_hist_seq_long"])
]
user_inputs_dict = {name: input_layer_dict[name] for name in user_feature_names}
user_model = tf.keras.Model(inputs=user_inputs_dict, outputs=final_interest)

item_inputs_dict = {label_name: input_layer_dict[label_name]}
item_model = tf.keras.Model(inputs=item_inputs_dict, outputs=output_item_embedding)

return model, user_model, item_model

2、生成式召回方法

上一小节,我们探讨了如MIND和SDM等序列召回方法,它们的核心目标是分析用户的历史行为,并将其“总结”成一个或多个能够代表用户兴趣的向量。这种方法好比是为用户拍摄一张静态的“兴趣快照”,用以在海量物品中进行检索。

本节将介绍一种截然不同的思路:生成式召回。它不再试图去“总结”用户,而是直接“预测”用户的下一个行为。其核心是直接对序列中物品与物品之间的动态依赖关系进行建模,关注“用户接下来会做什么”而非“用户是怎样的人”。

依循这条思路,我们将探讨三种代表性的生成式召回模型,它们清晰地展示了该领域的一条核心演进脉络。首先是SASRec,它开创性地将自然语言处理领域的Transformer模型直接应用于推荐序列预测,奠定了“预测下一个物品ID”这一基础范式。

在SASRec验证了该范式的巨大潜力之后,后续的探索主要沿着两个方向继续深化:

  1. 深化对“输入”的理解:既然模型可以处理序列,我们能否给它提供一个信息更丰富的序列?HSTU模型在这个方向上做了进一步的探索。它不再满足于简单的物品ID,而是将用户的属性、行为类型、时间等所有异构信息都融合成一个复杂的“事件流”作为输入,增强了模型对行为上下文的理解能力。
  2. 改进物品表示方式:传统推荐系统中,物品通常用简单的ID来表示,这种方式虽然直观,但缺乏语义信息。TIGER 模型尝试了一种不同的思路:能否用更有意义的方式来表示物品?它提出将物品表示为由多个“码本”组成的“语义ID”,这样的表示方式不仅适用于预测目标,也可以用于输入序列中的历史物品。

本节中,我们将通过 SASRec (基础框架) -> HSTU (丰富输入) -> TIGER (改进表示) 这几个典型模型,来了解生成式召回技术的一些代表性思路。

2.1 基于自注意力的序列推荐SASRec

在SASRec出现之前,主流的序列模型,如基于马尔可夫链或RNN的方法,存在各自的局限。马尔可夫链 通常只考虑最近的少数几个行为,视野有限;而RNN 虽然理论上能捕捉长期依赖,但其串行计算的特性导致训练效率低下。SASRec 的出发点便是:能否找到一种方法,既能像RNN一样看到完整的历史序列,又能高效地捕捉其中最重要的依赖关系?答案是借用在自然语言处理(NLP)领域大获成功的Transformer模型 。

SASRec的核心思想是将用户的行为序列视为一个句子来处理。对于序列中的每个商品,模型会自动学习它与序列中其他所有商品的相关性,然后基于这些相关性来预测下一个用户可能交互的商品。这样既保留了对长期行为的建模能力,又能根据数据特性灵活地调整对不同历史行为的关注度。

生成式召回模型的基本架构

类似于Transformer,SASRec的基本模块如 图 左 所示,主要包含自注意力层和前馈网络层两个组件。

自注意力层

对于序列中的每个商品,我们通过嵌入矩阵 MRI×d\bf{M}\in\mathbb{R}^{|\mathcal{I}|\times d} 将其映射为d维向量,其中 I|\mathcal{I}| 是商品总数。输入序列的嵌入矩阵记为 ERn×d\bf{E}\in\mathbb{R}^{n\times d},其中 Ei=Msi\bf{E}_i=\bf{M}_{s_i}

由于自注意力机制本身无法感知位置信息,我们引入可学习的位置嵌入 PRn×d\bf{P}\in\mathbb{R}^{n\times d}。最终的输入表示为:

E^=[Ms1+P1Ms2+P2Msn+Pn]\widehat{\bf{E}}=\left[ \begin{array}{c} \bf{M}_{s_1}+\bf{P}_{1} \\ \bf{M}_{s_2}+\bf{P}_{2} \\ \dots \\ \bf{M}_{s_n}+\bf{P}_{n} \end{array} \right]

在得到商品的embedding后,基于缩放点积注意力机制:

Attention(Q,K,V)=softmax(QKTd)V\text{Attention}(\bf{Q},\bf{K},\bf{V})=\text{softmax}\left(\frac{\bf{Q}\bf{K}^T}{\sqrt{d}}\right)\bf{V}

这里 Q\bf{Q}K\bf{K}V\bf{V} 分别表示查询、键、值矩阵。在自注意力中,这三个矩阵都由输入嵌入通过线性变换得到:

S=SA(E^)=Attention(E^WQ,E^WK,E^WV)\bf{S}=\text{SA}(\widehat{\bf{E}})=\text{Attention}(\widehat{\bf{E}}\bf{W}^Q,\widehat{\bf{E}}\bf{W}^K,\widehat{\bf{E}}\bf{W}^V)

其中 WQ\bf{W}^QWK\bf{W}^KWV\bf{W}^V 都是 d×dd\times d 的可学习权重矩阵,d\sqrt{d} 是缩放因子用于稳定训练。

值得注意的是,这里的自注意力机制需要采用因果掩码,确保在预测第 tt 个位置的下一个物品时,模型只能利用 11t1t-1 的历史信息,而不能‘偷看’未来的行为。

前馈网络层

得到自注意力层的输出后,前馈网络为模型引入了非线性变换能力:

Fi=FFN(Si)=ReLU(SiW(1)+b(1))W(2)+b(2)\bf{F}_i = \text{FFN}(\bf{S}_i)=\text{ReLU}(\bf{S}_i\bf{W}^{(1)}+\bf{b}^{(1)})\bf{W}^{(2)}+\bf{b}^{(2)}

其中 W(1)\bf{W}^{(1)}W(2)\bf{W}^{(2)} 是权重矩阵,b(1)\bf{b}^{(1)}b(2)\bf{b}^{(2)} 是偏置向量,Fi\bf{F}_i 是第ii层前馈网络的输出。

预测与训练

经过多层Transformer模块的加工后,模型会基于最后一个物品(第tt个)的输出表示Ft\bf{F}_t,来预测用户最可能交互的下一个物品ii。这个预测过程,本质上就是在整个物品库中,寻找与该输出表示向量最相似的物品向量。

商品ii的相关性分数为:

ri,t=FtMiTr_{i,t}=\bf{F}_t\bf{M}_i^T

这里复用了商品嵌入矩阵 M\bf{M},既减少了参数量又提高了性能。

类似于Decoder Only的Transformer模型,SASRec的训练目标也是预测用户下一个交互的物品。对于时间步t,期望输出 oto_t 定义为:

ot={<pad>如果 st 是填充项st+11t<nSSuut=no_t=\begin{cases} \texttt{<pad>} & \text{如果 } s_t \text{ 是填充项}\\ s_{t+1} & 1\leq t<n\\ \mathcal{S}^{u}_{|\mathcal{S}^u|} & t=n \end{cases}

其中<pad>\texttt{<pad>}表示填充项,Su\mathcal{S}^{u}表示用户uu的交互序列。模型训练时,输入为ss,期望输出为oo,采用二元交叉熵作为损失函数:

SuSt[1,2,,n][log(σ(rot,t))+j∉Sulog(1σ(rj,t))]-\sum_{\mathcal{S}^{u}\in\mathcal{S}}\sum_{t\in[1,2,\dots,n]}\left[\log(\sigma(r_{o_t,t}))+\sum_{j\not\in \mathcal{S}^{u}}\log(1-\sigma(r_{j,t}))\right]

其中 σ\sigma 是sigmoid函数,rot,tr_{o_t,t} 是正样本分数,rj,tr_{j,t} 是负样本分数。

代码

  • SASRec的核心在于将位置编码与序列嵌入相结合,然后通过多层Transformer模块处理。
  • 关键的设计点包括:(1) use_causal_mask=True 确保预测第t个位置时只能看到前t-1个位置;(2) 残差连接和层归一化稳定训练;(3) 前馈网络增强非线性表达能力。最后取序列最后一个位置的输出表示用户当前兴趣,用于预测下一个物品。
    sasrec.py文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import tensorflow as tf

from .utils import build_input_layer, build_embedding_table_dict, add_tensor_func
from .layers import PositionEncodingLayer, NegativeSampleEmbedding, DNNs


def build_sasrec_model(feature_columns, model_config):
"""
构建SASRec模型 (Self-Attentive Sequential Recommendation)

参数:
feature_columns: 特征列配置
model_config: 模型配置字典,包含:
- max_seq_len: 最大序列长度 (default: 200)
- mha_num: 多头注意力层数 (default: 2)
- nums_heads: 注意力头数 (default: 1)
- dropout: dropout率 (default: 0.2)
- activation: 激活函数 (default: 'relu')
- pos_emb_trainable: 位置编码是否可训练 (default: True)
- pos_initializer: 位置编码初始化器 (default: 'glorot_uniform')
"""
# 从配置中提取参数,设置默认值
max_seq_len = model_config.get("max_seq_len", 200)
mha_num = model_config.get("mha_num", 2)
nums_heads = model_config.get("nums_heads", 1)
dropout = model_config.get("dropout", 0.2)
activation = model_config.get("activation", "relu")
pos_emb_trainable = model_config.get("pos_emb_trainable", True)
pos_initializer = model_config.get("pos_initializer", "glorot_uniform")

input_layer_dict = build_input_layer(feature_columns)
embedding_table_dict = build_embedding_table_dict(feature_columns, prefix="sasrec/")
sequence_embedding = embedding_table_dict["item_id"](input_layer_dict["seq_ids"])
positive_embedding = embedding_table_dict["item_id"](input_layer_dict["pos_ids"])
# 评估时需要使用的embedding, 包括全量评估和采样评估
negative_embedding = embedding_table_dict["item_id"](
input_layer_dict["neg_sample_ids"]
)

# 对于全量物品评估,我们需要一个可以映射到物品嵌入的单一输入
# 为物品评估创建一个虚拟输入,在评估期间会被替换
all_item_input = tf.keras.layers.Input(
shape=(), name="all_item_input", dtype="int32"
)
all_item_embedding = embedding_table_dict["item_id"](all_item_input)

position_embedding = PositionEncodingLayer(
dims=feature_columns[0].emb_dim,
max_len=max_seq_len,
trainable=pos_emb_trainable,
initializer=pos_initializer,
)(sequence_embedding)
# 原始序列emb加上position embedding
sequence_embedding = add_tensor_func([sequence_embedding, position_embedding])

# 多头注意力
for i in range(mha_num):
sequence_embedding_norm = tf.keras.layers.LayerNormalization()(
sequence_embedding
)
sequence_embedding_output = tf.keras.layers.MultiHeadAttention(
num_heads=nums_heads,
key_dim=feature_columns[0].emb_dim,
dropout=dropout,
name=f"{i}_block",
)(sequence_embedding_norm, sequence_embedding, use_causal_mask=True)
# 残差连接
sequence_embedding = add_tensor_func(
[sequence_embedding, sequence_embedding_output]
)
sequence_embedding = tf.keras.layers.LayerNormalization()(sequence_embedding)
# FFN
sequence_embedding = DNNs(
units=[feature_columns[0].emb_dim, feature_columns[0].emb_dim],
dropout_rate=dropout,
activation=activation,
name=f"{i}_dnn",
)(sequence_embedding)
sequence_embedding = tf.keras.layers.LayerNormalization()(sequence_embedding)

# 序列的padding在左边,直接拿到序列的最后一个结果即可
last_sequence_embedding = tf.keras.layers.Lambda(
lambda x: x[:, -1, :], name="last_sequence_embedding"
)(
sequence_embedding
) # B, emb_dim

# 获取所有item的索引和权重
item_embedding_weight = embedding_table_dict["item_id"].embeddings

# 负采样算loss
negative_embeddings = NegativeSampleEmbedding(
vocab_size=feature_columns[0].vocab_size,
num_sampled=max_seq_len,
sampled_type="uniform",
)(positive_embedding, item_embedding_weight)

# 序列展开成[batch_size x max_len, emb_dim]
sequence_embedding = tf.keras.layers.Reshape((-1, feature_columns[0].emb_dim))(
sequence_embedding
)
pos_item_embedding = tf.keras.layers.Reshape((-1, feature_columns[0].emb_dim))(
positive_embedding
)
negative_embeddings = tf.keras.layers.Reshape((-1, feature_columns[0].emb_dim))(
negative_embeddings
)

pos_logits = tf.keras.layers.Lambda(lambda x: tf.reduce_sum(x[0] * x[1], axis=-1))(
[pos_item_embedding, sequence_embedding]
) # B,1
neg_logits = tf.keras.layers.Lambda(
lambda x: tf.reduce_sum(x[0] * tf.expand_dims(x[1], axis=1), axis=-1)
)(
[negative_embeddings, sequence_embedding]
) # B, max_len

# 创建目标掩码,忽略padding项目(ID=0),维度: [batch_size * maxlen]
# 这里的掩码是为了计算loss时忽略padding项
is_target = tf.keras.layers.Lambda(
lambda x: tf.reshape(tf.cast(tf.not_equal(x, 0), tf.float32), [-1, max_seq_len])
)(
input_layer_dict["pos_ids"]
) # B, max_len

main_loss = tf.reduce_sum(
-tf.math.log(tf.sigmoid(pos_logits) + 1e-24) * is_target
- tf.math.log(1 - tf.sigmoid(neg_logits) + 1e-24) * is_target
) / tf.reduce_sum(is_target)

# 构建模型
model = tf.keras.Model(inputs=list(input_layer_dict.values()), outputs=[main_loss])

# 推理时用户输入和item输入
user_inputs_list = [v for k, v in input_layer_dict.items() if k in ["seq_ids"]]
model.__setattr__("user_input", user_inputs_list)
model.__setattr__("user_embedding", last_sequence_embedding)

# 全量物品评估使用专用的物品输入
model.__setattr__("all_item_input", [all_item_input])
model.__setattr__("all_item_embedding", all_item_embedding)

sampling_item_inputs_list = [
v for k, v in input_layer_dict.items() if k in ["neg_sample_ids"]
]
model.__setattr__("sampling_item_input", sampling_item_inputs_list)
model.__setattr__("sampling_item_embedding", negative_embedding)

# 为评估创建独立的用户和物品模型
user_model = tf.keras.Model(
inputs=user_inputs_list, outputs=last_sequence_embedding
)
item_model = tf.keras.Model(inputs=[all_item_input], outputs=all_item_embedding)

return model, user_model, item_model

2.2 为复杂的“事件流”输入定制生成式架构HSTU

SASRec成功地验证了将推荐问题视为序列预测的可行性,但它将视线聚焦于一个相对简化的世界:一个仅由物品ID构成的序列。一个自然而然的问题是:用户的行为序列远比这丰富,能否将用户的属性、行为类型(点击、加购、购买)、上下文时间等所有信息都融入这个序列,让模型拥有一个更全面的“上帝视角”?

HSTU 模型正是对这个问题的一次深入探索。它代表了生成式范式的一个重要演进方向:不再满足于简单的物品ID序列,而是将所有异构特征统一编码为单一的、更复杂的“事件流”(Event Stream)作为模型输入。 HSTU的目标,就是学习这个复杂的“句子”,并预测下一个可能的事件。

这种统一化的架构设计虽然优雅,但在实现过程中面临着诸多技术挑战。HSTU的设计巧妙地解决了特征处理、模型架构和信号传递等关键问题,旨在用单一的模块替换传统推荐模型中特征提取、交互和预测等多个异构组件。

特征处理

HSTU的特征处理分为两个策略。对于类别特征,它将所有信息按时间戳“拉平”成一个统一的序列。想象一个用户的行为流:[用户年龄=30, 登录, 浏览商品A(类别:手机), 浏览商品B(类别:手机壳), 将B加入购物车, 退出]。SASRec可能只看到 [A, B]。而HSTU则试图理解整个事件流:[(特征:年龄,值:30), (行为:登录), (行为:浏览,物品:A), (行为:浏览,物品:B), (行为:加购,物品:B), (行为:退出)]

对于变化频繁的数值特征(如加权计数器、比率等),HSTU则采用隐式建模策略,让模型通过观察用户在序列中的实际行为模式来自动推断这些数值信息。

因此,HSTU的输入输出可以表示为:

  • 输入(xix_i):(Φ0,a0),(Φ1,a1),,(Φnc1,anc1)(\Phi_0,a_0), (\Phi_1,a_1), \ldots, (\Phi_{n_c-1},a_{n_c-1})
  • 输出(yiy_i):Φ1,Φ2,,Φnc1,\Phi_1',\Phi_2',\ldots,\Phi_{n_c-1}',\varnothing

其中 Φi\Phi_i 表示用户在时刻 ii 交互的内容,aia_i 表示用户在时刻 ii 的行为,Φi\Phi_i' 表示用户在时刻 ii 交互的内容,Φi=Φi\Phi_i' = \Phi_i (如果 aia_i 为正向行为),否则为 \varnothing

在推荐系统的召回阶段,模型学习一个条件分布p(Φi+1ui)p(\Phi_{i+1}|u_i),其中 uiu_i 是用户在当前时刻的表示向量,Φi+1\Phi_{i+1} 是候选物品。召回的目标是从物品库 Xc\mathbb{X}_c 中选择能够最大化用户满意度的物品,即 argmaxΦXcp(Φui)\text{argmax}_{\Phi \in \mathbb{X}_c} p(\Phi|u_i)。HSTU召回与标准的序列生成任务有两个不同的地方:首先用户可能对推荐的物品产生负面反应,因此监督信号不总是下一个物品;其次当序列中包含非交互相关的特征(如人口统计学信息)时,对应的输出标签是未定义的。通过这种方式,HSTU将复杂的推荐问题转化为序列到序列的学习问题,使得模型能够基于用户的历史行为序列预测其未来可能感兴趣的内容。

架构统一:HSTU单元

为了用一个模块就能替换DLRM中的异构组件,HSTU引入了层次序列转换单元(Hierarchical Sequential Transduction Unit,HSTU) 如 图右所示。HSTU由堆叠的相同层组成,每层包含三个关键子模块:

  • 点向投影(Pointwise Projection):使用单一线性变换后分割的方式同时产生注意力计算所需的查询、键、值以及门控权重:
    U(X),V(X),Q(X),K(X)=Split(ϕ1(f1(X)))U(X), V(X), Q(X), K(X) = \text{Split}(\phi_1(f_1(X)))

  • 点向聚合(Pointwise Aggregation):采用点向聚合而非传统softmax来保持推荐系统中用户偏好强度信息:
    A(X)V(X)=ϕ2(Q(X)K(X)T+rabp,t)V(X)A(X)V(X) = \phi_2\left(Q(X)K(X)^T + \text{rab}^{p,t}\right)V(X)

  • 点向转换(Pointwise Transformation):使用门控机制实现类似MoE的特征选择能力:
    Y(X)=f2(Norm(A(X)V(X))U(X))Y(X) = f_2\left(\text{Norm}\left(A(X)V(X)\right) \odot U(X)\right)

其中:fi(X)=WiX+bif_i(X) = W_i X + b_i表示线性变换,ϕ1\phi_1ϕ2\phi_2使用SiLU激活函数,Norm\text{Norm}表示Layer Normalization(层归一化),rabp,t\text{rab}^{p,t}表示包含位置和时间信息的相对注意力偏置,\odot表示元素级乘法。

这三个子模块巧妙地统一处理了传统DLRM的三个主要阶段:特征提取通过注意力机制实现目标感知的特征池化;特征交互通过门控机制实现从原始特征到交叉特征的动态结合,注意力部分(A(X)V(X)A(X)V(X))捕捉高阶关系,门控部分(U(X)U(X))保留低阶信息;表示转换中门控信号U(X)U(X)充当为每个特征维度定制的"路由器",实现类似MoE的条件计算效果。

替换softmax归一化机制

HSTU Pointwise Aggregation 步骤放弃了传统的softmax归一化,而使用了不一样的聚合方式。传统Transformer使用softmax注意力虽然在语言任务中表现优异,但在推荐场景下存在信息丢失的问题。在推荐场景中,用户兴趣的“强度”是一个至关重要的信号。传统的softmax注意力机制会将所有历史行为的注意力权重归一化,使其总和为1。这本质上是在计算一种相对重要性,而非绝对相关性,从而扭曲了真实的偏好强度。例如,假设目标是推荐一部科幻电影。用户A的历史记录中有10部科幻片和1部喜剧片,而用户B只有1部科幻片和1部喜剧片。对于用户A,其对科幻的强烈偏好是一个非常强的信号。然而,softmax归一化可能会迫使这10部科幻片瓜分注意力权重,从而削弱“大量观看科幻”这一整体信号的强度,使其与用户B的信号差异不再显著。相比之下,HSTU的点向聚合(通过SiLU激活)独立计算每个历史项目的相关性得分,不进行跨项归一化。这使得模型能够保留偏好的原始强度——用户A历史中的科幻片可以共同产生一个非常高的激活总分,这对于准确预估点击率、观看时长等具体数值至关重要,而不仅仅是进行排序。

训练与召回

HSTU的完整工作流程包含训练和召回两个阶段。在训练阶段,模型基于前面描述的输入输出格式进行标准的Transformer训练——通过预测用户行为序列中的下一个正向交互物品来学习用户偏好模式和物品间的依赖关系。在召回阶段,经过多层HSTU编码器处理后,模型将用户的完整行为事件序列转换为能够概括用户历史、捕捉当前兴趣的实时动态表示uiu_i。拥有了用户表示向量uiu_i后,召回过程就转变为一个标准的向量检索问题——将uiu_i与物品语料库中所有物品的嵌入向量进行高效的相似度计算,通过近似最近邻(ANN)搜索从海量物品池中快速检索出与用户当前兴趣最匹配的Top-K个物品,形成召回列表。

总而言之,如果说SASRec是将推荐序列视为一句由‘单词’(物品ID)组成的句子,那么HSTU则是将推荐场景中的所有信息(用户属性、行为、物品、上下文)都看作一种更复杂的‘语言’,并为此设计了一套全新的‘语法规则’(HSTU单元)来理解和生成这种语言。它的核心创新在于丰富了模型的输入,从而实现了对用户行为更深层次的理解。

代码

  • HSTU的核心在于其定制的注意力机制,它通过SiLU激活和门控机制实现了点向聚合,避免了传统Softmax的归一化。
    hstu.py 文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
import tensorflow as tf

from .utils import build_input_layer, build_embedding_table_dict, add_tensor_func
from .layers import PositionEncodingLayer, NegativeSampleEmbedding, DNNs, EmbeddingIndex


class HstuLayer(tf.keras.layers.Layer):
"""多头注意力机制的 Keras 实现

参数:
num_units: 注意力的维度大小,如果为None则使用输入的最后一维
num_heads: 注意力头的数量
dropout_rate: dropout 比率
attention_type: 注意力类型,支持 'dot_product', 'relative_position_bias', 'time_interval_bias'
causality: 是否使用因果掩码
linear_projection_and_dropout: 是否在输出后添加线性投影和dropout
args: 包含各种配置参数的对象
with_qk: 是否返回Q和K
"""

def __init__(
self,
num_units=None,
num_heads=8,
attention_type="dot_product",
dropout_rate=0,
causality=False,
linear_projection_and_dropout=False,
args=None,
with_qk=False,
**kwargs,
):
super(HstuLayer, self).__init__(**kwargs)
self.num_units = num_units
self.num_heads = num_heads
self.attention_type = attention_type
self.dropout_rate = dropout_rate
self.causality = causality
self.linear_projection_and_dropout = linear_projection_and_dropout
self.args = args
self.with_qk = with_qk

def build(self, input_shape):
# 设置注意力维度的默认值
if isinstance(input_shape, list):
queries_shape = input_shape[0]
keys_shape = input_shape[1]
else:
queries_shape = input_shape
keys_shape = input_shape

if self.num_units is None:
self.num_units = queries_shape[-1]

# 初始化器
qkv_initializer = None
if (
hasattr(self.args, "qkv_projection_initializer")
and self.args.qkv_projection_initializer == "normal"
):
print("Set qkv projection initializer to normal")
qkv_initializer = tf.keras.initializers.RandomNormal(mean=0, stddev=0.02)

# 线性投影层
use_bias = (
getattr(self.args, "qkv_projection_bias", False) if self.args else False
)
self.query_dense = tf.keras.layers.Dense(
self.num_units,
activation=None,
use_bias=use_bias,
kernel_initializer=qkv_initializer,
name="query_projection",
)
self.key_dense = tf.keras.layers.Dense(
self.num_units,
activation=None,
use_bias=use_bias,
kernel_initializer=qkv_initializer,
name="key_projection",
)

self.value_projection = (
getattr(self.args, "value_projection", True) if self.args else True
)
if self.value_projection:
self.value_dense = tf.keras.layers.Dense(
self.num_units,
activation=None,
use_bias=use_bias,
kernel_initializer=qkv_initializer,
name="value_projection",
)

# 相对位置偏置和时间间隔偏置
if "relative_position_bias" in self.attention_type:
self.rel_pos_bias = self.add_weight(
shape=(2 * getattr(self.args, "maxlen", 50) - 1, self.num_heads),
initializer="glorot_uniform",
trainable=True,
name="relative_position_bias",
)

if "time_interval_bias" in self.attention_type:
max_interval = getattr(
self.args, "time_interval_attention_max_interval", 1024
)
self.time_interval_bias = self.add_weight(
shape=(max_interval + 1, self.num_heads),
initializer="glorot_uniform",
trainable=True,
name="time_interval_bias",
)

# U投影
self.u_projection = (
getattr(self.args, "u_projection", False) if self.args else False
)
if self.u_projection:
u_initializer = None
if (
hasattr(self.args, "u_projection_initializer")
and self.args.u_projection_initializer == "normal"
):
u_initializer = tf.keras.initializers.RandomNormal(mean=0, stddev=0.02)

u_bias = (
getattr(self.args, "u_projection_bias", False) if self.args else False
)
self.u_dense = tf.keras.layers.Dense(
self.num_units,
activation=None,
use_bias=u_bias,
kernel_initializer=u_initializer,
name="u_projection",
)

# 输出投影
if self.linear_projection_and_dropout:
self.output_dense = tf.keras.layers.Dense(
self.num_units,
activation=None,
kernel_initializer=qkv_initializer,
name="output_projection",
)

# Dropout层
self.dropout = tf.keras.layers.Dropout(self.dropout_rate)
# 创建归一化层
self.query_norm = tf.keras.layers.LayerNormalization(name="query_normalization")

super(HstuLayer, self).build(input_shape)

def silu(self, x):
"""SiLU (Swish) 激活函数"""
return x * tf.sigmoid(x)

def normalize(self, norm_func, inputs):
"""归一化"""
return norm_func(inputs)

def relative_position_bias(self, batch_size, maxlen, num_heads):
"""相对位置偏置"""
seq_len = tf.shape(self.queries)[1]

# 计算位置索引
positions = tf.range(seq_len)
relative_positions = positions[:, None] - positions[None, :]
relative_positions = relative_positions + maxlen - 1 # 转换为非负索引

# 获取相应的偏置
bias = tf.gather(self.rel_pos_bias, relative_positions)
# 转换形状以适配多头注意力
bias = tf.transpose(bias, [2, 0, 1]) # (num_heads, seq_len, seq_len)
bias = tf.tile(
tf.expand_dims(bias, 0), [batch_size, 1, 1, 1]
) # (batch_size, num_heads, seq_len, seq_len)
bias = tf.reshape(
bias, [batch_size * num_heads, seq_len, seq_len]
) # (batch_size * num_heads, seq_len, seq_len)

return bias

def time_interval_bias(self, input_interval, maxlen, max_interval, num_heads):
"""时间间隔偏置"""
batch_size = tf.shape(self.queries)[0]
seq_len = tf.shape(self.queries)[1]

# 计算时间间隔
intervals = tf.abs(
input_interval[:, :, None] - input_interval[:, None, :]
) # (batch_size, seq_len, seq_len)
intervals = tf.minimum(intervals, max_interval) # 截断最大间隔

# 获取相应的偏置
bias = tf.gather(
self.time_interval_bias, intervals
) # (batch_size, seq_len, seq_len, num_heads)
bias = tf.transpose(
bias, [0, 3, 1, 2]
) # (batch_size, num_heads, seq_len, seq_len)
bias = tf.reshape(
bias, [batch_size * num_heads, seq_len, seq_len]
) # (batch_size * num_heads, seq_len, seq_len)

return bias

def apply_attention(
self,
K,
V,
outputs,
scale_attention=True,
attention_activation=None,
attention_normalization=None,
):
"""应用注意力机制"""
# 缩放
if scale_attention:
depth = tf.cast(tf.shape(K)[-1], tf.float32)
outputs = outputs / tf.sqrt(depth)

# 因果掩码
if self.causality:
# 创建下三角矩阵
diag_vals = tf.ones_like(outputs[0, :, :])
tril = tf.linalg.band_part(diag_vals, -1, 0) # 下三角为1,上三角为0
causality_mask = tf.tile(
tf.expand_dims(tril, 0), [tf.shape(outputs)[0], 1, 1]
)

# 将上三角部分设置为很小的负数
paddings = tf.ones_like(causality_mask) * (-(2**32) + 1)
outputs = tf.where(tf.equal(causality_mask, 0), paddings, outputs)

# Key掩码
key_masks = tf.sign(tf.reduce_sum(tf.abs(K), axis=-1)) # (h*N, T_k)
key_masks = tf.tile(
tf.expand_dims(key_masks, 1), [1, tf.shape(self.queries)[1], 1]
) # (h*N, T_q, T_k)

# 应用Key掩码
paddings = tf.ones_like(outputs) * (-(2**32) + 1)
outputs = tf.where(tf.equal(key_masks, 0), paddings, outputs)

# 应用激活函数
if attention_activation == "softmax":
weights = tf.nn.softmax(outputs)
else:
weights = outputs

# 归一化
if attention_normalization == "softmax":
weights = tf.nn.softmax(weights)

# 应用dropout
weights = self.dropout(weights, training=True)

# 加权求和
attention_output = tf.matmul(weights, V)

return attention_output

def call(self, inputs, input_interval=None, training=True):
# 处理输入
if isinstance(inputs, list):
self.queries, self.keys = inputs[:2]
else:
self.queries = self.keys = inputs

# 归一化查询
if hasattr(self.args, "normalize_query") and self.args.normalize_query:
self.queries = self.normalize(self.query_norm, self.queries)

# 如果设置了覆写key
if (
hasattr(self.args, "overwrite_key_with_query")
and self.args.overwrite_key_with_query
):
self.keys = self.queries

# 获取batch_size
batch_size = tf.shape(self.queries)[0]

# 线性投影
Q = self.query_dense(self.queries) # (N, T_q, C)
K = self.key_dense(self.keys) # (N, T_k, C)
if self.value_projection:
V = self.value_dense(self.keys) # (N, T_k, C)
else:
V = self.keys

# 分割并拼接,实现多头
Q_split = tf.reshape(
Q, [batch_size, -1, self.num_heads, self.num_units // self.num_heads]
)
Q_split = tf.transpose(Q_split, [0, 2, 1, 3])
Q_ = tf.reshape(
Q_split, [batch_size * self.num_heads, -1, self.num_units // self.num_heads]
)

K_split = tf.reshape(
K, [batch_size, -1, self.num_heads, self.num_units // self.num_heads]
)
K_split = tf.transpose(K_split, [0, 2, 1, 3])
K_ = tf.reshape(
K_split, [batch_size * self.num_heads, -1, self.num_units // self.num_heads]
)

V_split = tf.reshape(
V, [batch_size, -1, self.num_heads, self.num_units // self.num_heads]
)
V_split = tf.transpose(V_split, [0, 2, 1, 3])
V_ = tf.reshape(
V_split, [batch_size * self.num_heads, -1, self.num_units // self.num_heads]
)

# 应用SiLU激活
if (
hasattr(self.args, "qkv_projection_activation")
and self.args.qkv_projection_activation == "silu"
):
print("Use SiLU activation on qkv projection")
Q_, K_, V_ = self.silu(Q_), self.silu(K_), self.silu(V_)

new_values = 0
# 不同类型的注意力机制
if "dot_product" in self.attention_type:
outputs = tf.matmul(Q_, tf.transpose(K_, [0, 2, 1])) # (h*N, T_q, T_k)
outputs = self.apply_attention(K_, V_, outputs)
new_values += outputs

if "relative_position_bias" in self.attention_type:
print("Add relative position bias")
maxlen = getattr(self.args, "maxlen", 50)
attention_bias = self.relative_position_bias(
batch_size, maxlen, self.num_heads
)

if (
hasattr(self.args, "relative_position_bias_add_item_interaction")
and self.args.relative_position_bias_add_item_interaction
):
print("Relative position bias add item interaction")
outputs = tf.matmul(Q_, tf.transpose(K_, [0, 2, 1]))
outputs += attention_bias
else:
outputs = attention_bias

scale_attention = (
getattr(self.args, "scale_attention", True) if self.args else True
)
attention_activation = (
getattr(self.args, "attention_activation", None) if self.args else None
)
attention_normalization = (
getattr(self.args, "attention_normalization", None)
if self.args
else None
)

outputs = self.apply_attention(
K_,
V_,
outputs,
scale_attention=scale_attention,
attention_activation=attention_activation,
attention_normalization=attention_normalization,
)
new_values += outputs

if "time_interval_bias" in self.attention_type and input_interval is not None:
print("Add time interval bias attention")
maxlen = getattr(self.args, "maxlen", 50)
max_interval = getattr(
self.args, "time_interval_attention_max_interval", 1024
)
attention_bias = self.time_interval_bias(
input_interval, maxlen, max_interval, self.num_heads
)

if (
hasattr(self.args, "time_interval_bias_add_item_interaction")
and self.args.time_interval_bias_add_item_interaction
):
outputs = tf.matmul(Q_, tf.transpose(K_, [0, 2, 1]))
outputs += attention_bias
else:
outputs = attention_bias

scale_attention = (
getattr(self.args, "scale_attention", True) if self.args else True
)
attention_activation = (
getattr(self.args, "attention_activation", None) if self.args else None
)
attention_normalization = (
getattr(self.args, "attention_normalization", None)
if self.args
else None
)

outputs = self.apply_attention(
K_,
V_,
outputs,
scale_attention=scale_attention,
attention_activation=attention_activation,
attention_normalization=attention_normalization,
)
new_values += outputs

# 合并多头注意力结果
outputs_split = tf.reshape(
new_values,
[batch_size, self.num_heads, -1, self.num_units // self.num_heads],
)
outputs_split = tf.transpose(outputs_split, [0, 2, 1, 3])
outputs = tf.reshape(outputs_split, [batch_size, -1, self.num_units])

# U投影
if self.u_projection:
U = self.u_dense(self.queries)
U = self.silu(U)
outputs = U * self.normalize(outputs)

# 线性投影和dropout
if self.linear_projection_and_dropout:
dropout_before = (
getattr(self.args, "dropout_before_linear_projection", False)
if self.args
else False
)
if dropout_before:
outputs = self.dropout(outputs, training=training)
outputs = self.output_dense(outputs)
if not dropout_before:
outputs = self.dropout(outputs, training=training)

# 残差连接
outputs += self.queries

if self.with_qk:
return Q, K
else:
return outputs

def get_config(self):
config = super(HstuLayer, self).get_config()
config.update(
{
"num_units": self.num_units,
"num_heads": self.num_heads,
"attention_type": self.attention_type,
"dropout_rate": self.dropout_rate,
"causality": self.causality,
"linear_projection_and_dropout": self.linear_projection_and_dropout,
"with_qk": self.with_qk,
}
)
return config


def build_hstu_model(feature_columns, model_config):
"""
构建HSTU模型 (分层结构化Transformer单元)

参数:
feature_columns: 特征列配置
model_config: 模型配置字典,包含:
- max_seq_len: 最大序列长度 (默认: 50)
- mha_num: 多头注意力层数 (默认: 2)
- nums_heads: 注意力头数 (默认: 1)
- dropout: dropout率 (默认: 0.2)
- activation: 激活函数 (默认: 'relu')
- pos_emb_trainable: 位置编码是否可训练 (默认: True)
- pos_initializer: 位置编码初始化器 (默认: 'glorot_uniform')
- attention_type: 注意力类型 (默认: 'dot_product')
"""
# 从配置中提取参数并设置默认值
max_seq_len = model_config.get("max_seq_len", 50)
mha_num = model_config.get("mha_num", 2)
nums_heads = model_config.get("nums_heads", 1)
dropout = model_config.get("dropout", 0.2)
activation = model_config.get("activation", "relu")
pos_emb_trainable = model_config.get("pos_emb_trainable", True)
pos_initializer = model_config.get("pos_initializer", "glorot_uniform")
attention_type = model_config.get("attention_type", "dot_product")
input_layer_dict = build_input_layer(feature_columns)
filter_feature_columns = [x for x in feature_columns if x.name != "timestamps"]
embedding_table_dict = build_embedding_table_dict(
filter_feature_columns, prefix="hstu/"
)
sequence_embedding = embedding_table_dict["item_id"](input_layer_dict["seq_ids"])
positive_embedding = embedding_table_dict["item_id"](input_layer_dict["pos_ids"])
# 评估时需要使用的embedding, 包括全量评估和采样评估
negative_embedding = embedding_table_dict["item_id"](
input_layer_dict["neg_sample_ids"]
)

# 对于全量物品评估,我们需要一个可以映射到物品嵌入的单一输入
# 创建一个用于物品评估的虚拟输入,在评估时会被替换
all_item_input = tf.keras.layers.Input(
shape=(), name="all_item_input", dtype="int32"
)
all_item_embedding = embedding_table_dict["item_id"](all_item_input)

position_embedding = PositionEncodingLayer(
dims=feature_columns[0].emb_dim,
max_len=max_seq_len,
trainable=pos_emb_trainable,
initializer=pos_initializer,
)(sequence_embedding)
# 原始序列emb加上position embedding
sequence_embedding = add_tensor_func([sequence_embedding, position_embedding])

# 多头注意力
for i in range(mha_num):
sequence_embedding_norm = tf.keras.layers.LayerNormalization()(
sequence_embedding
)
sequence_embedding_output = HstuLayer(
num_units=feature_columns[0].emb_dim,
num_heads=nums_heads,
attention_type=attention_type,
dropout_rate=dropout,
causality=True,
linear_projection_and_dropout=False,
args=model_config,
name=f"{i}_block",
)(sequence_embedding_norm, input_interval=input_layer_dict.get("timestamps"))
# 残差连接
sequence_embedding = add_tensor_func(
[sequence_embedding, sequence_embedding_output]
)
sequence_embedding = tf.keras.layers.LayerNormalization()(sequence_embedding)
# 前馈神经网络
sequence_embedding = DNNs(
units=[feature_columns[0].emb_dim, feature_columns[0].emb_dim],
dropout_rate=dropout,
activation=activation,
name=f"{i}_dnn",
)(sequence_embedding)
sequence_embedding = tf.keras.layers.LayerNormalization()(sequence_embedding)

# 序列的padding在左边,直接拿到序列的最后一个结果即可
last_sequence_embedding = tf.keras.layers.Lambda(
lambda x: x[:, -1, :], name="last_sequence_embedding"
)(
sequence_embedding
) # B, emb_dim

# 获取所有item的索引和权重
item_embedding_weight = embedding_table_dict["item_id"].embeddings

# 负采样算loss
negative_embeddings = NegativeSampleEmbedding(
vocab_size=feature_columns[0].vocab_size,
num_sampled=max_seq_len,
sampled_type="uniform",
)(positive_embedding, item_embedding_weight)

# 序列展开成[batch_size x max_len, emb_dim]
sequence_embedding = tf.keras.layers.Reshape((-1, feature_columns[0].emb_dim))(
sequence_embedding
)
pos_item_embedding = tf.keras.layers.Reshape((-1, feature_columns[0].emb_dim))(
positive_embedding
)
negative_embeddings = tf.keras.layers.Reshape((-1, feature_columns[0].emb_dim))(
negative_embeddings
)

pos_logits = tf.keras.layers.Lambda(lambda x: tf.reduce_sum(x[0] * x[1], axis=-1))(
[pos_item_embedding, sequence_embedding]
) # B,1
neg_logits = tf.keras.layers.Lambda(
lambda x: tf.reduce_sum(x[0] * tf.expand_dims(x[1], axis=1), axis=-1)
)(
[negative_embeddings, sequence_embedding]
) # B, max_len

# 创建目标掩码,忽略padding项目(ID=0),维度: [batch_size * maxlen]
# 这里的掩码是为了计算loss时忽略padding项
is_target = tf.keras.layers.Lambda(
lambda x: tf.reshape(tf.cast(tf.not_equal(x, 0), tf.float32), [-1, max_seq_len])
)(
input_layer_dict["pos_ids"]
) # B, max_len

main_loss = tf.reduce_sum(
-tf.math.log(tf.sigmoid(pos_logits) + 1e-24) * is_target
- tf.math.log(1 - tf.sigmoid(neg_logits) + 1e-24) * is_target
) / tf.reduce_sum(is_target)

# 构建模型
model = tf.keras.Model(inputs=list(input_layer_dict.values()), outputs=[main_loss])

# 推理时用户输入和item输入
user_inputs_list = [
v for k, v in input_layer_dict.items() if k in ["seq_ids", "timestamps"]
]
model.__setattr__("user_input", user_inputs_list)
model.__setattr__("user_embedding", last_sequence_embedding)

# 全量物品评估使用专用的物品输入
model.__setattr__("all_item_input", [all_item_input])
model.__setattr__("all_item_embedding", all_item_embedding)

sampling_item_inputs_list = [
v for k, v in input_layer_dict.items() if k in ["neg_sample_ids"]
]
model.__setattr__("sampling_item_input", sampling_item_inputs_list)
model.__setattr__("sampling_item_embedding", negative_embedding)

# 为评估创建独立的用户和物品模型
user_model = tf.keras.Model(
inputs=user_inputs_list, outputs=last_sequence_embedding
)
item_model = tf.keras.Model(inputs=[all_item_input], outputs=all_item_embedding)

return model, user_model, item_model

2.3 用语义ID重新定义物品表示TIGER

HSTU通过极大地丰富输入信息,提升了模型对用户行为上下文的理解能力。然而,生成式召回的演进还有另一条同样重要的路径。这条路径将目光从输入侧转向了输出侧,它反思了一个根本问题:即使我们拥有了完美的输入,但我们预测的目标——一个孤立的、无语义的原子ID —— 本身是否就是最佳选择?

直接预测原子ID存在两个固有局限:

  1. 语义鸿沟:模型难以理解物品间的内在联系。它不知道“耐克篮球鞋A”和“耐克篮球鞋B”在本质上高度相似,这种相似性只能通过海量共现数据间接学习,效率低下。
  2. 泛化难题:对于从未在训练集中出现过的新物品(冷启动问题),模型完全无法进行推荐,因为它的词汇表中根本没有这个ID。

TIGER (Transformer Index for GEnerative Recommenders) 模型便是在这一思考下诞生的革命性工作。它提出,推荐任务的核心不应是预测一个无意义的ID,而是生成一个能够描述物品内在核心语义的、结构化的‘语义ID’

这种范式转换带来了多项优势:允许在语义相似的物品间共享知识以解决冷启动问题;通过代码字元组高效表示大型物料库;并最终简化了推荐系统架构。

TIGER框架由两个主要阶段组成:首先利用物品内容特征生成语义ID,然后在这些语义ID序列上训练一个生成式推荐模型。

语义ID的生成 (RQ-VAE)

语义ID的核心是为每个物品创建一个语义上有意义的代码字元组。内容特征相似的物品,其语义ID也应当相似(部分重叠),从而让模型能学习物品间的深层语义关系。

TIGER采用残差量化变分自编码器 (RQ-VAE) 来实现这一目标。RQ-VAE是一种多层向量量化器,它通过对残差进行迭代量化来生成代码字元组。

RQ-VAE量化方法

具体过程如下:

  1. 一个预训练的内容编码器(如Sentence-T5)先将物品内容特征(如标题、描述)处理成语义嵌入向量 x\mathbf{x}
  2. RQ-VAE的编码器 E\mathcal{E} 将输入向量 x\mathbf{x} 进一步编码为潜在表示 z:=E(x)\mathbf{z} := \mathcal{E}(\mathbf{x}),并将初始残差定义为 r0:=z\mathbf{r}_0 := \mathbf{z}
  3. 模型包含 mm 个层级,每个层级 d{0,1,...,m1}d \in \{0, 1, ..., m-1\} 都有一个独立的码本 Cd\mathcal{C}_d。在第 dd 层,通过在码本中寻找与当前残差 rd\mathbf{r}_d 最接近的码字来完成量化:

    cd=argminkrdek2其中 ekCdc_d = \arg\min_{k} \|\mathbf{r}_d - \mathbf{e}_k\|^2 \quad \text{其中 } \mathbf{e}_k \in \mathcal{C}_d

  4. 计算新的残差并送入下一层:

    rd+1:=rdecd\mathbf{r}_{d+1} := \mathbf{r}_d - \mathbf{e}_{c_d}

  5. 重复此过程 mm 次,最终得到一个由 mm 个码字组成的语义ID元组 (c0,c1,...,cm1)(c_0, c_1, ..., c_{m-1})

RQ-VAE通过一个联合损失函数进行端到端训练,该函数包含重建损失(确保解码器能从量化表示中恢复原始信息)和量化损失(用于更新码本),从而联合优化编码器、解码器和所有码本。

为确保唯一性,如果多个物品在量化后产生了完全相同的语义ID,TIGER会在ID末尾追加一个额外的索引位来区分它们,例如 (12, 24, 52, 0)(12, 24, 52, 1)

基于语义ID的生成式检索

拥有了每个物品的语义ID后,推荐任务就转变为一个标准的序列到序列生成问题。

基于语义ID的生成式检索

  1. 将用户的历史交互序列 (item1,item2,...,itemn)(\text{item}_1, \text{item}_2, ..., \text{item}_n) 转换为其对应的语义ID序列:

    ((c1,0,...,c1,m1),(c2,0,...,c2,m1),...,(cn,0,...,cn,m1))((c_{1,0}, ..., c_{1,m-1}), (c_{2,0}, ..., c_{2,m-1}), ..., (c_{n,0}, ..., c_{n,m-1}))

    其中 (ci,0,...,ci,m1)(c_{i,0}, ..., c_{i,m-1})itemi\text{item}_i 的语义ID
  2. 将每个物品的语义ID元组展平,形成一个长序列作为Transformer模型的输入:

    (c1,0,...,c1,m1,c2,0,...,c2,m1,...,cn,0,...,cn,m1)(c_{1,0}, ..., c_{1,m-1}, c_{2,0}, ..., c_{2,m-1}, ..., c_{n,0}, ..., c_{n,m-1})

  3. 训练一个标准的Encoder-Decoder Transformer模型。Encoder负责理解用户历史序列的上下文,Decoder则自回归地、逐个码字地生成下一个最可能被交互物品的语义ID (cn+1,0,...,cn+1,m1)(c_{n+1,0}, ..., c_{n+1,m-1})
  4. 在推理阶段,一旦生成了完整的预测语义ID,就可以通过查找表将其映射回具体的物品,完成推荐。

总结

简单性能对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+-----------+---------------+--------------+-----------+----------+----------------+---------------+
| model | hit_rate@10 | hit_rate@5 | ndcg@10 | ndcg@5 | precision@10 | precision@5 |
+===========+===============+==============+===========+==========+================+===============+
| mind | 0.0041 | 0.0012 | 0.0014 | 0.0005 | 0.0004 | 0.0002 |
+-----------+---------------+--------------+-----------+----------+----------------+---------------+
| sdm | 0.1031 | 0.0969 | 0.099 | 0.0969 | 0.0103 | 0.0194 |
+-----------+---------------+--------------+-----------+----------+----------------+---------------+

+-----------+---------------+--------------+-----------+----------+
| model | hit_rate@10 | hit_rate@5 | ndcg@10 | ndcg@5 |
+===========+===============+==============+===========+==========+
| sasrec | 0.0235 | 0.0119 | 0.0114 | 0.0078 |
+-----------+---------------+--------------+-----------+----------+
| hstu | 0.0142 | 0.0075 | 0.007 | 0.0048 |
+-----------+---------------+--------------+-----------+----------+

结束

  • 序列召回代表了推荐系统的一个重要发展方向:从基于静态用户画像的推荐,转向基于动态行为序列的推荐。这种方法不仅关注“用户是谁”,更重视“用户下一步可能做什么”。通过建模用户行为中的时序依赖、兴趣演化和多重意图,序列召回能够提供更精准、更及时的推荐结果。
  • 为了实现这一目标,序列召回主要沿着两条技术路径发展。
    • 第一条路径专注于改进用户表示,通过构建更丰富的用户画像来提升检索效果。例如,MIND模型使用多个向量表示用户的不同兴趣,解决了单一向量的表达局限;SDM模型则融合了用户的长短期偏好,使用户画像能够同时反映稳定兴趣和即时需求。
    • 第二条路径采用了不同的建模思路,将推荐问题转化为序列生成任务,直接预测行为序列的下一个物品。SASRec引入Transformer模型,建立了“预测下一个物品ID”的基本框架;HSTU将异构特征统一为事件流表示,扩展了模型的输入信息;TIGER通过语义ID改进了物品表示方法,使模型能够更好地理解物品间的语义关系。
  • 虽然技术路径有所不同,但所有序列召回模型都致力于通过挖掘时序规律来更好地预测用户意图,提升推荐的相关性和时效性。这一发展趋势也反映了推荐系统的演进方向,即从理解用户历史行为扩展到预测用户未来行为。如何在保持模型性能的同时满足在线服务的效率要求,以及如何与大语言模型等新兴技术相结合,是序列召回技术需要持续探索的重要问题。