多目标建模 ESSM、MMOE

  • 场景:精排多任务学习
  • 模型:ESSM、MMOE
  • 数据:Ali-CCP数据集

ESSM 算法原理

MMOE 算法原理

代码实现

直接调包

ESSM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#使用pandas加载数据
import pandas as pd
data_path = '../examples/ranking/data/ali-ccp' #数据存放文件夹
df_train = pd.read_csv(data_path + '/ali_ccp_train_sample.csv') #加载训练集
df_val = pd.read_csv(data_path + '/ali_ccp_val_sample.csv') #加载验证集
df_test = pd.read_csv(data_path + '/ali_ccp_test_sample.csv') #加载测试集
print("train : val : test = %d %d %d" % (len(df_train), len(df_val), len(df_test)))
#查看数据,其中'click'、'purchase'为标签列,'D'开头为dense特征列,其余为sparse特征,各特征列的含义参考官网描述
print(df_train.head(5))

train_idx, val_idx = df_train.shape[0], df_train.shape[0] + df_val.shape[0]
data = pd.concat([df_train, df_val, df_test], axis=0)
#task 1 (as cvr): main task, purchase prediction
#task 2(as ctr): auxiliary task, click prediction
data.rename(columns={'purchase': 'cvr_label', 'click': 'ctr_label'}, inplace=True)
data["ctcvr_label"] = data['cvr_label'] * data['ctr_label']

from torch_rechub.models.multi_task import ESMM
from torch_rechub.basic.features import DenseFeature, SparseFeature

col_names = data.columns.values.tolist()
dense_cols = ['D109_14', 'D110_14', 'D127_14', 'D150_14', 'D508', 'D509', 'D702', 'D853']
sparse_cols = [col for col in col_names if col not in dense_cols and col not in ['cvr_label', 'ctr_label', 'ctcvr_label']]
print("sparse cols:%d dense cols:%d" % (len(sparse_cols), len(dense_cols)))
label_cols = ['cvr_label', 'ctr_label', "ctcvr_label"] #the order of 3 labels must fixed as this
used_cols = sparse_cols #ESMM only for sparse features in origin paper
item_cols = ['129', '205', '206', '207', '210', '216'] #assumption features split for user and item
user_cols = [col for col in used_cols if col not in item_cols]
user_features = [SparseFeature(col, data[col].max() + 1, embed_dim=16) for col in user_cols]
item_features = [SparseFeature(col, data[col].max() + 1, embed_dim=16) for col in item_cols]

model = ESMM(user_features, item_features, cvr_params={"dims": [16, 8]}, ctr_params={"dims": [16, 8]})

from torch_rechub.utils.data import DataGenerator

x_train, y_train = {name: data[name].values[:train_idx] for name in used_cols}, data[label_cols].values[:train_idx]
x_val, y_val = {name: data[name].values[train_idx:val_idx] for name in used_cols}, data[label_cols].values[train_idx:val_idx]
x_test, y_test = {name: data[name].values[val_idx:] for name in used_cols}, data[label_cols].values[val_idx:]
dg = DataGenerator(x_train, y_train)
train_dataloader, val_dataloader, test_dataloader = dg.generate_dataloader(x_val=x_val, y_val=y_val,
x_test=x_test, y_test=y_test, batch_size=1024)

import torch
import os
from torch_rechub.trainers import MTLTrainer
device = 'cuda' if torch.cuda.is_available() else 'cpu'
learning_rate = 1e-3
epoch = 1 #10
weight_decay = 1e-5
save_dir = '../examples/ranking/data/ali-ccp/saved'
if not os.path.exists(save_dir):
os.makedirs(save_dir)
task_types = ["classification", "classification"] #CTR与CVR均为二分类任务
mtl_trainer = MTLTrainer(model, task_types=task_types,
optimizer_params={"lr": learning_rate, "weight_decay": weight_decay},
n_epoch=epoch, earlystop_patience=1, device=device, model_path=save_dir)
mtl_trainer.fit(train_dataloader, val_dataloader)
auc = mtl_trainer.evaluate(mtl_trainer.model, test_dataloader)
print(f'test auc: {auc}')

