参考
召回(二)向量召回
然而,矩阵分解仍然面临一些局限:它是一个相对简单的线性模型,通过用户向量和物品向量的内积来预测评分,表达能力有限;它主要依赖用户-物品交互矩阵,难以融入更多的特征信息(如用户画像、物品属性、上下文信息等);在面对数亿用户和数千万商品的工业级规模时,完整交互矩阵的处理和冷启动问题仍然是挑战。
在向量空间中,推荐问题得到了根本性的简化。原本需要遍历巨大交互矩阵的召回过程,转变为在高维向量空间中根据一个“查询”向量快速搜索出距离最近的K个物品向量。这种转变不仅大幅提升了计算效率,还通过向量的表示能力捕捉到了更深层次的语义相似性。
向量召回技术主要沿着两条路径发展。I2I(Item-to-Item)召回专注于计算物品与物品之间的相似性。U2I(User-to-Item)召回则直接匹配用户与物品。
1、I2I召回
在推荐系统中,I2I(Item-to-Item)召回是一个核心任务:给定一个物品,如何快速找出与之相似的其他物品?这个看似简单的问题,实际上蕴含着深刻的洞察——“相似性”并非仅仅由物品的内在属性决定,而是与用户的行为所共同定义的。如果两个商品经常被同一批用户购买,两部电影被同一群观众喜欢,那么它们之间就可能存在某种关联。
接下来,我们将看到所有I2I召回方法的本质都是在回答同一个问题:如何更好地定义和利用“序列”来学习物品之间的相似性。从最直接的用户行为序列,到融合属性信息的增强序列,再到面向业务目标的会话序列,每一种方法都是对“序列”概念的不同诠释和深化。
1.1 序列建模理论基础Word2Vec
Word2Vec (Mikolov et al., 2013) 的成功建立在一个简单而深刻的假设之上:在相似语境中出现的词语往往具有相似的含义。通过分析海量文本中词语的共现模式,我们可以为每个词学习一个稠密的向量表示,使得语义相近的词在向量空间中距离更近。
Word2Vec主要包含两种模型架构:Skip-Gram和CBOW(Continuous Bag of Words)。
Skip-Gram模型通过给定的中心词来预测其周围的上下文词,而CBOW模型则相反,通过上下文词来预测中心词。
在推荐系统中,Skip-Gram模型由于其更好的性能表现而被更广泛地采用。
Skip-Gram原理
在Skip-Gram模型中,给定文本序列中位置t的中心词 w t w_t w t ,模型的目标是最大化其上下文窗口内所有词语的出现概率。具体而言,对于窗口大小为m的情况,模型要预测 w t − m , w t − m + 1 , . . . , w t − 1 , w t + 1 , . . . , w t + m w_{t-m},w_{t-m+1},...,w_{t-1},\ w_{t+1},...,w_{t+m} w t − m , w t − m + 1 , ... , w t − 1 , w t + 1 , ... , w t + m 这些上下文词的概率。
中心词 w t w_t w t 预测上下文词 w t + j w_{t+j} w t + j 的条件概率定义为:
P ( w t + j ∣ w t ) = e v t + j T v t ∑ k = 1 ∣ V ∣ e v k T v t 其中 v w i 表示词 w i 的向量表示, V 是词汇表。 P(w_{t+j}|w_t) = \frac{
e^{ v_{t+j}^T v_t }
}{
\sum_{k=1}^{|V|} e^{ v_k^T v_t }
} \\
其中 v_{w_i} 表示词 w_i 的向量表示,V是词汇表。
P ( w t + j ∣ w t ) = ∑ k = 1 ∣ V ∣ e v k T v t e v t + j T v t 其中 v w i 表示词 w i 的向量表示, V 是词汇表。
这个softmax公式确保了所有词的概率之和为1,而分子中的内积 v t + j T v t v_{t+j}^T v_t v t + j T v t 衡量了中心词与上下文词的相似度。
负采样优化
直接计算上述softmax的分母需要遍历整个词汇表,在实际应用中计算代价过高。为了解决这个问题,Word2Vec采用了负采样(Negative Sampling)技术。这种方法将原本的多分类问题转化为多个二分类问题:
log σ ( v w t + j T v w t ) + ∑ i = 1 k E w i ∼ P n ( w ) log σ ( − v w i T v w t ) 其中 σ ( x ) = 1 1 + e − x 是 s i g m o i d 函数, k 是负样本数量, P n ( w ) 是负采样分布。 \log \sigma (v_{w_{t+j}}^T v_{w_t} ) +
\sum_{i=1}^k \mathbb{E}_{w_i \sim P_n(w)} \log \sigma (- v_{w_i}^T v_{w_t}) \\
其中 \sigma (x) = \frac{1}{1+e^{-x}} 是sigmoid函数,k是负样本数量,P_n(w)是负采样分布。
log σ ( v w t + j T v w t ) + i = 1 ∑ k E w i ∼ P n ( w ) log σ ( − v w i T v w t ) 其中 σ ( x ) = 1 + e − x 1 是 s i g m o i d 函数, k 是负样本数量, P n ( w ) 是负采样分布。
负采样的直观解释是:对于真实的词对,我们希望增加它们的相似度;对于随机采样的负样本词对,我们希望降低它们的相似度。
1.2 最直接的迁移Item2Vec
类似地,在推荐系统中,每个用户的交互历史可以看作一个“句子”,其中包含的物品就是“词语”。如果两个物品经常被同一个用户交互,那么它们之间就存在相似性。
这种映射关系可以表示为:
词语 → 物品
句子 → 用户交互序列
词语共现 → 物品共同被用户交互
原理
Item2Vec直接采用Word2Vec的Skip-Gram架构,但在序列构建上有所简化。
给定数据集 S = { s 1 , s 2 , . . . , s n } \mathcal{S} = \{ s_1,s_2,...,s_n \} S = { s 1 , s 2 , ... , s n } ,其中每个 s i s_i s i 包含用户 i i i 交互过的所有物品,Item2Vec将每个用户的交互历史视为一个集合而非序列,忽略了交互的时间顺序。
优化目标函数与Word2Vec保持一致:
L = ∑ s ∈ S ∑ l i ∈ s ∑ − m ≤ j ≤ m , j ≠ 0 log P ( l i + j ∣ l i ) 其中 l i 表示物品, m 是上下文窗口大小, P ( l i + j ∣ l i ) 采用与 W o r d 2 V e c 相同的 s o f t m a x 形式计算。 \mathcal{L} = \sum_{s \in \mathcal{S}} \sum_{l_i \in s} \sum_{-m \leq j \leq m, j \neq 0} \log P(l_{i+j}|l_i) \\
其中l_i 表示物品,m是上下文窗口大小,P(l_{i+j}|l_i)采用与Word2Vec相同的softmax形式计算。
L = s ∈ S ∑ l i ∈ s ∑ − m ≤ j ≤ m , j = 0 ∑ log P ( l i + j ∣ l i ) 其中 l i 表示物品, m 是上下文窗口大小, P ( l i + j ∣ l i ) 采用与 W or d 2 V ec 相同的 so f t ma x 形式计算。
代码
Item2Vec的实现可以直接调用gensim库的Word2Vec模型。核心在于将用户交互序列作为训练语料。
这里的 train_hist_movie_id_list 就是前面提到的数据集,其中每个用户的交互历史被视为一个“句子”,物品ID对应“词语”。训练完成后,每个物品都得到一个稠密的向量表示。
item2vec.py 文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import numpy as npimport loggingfrom gensim.models import Word2Veclogging.getLogger("gensim" ).setLevel(logging.WARNING) class Item2Vec : def __init__ (self, model_config ): self.model_config = model_config def build_model (self ): pass def fit (self, train_hist_movie_id_list ): self.model = Word2Vec( train_hist_movie_id_list, vector_size=self.model_config["EmbDim" ], window=self.model_config["Window" ], min_count=self.model_config["MinCount" ], workers=self.model_config["Workers" ], ) def get_user_embs (self, hist_movie_id_list ): user_embs = [] for hist_movie_ids in hist_movie_id_list: user_embs.append( self.model.wv[hist_movie_ids[np.where(hist_movie_ids != 0 )[0 ]]].mean( axis=0 ) ) return np.array(user_embs) def get_item_embs (self, item_id_list ): item_embs = [] for item_id in item_id_list: try : item_embs.append(self.model.wv[item_id]) except KeyError: item_embs.append(np.zeros(self.model_config["EmbDim" ])) return np.array(item_embs) def build_item2vec_model (model_config ): return Item2Vec(model_config)
1.3 用属性信息增强序列EGES
Item2Vec虽然验证了序列建模在推荐系统中的可行性,但其简单的设计也带来了明显的局限性。
首先,将用户交互历史简单视为无序集合,忽略了时序信息可能丢失重要的用户行为模式。其次,对于新上架的物品由于缺乏用户交互历史,Item2Vec无法生成有意义的向量表示。
EGES(Enhanced Graph Embedding with Side Information)正是为了解决这些核心挑战而提出的。
该方法通过两个关键创新来改进传统的序列建模:一是基于会话构建更精细的商品关系图来更好地反映用户行为模式,二是融合商品的辅助信息来解决冷启动问题。
构建商品关系图
EGES的第一个创新是将物品序列的概念从简单的用户交互扩展为更精细的会话级序列。考虑到用户行为的复杂性和计算效率,研究者设置了一小时的时间窗口,只选择窗口内的用户行为构建商品关系图。
具体构建过程如图所示:当两个商品在同一会话(时间窗口内)的用户行为序列中连续出现时,在它们之间建立一条有向边,边的权重等于这种商品转移模式在所有用户行为历史中出现的频率 。相比于传统方法将整个用户历史视为一个序列,这种基于会话的图构建方法能够更准确地捕捉用户在特定时间段内的连续兴趣转移模式。
在构建好的商品图上,EGES采用带权随机游走策略生成训练序列。从一个节点出发,转移概率由边权重决定:
P ( v j ∣ v i ) = { M i j ∑ j = 1 ∣ N + ( v i ) ∣ M i j , i f v j ∈ N + ( v i ) 0 , i f e i j ∈ E 其中 M i j 表示节点 i 到节点 j 的边权重, N + v i 表示节点 v i 的邻居集合。通过这种随机游走过程,可以生成大量的商品序列用于后续的 E m b e d d i n g 学习。 P(v_j | v_i) =
\begin{cases}
\frac{ M_{ij} }{ \sum_{j=1}^{|N_+(v_{i})|} M_{ij}} \quad ,if\quad v_j \in N_+(v_i) \\
0 \quad ,if\quad e_{ij} \in E
\end{cases} \\
其中M_{ij}表示节点i到节点j的边权重,N_+{v_i}表示节点v_i的邻居集合。通过这种随机游走过程,可以生成大量的商品序列用于后续的Embedding学习。
P ( v j ∣ v i ) = ⎩ ⎨ ⎧ ∑ j = 1 ∣ N + ( v i ) ∣ M ij M ij , i f v j ∈ N + ( v i ) 0 , i f e ij ∈ E 其中 M ij 表示节点 i 到节点 j 的边权重, N + v i 表示节点 v i 的邻居集合。通过这种随机游走过程,可以生成大量的商品序列用于后续的 E mb e dd in g 学习。
融合辅助信息解决稀疏性问题
基于上述商品图和随机游走策略,我们可以采用类似Word2Vec的方法学习商品的向量表示。然而,这种纯粹基于行为序列的方法面临一个关键挑战:对于用户交互稀少的商品,由于缺乏足够的共现信息,很难学习到高质量的Embedding表示。
为了解决这种稀疏性问题,EGES方法的第二个创新是引入商品的辅助信息 (如类别、品牌、价格区间等)来增强商品的向量表示。
GES的核心思想是将商品本身的Embedding与其各种属性的Embedding进行平均聚合:
H v = 1 n + 1 ∑ s = 0 n W v s 其中 W v s 表示商品 v 的第 s 种属性的向量表示, W v 0 表示商品 I D 的向量表示。 H_v = \frac{1}{n+1} \sum_{s=0}^{n} W_v^s \\
其中W_v^s表示商品v的第s种属性的向量表示,W_v^0表示商品ID的向量表示。
H v = n + 1 1 s = 0 ∑ n W v s 其中 W v s 表示商品 v 的第 s 种属性的向量表示, W v 0 表示商品 I D 的向量表示。
这种方法虽然有效缓解了稀疏性问题,但存在一个明显的局限:它假设所有类型的辅助信息对商品表示的贡献是相等的,这显然不符合实际情况。
EGES的核心创新在于认识到不同类型的辅助信息应该有不同的重要性 。对于手机,品牌可能比价格更重要;对于日用品,价格可能比品牌更关键。
对于具有n种辅助信息的商品v,EGES为其维护n+1个向量表示:一个商品ID的向量表示,以及n个属性的向量表示。商品的最终向量表示通过加权聚合得到:
H v = ∑ j = 0 n e a v j W v j ∑ j = 0 n e a v j 其中 a v j 是可学习的权重参数。 H_v = \frac{
\sum_{j=0}^n e^{a_v^j} W_v^j
}{
\sum_{j=0}^n e^{a_v^j}
} \\
其中a_v^j是可学习的权重参数。
H v = ∑ j = 0 n e a v j ∑ j = 0 n e a v j W v j 其中 a v j 是可学习的权重参数。
这种设计的精妙之处在于,不同类型的辅助信息对不同商品的重要性是不同的——对于手机,品牌可能比价格更重要;对于日用品,价格可能比品牌更关键。
冷启动商品的处理 :对于新上架且没有任何用户交互历史的商品,EGES提供了有效的冷启动解决方案。由于这类商品缺乏行为数据,无法通过随机游走生成训练序列,因此既不存在基于ID的向量表示,也没有经过训练的注意力权重参数 a v j a_v^j a v j 。
在这种情况下,系统采用简单而有效的mean pooling策略:直接对该商品的所有辅助信息向量(类别、品牌、价格区间等)进行平均聚合 来构建商品表示。虽然这种方法无法体现不同属性的差异化重要性,但能够有效利用商品的内容特征,从而支持基于向量相似度的商品召回(I2I召回)。
通过这种方式,即使是刚上架、没有任何用户交互的新商品,也能通过其属性信息获得有意义的向量表示,从而被纳入推荐候选集。
EGES在淘宝的实际部署效果显著:在包含十亿级训练样本的大规模数据集上,相比传统方法在推荐准确率上有了显著的提升,同时有效解决了新商品的冷启动问题。
EGES采用与Word2Vec类似的负采样策略,但损失函数经过了优化:
L ( v , u , y ) = − [ y log ( σ ( H v T Z u ) ) + ( 1 − y ) log ( 1 − σ ( H v T Z u ) ) ] 其中 y 是标签( 1 表示正样本, 0 表示负样本), H v 是商品 v 的向量表示, Z u 是上下文节点 u 的向量表示。 L(v,u,y) = -[ y \log ( \sigma (H_v^T Z_u) ) + (1-y) \log (1 - \sigma (H_v^T Z_u) ) ] \\
其中y是标签(1表示正样本,0表示负样本),H_v是商品v的向量表示,Z_u是上下文节点u的向量表示。
L ( v , u , y ) = − [ y log ( σ ( H v T Z u )) + ( 1 − y ) log ( 1 − σ ( H v T Z u ))] 其中 y 是标签( 1 表示正样本, 0 表示负样本), H v 是商品 v 的向量表示, Z u 是上下文节点 u 的向量表示。
代码
EGES的核心在于商品特定注意力层(ItemSpecificAttentionLayer),它为每个商品学习一组特征权重。
这里的attention_weights是一个形状为 ∣ V ∣ × ( n + 1 ) |V| \times (n+1) ∣ V ∣ × ( n + 1 ) 的参数矩阵,其中 ∣ V ∣ |V| ∣ V ∣ 是商品总数, n + 1 n+1 n + 1 是特征数量(商品ID + n种辅助信息)。对于每个商品,模型会学习到一组特定的权重,自动发现哪些特征对该商品更重要。这种商品特定的注意力机制是EGES相比简单平均聚合的关键优势。
eges.py文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 import osimport logginglogger = logging.getLogger(__name__) import tensorflow as tfimport networkx as nximport numpy as npfrom tqdm import tqdmclass SimpleWalker : """简化的随机游走器实现""" def build_graph (self, session_list ): logger.debug("构建图..." ) edges = [] for session in session_list: if len (session) > 1 : for i in range (len (session) - 1 ): u = int (session[i]) v = int (session[i + 1 ]) edges.append((u, v)) if len (edges) == 0 : return None , None G = nx.Graph() G.add_edges_from(edges) nodes = list (G.nodes()) node_map = {node: i for i, node in enumerate (nodes)} reverse_node_map = {i: node for i, node in enumerate (nodes)} logger.debug( f"图包含 {G.number_of_nodes()} 个节点和 {G.number_of_edges()} 条边" ) return G, (node_map, reverse_node_map) def generate_walks (self, G, num_walks, walk_length ): """生成随机游走""" logger.debug(f"生成随机游走..." ) walks = [] nodes = list (G.nodes()) for _ in range (num_walks): np.random.shuffle(nodes) for node in tqdm( nodes, desc=f"Walk {_+1 } /{num_walks} " , disable=not logger.isEnabledFor(logging.DEBUG), ): walk = [node] for _ in range (walk_length - 1 ): curr = walk[-1 ] neighbors = list (G.neighbors(curr)) if len (neighbors) == 0 : break walk.append(np.random.choice(neighbors)) walks.append(walk) logger.debug(f"生成的游走序列数量: {len (walks)} " ) return walks def get_graph_context_all_pairs (walks, window_size ): """ 从随机游走生成训练样本对 """ all_pairs = [] for walk in tqdm( walks, desc="生成训练样本对" , disable=not logger.isEnabledFor(logging.DEBUG) ): for i in range (len (walk)): for j in range ( max (0 , i - window_size), min (len (walk), i + window_size + 1 ) ): if i != j: all_pairs.append((walk[i], walk[j])) logger.debug(f"生成的样本对数量: {len (all_pairs)} " ) return all_pairs class ItemSpecificAttentionLayer (tf.keras.layers.Layer): """ 正确实现EGES论文中描述的加权池化注意力层 A ∈ R^{|V| × (n+1)} - 每个商品有自己的一组特征权重 """ def __init__ (self, num_items, **kwargs ): """ 参数: num_items: 商品总数 |V| """ self.num_items = num_items super (ItemSpecificAttentionLayer, self).__init__(**kwargs) def build (self, input_shape ): num_features = input_shape[1 ] self.attention_weights = self.add_weight( name="attention_weights" , shape=(self.num_items, num_features), initializer=tf.keras.initializers.RandomNormal(), trainable=True , regularizer=tf.keras.regularizers.l2(1e-5 ), ) super (ItemSpecificAttentionLayer, self).build(input_shape) def call (self, inputs, item_indices ): """ 参数: inputs: 特征嵌入 [batch_size, n+1, emb_dim], n 是特征数量,emb_dim 是嵌入维度 item_indices: 当前批次中每个样本对应的商品索引 [batch_size] """ batch_attention_weights = tf.gather(self.attention_weights, item_indices) exp_attention = tf.exp(batch_attention_weights) attention_sum = tf.reduce_sum(exp_attention, axis=1 , keepdims=True ) normalized_attention = exp_attention / attention_sum normalized_attention = tf.expand_dims(normalized_attention, axis=-1 ) weighted_embedding = inputs * normalized_attention output = tf.reduce_sum(weighted_embedding, axis=1 ) return output, normalized_attention def generate_negative_samples (train_sample_dict, num_negatives=2 ): """生成负样本""" negative_sample_dict = { "movie_id" : np.repeat(train_sample_dict["movie_id" ], num_negatives), "context_id" : np.repeat(train_sample_dict["context_id" ], num_negatives), "genre_id" : np.repeat(train_sample_dict["genre_id" ], num_negatives), } np.random.shuffle(negative_sample_dict["context_id" ]) return negative_sample_dict def build_eges_model (feature_columns, model_config ): """ 构建EGES模型(自包含版本,适配funrec训练/评估流程) 参数: feature_columns: 未使用(EGES使用自定义输入) model_config: 包含以下字段: - dataset_name: 数据集名称,用于解析字典大小 - item_feature_list: 物品侧特征列表(例如 ['movie_id', 'genre_id']) - emb_dim: 嵌入维度 - l2_reg: L2正则 - use_attention: 是否使用注意力聚合 返回: (main_model, None, item_model) """ from ..config.data_config import DATASET_CONFIG from ..data.data_utils import read_pkl_data item_feature_list = model_config.get("item_feature_list" , ["movie_id" , "genre_id" ]) emb_dim = int (model_config.get("emb_dim" , 16 )) l2_reg = float (model_config.get("l2_reg" , 1e-5 )) use_attention = model_config.get("use_attention" , True ) dataset_name = model_config.get("dataset_name" ) if dataset_name is None : raise ValueError("EGES requires 'dataset_name' in model_config" ) dataset_cfg = dict (DATASET_CONFIG.get(dataset_name, {})) feature_dict = read_pkl_data(dataset_cfg["dict_path" ]) item_vocab_size = feature_dict.get("movie_id" ) or feature_dict.get("movieId" ) or 0 if not isinstance (item_vocab_size, (int , np.integer)): item_vocab_size = len (item_vocab_size) + 1 genre_vocab_size = feature_dict.get("genres" , 0 ) if not isinstance (genre_vocab_size, (int , np.integer)): genre_vocab_size = len (genre_vocab_size) + 1 inputs = { "movie_id" : tf.keras.Input(shape=(), dtype="int32" , name="movie_id" ), "context_id" : tf.keras.Input(shape=(), dtype="int32" , name="context_id" ), "genre_id" : tf.keras.Input(shape=(), dtype="int32" , name="genre_id" ), } movie_emb_table = tf.keras.layers.Embedding( input_dim=item_vocab_size, output_dim=emb_dim, name="eges_movie_id" , embeddings_initializer=tf.keras.initializers.RandomNormal(), embeddings_regularizer=tf.keras.regularizers.l2(l2_reg), mask_zero=False , ) genre_emb_table = tf.keras.layers.Embedding( input_dim=genre_vocab_size, output_dim=emb_dim, name="eges_genre_id" , embeddings_initializer=tf.keras.initializers.RandomNormal(), embeddings_regularizer=tf.keras.regularizers.l2(l2_reg), mask_zero=False , ) context_emb_table = tf.keras.layers.Embedding( input_dim=item_vocab_size, output_dim=emb_dim, name="eges_context_id" , embeddings_initializer=tf.keras.initializers.RandomNormal(), embeddings_regularizer=tf.keras.regularizers.l2(l2_reg), mask_zero=False , ) feature_embedding_dict = { "movie_id" : tf.expand_dims(movie_emb_table(inputs["movie_id" ]), axis=1 ), "genre_id" : tf.expand_dims(genre_emb_table(inputs["genre_id" ]), axis=1 ), "context_id" : tf.expand_dims(context_emb_table(inputs["context_id" ]), axis=1 ), } all_feature_embeddings = [] for feat_name in item_feature_list: all_feature_embeddings.append(feature_embedding_dict[feat_name]) stacked_embeddings = tf.concat(all_feature_embeddings, axis=1 ) item_indices = tf.cast(inputs["movie_id" ], tf.int32) if use_attention: attention_layer = ItemSpecificAttentionLayer(num_items=item_vocab_size) final_embedding, _ = attention_layer(stacked_embeddings, item_indices) else : final_embedding = tf.reduce_mean(stacked_embeddings, axis=1 ) context_embedding = tf.squeeze(feature_embedding_dict["context_id" ], axis=1 ) output = tf.reduce_sum(final_embedding * context_embedding, axis=1 , keepdims=True ) output = tf.keras.layers.Activation("sigmoid" )(output) model = tf.keras.Model(inputs=inputs, outputs=output) item_inputs = {k: v for k, v in inputs.items() if k in item_feature_list} model.__setattr__("item_input" , item_inputs) model.__setattr__("item_embedding" , final_embedding) item_model = tf.keras.Model(inputs=item_inputs, outputs=final_embedding) return model, None , item_model
1.4 业务目标融入序列Airbnb
Airbnb作为全球最大的短租平台,面临着与传统电商不同的挑战。房源不是标准化商品,用户的预订行为远比点击浏览稀疏,而且地理位置成为了一个关键因素。更重要的是,Airbnb需要的不仅仅是相似性,而是能够真正促进最终预订转化的推荐。
面向业务的序列构建
Airbnb重新定义了“序列”的概念,采用基于会话的序列构建策略。具体而言:
会话切分机制 :系统不再简单地将用户交互过的所有房源串联,而是基于用户的点击会话(Click Sessions)构建序列。当用户连续点击间隔超过30分钟时,系统会自动开始一个新的会话。这种时间窗口的设计能够更准确地捕捉用户在特定搜索场景下的连贯意图。
行为权重差异化 :Airbnb引入了重要的业务洞察——用户行为的信号强度存在显著差异。最终的预订行为相比于简单的点击浏览,包含了更强烈的用户偏好信号,因此在模型训练中应当给予更高的权重。
全局上下文机制
为了强化模型对最终转化行为的学习,Airbnb设计了全局上下文机制。
在传统的Skip-Gram模型中,只有在滑动窗口内的物品才被视为上下文,但这种局部窗口无法充分利用最终预订这一强烈的正向信号。因此,Airbnb让用户最终预订的房源(booked listing)与序列中的每一个浏览房源都形成正样本对进行训练,无论它们在序列中的距离有多远。
针对有预订行为的会话(booked sessions),Airbnb修改了优化目标函数,增加了全局上下文项:
arg max θ ∑ ( l , c ) ∈ D p log 1 1 + e − v c T v l + ∑ ( l , c ) ∈ D n log 1 1 + e v c T v l + log 1 1 + e − v l b T v l \argmax_{\theta}
\sum_{(l,c) \in \mathcal{D}_p} \log \frac{1}{1+e^{-v_c^T v_l}} +
\sum_{(l,c) \in \mathcal{D}_n} \log \frac{1}{1+e^{v_c^T v_l}} + \log \frac{1}{1+ e^{-v_{l_b}^T v_l}}
θ arg max ( l , c ) ∈ D p ∑ log 1 + e − v c T v l 1 + ( l , c ) ∈ D n ∑ log 1 + e v c T v l 1 + log 1 + e − v l b T v l 1
在这个公式中,前两项是标准的Skip-Gram目标函数:
第一项最大化正样本对 ( l , c ) (l,c) ( l , c ) 的相似度,其中 l 是目标房源,c是滑动窗口内的上下文房源;
第二项最小化负样本对的相似度。
关键的创新在于第三项,这里 l b l_b l b 表示用户在该会话中最终预订的房源。
通过这种全局上下文机制,预订房源为序列中的每个房源都提供了额外的学习信号,使得模型能够更有效地捕捉“什么样的房源组合最终会导致预订 ”这一关键转化模式。
市场感知的负采样
Airbnb的另一个创新是改进了负采样策略。传统方法从整个物品库中随机选择负样本,但Airbnb观察到用户通常只会在同一个市场(城市或地区)内进行预订。如果负样本来自不同的地理位置,模型就容易学到地理位置这种“简单特征”,而忽略了房源本身的特点。
因此,Airbnb增加了“同市场负采样 ”策略,一部分负样本从与正样本相同的地理市场 中选择:
∑ ( l , l m − ) ∈ D m log 1 1 + e v l m − T v l 其中 l m − 表示来自相同市场的负样本。 \sum_{(l,l_m^-) \in \mathcal{D}_m} \log \frac{1}{1+e^{v_{l_m^-}^T v_l}} \\
其中l_m^-表示来自相同市场的负样本。
( l , l m − ) ∈ D m ∑ log 1 + e v l m − T v l 1 其中 l m − 表示来自相同市场的负样本。
这迫使模型学习同一地区内房源的细微差别,提升了推荐的精细度。
2、U2I召回
在完成了I2I召回的探讨后,我们转向另一条同样重要的技术路径:U2I(用户到物品)召回。如果说I2I召回解决的是“买了这个商品的人还会买什么 ”的问题,那么U2I召回直面的则是推荐系统的核心命题——“这个用户会喜欢什么商品 ”。
U2I召回的核心挑战在于如何在庞大的物品库中,快速找到与用户兴趣高度匹配的候选集。传统的协同过滤方法虽然有效,但在面对数亿用户和数千万商品时,计算复杂度成为不可逾越的障碍。U2I召回的演进历程,本质上是一个将复杂的“匹配”问题逐步简化为高效“搜索”问题的过程。
这一转变的关键突破来自于一个统一的架构思想:双塔模型 (Two-Tower Model)。无论是经典的因子分解机FM、深度结构化语义模型DSSM,还是YouTube的深度神经网络YouTubeDNN,它们在表面上看起来差异巨大,但在本质上都遵循着同一个设计哲学——将用户和物品分别编码为向量,然后通过向量间的相似度计算来衡量匹配程度。
双塔模型的核心思想是将推荐问题分解为两个相对独立的子问题。
用户塔 (User Tower)专注于理解用户——处理用户的历史行为、人口统计学特征、上下文信息等,最终输出一个代表用户兴趣的向量。
物品塔 (Item Tower)则专精于刻画物品——整合物品的ID、类别、属性、内容特征等,输出一个表征物品特性的向量。
这种“分而治之”的设计带来了巨大的工程优势。在训练完成后,所有物品的向量都可以离线预计算 并存储在高效的向量检索系统中(如Faiss、Annoy等)。当用户发起推荐请求时,系统只需实时计算用户向量,然后通过近似最近邻(ANN)搜索 快速找到最相似的物品向量。这种架构的优雅之处在于,它将原本需要 O ( U × I ) O(U \times I) O ( U × I ) 的用户-物品匹配复杂度,降低到了 O ( U + I ) O(U+I) O ( U + I ) 的向量计算复杂度。
用户与物品的匹配度通过两个向量的点积 或余弦相似度 来衡量:
s c o r e ( u , v ) = u ⋅ v = ∑ i = 1 d u i v i 其中 d 是向量维度 , u i 和 v i 是向量 u 和 v 的第 i 个分量。 score(u,v) = u\cdot v = \sum_{i=1}^d u_i v_i \\
其中d是向量维度, u_i和v_i是向量u和v的第i个分量。
score ( u , v ) = u ⋅ v = i = 1 ∑ d u i v i 其中 d 是向量维度 , u i 和 v i 是向量 u 和 v 的第 i 个分量。
这个简单的数学操作背后,蕴含着“语义相似性”的深刻含义——向量空间中的距离反映了用户兴趣与物品特性的匹配程度。接下来,我们将沿着双塔模型的演进轨迹,从经典的数学基础到现代的深度学习实现,逐一探讨这些里程碑式的工作。
2.1 因子分解机FM
双塔模型的雏形
FM的核心贡献在于,它首次将用户-物品的复杂交互,优雅地分解为两个低维向量的内积操作。
从交互矩阵到向量内积
y ^ ( x ) : = w 0 + ∑ i = 1 n w i x i + ∑ i = 1 n ∑ j = i + 1 n ⟨ v i , v j ⟩ x i x j \hat{y} (x) := w_0 + \sum_{i=1}^n w_i x_i + \sum_{i=1}^n \sum_{j=i+1}^n \langle v_i, v_j \rangle x_i x_j
y ^ ( x ) := w 0 + i = 1 ∑ n w i x i + i = 1 ∑ n j = i + 1 ∑ n ⟨ v i , v j ⟩ x i x j
这个公式看起来复杂,但其核心思想简单而深刻:每个特征i都对应一个k维的隐向量 v i v_i v i ,特征间的交互通过这些隐向量的内积 ⟨ v i , v j ⟩ = ∑ f = 1 k v i , f ⋅ v j , f \langle v_i, v_j\rangle = \sum_{f=1}^k v_{i,f} \cdot v_{j,f} ⟨ v i , v j ⟩ = ∑ f = 1 k v i , f ⋅ v j , f 来建模。
FM的真正巧妙之处在于一个数学变换技巧。原本 O ( n 2 ) O(n^2) O ( n 2 ) 复杂度的二阶交互项,可以通过代数运算重写为:
∑ i = 1 n ∑ j = i + 1 n ⟨ v i , v j ⟩ x i x j = 1 2 ∑ f = 1 k ( ( ∑ i = 1 n v i , f x i ) 2 − ∑ i = 1 n v i , f 2 x i 2 ) \sum_{i=1}^n \sum_{j=i+1}^n \langle v_i, v_j\rangle x_i x_j =
\frac{1}{2} \sum_{f=1}^k \left(
\left( \sum_{i=1}^n v_{i,f} x_i \right) ^2 -
\sum_{i=1}^n v_{i,f}^2 x_i^2
\right)
i = 1 ∑ n j = i + 1 ∑ n ⟨ v i , v j ⟩ x i x j = 2 1 f = 1 ∑ k ( i = 1 ∑ n v i , f x i ) 2 − i = 1 ∑ n v i , f 2 x i 2
这一变换将计算复杂度从 O ( k n 2 ) O(kn^2) O ( k n 2 ) 降低到 O ( k n ) O(kn) O ( kn ) ,使得FM能够处理大规模稀疏数据。
分解为双塔结构
虽然FM通过数学变换解决了计算复杂度问题,但在召回任务中,我们还面临另一个挑战:如何高效地为用户从海量候选物品中筛选出最相关的推荐结果?这时就需要考虑将FM分解为双塔结构。
在召回场景下,我们可以将所有特征自然地分为两类:用户侧特征集U(如用户年龄、性别、历史偏好等)和物品侧特征集I(如物品类别、价格、品牌等)。
这里有一个关键的发现:当我们为同一个用户推荐不同物品时,用户特征是固定不变的。因此,用户特征内部的交互得分(无论是一阶还是二阶)对所有候选物品都是相同的。既然这部分得分相同,在排序时就可以忽略,我们只需要关注: 1. 物品特征内部的交互得分 2. 用户特征与物品特征之间的交互得分。基于这个思路,我们可以将FM的二阶交互项重新组织:
1 2 ∑ f = 1 k ( ( ∑ i = 1 n v i , f x i ) 2 − ∑ i = 1 n v i , f 2 x i 2 ) = 1 2 ∑ f = 1 k ( ( ∑ u ∈ U v u , f x u + ∑ t ∈ I v t , f x t ) 2 − ∑ u ∈ U v u , f 2 x u 2 − ∑ t ∈ I v t , f 2 x u 2 ) = 1 2 ∑ f = 1 k ( ( ∑ u ∈ U v u , f x u ) 2 + ( ∑ t ∈ I v t , f x t ) 2 + 2 ∑ u ∈ U v u , f x u ∑ t ∈ I v t , f x t − ∑ u ∈ U v u , f 2 x u 2 − ∑ t ∈ I v t , f 2 x u 2 ) \frac{1}{2} \sum_{f=1}^k \left(
\left( \sum_{i=1}^n v_{i,f} x_i \right) ^2 -
\sum_{i=1}^n v_{i,f}^2 x_i^2
\right) \\
\begin{aligned}
&= \frac{1}{2} \sum_{f=1}^k \left(
\left( \sum_{u \in U} v_{u,f} x_u + \sum_{t \in I} v_{t,f} x_t \right) ^2 - \sum_{u \in U} v_{u,f}^2 x_u^2 - \sum_{t \in I} v_{t,f}^2 x_u^2
\right) \\
&= \frac{1}{2} \sum_{f=1}^k \left(
\left( \sum_{u \in U} v_{u,f} x_u \right) ^2 + \left( \sum_{t \in I} v_{t,f} x_t \right) ^2 + 2\sum_{u\in U} v_{u,f}x_u \sum_{t \in I} v_{t,f} x_t - \sum_{u \in U} v_{u,f}^2 x_u^2 - \sum_{t \in I} v_{t,f}^2 x_u^2
\right)
\end{aligned}
2 1 f = 1 ∑ k ( i = 1 ∑ n v i , f x i ) 2 − i = 1 ∑ n v i , f 2 x i 2 = 2 1 f = 1 ∑ k ( u ∈ U ∑ v u , f x u + t ∈ I ∑ v t , f x t ) 2 − u ∈ U ∑ v u , f 2 x u 2 − t ∈ I ∑ v t , f 2 x u 2 = 2 1 f = 1 ∑ k ( u ∈ U ∑ v u , f x u ) 2 + ( t ∈ I ∑ v t , f x t ) 2 + 2 u ∈ U ∑ v u , f x u t ∈ I ∑ v t , f x t − u ∈ U ∑ v u , f 2 x u 2 − t ∈ I ∑ v t , f 2 x u 2
基于前面的分析,我们发现用户特征内部的交互项对所有候选物品都相同,因此可以在召回阶段忽略。这样就可以将FM重新组织,只保留对排序有影响的部分:
s c o r e F M = ∑ t ∈ I w t x t + 1 2 ∑ f = 1 k ( ( ∑ t ∈ I v t , f x t ) 2 − ∑ t ∈ I v t , f 2 x u 2 ) + ∑ f = 1 k ( ∑ u ∈ U v u , f x u ∑ t ∈ I v t , f x t ) score_{FM} = \sum_{t \in I} w_t x_t + \frac{1}{2} \sum_{f=1}^k \left(
\left( \sum_{t \in I} v_{t,f} x_t \right) ^2 - \sum_{t \in I} v_{t,f}^2 x_u^2
\right) + \sum_{f=1}^k \left(
\sum_{u\in U} v_{u,f}x_u \sum_{t \in I} v_{t,f} x_t
\right)
scor e FM = t ∈ I ∑ w t x t + 2 1 f = 1 ∑ k ( t ∈ I ∑ v t , f x t ) 2 − t ∈ I ∑ v t , f 2 x u 2 + f = 1 ∑ k ( u ∈ U ∑ v u , f x u t ∈ I ∑ v t , f x t )
观察上面的公式,我们发现一个重要的数学结构:最后一项 ∑ u ∈ U v u , f x u ∑ t ∈ I v t , f x t \sum_{u\in U} v_{u,f}x_u \sum_{t \in I} v_{t,f} x_t ∑ u ∈ U v u , f x u ∑ t ∈ I v t , f x t 实际上是两个向量 ∑ u ∈ U v u x u \sum_{u\in U} v_{u}x_u ∑ u ∈ U v u x u 和 ∑ t ∈ I v t x t \sum_{t\in I} v_{t}x_t ∑ t ∈ I v t x t
的内积。这启发我们将整个匹配分数重新组织为双塔结构:
s c o r e F M = V i t e m ⋅ V u s e r T score_{FM} = V_{item} \cdot V_{user}^T
scor e FM = V i t e m ⋅ V u ser T
通过这种重新组织,我们得到了FM的双塔表示:
V u s e r = [ 1 ; ∑ u ∈ U v u x u ] V_{user} = [1 \quad ;\quad \sum_{u \in U} v_u x_u]
V u ser = [ 1 ; u ∈ U ∑ v u x u ]
V i t e m = [ ∑ ∈ I w t x t + 1 2 ∑ f = 1 k ( ( ∑ t ∈ I v t , f x t ) 2 − ∑ t ∈ I v t , f 2 x u 2 ) ; ∑ t ∈ I v t x t ] V_{item} = [\sum_{ \in I} w_t x_t + \frac{1}{2} \sum_{f=1}^k (
( \sum_{t \in I} v_{t,f} x_t ) ^2 - \sum_{t \in I} v_{t,f}^2 x_u^2
) \quad ;\quad \sum_{t \in I} v_t x_t]
V i t e m = [ ∈ I ∑ w t x t + 2 1 f = 1 ∑ k (( t ∈ I ∑ v t , f x t ) 2 − t ∈ I ∑ v t , f 2 x u 2 ) ; t ∈ I ∑ v t x t ]
这里的设计很巧妙:用户向量包含一个常数项1和用户特征的聚合表示,物品向量则包含物品的内部交互信息和物品特征的聚合表示。两个向量通过内积运算,既能捕捉用户-物品之间的交互,又保留了物品内部特征的复杂关系。
这样的分解揭示了一个重要原理:即使是复杂的特征交互模式,也可以通过合适的向量表示和简单的内积运算来实现。
代码
FM召回的双塔实现关键在于如何将数学推导转化为实际的向量表示。
用户塔 构建了包含常数项和特征聚合的向量。
物品塔 则更为复杂,需要计算一阶线性项和FM交互项。
最终通过内积计算匹配分数:fm_score = Dot(axes=1)([item_vector, user_vector])。这种设计使得物品向量可以离线预计算,用户向量实时计算,从而支持高效的召回检索。
fm_recall.py文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 """ 因子分解机(FM)推荐模型。 """ import tensorflow as tffrom tensorflow.keras.models import Modelfrom tensorflow.keras.layers import Dense, Dot, Add, Subtract, Concatenatefrom .utils import ( build_input_layer, build_group_feature_embedding_table_dict, concat_group_embedding, ) from .layers import PredictLayerclass SumPooling (tf.keras.layers.Layer): """对嵌入特征进行求和聚合的层""" def __init__ (self, name=None , **kwargs ): super (SumPooling, self).__init__(name=name, **kwargs) def call (self, inputs ): return tf.reduce_sum(inputs, axis=1 ) def get_config (self ): config = super (SumPooling, self).get_config() return config class OnesLayer (tf.keras.layers.Layer): """生成全1向量的层""" def __init__ (self, name=None , **kwargs ): super (OnesLayer, self).__init__(name=name, **kwargs) def call (self, inputs ): batch_size = tf.shape(inputs)[0 ] return tf.ones((batch_size, 1 ), dtype=tf.float32) def get_config (self ): config = super (OnesLayer, self).get_config() return config class SquareLayer (tf.keras.layers.Layer): """平方操作层""" def __init__ (self, name=None , **kwargs ): super (SquareLayer, self).__init__(name=name, **kwargs) def call (self, inputs ): return tf.square(inputs) def get_config (self ): config = super (SquareLayer, self).get_config() return config class SumScalarLayer (tf.keras.layers.Layer): """将向量求和为标量的层""" def __init__ (self, name=None , **kwargs ): super (SumScalarLayer, self).__init__(name=name, **kwargs) def call (self, inputs ): return tf.reduce_sum(inputs, axis=1 , keepdims=True ) def get_config (self ): config = super (SumScalarLayer, self).get_config() return config class ScaleLayer (tf.keras.layers.Layer): """按常数缩放的层""" def __init__ (self, scale_factor, name=None , **kwargs ): super (ScaleLayer, self).__init__(name=name, **kwargs) self.scale_factor = scale_factor def call (self, inputs ): return inputs * self.scale_factor def get_config (self ): config = super (ScaleLayer, self).get_config() config.update({"scale_factor" : self.scale_factor}) return config def build_fm_recall_model (feature_columns, model_config ): """ 构建因子分解机(FM)模型 - 双塔结构用于召回 基于FM的数学分解:MatchScore = V_item · V_user^T 根据FM的数学推导: - 用户向量:V_user = [1; ∑(v_u * x_u)] - 物品向量:V_item = [∑w_t*x_t + FM_interaction; ∑(v_t * x_t)] Args: feature_columns: 特征列配置 model_config: 模型配置字典,包含: - embedding_dim: 嵌入维度 (default: 8) Returns: Tuple of (training_model, user_model, item_model) """ embedding_dim = model_config.get("embedding_dim" , 8 ) input_layer_dict = build_input_layer(feature_columns) group_embedding_feature_dict = build_group_feature_embedding_table_dict( feature_columns, input_layer_dict, prefix="embedding/" ) user_inputs = [ input_layer_dict[fc.name] for fc in feature_columns if "user" in fc.group ] item_inputs = [ input_layer_dict[fc.name] for fc in feature_columns if "item" in fc.group ] def build_user_tower (): user_embeddings = group_embedding_feature_dict.get("user" , []) if not user_embeddings: raise ValueError("No user embeddings found" ) user_concat = Concatenate(axis=1 , name="user_concat" )( user_embeddings ) user_embedding_sum = SumPooling(name="user_embedding_sum" )( user_concat ) ones_vector = OnesLayer(name="ones_vector" )( user_embedding_sum ) user_vector = Concatenate(axis=1 , name="user_vector" )( [ones_vector, user_embedding_sum] ) return user_vector def build_item_tower (): item_embeddings = group_embedding_feature_dict.get("item" , []) if not item_embeddings: raise ValueError("No item embeddings found" ) item_concat = Concatenate(axis=1 , name="item_concat" )( item_embeddings ) item_embedding_sum = SumPooling(name="item_embedding_sum" )( item_concat ) item_linear_weights = Dense( 1 , activation="linear" , use_bias=False , name="item_linear_weights" )( item_embedding_sum ) sum_squared = SquareLayer(name="item_sum_squared" )( item_embedding_sum ) item_squared = SquareLayer(name="item_squared" )( item_concat ) squared_sum = SumPooling(name="item_squared_sum" )( item_squared ) fm_interaction_vector = Subtract(name="fm_subtract" )( [sum_squared, squared_sum] ) fm_interaction_half = ScaleLayer(0.5 , name="fm_half_scale" )( fm_interaction_vector ) fm_interaction_scalar = SumScalarLayer(name="fm_interaction_scalar" )( fm_interaction_half ) first_term = Add(name="item_first_term" )( [item_linear_weights, fm_interaction_scalar] ) item_vector = Concatenate(axis=1 , name="item_vector" )( [first_term, item_embedding_sum] ) return item_vector user_representation = build_user_tower() item_representation = build_item_tower() user_model = Model( inputs=user_inputs, outputs=user_representation, name="user_tower_model" ) item_model = Model( inputs=item_inputs, outputs=item_representation, name="item_tower_model" ) fm_score = Dot(axes=1 , name="fm_match_score" )( [item_representation, user_representation] ) output = Dense(1 , activation="sigmoid" , name="output" )(fm_score) all_inputs = user_inputs + item_inputs training_model = Model( inputs=all_inputs, outputs=output, name="fm_two_tower_training" ) return training_model, user_model, item_model
2.2 深度结构化语义模型DSSM
虽然FM在数学上优雅地实现了向量分解,但它本质上仍是线性模型 ,对于复杂的非线性用户-物品关系表达能力有限。深度结构化语义模型(Deep Structured Semantic Model, DSSM) (Huang et al., 2013) 的出现,将双塔模型的表达能力推向了新的高度——通过深度神经网络替代线性变换,实现了更强的特征学习和表示能力。其核心思想是通过深度神经网络 将用户和物品映射到共同的语义空间中,通过向量间的相似度计算来衡量匹配程度。
推荐中的双塔架构
在推荐系统中,DSSM的架构包括两个核心部分:用户塔和物品塔,每个塔都是独立的DNN结构。用户特征(如历史行为、人口统计学信息等)经过用户塔处理后输出用户Embedding,物品特征(如ID、类别、属性等)经过物品塔处理后输出物品Embedding。两个Embedding的维度必须保持一致 ,以便进行后续的相似度计算。
相比FM的线性组合,DSSM的深度结构能够使用户侧和物品侧的特征各自在塔内进行复杂的非线性变换 ,但两塔之间的交互仅在最终的内积计算时发生。这种设计带来了显著的工程优势——物品向量可以离线预计算 ,用户向量可以实时计算,然后通过高效的ANN检索完成召回。
多分类训练范式
DSSM将召回任务视为一个极端多分类问题,将物料库中的所有物品看作不同的类别。模型的目标是最大化用户对正样本物品的预测概率:
P ( y ∣ x , θ ) = e s ( x , y ) ∑ j ∈ M e s ( x , y j ) 这里 s ( x , y ) 表示用户 x 和物品 y 的相似度分数, P ( y ∣ x , θ ) 表示匹配概率, M 表示整个物料库。 P(y|x,\theta ) = \frac{e^{s(x,y)}}{ \sum_{j\in M} e^{s(x,y_j)} } \\
这里s(x,y)表示用户x和物品y的相似度分数,P(y|x,\theta )表示匹配概率,M表示整个物料库。
P ( y ∣ x , θ ) = ∑ j ∈ M e s ( x , y j ) e s ( x , y ) 这里 s ( x , y ) 表示用户 x 和物品 y 的相似度分数, P ( y ∣ x , θ ) 表示匹配概率, M 表示整个物料库。
由于物料库规模庞大,直接计算这个Softmax在计算上不可行,因此实际训练时采用负采样技术,为每个正样本采样一定数量的负样本来近似计算。
双塔模型的细节
除了相对简单的模型结构外,双塔模型在实际应用中的一些关键细节同样值得深入探讨。这些细节往往决定了模型的最终效果,(Yi et al., 2019) 等研究对此进行了分析。
向量归一化 :对用户塔和物品塔输出的Embedding进行L2归一化:
u ← u ∥ u ∥ 2 , v ← v ∥ v ∥ 2 u \leftarrow \frac{u}{\| u \|_2} \quad, \quad v \leftarrow \frac{v}{\| v \|_2}
u ← ∥ u ∥ 2 u , v ← ∥ v ∥ 2 v
归一化的核心作用是解决向量点积的非度量性问题。原始的向量点积不满足三角不等式,可能导致“距离”计算的不一致性。例如,对于三个点 A = ( 10 , 0 ) , B = ( 0 , 10 ) , C = ( 11 , 0 ) A=(10,0),B=(0,10),C = (11,0) A = ( 10 , 0 ) , B = ( 0 , 10 ) , C = ( 11 , 0 ) ,使用点积计算会得到 d i s t ( A , B ) < d i s t ( A , C ) dist(A,B) < dist(A,C) d i s t ( A , B ) < d i s t ( A , C ) ,但这与直观的几何距离不符。
通过归一化,向量点积被转化为欧式距离的度量形式。对于归一化向量u和v,它们的欧式距离为:
∥ u − v ∥ = 2 − 2 ⟨ u , v ⟩ \| u-v \| = \sqrt{2-2\langle u,v \rangle}
∥ u − v ∥ = 2 − 2 ⟨ u , v ⟩
这种转换的关键意义在于训练与检索的一致性 :模型训练时使用的相似度计算(归一化后的点积)与线上ANN检索系统使用的距离度量(欧式距离)本质上是等价的。这确保了离线训练学到的向量关系能够在线上检索中得到正确体现,避免了训练-服务不一致的问题。
温度系数调节 :在归一化后的向量计算内积后,除以温度系数:
s ( u , v ) = ⟨ u , v ⟩ τ s(u,v) = \frac{\langle u,v \rangle}{\tau}
s ( u , v ) = τ ⟨ u , v ⟩
这里的温度系数 τ \tau τ 看起来是个简单的除法操作,但实际上它对模型的训练效果有着深远的影响。从数学角度来看,温度系数本质上是在缩放logits,进而改变Softmax函数的输出分布形状。当我们设置 τ < 1 \tau <1 τ < 1 时,相似度的差异会被放大,这意味着模型会对高分样本给出更高的概率,预测变得更加“确定”;相反,当 τ > 1 \tau > 1 τ > 1 时,分布会变得更加平滑,模型的预测也更加保守。
代码
DSSM的实现核心在于构建独立的用户塔和物品塔,每个塔都是一个深度神经网络。
关键的向量归一化和相似度计算。
这种设计使得用户和物品的表示完全独立,支持离线预计算物品向量并存储在ANN索引中,实现毫秒级的召回响应。
dssm.py文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 import tensorflow as tffrom .utils import ( concat_group_embedding, build_input_layer, build_group_feature_embedding_table_dict, ) from .layers import DNNs, PredictLayerdef build_dssm_model (feature_columns, model_config ): """ 构建双塔模型 参数: feature_columns: 特征列配置 model_config: 模型配置字典,包含: - dnn_units: 物品和用户塔的层单元数 (default: [128, 64, 32]) - dropout_rate: 丢弃概率 (default: 0.2) """ dnn_units = model_config.get("dnn_units" , [128 , 64 , 32 ]) dropout_rate = model_config.get("dropout_rate" , 0.2 ) input_layer_dict = build_input_layer(feature_columns) group_embedding_feature_dict = build_group_feature_embedding_table_dict( feature_columns, input_layer_dict, prefix="embedding/" ) user_feature = concat_group_embedding( group_embedding_feature_dict, "user" , axis=1 , flatten=True ) item_feature = concat_group_embedding( group_embedding_feature_dict, "item" , axis=1 , flatten=True ) user_tower = DNNs( units=dnn_units, activation="tanh" , dropout_rate=dropout_rate, use_bn=True )(user_feature) item_tower = DNNs( units=dnn_units, activation="tanh" , dropout_rate=dropout_rate, use_bn=True )(item_feature) user_embedding = tf.keras.layers.Lambda(lambda x: tf.nn.l2_normalize(x, axis=1 ))( user_tower ) item_embedding = tf.keras.layers.Lambda(lambda x: tf.nn.l2_normalize(x, axis=1 ))( item_tower ) cosine_similarity = tf.keras.layers.Dot(axes=1 )([user_embedding, item_embedding]) user_input_layer_dict = { fc.name: input_layer_dict[fc.name] for fc in feature_columns if "user" in fc.group } user_model = tf.keras.Model(inputs=user_input_layer_dict, outputs=user_embedding) item_input_layer_dict = { fc.name: input_layer_dict[fc.name] for fc in feature_columns if "item" in fc.group } item_model = tf.keras.Model(inputs=item_input_layer_dict, outputs=item_embedding) output = PredictLayer(name="dssm_output" )(cosine_similarity) model = tf.keras.Model(inputs=input_layer_dict, outputs=output) return model, user_model, item_model
2.3 从匹配到预测用户下一行为YouTubeDNN
原理
YouTube深度神经网络推荐系统 (Covington et al., 2016) 代表了双塔模型演进的一个重要里程碑。YouTubeDNN在架构上延续了双塔设计,但引入了一个关键的思想转变:将召回任务重新定义为“预测用户下一个会观看的视频 ”。
YouTubeDNN采用了“非对称”的双塔架构:
用户塔集成了观看历史、搜索历史、人口统计学特征等多模态信息,用户观看的视频ID通过嵌入层映射后进行平均池化聚合,模型还引入了“Example Age”特征来建模内容新鲜度的影响;
物品塔则相对简化,本质上是一个巨大的嵌入矩阵,每个视频对应一个可学习的向量,避免了复杂的物品特征工程。
这种“预测下一个观看视频”的任务设定,本质上类似于NLP中的next token预测,可以自然地建模为一个极端多分类问题:
P ( w t = i ∣ U , C ) = e v i u ∑ j ∈ V e v j u 这里 w t 表示用户在时间 t 观看的视频, U 是用户特征, C 是上下文信息, V 是整个视频库。 P(w_t = i | U,C) = \frac{ e^{v_i u} }{ \sum_{j \in V} e^{v_j u} } \\
这里 w_t 表示用户在时间 t 观看的视频,U是用户特征,C是上下文信息,V是整个视频库。
P ( w t = i ∣ U , C ) = ∑ j ∈ V e v j u e v i u 这里 w t 表示用户在时间 t 观看的视频, U 是用户特征, C 是上下文信息, V 是整个视频库。
由于视频库规模庞大,直接计算全量 Softmax 不可行,因此采用 Sampled Softmax 进行高效训练。
关键的工程技巧
YouTubeDNN的成功不仅来自于模型设计,更来自于一系列精心设计的工程技巧:
非对称的时序分割 :传统协同过滤通常随机保留验证项目,但这种做法存在未来信息泄露问题。视频消费具有明显的不对称模式——剧集通常按顺序观看,用户往往从热门内容开始逐步深入小众领域。因此,YouTubeDNN采用时序分割策略:对于作为预测目标的用户观看记录,只使用该目标之前的历史行为作为输入特征。这种“回滚”机制更符合真实的推荐场景。
负采样策略 :为了高效处理数百万类别的Softmax,模型采用重要性采样技术,每次只对数千个负样本进行计算,将训练速度提升了100多倍。
用户样本均衡 :为每个用户生成固定数量的训练样本,避免高活跃用户主导模型学习。这个看似简单的技巧,对提升长尾用户的推荐效果至关重要。
YouTubeDNN的成功在于建立了一套可扩展、可工程化的推荐系统范式——训练时使用复杂的多分类目标和丰富的用户特征,服务时通过预计算物品向量和实时计算用户向量,配合高效的ANN检索完成召回。这种设计实现了训练复杂度和服务效率的有效平衡,至今仍被广泛借鉴。
代码
YouTubeDNN的用户塔设计体现了“非对称”的思想,它整合了多种用户特征和历史行为序列。
物品塔则采用简化设计,直接使用物品Embedding表。
训练时采用Sampled Softmax优化,将百万级的多分类问题转化为高效的采样学习。
这种设计的核心优势在于:用户塔可以根据业务需求灵活扩展特征和模型复杂度,而物品塔保持简洁高效,易于离线预计算和实时检索。
youtubednn.py文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 import tensorflow as tffrom .utils import ( concat_group_embedding, build_input_layer, build_group_feature_embedding_table_dict, ) from .layers import DNNs, SampledSoftmaxLayer, L2NormalizeLayer, SqueezeLayerdef build_youtubednn_model (feature_columns, model_config ): emb_dim = model_config.get("emb_dim" , 16 ) neg_sample = model_config.get("neg_sample" , 20 ) dnn_units = model_config.get("dnn_units" , [32 ]) label_name = model_config.get("label_name" , "movie_id" ) input_layer_dict = build_input_layer(feature_columns) group_embedding_feature_dict, embedding_table_dict = ( build_group_feature_embedding_table_dict( feature_columns, input_layer_dict, prefix="embedding/" , return_embedding_table=True , ) ) user_feature_embedding = concat_group_embedding( group_embedding_feature_dict, "user_dnn" ) if "raw_hist_seq" in group_embedding_feature_dict: hist_seq_embedding = concat_group_embedding( group_embedding_feature_dict, "raw_hist_seq" ) user_dnn_inputs = tf.concat( [user_feature_embedding, hist_seq_embedding], axis=1 ) else : user_dnn_inputs = user_feature_embedding item_embedding_table = embedding_table_dict[label_name] item_vocab_size = None for fc in feature_columns: if fc.name == label_name: item_vocab_size = fc.vocab_size break user_dnn_output = DNNs( units=dnn_units + [emb_dim], activation="relu" , use_bn=False )(user_dnn_inputs) user_dnn_output = L2NormalizeLayer(axis=-1 )(user_dnn_output) sampled_softmax_layer = SampledSoftmaxLayer(item_vocab_size, neg_sample, emb_dim) output = sampled_softmax_layer( [item_embedding_table.embeddings, user_dnn_output, input_layer_dict[label_name]] ) model = tf.keras.Model(inputs=input_layer_dict, outputs=output) output_item_embedding = SqueezeLayer(axis=1 )( embedding_table_dict[label_name](input_layer_dict[label_name]) ) output_item_embedding = L2NormalizeLayer(axis=-1 )(output_item_embedding) user_feature_names = [ fc.name for fc in feature_columns if "user_dnn" in fc.group or "raw_hist_seq" in fc.group ] user_inputs_dict = {name: input_layer_dict[name] for name in user_feature_names} user_model = tf.keras.Model(inputs=user_inputs_dict, outputs=user_dnn_output) item_inputs_dict = {label_name: input_layer_dict[label_name]} item_model = tf.keras.Model(inputs=item_inputs_dict, outputs=output_item_embedding) return model, user_model, item_model
总结
简单性能对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 +-------------+---------------+--------------+-----------+----------+----------------+---------------+ | model | hit_rate@10 | hit_rate@5 | ndcg@10 | ndcg@5 | precision@10 | precision@5 | +=============+===============+==============+===========+==========+================+===============+ | item2vec | 0.0098 | 0.0033 | 0.0047 | 0.0027 | 0.001 | 0.0007 | +-------------+---------------+--------------+-----------+----------+----------------+---------------+ | eges | 0.0179 | 0.0081 | 0.0074 | 0.0042 | 0.0018 | 0.0016 | +=============+===============+==============+===========+==========+================+===============+ | fm_recall | 0.001 | 0 | 0.0003 | 0 | 0.0001 | 0 | +-------------+---------------+--------------+-----------+----------+----------------+---------------+ | dssm | 0.0401 | 0.007 | 0.0137 | 0.0037 | 0.004 | 0.0014 | +-------------+---------------+--------------+-----------+----------+----------------+---------------+ | youtubednn | 0.023 | 0.0141 | 0.0118 | 0.0089 | 0.0023 | 0.0028 | +-------------+---------------+--------------+-----------+----------+----------------+---------------+
结束
向量召回技术的核心贡献在于将推荐系统从“匹配”转向“搜索”,通过向量化表示实现了从 O ( U × I ) O(U \times I) O ( U × I ) 到 O ( U + I ) O(U+I) O ( U + I ) 的复杂度优化。这一转变不仅解决了大规模推荐系统的计算瓶颈,更重要的是为推荐算法提供了统一的数学框架——将用户和物品都映射到同一个向量空间中,让“距离”代表“相似度”。
I2I召回的演进脉络 :从NLP领域的Word2Vec出发,I2I召回经历了三个关键发展阶段。Item2Vec实现了从“词语-句子”到“物品-用户序列”的直接迁移,验证了序列建模在推荐系统中的可行性;EGES通过构建商品关系图和融合辅助信息,解决了稀疏性和冷启动问题,展现了图结构和属性信息的价值;Airbnb则将业务目标深度融入序列构建,通过会话切分、全局上下文和市场感知负采样,实现了从相似性到转化率的跨越。这一演进过程本质上是对“序列”概念的不断深化——从简单的共现关系到复杂的业务逻辑。
U2I召回的技术路径 :U2I召回以双塔模型为核心架构,体现了“分而治之”的设计哲学。FM通过数学变换首次实现了用户-物品交互的向量分解,奠定了双塔模型的理论基础;DSSM用深度神经网络替代线性变换,将双塔模型的表达能力推向新高度,同时建立了多分类训练范式和工程化部署模式;YouTubeDNN则通过“预测下一个观看视频”的任务重定义,引入了时序分割、负采样优化等关键工程技巧,展现了双塔模型在超大规模场景下的实用价值。
核心技术洞察 :向量召回的成功建立在几个关键洞察之上。
首先是向量空间的语义性——通过合适的训练目标,向量间的距离能够反映真实的用户偏好和物品相似性;
其次是架构的可分解性——双塔设计实现了训练复杂度和服务效率的平衡,物品向量可离线预计算,用户向量可实时生成;
最后是序列的信息密度——用户行为序列蕴含着丰富的偏好信号,通过不同的序列构建和建模方式可以挖掘出不同层次的用户意图。
技术边界与挑战 :向量召回在解决效率问题的同时,也带来了固有的局限性。
双塔之间缺乏深度交互可能损失复杂的用户-物品关系,向量表示的有限维度难以完全捕捉丰富的语义信息,这些挑战推动了后续序列建模技术的发展。
向量召回的使命是在庞大的物品库中高效筛选出与用户兴趣匹配的候选集,为后续的精排模型提供高质量输入,它更像是推荐系统的“粗筛”阶段,追求的是召回率而非精确率 。