MMOE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from torch_rechub.models.multi_task import MMOE
# 定义模型
used_cols = sparse_cols + dense_cols
features = [SparseFeature(col, data[col].max()+1, embed_dim=4)for col in sparse_cols] \
+ [DenseFeature(col) for col in dense_cols]
model = MMOE(features, task_types, 8, expert_params={"dims": [16]}, tower_params_list=[{"dims": [8]}, {"dims": [8]}])
#构建dataloader
label_cols = ['cvr_label', 'ctr_label']
x_train, y_train = {name: data[name].values[:train_idx] for name in used_cols}, data[label_cols].values[:train_idx]
x_val, y_val = {name: data[name].values[train_idx:val_idx] for name in used_cols}, data[label_cols].values[train_idx:val_idx]
x_test, y_test = {name: data[name].values[val_idx:] for name in used_cols}, data[label_cols].values[val_idx:]
dg = DataGenerator(x_train, y_train)
train_dataloader, val_dataloader, test_dataloader = dg.generate_dataloader(x_val=x_val, y_val=y_val,
x_test=x_test, y_test=y_test, batch_size=1024)
#训练模型及评估
mtl_trainer = MTLTrainer(model, task_types=task_types, optimizer_params={"lr": learning_rate, "weight_decay": weight_decay}, n_epoch=epoch, earlystop_patience=30, device=device, model_path=save_dir)
mtl_trainer.fit(train_dataloader, val_dataloader)
auc = mtl_trainer.evaluate(mtl_trainer.model, test_dataloader)

自定义模型

ESSM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import torch
import torch.nn as nn
from ...basic.layers import MLP, EmbeddingLayer

class ESMM(nn.Module):
def __init__(self, user_features, item_features, cvr_params, ctr_params):
super().__init__()
self.user_features = user_features
self.item_features = item_features
self.embedding = EmbeddingLayer(user_features + item_features)
self.tower_dims = len(user_features) * user_features[0].embed_dim + len(item_features) * item_features[0].embed_dim
self.tower_cvr = MLP(self.tower_dims, **cvr_params)
self.tower_ctr = MLP(self.tower_dims, **ctr_params)

def forward(self, x):
# # Field-wise Pooling Layer for user and item
# embed_user_features = self.embedding(x, self.user_features, squeeze_dim=False).sum(dim=1) #[batch_size, embed_dim]
# embed_item_features = self.embedding(x, self.item_features,
# squeeze_dim=False).sum(dim=1) #[batch_size, embed_dim]

# Here we concat all the features instead of field-wise pooling them
# [batch_size, num_features, embed_dim] --> [batch_size, num_features * embed_dim]
_batch_size = self.embedding(x, self.user_features, squeeze_dim=False).shape[0]
embed_user_features = self.embedding(x, self.user_features, squeeze_dim=False).reshape(_batch_size, -1)
embed_item_features = self.embedding(x, self.item_features, squeeze_dim=False).reshape(_batch_size, -1)

# print('embed_user_features', embed_user_features.shape)

input_tower = torch.cat((embed_user_features, embed_item_features), dim=1)
cvr_logit = self.tower_cvr(input_tower)
ctr_logit = self.tower_ctr(input_tower)
cvr_pred = torch.sigmoid(cvr_logit)
ctr_pred = torch.sigmoid(ctr_logit)
ctcvr_pred = torch.mul(ctr_pred, cvr_pred)

ys = [cvr_pred, ctr_pred, ctcvr_pred]
return torch.cat(ys, dim=1)


MMOE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import torch
import torch.nn as nn
from torch_rechub.basic.layers import MLP, EmbeddingLayer, PredictionLayer


class MMOE(nn.Module):
def __init__(self, features, task_types, n_expert, expert_params, tower_params_list):
super().__init__()
self.features = features
self.task_types = task_types
self.n_task = len(task_types)
self.n_expert = n_expert
self.embedding = EmbeddingLayer(features)
self.input_dims = sum([fea.embed_dim for fea in features])
self.experts = nn.ModuleList(MLP(self.input_dims, output_layer=False, **expert_params) for i in range(self.n_expert))
self.gates = nn.ModuleList(MLP(self.input_dims, output_layer=False, **{"dims": [self.n_expert], "activation": "softmax"}) for i in range(self.n_task)) # n_gate = n_task
self.towers = nn.ModuleList(MLP(expert_params["dims"][-1], **tower_params_list[i]) for i in range(self.n_task))
self.predict_layers = nn.ModuleList(PredictionLayer(task_type) for task_type in task_types)

def forward(self, x):
# [batch_size, input_dims]
embed_x = self.embedding(x, self.features, squeeze_dim=True)
# expert_out[i]: [batch_size, 1, expert_dims[-1]]
expert_outs = [expert(embed_x).unsqueeze(1) for expert in self.experts]
# [batch_size, n_expert, expert_dims[-1]]
expert_outs = torch.cat(expert_outs, dim=1)
# gate_out[i]: [batch_size, n_expert, 1]
gate_outs = [gate(embed_x).unsqueeze(-1) for gate in self.gates]

ys = []
for gate_out, tower, predict_layer in zip(gate_outs, self.towers, self.predict_layers):
# [batch_size, n_expert, expert_dims[-1]]
expert_weight = torch.mul(gate_out, expert_outs)
# [batch_size, expert_dims[-1]]
expert_pooling = torch.sum(expert_weight, dim=1)
tower_out = tower(expert_pooling) # [batch_size, 1]
y = predict_layer(tower_out) # logit -> proba
ys.append(y)
return torch.cat(ys, dim=1)