参考

赛题项目:阿里天池新闻推荐

在掌握推荐系统的核心方法后,本章将通过一个完整的项目实践,展示如何将理论知识应用于实践。我们将从需求理解与数据分析入手,建立评测指标与基线;然后逐步构建多路召回与冷启动策略,开展特征工程,并训练排序模型;最后对结果进行验证与融合。本章旨在将前述章节的算法与技术串联起来,覆盖从数据处理、模型构建到离线评测的完整流程,帮助读者建立系统化的实战能力。

一、赛题理解

赛题理解是切入一道赛题的基础,会影响后续特征工程和模型构建等各种工作,也影响着后续发展工作的方向,正确了解赛题背后的思想以及赛题业务逻辑的清晰,有利于花费更少时间构建更为有效的特征模型, 在各种比赛中, 赛题理解都是极其重要且必须走好的第一步, 今天我们就从赛题的理解出发, 首先了解一下这次赛题的概况和数据,从中分析赛题以及大致的处理方式, 其次我们了解模型评测的指标,最后对赛题的理解整理一些经验。

此次比赛是新闻推荐场景下的用户行为预测挑战赛, 该赛题是以新闻APP中的新闻推荐为背景, 目的是要求我们根据用户历史浏览点击新闻文章的数据信息预测用户未来的点击行为, 即用户的最后一次点击的新闻文章, 这道赛题的设计初衷是引导大家了解推荐系统中的一些业务背景, 解决实际问题。

1、数据概况

该数据来自某新闻APP平台的用户交互数据,包括30万用户,近300万次点击,共36万多篇不同的新闻文章,同时每篇新闻文章有对应的embedding向量表示。为了保证比赛的公平性,从中抽取20万用户的点击日志数据作为训练集,5万用户的点击日志数据作为测试集A,5万用户的点击日志数据作为测试集B。具体数据表和参数, 大家可以参考赛题说明。下面说一下拿到这样的数据如何进行理解, 来有效的开展下一步的工作。

2、评价方式理解

理解评价方式, 我们需要结合着最后的提交文件来看, 根据sample.submit.csv, 我们最后提交的格式是针对每个用户, 我们都会给出五篇文章的推荐结果,按照点击概率从前往后排序。 而真实的每个用户最后一次点击的文章只会有一篇的真实答案, 所以我们就看我们推荐的这五篇里面是否有命中真实答案的。比如对于user1来说, 我们的提交会是:user1, article1, article2, article3, article4, article5$。

评价指标的公式如下:

score(user)=k=15s(user,k)kscore(\text{user}) = \sum_{k=1}^5 \frac{s(\text{user}, k)}{k}

假如article1就是真实的用户点击文章,也就是article1命中, 则s(user1,1)=1,s(user1,24)s(\text{user1},1)=1, s(\text{user1},2-4)都是0, 如果article2是用户点击的文章, 则s(user,2)=1/2,s(user,1,3,4,5)s(\text{user},2)=1/2,s(\text{user},1,3,4,5)都是0。也就是score(user)score(\text{user})命中第几条的倒数。如果都没中, 则score(user1)=0score(\text{user1})=0。 这个是合理的, 因为我们希望的就是命中的结果尽量靠前, 而此时分数正好比较高。

3、问题分析

根据赛题简介,我们首先要明确我们此次比赛的目标: 根据用户历史浏览点击新闻的数据信息预测用户最后一次点击的新闻文章。从这个目标上看, 会发现此次比赛和我们之前遇到的普通的结构化比赛不太一样, 主要有两点:

  • 首先是目标上, 要预测最后一次点击的新闻文章,也就是我们给用户推荐的是新闻文章, 并不是像之前那种预测一个数或者预测数据哪一类那样的问题
  • 数据上, 通过给出的数据我们会发现, 这种数据也不是我们之前遇到的那种特征+标签的数据,而是基于了真实的业务场景, 拿到的用户的点击日志

所以拿到这个题目,我们的思考方向就是结合我们的目标,把该预测问题转成一个监督学习的问题(特征+标签),然后我们才能进行ML,DL等建模预测。那么我们自然而然的就应该在心里会有这么几个问题:如何转成一个监督学习问题呢? 转成一个什么样的监督学习问题呢? 我们能利用的特征又有哪些呢? 又有哪些模型可以尝试呢? 此次面对数万级别的文章推荐,我们又有哪些策略呢?

当然这些问题不会在我们刚看到赛题之后就一下出来答案, 但是只要有了问题之后, 我们就能想办法解决问题了, 比如上面的第二个问题,转成一个什么样的监督学习问题? 由于我们是预测用户最后一次点击的新闻文章,从36万篇文章中预测某一篇的话我们首先可能会想到这可能是一个多分类的问题(36万类里面选1), 但是如此庞大的分类问题, 我们做起来可能比较困难, 那么能不能转化一下? 既然是要预测最后一次点击的文章, 那么如果我们能预测出某个用户最后一次对于某一篇文章会进行点击的概率, 是不是就间接性的解决了这个问题呢?概率最大的那篇文章不就是用户最后一次可能点击的新闻文章吗? 这样就把原问题变成了一个点击率预测的问题(用户, 文章) --> 点击的概率(软分类), 而这个问题, 就是我们所熟悉的监督学习领域分类问题了, 这样我们后面建模的时候, 对于模型的选择就基本上有大致方向了,比如最简单的逻辑回归模型。

这样, 我们对于该赛题的解决方案应该有了一个大致的解决思路,要先转成一个分类问题来做, 而分类的标签就是用户是否会点击某篇文章,分类问题的特征中会有用户和文章,我们要训练一个分类模型, 对某用户最后一次点击某篇文章的概率进行预测。 那么又会有几个问题:如何转成监督学习问题? 训练集和测试集怎么制作? 我们又能利用哪些特征? 我们又可以尝试哪些模型? 面对36万篇文章, 20多万用户的推荐, 我们又有哪些策略来缩减问题的规模?如何进行最后的预测?

二、Baseline

本baseline将重点实现ItemCF(基于物品的协同过滤)算法作为召回策略,这是工业界广泛使用的经典方法,具有可解释性强、效果稳定的特点。

代码

requirements.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
tensorflow==2.13.0
pandas==2.0.3
scikit-learn>=1.3.2
python-dotenv==1.0.1
pyyaml==6.0.2
gensim==4.3.3
tabulate==0.9.0
tqdm==4.67.1
networkx==3.1
# 新闻推荐需要
seaborn==0.13.2
faiss-cpu==1.7.4
lightgbm==4.6.0

Baseline.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import warnings
import os
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from collections import defaultdict
from utils import load_env_with_fallback, reduce_mem, get_all_click_df, get_user_item_time,\
get_item_topk_click, submit

from model_itemcf import itemcf_sim, item_based_recommend


warnings.filterwarnings('ignore')
load_env_with_fallback()
RAW_DATA_PATH = Path(os.getenv('FUNREC_RAW_DATA_PATH'))
PROCESSED_DATA_PATH = Path(os.getenv('FUNREC_PROCESSED_DATA_PATH'))

data_path = RAW_DATA_PATH / "news_recommendation"
save_path = PROCESSED_DATA_PATH / "projects/news_recommendation"
if not os.path.exists(save_path):
os.makedirs(save_path)

all_click_df = get_all_click_df(data_path, offline=False)
all_click_df = reduce_mem(all_click_df)
print(all_click_df.head())
# print(all_click_df[all_click_df['click_article_id'] == 87100])
# print(all_click_df[all_click_df['user_id'] == 249971])

i2i_sim = itemcf_sim(all_click_df, save_path)

# itemcf 召回
user_recall_item_dict = defaultdict(dict)
# 准备参数
user_item_time_dict = get_user_item_time(all_click_df)
sim_item_topk = 10
recall_item_num = 10
item_topk_click = get_item_topk_click(all_click_df, k=50)

# 只需要 testA 的用户情况
tst_click = pd.read_csv(data_path / 'testA_click_log.csv')
# tst_click = pd.read_csv(data_path / 'testA_click_log.csv')[:10000]
tst_users = tst_click['user_id'].unique()
for user in tqdm(tst_users, desc='recall'):
user_recall_item_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim,
sim_item_topk=sim_item_topk,
recall_item_num=recall_item_num,
item_topk_click=item_topk_click)

# 转df
recall_list = []
for user, items in tqdm(user_recall_item_dict.items(), desc='to_df'):
for i, s in items:
recall_list.append([user, i, s])
recall_df = pd.DataFrame(recall_list, columns=['user_id', 'click_article_id', 'pred_score'])
print(recall_df.head())


# 只需要 testA 的用户情况
tst_click = pd.read_csv(data_path / 'testA_click_log.csv')
# tst_click = pd.read_csv(data_path / 'testA_click_log.csv')[:10000]
tst_users = tst_click['user_id'].unique()
tst_recall = recall_df[recall_df['user_id'].isin(tst_users)]
print(tst_recall.head())

# 生成提交文件
submit(tst_recall, save_path, topk=5, model_name='itemcf')

utils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
import time
import numpy as np
import pandas as pd
from datetime import datetime
from pathlib import Path
from dotenv import find_dotenv, load_dotenv

# 提交函数
def submit(recall_df, save_path, topk=5, model_name=None):
# 生成排名
recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
recall_df['rank'] = recall_df.groupby('user_id')['pred_score'].rank(ascending=False, method='first')
# 保证都有5篇以上
tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max())
assert tmp.min() > topk
# 把排名改为按要求的1-5横向摆放
del recall_df['pred_score']
submit = recall_df[recall_df['rank']<=topk].set_index(['user_id', 'rank']).unstack(-1).reset_index()
submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]
submit = submit.rename(columns = {'':'user_id', 1:'article_1', 2:'article_2', 3:'article_3',
4:'article_4', 5:'article_5'})
save_name = save_path / f"{model_name}_{datetime.now().strftime('%m-%d %H:%M')}.csv"
submit.to_csv(save_name, index=False, header=True)




# 加载环境函数
def load_env_with_fallback() -> None:
envpath = find_dotenv(usecwd=True)
if envpath:
load_dotenv(envpath)
print("已加载 .env 文件:", envpath)
else:
print("未找到 .env 文件")
if not os.getenv('FUNREC_RAW_DATA_PATH'):
os.environ['FUNREC_RAW_DATA_PATH'] = str(Path.cwd()) / "dataset"
if not os.getenv('FUNREC_PROCESSED_DATA_PATH'):
os.environ['FUNREC_PROCESSED_DATA_PATH'] = str(Path.cwd()) / "dataset_processed"

# df 节省内存
def reduce_mem(df):
start_time = time.time()
start_mem = df.memory_usage().sum() / 1024**2
numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
for col in df.columns:
col_type = df[col].dtypes
if col_type in numerics:
dmin = df[col].min()
dmax = df[col].max()
if pd.isnull(dmin) or pd.isnull(dmax):
continue
else:
if str(col_type)[:3] == 'int':
if dmin > np.iinfo(np.int8).min and dmax < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif dmin > np.iinfo(np.int16).min and dmax < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif dmin > np.iinfo(np.int32).min and dmax < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
else:
df[col] = df[col].astype(np.int64)
else:
if dmin > np.iinfo(np.float16).min and dmax < np.iinfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif dmin > np.iinfo(np.float32).min and dmax < np.iinfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
end_mem = df.memory_usage().sum() / 1024**2
end_time = time.time()
print("Memory usage decreased to {:5.2f}Mb({:.2f}% redcution), time spend {:.2f} min".format(
end_mem, 100*(start_mem-end_mem)/start_mem, (start_time-end_time)/60))
return df

# 采样部分数据进行 调试、线下开发、线上提交
def get_all_click_sample(data_path, sample_num=20000):
all_click = pd.read_csv(data_path / "train_click_log.csv")
all_user_ids = all_click['user_id'].unique()

sample_user_ids = np.random.choice(all_user_ids ,size=sample_num, replace=False)
all_click = all_click[all_click['user_id'].isin(sample_user_ids)]

all_click = all_click.drop_duplicates(['user_id', 'click_article_id', 'click_timestamp']).reset_index(drop=True)
return all_click

def get_all_click_df(data_path, offline=True):
if offline:
all_click = pd.read_csv(data_path / "train_click_log.csv")[:20000]
else:
tst_click = pd.read_csv(data_path / 'testA_click_log.csv')
trn_click = pd.read_csv(data_path / "train_click_log.csv")
# trn_click = pd.read_csv(data_path / "train_click_log.csv")[:10000]
# tst_click = pd.read_csv(data_path / 'testA_click_log.csv')[:10000]
all_click = pd.concat([trn_click, tst_click])

all_click = all_click.drop_duplicates(['user_id', 'click_article_id', 'click_timestamp']).reset_index(drop=True)
return all_click

# 用户 - 文章,点击
# {user1:[(item,time),(item,time)], ..., user2:[...]}
def get_user_item_time(click_df):
click_df = click_df.sort_values('click_timestamp')
def make_item_time_list(df):
return list(zip(df['click_article_id'], df['click_timestamp']))
click_df = click_df.groupby('user_id')[['click_article_id', 'click_timestamp']] \
.apply(make_item_time_list) \
.reset_index() \
.rename(columns={0:'item_time_list'})
user_item_time_dict = dict(zip(click_df['user_id'], click_df['item_time_list']))
return user_item_time_dict

# 最热门的K个新闻(返回新闻id)
def get_item_topk_click(click_df, k):
return click_df['click_article_id'].value_counts().index[:k]

model_itemcf.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import pickle
import math
from tqdm import tqdm
from collections import defaultdict
from utils import get_user_item_time


# itemcf 相似度矩阵计算
# Sim = 共现权重W(i,j) / sqrt(Ni用户数, Nj用户数)
# 其中共现权重不是直接次数,而是这个用户是否活跃 1/log(活跃数+1)是权值
def itemcf_sim(df, save_path):
user_item_time_dict = get_user_item_time(df)

# 计算共现权重W、每个物品的用户数Ni
Wij = defaultdict(lambda: defaultdict(float))
Ni = defaultdict(int)
for user, item_list in tqdm(user_item_time_dict.items()):
w = 1 / math.log(len(item_list)+1)
for i, i_time in item_list:
Ni[i] += 1
for j, j_time in item_list:
if i == j:
continue
Wij[i][j] += w

# 计算 Sim
Sim = defaultdict(lambda: defaultdict(float))
for i, wj in Wij.items():
for j, w in wj.items():
Sim[i][j] = w / math.sqrt(Ni[i] * Ni[j])

# 转为普通字典
i2i_Sim = dict(Sim)
# 存储相似度矩阵
pickle.dump(i2i_Sim, open(save_path / 'itemcf_i2i_sim.pkl', 'wb'))
print("itemcf相似度矩阵保存文件到本地")
return i2i_Sim

# itemcf 召回
def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click):
user_history = user_item_time_dict[user_id]
user_history = [item for item, _ in user_history]

item_rank = defaultdict(float)
# 每个历史物品都找到 k 个相似物品(此处k不是最终数量)
for i in user_history:
if i not in i2i_sim: # 没有相似的物品,不在相似度矩阵中
continue
items = sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]
for j, w in items:
if j in user_history:
continue
item_rank[j] += w

# 数量应该很多了,不足则用热门补全
if len(item_rank) < recall_item_num:
for idx, i in enumerate(item_topk_click):
if i in item_rank:
continue
item_rank[i] = -idx-100
if len(item_rank) == recall_item_num:
break

# 排序输出最终所需的K个(这里是最终的召回K)
res = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
return res

三、数据分析

数据分析的价值主要在于熟悉了解整个数据集的基本情况包括每个文件里有哪些数据,具体的文件中的每个字段表示什么实际含义,以及数据集中特征之间的相关性,在推荐场景下主要就是分析用户本身的基本属性,文章基本属性,以及用户和文章交互的一些分布,这些都有利于后面的召回策略的选择,以及特征工程。

建议:当特征工程和模型调参已经很难继续上分了,可以回来在重新从新的角度去分析这些数据,或许可以找到上分的灵感

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
%matplotlib inline
import warnings
import os
import matplotlib.pyplot as plt
from pathlib import Path
from utils import load_env_with_fallback

plt.rc('font', size=13)
warnings.filterwarnings("ignore")
load_env_with_fallback()
RAW_DATA_PATH = Path(os.getenv('FUNREC_RAW_DATA_PATH'))
PROCESSED_DATA_PATH = Path(os.getenv('FUNREC_PROCESSED_DATA_PATH'))

import pandas as pd

data_path = RAW_DATA_PATH / 'news_recommendation/'

# 训练集
trn_click = pd.read_csv(data_path / 'train_click_log.csv')
item_df = pd.read_csv(data_path / 'articles.csv')
item_df = item_df.rename(columns={'article_id':'click_article_id'})
item_emb_df = pd.read_csv(data_path / 'articles_emb.csv')

# 测试集
tst_click = pd.read_csv(data_path / 'testA_click_log.csv')

1、数据查看分析

用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 对每个用户的点击时间戳进行排序
trn_click['rank'] = trn_click.groupby(['user_id'])['click_timestamp'].rank(ascending=False).astype(int)
tst_click['rank'] = tst_click.groupby(['user_id'])['click_timestamp'].rank(ascending=False).astype(int)

#计算用户点击文章的次数,并添加新的一列count
trn_click['click_cnts'] = trn_click.groupby(['user_id'])['click_timestamp'].transform('count')
tst_click['click_cnts'] = tst_click.groupby(['user_id'])['click_timestamp'].transform('count')

trn_click = trn_click.merge(item_df, how='left', on=['click_article_id'])
trn_click.describe()
trn_click.head()

#用户点击日志信息
trn_click.info()

train_click_log.csv文件数据中每个字段的含义:

  1. user_id: 用户的唯一标识
  2. click_article_id: 用户点击的文章唯一标识
  3. click_timestamp: 用户点击文章时的时间戳
  4. click_environment: 用户点击文章的环境
  5. click_deviceGroup: 用户点击文章的设备组
  6. click_os: 用户点击文章时的操作系统
  7. click_country: 用户点击文章时的所在的国家
  8. click_region: 用户点击文章时所在的区域
  9. click_referrer_type: 用户点击文章时,文章的来源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1112623 entries, 0 to 1112622
Data columns (total 14 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 user_id 1112623 non-null int64
1 click_article_id 1112623 non-null int64
2 click_timestamp 1112623 non-null int64
3 click_environment 1112623 non-null int64
4 click_deviceGroup 1112623 non-null int64
5 click_os 1112623 non-null int64
6 click_country 1112623 non-null int64
7 click_region 1112623 non-null int64
8 click_referrer_type 1112623 non-null int64
9 rank 1112623 non-null int64
10 click_cnts 1112623 non-null int64
11 category_id 1112623 non-null int64
12 created_at_ts 1112623 non-null int64
13 words_count 1112623 non-null int64
dtypes: int64(14)
memory usage: 118.8 MB
1
2
3
#训练集中的用户数量为20w
trn_click.user_id.nunique() # 200000
trn_click.groupby('user_id')['click_article_id'].count().min() # 训练集里面每个用户至少点击了2篇文章
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import seaborn as sns

plt.figure(figsize=(15, 20))
i = 1
for col in ['click_article_id', 'click_timestamp', 'click_environment', 'click_deviceGroup', 'click_os', 'click_country',
'click_region', 'click_referrer_type', 'rank', 'click_cnts']:
plot_envs = plt.subplot(5, 2, i)
i += 1
v = trn_click[col].value_counts().reset_index()[:10]
# Use iloc to access columns by position to avoid column name issues
fig = sns.barplot(x=v.iloc[:, 0], y=v.iloc[:, 1])
for item in fig.get_xticklabels():
item.set_rotation(90)
plt.title(col)
plt.tight_layout()
plt.show()

直方图查看10个用户情况

测试集用户点击日志

类似,此处不赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
tst_click = tst_click.merge(item_df, how='left', on=['click_article_id'])
tst_click.head()
tst_click.describe()

#测试集中的用户数量为5w
tst_click.user_id.nunique()
tst_click.groupby('user_id')['click_article_id'].count().min() # 注意测试集里面有只点击过一次文章的用户
#新闻文章数据集浏览
item_df.head()
item_df['words_count'].value_counts()
print(item_df['category_id'].nunique()) # 461个文章主题
item_df['category_id'].hist(figsize=(5, 4), grid=False)
item_df.shape # 364047篇文章
item_emb_df.head()
item_emb_df.shape

用户分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
user_click_merge = pd.concat([trn_click, tst_click])
#用户重复点击
user_click_count = user_click_merge.groupby(['user_id', 'click_article_id'])['click_timestamp'].agg({'count'}).reset_index()
user_click_count[:5]
user_click_count[user_click_count['count']>7]
user_click_count['count'].unique()
#用户点击新闻次数
user_click_count['count'].value_counts()

# 用户点击环境变化
def plot_envs(df, cols, r, c, figsize=(8, 4)):
plt.figure(figsize=figsize)
i = 1
for col in cols:
plt.subplot(r, c, i)
i += 1
v = df[col].value_counts().reset_index()
fig = sns.barplot(x=v.iloc[:, 0], y=v.iloc[:, 1])
for item in fig.get_xticklabels():
item.set_rotation(90)
plt.title(col)
plt.tight_layout()
plt.show()

import numpy as np

# 分析用户点击环境变化是否明显,这里随机采样10个用户分析这些用户的点击环境分布
sample_user_ids = np.random.choice(tst_click['user_id'].unique(), size=5, replace=False)
sample_users = user_click_merge[user_click_merge['user_id'].isin(sample_user_ids)]
cols = ['click_environment','click_deviceGroup', 'click_os', 'click_country', 'click_region','click_referrer_type']
for _, user_df in sample_users.groupby('user_id'):
plot_envs(user_df, cols, 2, 3, figsize=(8, 4))

新闻分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 用户点击新闻数量分布
user_click_item_count = sorted(user_click_merge.groupby('user_id')['click_article_id'].count().values, reverse=True)
plt.figure(figsize=(5, 3))
plt.plot(user_click_item_count)

#点击次数在前50的用户
plt.figure(figsize=(5, 3))
plt.plot(user_click_item_count[:50])
_ii = pd.DataFrame(user_click_item_count)
_ii.value_counts()
#点击次数排名在[25000:50000]之间
plt.figure(figsize=(5, 3))
plt.plot(user_click_item_count[25000:50000])


# 新闻点击次数分析
item_click_count = sorted(user_click_merge.groupby('click_article_id')['user_id'].count(), reverse=True)
plt.figure(figsize=(5, 3))
plt.plot(item_click_count)
plt.figure(figsize=(5, 3))
plt.plot(item_click_count[:100])
plt.figure(figsize=(5, 3))
_ = plt.plot(item_click_count[:20])
print(item_click_count[:20])
plt.figure(figsize=(5, 3))
plt.plot(item_click_count[3500:])


# 新闻共现频次:两篇新闻连续出现的次数
tmp = user_click_merge.sort_values('click_timestamp')
tmp['next_item'] = tmp.groupby(['user_id'])['click_article_id'].transform(lambda x:x.shift(-1))
union_item = tmp.groupby(['click_article_id','next_item'])['click_timestamp'].agg({'count'}).reset_index().sort_values('count', ascending=False)
union_item[['count']].describe()

#画个图直观地看一看
x = union_item['click_article_id']
y = union_item['count']
plt.figure(figsize=(5, 3))
_ = plt.scatter(x, y)

plt.figure(figsize=(5, 3))
_ = plt.plot(union_item['count'].values[40000:])


# 文章信息
#不同类型的新闻出现的次数
plt.figure(figsize=(5, 3))
_ = plt.plot(user_click_merge['category_id'].value_counts().values)
#出现次数比较少的新闻类型, 有些新闻类型,基本上就出现过几次
plt.figure(figsize=(5, 3))
_ = plt.plot(user_click_merge['category_id'].value_counts().values[150:])

#新闻字数的描述性统计
user_click_merge['words_count'].describe()
plt.figure(figsize=(5, 3))
_ = plt.plot(user_click_merge['words_count'].values)

用户新闻交互分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# 用户点击新闻类型偏好
plt.figure(figsize=(5, 3))
_ = plt.plot(sorted(user_click_merge.groupby('user_id')['category_id'].nunique(), reverse=True))
user_click_merge.groupby('user_id')['category_id'].nunique().reset_index().describe()

# 用户查看文章长度分布
plt.figure(figsize=(5, 3))
_ =plt.plot(sorted(user_click_merge.groupby('user_id')['words_count'].mean(), reverse=True))
#挑出大多数人的区间仔细看看
plt.figure(figsize=(5, 3))
_ = plt.plot(sorted(user_click_merge.groupby('user_id')['words_count'].mean(), reverse=True)[1000:45000])
#更加详细的参数
user_click_merge.groupby('user_id')['words_count'].mean().reset_index().describe()

# 用户点击新闻的时间分析
#为了更好的可视化,这里把时间进行归一化操作
from sklearn.preprocessing import MinMaxScaler

mm = MinMaxScaler()
user_click_merge['click_timestamp'] = mm.fit_transform(user_click_merge[['click_timestamp']])
user_click_merge['created_at_ts'] = mm.fit_transform(user_click_merge[['created_at_ts']])

user_click_merge = user_click_merge.sort_values('click_timestamp')
user_click_merge.head()

def mean_diff_time_func(df, col):
df = pd.DataFrame(df, columns=[col])
df['time_shift1'] = df[col].shift(1).fillna(0)
df['diff_time'] = abs(df[col] - df['time_shift1'])
return df['diff_time'].mean()

# 点击时间差的平均值
mean_diff_click_time = user_click_merge.groupby('user_id')[['click_timestamp', 'created_at_ts']].apply(lambda x: mean_diff_time_func(x, 'click_timestamp'))

plt.figure(figsize=(5, 3))
_ = plt.plot(sorted(mean_diff_click_time.values, reverse=True))

# 前后点击文章的创建时间差的平均值
mean_diff_created_time = user_click_merge.groupby('user_id')[['click_timestamp', 'created_at_ts']].apply(lambda x: mean_diff_time_func(x, 'created_at_ts'))
plt.figure(figsize=(5, 3))
_ = plt.plot(sorted(mean_diff_created_time.values, reverse=True))

# 用户前后点击文章的相似性分布
item_idx_2_rawid_dict = dict(zip(item_emb_df['article_id'], item_emb_df.index))
del item_emb_df['article_id']
item_emb_np = np.ascontiguousarray(item_emb_df.values, dtype=np.float32)

# 随机选择5个用户,查看这些用户前后查看文章的相似性
sub_user_ids = np.random.choice(user_click_merge.user_id.unique(), size=15, replace=False)
sub_user_info = user_click_merge[user_click_merge['user_id'].isin(sub_user_ids)]

sub_user_info.head()

def get_item_sim_list(df):
sim_list = []
item_list = df['click_article_id'].values
for i in range(0, len(item_list)-1):
emb1 = item_emb_np[item_idx_2_rawid_dict[item_list[i]]]
emb2 = item_emb_np[item_idx_2_rawid_dict[item_list[i+1]]]
sim_list.append(np.dot(emb1,emb2)/(np.linalg.norm(emb1)*(np.linalg.norm(emb2))))
sim_list.append(0)
return sim_list

plt.figure(figsize=(5, 3))
for _, user_df in sub_user_info.groupby('user_id'):
item_sim_list = get_item_sim_list(user_df)
plt.plot(item_sim_list)

用户前后点击新闻相似度变化

从图中可以看出有些用户前后看的商品的相似度波动比较大,有些波动比较小,也是有一定的区分度的。

2、数据总结

通过数据分析的过程, 我们目前可以得到以下几点重要的信息, 这个对于我们进行后面的特征制作和分析非常有帮助:

  1. 训练集和测试集的用户id没有重复,也就是测试集里面的用户没有模型是没有见过的
  2. 训练集中用户最少的点击文章数是2, 而测试集里面用户最少的点击文章数是1
  3. 用户对于文章存在重复点击的情况, 但这个都存在于训练集里面
  4. 同一用户的点击环境存在不唯一的情况,后面做这部分特征的时候可以采用统计特征
  5. 用户点击文章的次数有很大的区分度,后面可以根据这个制作衡量用户活跃度的特征
  6. 文章被用户点击的次数也有很大的区分度,后面可以根据这个制作衡量文章热度的特征
  7. 用户看的新闻,相关性是比较强的,所以往往我们判断用户是否对某篇文章感兴趣的时候, 在很大程度上会和他历史点击过的文章有关
  8. 用户点击的文章字数有比较大的区别, 这个可以反映用户对于文章字数的区别
  9. 用户点击过的文章主题也有很大的区别, 这个可以反映用户的主题偏好
  10. 不同用户点击文章的时间差也会有所区别, 这个可以反映用户对于文章时效性的偏好

所以根据上面的一些分析,可以更好的帮助我们后面做好特征工程, 充分挖掘数据的隐含信息。

四、多路召回

所谓的“多路召回”策略,就是指采用不同的策略、特征或简单模型,分别召回一部分候选集,然后把候选集混合在一起供后续排序模型使用,可以明显的看出,“多路召回策略”是在“计算速度”和“召回率”之间进行权衡的结果。其中,各种简单策略保证候选集的快速召回,从不同角度设计的策略保证召回率接近理想的状态,不至于损伤排序效果。如下图是多路召回的一个示意图,在多路召回中,每个策略之间毫不相关,所以一般可以写并发多线程同时进行,这样可以更加高效。

多路召回

上图只是一个多路召回的例子,也就是说可以使用多种不同的策略来获取用户排序的候选商品集合,而具体使用哪些召回策略其实是与业务强相关的 ,针对不同的任务就会有对于该业务真实场景下需要考虑的召回规则。例如新闻推荐,召回规则可以是“热门视频”、“导演召回”、“演员召回”、“最近上映“、”流行趋势“、”类型召回“等等。

三种模式, 不同的模式对应的不同的数据集:

  1. Debug模式: 从海量数据的训练集中随机抽取一部分样本来进行调试(train_click_log_sample), 先跑通一个baseline。
  2. 线下验证模式: 加载整个训练集(train_click_log), 然后把整个训练集再分成训练集和验证集。训练集是模型的训练数据, 验证集部分帮助我们调整模型的参数和其他的一些超参数。
  3. 线上模式: 使用的训练数据集是全量的数据集(train_click_log+test_click_log)

1、相似度矩阵

  1. i2i_sim: 借鉴KDD2020的去偏商品推荐,在计算item2item相似性矩阵时,使用关联规则,使得计算的文章的相似性还考虑到了: 1. 用户点击的时间权重 2. 用户点击的顺序权重 3. 文章创建的时间权重
  2. u2u_sim: 在计算用户之间的相似度的时候,也可以使用一些简单的关联规则,比如用户活跃度权重,这里将用户的点击次数作为用户活跃度的指标。
  3. item_emb_sim: 使用Embedding计算item之间的相似度是为了后续冷启动的时候可以获取未出现在点击数据中的文章。

faiss是Facebook的AI团队开源的一套用于做聚类或者相似性搜索的软件库,底层是用C++实现。Faiss因为超级优越的性能,被广泛应用于推荐相关的业务当中.

faiss工具包一般使用在推荐系统中的向量召回部分。在做向量召回的时候要么是u2u,u2i或者i2i,这里的u和i指的是user和item.我们知道在实际的场景中user和item的数量都是海量的,我们最容易想到的基于向量相似度的召回就是使用两层循环遍历user列表或者item列表计算两个向量的相似度,但是这样做在面对海量数据是不切实际的,faiss就是用来加速计算某个查询向量最相似的topk个索引向量。

faiss查询的原理:

faiss使用了PCA和PQ(Product quantization乘积量化)两种技术进行向量压缩和编码,当然还使用了其他的技术进行优化,但是PCA和PQ是其中最核心部分。

  1. PCA降维算法细节参考:主成分分析(PCA)原理总结
  2. PQ编码的细节参考:实例理解product quantization算法

faiss使用faiss官方教程

2、召回

召回常用的策略:

  • Youtube DNN 召回
  • 基于文章的召回
    • 文章的协同过滤
    • 基于文章embedding的召回
  • 基于用户的召回
    • 用户的协同过滤
    • 用户embedding

上面的各种召回方式一部分在基于用户已经看得文章的基础上去召回与这些文章相似的一些文章,而这个相似性的计算方式不同,就得到了不同的召回方式,比如文章的协同过滤,文章内容的embedding等。还有一部分是根据用户的相似性进行推荐,对于某用户推荐与其相似的其他用户看过的文章,比如用户的协同过滤和用户embedding。 还有一种思路是类似矩阵分解的思路,先计算出用户和文章的embedding之后,就可以直接算用户和文章的相似度,根据这个相似度进行推荐,比如YouTube DNN。我们下面详细来看一下每一个召回方法:

YoutubeDNN召回

(这一步是直接获取用户召回的候选文章列表)

论文下载地址

Youtubednn召回架构

YoutubeDNN召回架构

关于YoutubeDNN原理和应用推荐看王喆的两篇博客:

  1. 重读Youtube深度学习推荐系统论文,字字珠玑,惊为神文
  2. YouTube深度学习推荐系统的十大工程问题

参考文献:

  1. YouTubeDNN原理
  2. word2vec放到排序中的w2v的介绍部分

ItemCF召回

已经通过协同过滤,Embedding检索的方式得到了文章的相似度矩阵,下面使用协同过滤的思想,给用户召回与其历史文章相似的文章。 这里在召回的时候,也是用了关联规则的方式:

  1. 考虑相似文章与历史点击文章顺序的权重
  2. 考虑文章创建时间的权重,也就是考虑相似文章与历史点击文章创建时间差的权重
  3. 考虑文章内容相似度权重(使用Embedding计算相似文章相似度,但是这里需要注意,在Embedding的时候并没有计算所有商品两两之间的相似度,所以相似的文章与历史点击文章不存在相似度,需要做特殊处理)

UserCF召回

基于用户协同过滤,核心思想是给用户推荐与其相似的用户历史点击文章,因为这里涉及到了相似用户的历史文章,这里仍然可以加上一些关联规则来给用户可能点击的文章进行加权,这里使用的关联规则主要是考虑相似用户的历史点击文章与被推荐用户历史点击商品的关系权重,而这里的关系就可以直接借鉴基于物品的协同过滤相似的做法,只不过这里是对被推荐物品关系的一个累加的过程。

(1)UserCF(emb):使用usercf的u2u用户相似度和emb的物品相似度,进行u2u2i召回。

(2)UserCF(emb+youtubednnuser) :使用youtubednn的u2u用户相似度和emb的物品相似度,进行u2u2i召回。

运行效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
已加载 .env 文件: /Users/seymour/GitHub/LLM4Rec/FunRec/.env
[INFO] 11:31:02 Start...

[1] 采样数据...
[1] 新闻数据...
[2] 相似矩阵...
ItemCF_sim: 100%|███████████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 2290467.45it/s]
itemcf相似度矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/itemcf_i2i_sim.pkl
UserCF_sim: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 9513/9513 [00:05<00:00, 1703.22it/s]
usercf相似度矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/usercf_u2u_sim.pkl
faiss 正在处理(等待时间较长)...
faiss 处理完毕.
embedding_sim: 364047it [00:03, 102177.76it/s]
content-based相似度矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/emb_i2i_sim.pkl

[INFO] 1. YoutubeDNN recall ==============
gen_data_set: 100%|███████████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 48303.33it/s]
YoutubeDNN训练后分离user向量矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/user_youtube_emb.pkl
YoutubeDNN训练后分离item向量矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/item_youtube_emb.pkl
FAISS开始处理...
FAISS处理完毕.
YoutubeDNN_相似度矩阵: 20000it [00:00, 67522.37it/s]
YoutubeDNN_u2i矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/youtube_u2i_dict.pkl
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/YoutubeDNN_recall_12-21 11:33.pkl
Hit@5 2.79% (557/20000)
Hit@10 4.82% (964/20000)
Hit@15 6.69% (1338/20000)
Hit@20 8.04% (1608/20000)

[INFO] 2. ItemCF(itemcf) recall ===============
ItemCF i2i recall: 100%|██████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 75085.73it/s]
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/ItemCF(itemcf)_recall_12-21 11:33.pkl
Hit@5 6.46% (1291/20000)
Hit@10 10.36% (2072/20000)

[INFO] 3. ItemCF(emb) recall ===============
ItemCF i2i recall: 100%|██████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 64512.62it/s]
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/ItemCF(emb)_recall_12-21 11:33.pkl
Hit@5 1.37% (274/20000)
Hit@10 1.59% (318/20000)

[INFO] 4. UserCF(emb+usercf) recall ===============
UserCF u2u2i recall: 100%|█████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:02<00:00, 6786.53it/s]
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/UserCF(emb+usercf)_recall_12-21 11:33.pkl
Hit@5 12.98% (2597/20000)
Hit@10 18.14% (3628/20000)

[INFO] 5. UserCF(emb+youtubeuser) recall ===============
UserCF u2u_sim: 20000it [00:00, 122271.86it/s]
UserCF(YoutubeUser) recall: 100%|█████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 65092.71it/s]
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/UserCF(emb+youtubeuser)_recall_12-21 11:33.pkl
Hit@5 1.25% (249/20000)
Hit@10 4.65% (930/20000)

[INFO] 11:33:16 Finished. (2.22 mins)

3、冷启动问题

冷启动问题可以分成三类:文章冷启动,用户冷启动,系统冷启动

  1. 文章冷启动:对于一个平台系统新加入的文章,该文章没有任何的交互记录,如何推荐给用户的问题。

    对于我们场景可以认为是,日志数据中没有出现过的文章都可以认为是冷启动的文章。

  2. 用户冷启动:对于一个平台系统新来的用户,该用户还没有文章的交互信息,如何给该用户进行推荐。

    对于我们场景就是,测试集中的用户是否在测试集对应的log数据中出现过,如果没有出现过,那么可以认为该用户是冷启动用户。但是有时候并没有这么严格,我们也可以自己设定某些指标来判别哪些用户是冷启动用户,比如通过使用时长,点击率,留存率等等。

  3. 系统冷启动:就是对于一个平台刚上线,还没有任何的相关历史数据,此时就是系统冷启动,其实也就是前面两种的一个综合。

当前场景下冷启动问题的分析

对当前的数据进行分析会发现,日志中所有出现过的点击文章只有3w多个,而整个文章库中却有30多万,那么测试集中的用户最后一次点击是否会点击没有出现在日志中的文章呢?如果存在这种情况,说明用户点击的文章之前没有任何的交互信息,这也就是我们所说的文章冷启动。通过数据分析还可以发现,测试集用户只有一次点击的数据占得比例还不少,其实仅仅通过用户的一次点击就给用户推荐文章使用模型的方式也是比较难的,这里其实也可以考虑用户冷启动的问题,但是这里只给出物品冷启动的一些解决方案及代码,关于用户冷启动的话提一些可行性的做法。

  1. 文章冷启动(没有冷启动的探索问题):其实我们这里不是为了做文章的冷启动而做冷启动,而是猜测用户可能会点击一些没有在log数据中出现的文章,我们要做的就是如何从将近27万的文章中选择一些文章作为用户冷启动的文章,这里其实也可以看成是一种召回策略,我们这里就采用简单的比较好理解的基于规则的召回策略来获取用户可能点击的未出现在log数据中的文章。 现在的问题变成了:如何给每个用户考虑从27万个商品中获取一小部分商品?随机选一些可能是一种方案。下面给出一些参考的方案。
    • 首先基于Embedding召回一部分与用户历史相似的文章
    • 从基于Embedding召回的文章中通过一些规则过滤掉一些文章,使得留下的文章用户更可能点击。我们这里的规则,可以是,留下那些与用户历史点击文章主题相同的文章,或者字数相差不大的文章。并且留下的文章尽量是与测试集用户最后一次点击时间更接近的文章,或者是当天的文章也行。
  2. 用户冷启动:这里对测试集中的用户点击数据进行分析会发现,测试集中有百分之20的用户只有一次点击,那么这些点击特别少的用户的召回是不是可以单独做一些策略上的补充呢?或者是在排序后直接基于规则加上一些文章呢?这些都可以去尝试,这里没有提供具体的做法。

注意:这里看似和基于embedding计算的item之间相似度然后做itemcf是一致的,但是现在我们的目的不一样,我们这里的目的是找到相似的向量,并且还没有出现在log日志中的商品,再加上一些其他的冷启动的策略,这里需要找回的数量会偏多一点,不然被筛选完之后可能都没有文章了

4、多路召回合并

多路召回合并就是将前面所有的召回策略得到的用户文章列表合并起来,下面是对前面所有召回结果的汇总

  1. 基于itemcf计算的item之间的相似度sim进行的召回
  2. 基于embedding搜索得到的item之间的相似度进行的召回
  3. 基于usercf计算的用户相似度u2u2i召回
  4. YoutubeDNN召回
  5. YoutubeDNN得到的user之间的相似度进行的u2u2i召回
  6. 基于冷启动策略的召回

注意:
在做召回评估的时候就会发现有些召回的效果不错有些召回的效果很差,所以对每一路召回的结果,我们可以认为的定义一些权重,来做最终的相似度融合

对于上述实现的召回策略其实都不是最优的结果,我们只是做了个简单的尝试,其中还有很多地方可以优化,包括已经实现的这些召回策略的参数或者新加一些,修改一些关联规则都可以。当然还可以尝试更多的召回策略,比如对新闻进行热度召回等等。

五、特征工程

特征工程和数据清洗转换是比赛中至关重要的一块, 因为数据和特征决定了机器学习的上限,而算法和模型只是逼近这个上限而已,所以特征工程的好坏往往决定着最后的结果,特征工程可以一步增强数据的表达能力,通过构造新特征,我们可以挖掘出数据的更多信息,使得数据的表达能力进一步放大。 在本节内容中,我们主要是先通过制作特征和标签把预测问题转成了监督学习问题,然后围绕着用户画像和文章画像进行一系列特征的制作, 此外,为了保证正负样本的数据均衡,我们还学习了负采样就技术等。当然本节内容只是对构造特征提供了一些思路,也请学习者们在学习过程中开启头脑风暴,尝试更多的构造特征的方法,也欢迎我们一块探讨和交流。

不多赘述,可以查看原文:6.5特征工程

六、排序模型

通过召回的操作, 我们已经进行了问题规模的缩减, 对于每个用户,选择出了N篇文章作为了候选集,并基于召回的候选集构建了与用户历史相关的特征,以及用户本身的属性特征,文章本省的属性特征,以及用户与文章之间的特征,下面就是使用机器学习模型来对构造好的特征进行学习,然后对测试集进行预测,得到测试集中的每个候选集用户点击的概率,返回点击概率最大的topk个文章,作为最终的结果。

排序阶段选择了三个比较有代表性的排序模型,它们分别是:

  1. LGB的排序模型
  2. LGB的分类模型
  3. 深度学习的分类模型DIN

得到了最终的排序模型输出的结果之后,还选择了两种比较经典的模型集成的方法:

  1. 输出结果加权融合
  2. Staking(将模型的输出结果再使用一个简单模型进行预测)

1、排序

LGB排序

1
2
3
4
5
6
7
8
9
10
11
# 定义模型
lgb_ranker = lgb.LGBMRanker(boosting_type='gbdt', num_leaves=31, reg_alpha=0.0, reg_lambda=1,
max_depth=-1, n_estimators=100, subsample=0.7, colsample_bytree=0.7, subsample_freq=1,
learning_rate=0.01, min_child_weight=50, random_state=2018, n_jobs= 16)
# 训练模型
lgb_ranker.fit(train_idx[lgb_cols], train_idx['label'], group=g_train,
eval_set=[(valid_idx[lgb_cols], valid_idx['label'])], eval_group= [g_val],
eval_at=[1, 2, 3, 4, 5], eval_metric=['ndcg', ])

# 预测验证集结果
valid_idx['pred_score'] = lgb_ranker.predict(valid_idx[lgb_cols], num_iteration=lgb_ranker.best_iteration_)

LGB分类

1
2
3
4
5
6
7
8
9
10
11
# 模型及参数的定义
lgb_Classfication = lgb.LGBMClassifier(boosting_type='gbdt', num_leaves=31, reg_alpha=0.0, reg_lambda=1,
max_depth=-1, n_estimators=100, subsample=0.7, colsample_bytree=0.7, subsample_freq=1,
learning_rate=0.01, min_child_weight=50, random_state=2018, n_jobs= 16, verbose=10)
# 训练模型
lgb_Classfication.fit(train_idx[lgb_cols], train_idx['label'],eval_set=[(valid_idx[lgb_cols], valid_idx['label'])],
eval_metric=['auc', ])

# 预测验证集结果
valid_idx['pred_score'] = lgb_Classfication.predict_proba(valid_idx[lgb_cols],
num_iteration=lgb_Classfication.best_iteration_)[:,1]

DIN模型简介

我们下面尝试使用DIN模型, DIN的全称是Deep Interest Network, 这是阿里2018年基于前面的深度学习模型无法表达用户多样化的兴趣而提出的一个模型, 它可以通过考虑【给定的候选广告】和【用户的历史行为】的相关性,来计算用户兴趣的表示向量。具体来说就是通过引入局部激活单元,通过软搜索历史行为的相关部分来关注相关的用户兴趣,并采用加权和来获得有关候选广告的用户兴趣的表示。与候选广告相关性较高的行为会获得较高的激活权重,并支配着用户兴趣。该表示向量在不同广告上有所不同,大大提高了模型的表达能力。所以该模型对于此次新闻推荐的任务也比较适合, 我们在这里通过当前的候选文章与用户历史点击文章的相关性来计算用户对于文章的兴趣。 该模型的结构如下:

DIN

DIN模型

我们这里直接调包来使用这个模型, 关于这个模型的详细细节部分我们会在下一期的推荐系统组队学习中给出。下面说一下该模型如何具体使用:deepctr的函数原型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def DIN(dnn_feature_columns, history_feature_list, dnn_use_bn=False,
dnn_hidden_units=(200, 80), dnn_activation='relu', att_hidden_size=(80, 40), att_activation="dice",
att_weight_normalization=False, l2_reg_dnn=0, l2_reg_embedding=1e-6, dnn_dropout=0, seed=1024,
task='binary'):
pass
# * dnn_feature_columns: 特征列, 包含数据所有特征的列表
# * history_feature_list: 用户历史行为列, 反应用户历史行为的特征的列表
# * dnn_use_bn: 是否使用BatchNormalization
# * dnn_hidden_units: 全连接层网络的层数和每一层神经元的个数, 一个列表或者元组
# * dnn_activation_relu: 全连接网络的激活单元类型
# * att_hidden_size: 注意力层的全连接网络的层数和每一层神经元的个数
# * att_activation: 注意力层的激活单元类型
# * att_weight_normalization: 是否归一化注意力得分
# * l2_reg_dnn: 全连接网络的正则化系数
# * l2_reg_embedding: embedding向量的正则化稀疏
# * dnn_dropout: 全连接网络的神经元的失活概率
# * task: 任务, 可以是分类, 也可是是回归

在具体使用的时候, 我们必须要传入特征列和历史行为列, 但是再传入之前, 我们需要进行一下特征列的预处理。具体如下:

  1. 首先,我们要处理数据集, 得到数据, 由于我们是基于用户过去的行为去预测用户是否点击当前文章, 所以我们需要把数据的特征列划分成数值型特征, 离散型特征和历史行为特征列三部分, 对于每一部分, DIN模型的处理会有不同:
    1. 对于离散型特征, 在我们的数据集中就是那些类别型的特征, 比如user_id这种, 这种类别型特征, 我们首先要经过embedding处理得到每个特征的低维稠密型表示, 既然要经过embedding, 那么我们就需要为每一列的类别特征的取值建立一个字典,并指明embedding维度, 所以在使用deepctr的DIN模型准备数据的时候, 我们需要通过SparseFeat函数指明这些类别型特征, 这个函数的传入参数就是列名, 列的唯一取值(建立字典用)和embedding维度。
    2. 对于用户历史行为特征列, 比如文章id, 文章的类别等这种, 同样的我们需要先经过embedding处理, 只不过和上面不一样的地方是,对于这种特征, 我们在得到每个特征的embedding表示之后, 还需要通过一个Attention_layer计算用户的历史行为和当前候选文章的相关性以此得到当前用户的embedding向量, 这个向量就可以基于当前的候选文章与用户过去点击过得历史文章的相似性的程度来反应用户的兴趣, 并且随着用户的不同的历史点击来变化,去动态的模拟用户兴趣的变化过程。这类特征对于每个用户都是一个历史行为序列, 对于每个用户, 历史行为序列长度会不一样, 可能有的用户点击的历史文章多,有的点击的历史文章少, 所以我们还需要把这个长度统一起来, 在为DIN模型准备数据的时候, 我们首先要通过SparseFeat函数指明这些类别型特征, 然后还需要通过VarLenSparseFeat函数再进行序列填充, 使得每个用户的历史序列一样长, 所以这个函数参数中会有个maxlen,来指明序列的最大长度是多少。
    3. 对于连续型特征列, 我们只需要用DenseFeat函数来指明列名和维度即可。
  2. 处理完特征列之后, 我们把相应的数据与列进行对应,就得到了最后的数据。

2、排序融合

加权融合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 读取多个模型的排序结果文件
lgb_ranker = pd.read_csv(save_path / 'lgb_ranker_score.csv')
lgb_cls = pd.read_csv(save_path / 'lgb_cls_score.csv')
din_ranker = pd.read_csv(save_path / 'din_rank_score.csv')

rank_model = {'lgb_ranker': lgb_ranker,
'lgb_cls': lgb_cls,
'din_ranker': din_ranker}

def get_ensumble_predict_topk(rank_model, topk=5):
# final_recall = rank_model['lgb_cls'].append(rank_model['din_ranker'])
final_recall = pd.concat([rank_model['lgb_cls'], rank_model['din_ranker']]).reset_index(drop=True)
rank_model['lgb_ranker']['pred_score'] = rank_model['lgb_ranker']['pred_score'].transform(lambda x: norm_sim(x))

# final_recall = final_recall.append(rank_model['lgb_ranker'])
final_recall = pd.concat([final_recall, rank_model['lgb_ranker']]).reset_index(drop=True)
final_recall = final_recall.groupby(['user_id', 'click_article_id'])['pred_score'].sum().reset_index()

submit(final_recall, topk=topk, model_name='ensemble_fuse')

get_ensumble_predict_topk(rank_model)

Staking

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 读取多个模型的交叉验证生成的结果文件
# 训练集
trn_lgb_ranker_feats = pd.read_csv(save_path / 'trn_lgb_ranker_feats.csv')
trn_lgb_cls_feats = pd.read_csv(save_path / 'trn_lgb_cls_feats.csv')
trn_din_cls_feats = pd.read_csv(save_path / 'trn_din_cls_feats.csv')

# 测试集
tst_lgb_ranker_feats = pd.read_csv(save_path / 'tst_lgb_ranker_feats.csv')
tst_lgb_cls_feats = pd.read_csv(save_path / 'tst_lgb_cls_feats.csv')
tst_din_cls_feats = pd.read_csv(save_path / 'tst_din_cls_feats.csv')

# 将多个模型输出的特征进行拼接
finall_trn_ranker_feats = trn_lgb_ranker_feats[['user_id', 'click_article_id', 'label']]
finall_tst_ranker_feats = tst_lgb_ranker_feats[['user_id', 'click_article_id']]

for idx, trn_model in enumerate([trn_lgb_ranker_feats, trn_lgb_cls_feats, trn_din_cls_feats]):
for feat in [ 'pred_score', 'pred_rank']:
col_name = feat + '_' + str(idx)
finall_trn_ranker_feats[col_name] = trn_model[feat]

for idx, tst_model in enumerate([tst_lgb_ranker_feats, tst_lgb_cls_feats, tst_din_cls_feats]):
for feat in [ 'pred_score', 'pred_rank']:
col_name = feat + '_' + str(idx)
finall_tst_ranker_feats[col_name] = tst_model[feat]


# 定义一个逻辑回归模型再次拟合交叉验证产生的特征对测试集进行预测
# 这里需要注意的是,在做交叉验证的时候可以构造多一些与输出预测值相关的特征,来丰富这里简单模型的特征
from sklearn.linear_model import LogisticRegression

feat_cols = ['pred_score_0', 'pred_rank_0', 'pred_score_1', 'pred_rank_1', 'pred_score_2', 'pred_rank_2']

trn_x = finall_trn_ranker_feats[feat_cols]
trn_y = finall_trn_ranker_feats['label']

tst_x = finall_tst_ranker_feats[feat_cols]

# 采样50000行数据 因为全量数据太大了
sample_indices = trn_x.sample(n=50000, random_state=42).index
trn_x_sample = trn_x.loc[sample_indices]
trn_y_sample = trn_y.loc[sample_indices]

print(f"Original training data shape: {trn_x.shape}")
print(f"Sampled training data shape: {trn_x_sample.shape}")

# 定义模型
lr = LogisticRegression()

# 模型训练
lr.fit(trn_x_sample, trn_y_sample)

# 模型预测
test_score = []
test_batch_size = 10000
for i in tqdm(range(0, len(tst_x), test_batch_size), total=len(tst_x)//test_batch_size, desc="Predicting test score"):
test_score.append(lr.predict_proba(tst_x.iloc[i:i+test_batch_size])[:, 1])

finall_tst_ranker_feats['pred_score'] = np.concatenate(test_score)

附:代码

recall.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import os, warnings, pickle
import numpy as np
import pandas as pd
from collections import defaultdict
from pathlib import Path
from tqdm import tqdm
from datetime import datetime
from model_itemcf import itemcf_sim, itemcf_recall, cold_start_prepare, cold_start_recall
from model_usercf import usercf_sim, u2u_embdding_sim, usercf_recall, usercf_recall_YoutubeUser
from model_youtubednn import YoutubeDNN_u2i_dict
from utils import load_dotenv, get_all_click_sample, get_all_click_df, get_item_info_df, get_item_emb_dict \
,get_item_info_dict, get_hist_last_click, get_user_active_degree_dict, get_item_emb_df\
,embedding_sim, metrics_recall, get_user_item_time, get_item_topk_click, save_recall,\
get_user_hist_item_info_dict, combine_recall_results, submit

START_TIME = datetime.now()
print(f'[INFO] {START_TIME.strftime("%H:%M:%S")} Start...\n')
os.environ['OMP_NUM_THREADS'] = '1'
warnings.filterwarnings('ignore')


load_dotenv()
RAW_DATA_PATH = Path(os.getenv('FUNREC_RAW_DATA_PATH'))
PROCESSED_DATA_PATH = Path(os.getenv('FUNREC_PROCESSED_DATA_PATH'))
data_path = RAW_DATA_PATH / 'news_recommendation'
save_path = PROCESSED_DATA_PATH / 'projects/news_recommendation'
metric_recall = True # 做召回评估的一个标志, 如果不进行评估就是直接使用全量数据进行召回

max_min_scaler = lambda x: (x-np.min(x)) / (np.max(x)-np.min(x))

# 采样数据
print('[1] 采样数据...')
if metric_recall:
all_click_df = get_all_click_sample(data_path)
# all_click_df = get_all_click_df(data_path, offline=True)
else:
all_click_df = get_all_click_df(data_path, offline=False)
tst_click = pd.read_csv(data_path / 'testA_click_log.csv')

# all_click_df = get_all_click_df(data_path, offline=False)
# 时间戳归一化,用于规则权重(注意后面必须是两个[],否则apply的使用会不对)
all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(max_min_scaler)

# 新闻
print('[1] 新闻数据...')
item_info_df = get_item_info_df(data_path)
item_emb_df = get_item_emb_df(data_path)
item_emb_dict = get_item_emb_dict(data_path, save_path)
# 获取文章的属性信息,保存成字典的形式方便查询
item_creat_dict, item_category_dict, item_words_dict = get_item_info_dict(item_info_df)
# 用户活跃度
user_active_dict = get_user_active_degree_dict(all_click_df)


# 多路召回字典
user_multi_recall_dict = {
'YoutubeDNN_recall': {},
'ItemCF(itemcf)_recall': {},
'ItemCF(emb)_recall': {},
'UserCF(emb+usercf)_recall': {},
'UserCF(emb+youtubeuser)_recall': {},
'Coldstart_recall': {}
}
# 多路召回的权重
weight_dict = {
'YoutubeDNN_recall':5.0,
'ItemCF(itemcf)_recall': 10.0,
'ItemCF(emb)_recall': 2.0,
'UserCF(emb+usercf)_recall': 20.0,
'UserCF(emb+youtubeuser)_recall': 5.0,
'Coldstart_recall': 1.0
}

print('[2] 相似矩阵...')
trn_hist_click_df, trn_last_click_df = get_hist_last_click(all_click_df)
i2i_sim = itemcf_sim(trn_hist_click_df, save_path, item_creat_dict)
u2u_sim = usercf_sim(all_click_df, save_path, user_active_dict) # 太耗时了
emb_i2i_sim = embedding_sim(all_click_df, item_emb_df, save_path, topk=10)


# 线下测试:召回+评估
if metric_recall:
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
print("\n[INFO] 1. YoutubeDNN recall ==============")
user_multi_recall_dict['YoutubeDNN_recall'] = YoutubeDNN_u2i_dict(trn_hist_click_df, save_path, topk=20)
save_recall(user_multi_recall_dict['YoutubeDNN_recall'], save_path, "YoutubeDNN")
metrics_recall(user_multi_recall_dict['YoutubeDNN_recall'], trn_last_click_df, topk=20)


print("\n[INFO] 2. ItemCF(itemcf) recall ===============")
user_multi_recall_dict['ItemCF(itemcf)_recall'] = itemcf_recall(trn_hist_click_df, item_topk_click, \
i2i_sim, save_path, \
sim_item_topk=20, recall_item_num=10, \
emb_i2i_sim=emb_i2i_sim, item_creat_dict=item_creat_dict)
save_recall(user_multi_recall_dict['ItemCF(itemcf)_recall'], save_path, "ItemCF(itemcf)")
metrics_recall(user_multi_recall_dict['ItemCF(itemcf)_recall'], trn_last_click_df, topk=10)

print("\n[INFO] 3. ItemCF(emb) recall ===============")
i2i_sim_3 = emb_i2i_sim
user_multi_recall_dict['ItemCF(emb)_recall'] = itemcf_recall(trn_hist_click_df, item_topk_click, \
i2i_sim_3, save_path, \
sim_item_topk=20, recall_item_num=10, \
emb_i2i_sim=emb_i2i_sim, item_creat_dict=item_creat_dict)
save_recall(user_multi_recall_dict['ItemCF(emb)_recall'], save_path, "ItemCF(emb)")
metrics_recall(user_multi_recall_dict['ItemCF(emb)_recall'], trn_last_click_df, topk=10)

print("\n[INFO] 4. UserCF(emb+usercf) recall ===============")
user_multi_recall_dict['UserCF(emb+usercf)_recall'] = usercf_recall(trn_hist_click_df, item_topk_click, \
u2u_sim, item_creat_dict, emb_i2i_sim,\
sim_user_topk=20, recall_item_num=10)
save_recall(user_multi_recall_dict['UserCF(emb+usercf)_recall'], save_path, "UserCF(emb+usercf)")
metrics_recall(user_multi_recall_dict['UserCF(emb+usercf)_recall'], trn_last_click_df, topk=10)

print("\n[INFO] 5. UserCF(emb+youtubeuser) recall ===============")
user_emb_dict = pickle.load(open(save_path / 'user_youtube_emb.pkl', 'rb'))
u2u_sim_dict = u2u_embdding_sim(trn_hist_click_df, user_emb_dict, save_path, topk=10)
u2u_sim_5 = pickle.load(open(save_path / 'youtube_u2u_sim.pkl', 'rb'))
user_multi_recall_dict['UserCF(emb+youtubeuser)_recall'] = usercf_recall_YoutubeUser(trn_hist_click_df, item_topk_click,\
u2u_sim_5, item_creat_dict,\
emb_i2i_sim, save_path,\
sim_user_topk=20, recall_item_num=10)
save_recall(user_multi_recall_dict['UserCF(emb+youtubeuser)_recall'], save_path, "UserCF(emb+youtubeuser)")
metrics_recall(user_multi_recall_dict['UserCF(emb+youtubeuser)_recall'], trn_last_click_df, topk=10)

print("\n[INFO] 6. Coldstart recall ===============")
cold_click_df_ = trn_hist_click_df.copy()
cold_click_df_ = cold_click_df_.merge(item_info_df, how='left', on='click_article_id')
user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict = get_user_hist_item_info_dict(cold_click_df_)
click_article_ids_set = set(cold_click_df_['click_article_id'].values)
# 注意:这里使用了很多规则来筛选冷启动的文章,所以前面再召回的阶段就应该尽可能的多召回一些文章,否则很容易被删掉
user_recall_items_dict = cold_start_prepare(cold_click_df_, i2i_sim, item_topk_click, \
save_path, item_creat_dict, emb_i2i_sim)
cold_start_user_items_dict = cold_start_recall(user_recall_items_dict, user_hist_item_typs_dict, \
user_hist_item_words_dict, user_last_item_created_time_dict, \
item_category_dict, item_words_dict, item_creat_dict, \
click_article_ids_set, recall_item_num=10)
user_multi_recall_dict['Coldstart_recall'] = cold_start_user_items_dict
save_recall(user_multi_recall_dict['Coldstart_recall'], save_path, "Coldstart")
metrics_recall(user_multi_recall_dict['Coldstart_recall'], trn_last_click_df, topk=10)



# 线上提交:只召回
else:
item_topk_click = get_item_topk_click(all_click_df, k=50)
# 1
user_multi_recall_dict['YoutubeDNN_recall'] = YoutubeDNN_u2i_dict(all_click_df, save_path, topk=20)
save_recall(user_multi_recall_dict['YoutubeDNN_recall'], save_path, "YoutubeDNN")

# 2
user_multi_recall_dict['ItemCF(itemcf)_recall'] = itemcf_recall(all_click_df, item_topk_click, \
i2i_sim, save_path, \
sim_item_topk=20, recall_item_num=10, \
emb_i2i_sim=None, item_creat_dict=None)
save_recall(user_multi_recall_dict['ItemCF(itemcf)_recall'], save_path, "ItemCF(itemcf)")

# 3
i2i_sim_3 = emb_i2i_sim
user_multi_recall_dict['ItemCF(emb)_recall'] = itemcf_recall(all_click_df, item_topk_click, \
i2i_sim_3, save_path, \
sim_item_topk=20, recall_item_num=10, \
emb_i2i_sim=None, item_creat_dict=None)
save_recall(user_multi_recall_dict['ItemCF(emb)_recall'], save_path, "ItemCF(emb)")

# 4
user_multi_recall_dict['UserCF(emb+usercf)_recall'] = usercf_recall(all_click_df, item_topk_click, \
u2u_sim, item_creat_dict, emb_i2i_sim,\
sim_user_topk=20, recall_item_num=10)
save_recall(user_multi_recall_dict['UserCF(emb+usercf)_recall'], save_path, "UserCF(emb+usercf)")

# 5
user_emb_dict = pickle.load(open(save_path / 'user_youtube_emb.pkl', 'rb'))
u2u_sim_dict = u2u_embdding_sim(all_click_df, user_emb_dict, save_path, topk=10)
u2u_sim_5 = pickle.load(open(save_path / 'youtube_u2u_sim.pkl', 'rb'))
user_multi_recall_dict['UserCF(emb+youtubeuser)_recall'] = usercf_recall_YoutubeUser(all_click_df, item_topk_click,\
u2u_sim_5, item_creat_dict,\
emb_i2i_sim, save_path,\
sim_user_topk=20, recall_item_num=10)
save_recall(user_multi_recall_dict['UserCF(emb+youtubeuser)_recall'], save_path, "UserCF(emb+youtubeuser)")

# 6
cold_click_df_ = all_click_df.copy()
cold_click_df_ = cold_click_df_.merge(item_info_df, how='left', on='click_article_id')
user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict = get_user_hist_item_info_dict(cold_click_df_)
click_article_ids_set = set(cold_click_df_['click_article_id'].values)
user_recall_items_dict = cold_start_prepare(cold_click_df_, i2i_sim, item_topk_click, \
save_path, item_creat_dict, emb_i2i_sim)
cold_start_user_items_dict = cold_start_recall(user_recall_items_dict, user_hist_item_typs_dict, \
user_hist_item_words_dict, user_last_item_created_time_dict, \
item_category_dict, item_words_dict, item_creat_dict, \
click_article_ids_set, recall_item_num=10)
user_multi_recall_dict['Coldstart_recall'] = cold_start_user_items_dict
save_recall(user_multi_recall_dict['Coldstart_recall'], save_path, "Coldstart")

# 最终合并之后每个用户召回150个商品进行排序
print("\n[INFO] Combine recall results ===============")
final_recall_items_dict_rank = combine_recall_results(user_multi_recall_dict, weight_dict, topk=30)
save_recall(final_recall_items_dict_rank, save_path, "Final")
if metric_recall:
metrics_recall(final_recall_items_dict_rank, trn_last_click_df, topk=30)
else:
tst_user = tst_click['user_id'].unique()
final_recall_items_dict_rank = final_recall_items_dict_rank[final_recall_items_dict_rank['user_id'].isin(tst_user)]
submit(final_recall_items_dict_rank, save_path, topk=5, model_name='multi-recall')

END_TIME = datetime.now()
print(f'\n[INFO] {END_TIME.strftime("%H:%M:%S")} Finished. ({(END_TIME-START_TIME).total_seconds()/60:.2f} mins)')

utils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import os, time, pickle, faiss
import numpy as np
import pandas as pd
from tqdm import tqdm
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from dotenv import find_dotenv, load_dotenv
from sklearn.preprocessing import MinMaxScaler

# 提交函数
def submit(recall_df, save_path, topk=5, model_name=None):
# 生成排名
recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
recall_df['rank'] = recall_df.groupby('user_id')['pred_score'].rank(ascending=False, method='first')
# 保证都有5篇以上
tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max())
assert tmp.min() > topk
# 把排名改为按要求的1-5横向摆放
del recall_df['pred_score']
submit = recall_df[recall_df['rank']<=topk].set_index(['user_id', 'rank']).unstack(-1).reset_index()
submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]
submit = submit.rename(columns = {'':'user_id', 1:'article_1', 2:'article_2', 3:'article_3',
4:'article_4', 5:'article_5'})
save_name = save_path / f"{model_name}_{datetime.now().strftime('%m-%d %H:%M')}.csv"
submit.to_csv(save_name, index=False, header=True)

def save_recall(user_recall_items_dict, save_path, recall_name):
tt = datetime.now().strftime("%m-%d %H:%M")
save_name = save_path / f'{recall_name}_recall_{tt}.pkl'
pickle.dump(user_recall_items_dict, open(save_name, 'wb'))
print(f'itemcf recall结果保存:{save_name}')

# 评估函数 (Hit@5、10/15/20/25/30...)
def metrics_recall(user_recall_item_dict, trn_last_item_df, topk=30):
last_item_dict = dict(zip(trn_last_item_df['user_id'], trn_last_item_df['click_article_id']))
total_num = len(last_item_dict)

for k in range(5, topk+1, 5):
hit_num = 0
for user, items in user_recall_item_dict.items():
items_set = set([x[0] for x in items[:k]])
if last_item_dict[user] in items_set:
hit_num += 1
print(f'Hit@{k} \t {100*hit_num/total_num :5.2f}% ({hit_num}/{total_num})')




# 加载环境函数
def load_env_with_fallback() -> None:
envpath = find_dotenv(usecwd=True)
if envpath:
load_dotenv(envpath)
print("已加载 .env 文件:", envpath)
else:
print("未找到 .env 文件")
if not os.getenv('FUNREC_RAW_DATA_PATH'):
os.environ['FUNREC_RAW_DATA_PATH'] = str(Path.cwd()) / "dataset"
if not os.getenv('FUNREC_PROCESSED_DATA_PATH'):
os.environ['FUNREC_PROCESSED_DATA_PATH'] = str(Path.cwd()) / "dataset_processed"

# df 节省内存
def reduce_mem(df):
start_time = time.time()
start_mem = df.memory_usage().sum() / 1024**2
numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
for col in df.columns:
col_type = df[col].dtypes
if col_type in numerics:
dmin = df[col].min()
dmax = df[col].max()
if pd.isnull(dmin) or pd.isnull(dmax):
continue
else:
if str(col_type)[:3] == 'int':
if dmin > np.iinfo(np.int8).min and dmax < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif dmin > np.iinfo(np.int16).min and dmax < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif dmin > np.iinfo(np.int32).min and dmax < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
else:
df[col] = df[col].astype(np.int64)
else:
if dmin > np.iinfo(np.float16).min and dmax < np.iinfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif dmin > np.iinfo(np.float32).min and dmax < np.iinfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
end_mem = df.memory_usage().sum() / 1024**2
end_time = time.time()
print("Memory usage decreased to {:5.2f}Mb({:.2f}% redcution), time spend {:.2f} min".format(
end_mem, 100*(start_mem-end_mem)/start_mem, (start_time-end_time)/60))
return df

# 采样部分数据进行 调试、线下开发、线上提交
def get_all_click_sample(data_path, sample_num=20000):
all_click = pd.read_csv(data_path / "train_click_log.csv")
all_user_ids = all_click['user_id'].unique()

sample_user_ids = np.random.choice(all_user_ids ,size=sample_num, replace=False)
all_click = all_click[all_click['user_id'].isin(sample_user_ids)]

all_click = all_click.drop_duplicates(['user_id', 'click_article_id', 'click_timestamp']).reset_index(drop=True)
return all_click

def get_all_click_df(data_path, offline=True):
if offline:
all_click = pd.read_csv(data_path / "train_click_log.csv")[:20000]
else:
tst_click = pd.read_csv(data_path / 'testA_click_log.csv')
trn_click = pd.read_csv(data_path / "train_click_log.csv")
# trn_click = pd.read_csv(data_path / "train_click_log.csv")[:10000]
# tst_click = pd.read_csv(data_path / 'testA_click_log.csv')[:10000]
all_click = pd.concat([trn_click, tst_click])

all_click = all_click.drop_duplicates(['user_id', 'click_article_id', 'click_timestamp']).reset_index(drop=True)
return all_click


# 获取历史和最后一次点击
def get_hist_last_click(all_click):
all_click = all_click.sort_values(by=['user_id', 'click_timestamp'])
last_click_df = all_click.groupby('user_id').tail(1)

def hist_func(df):
if len(df) == 1:
return df
else:
return df[:-1]
hist_click_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True)

return last_click_df, hist_click_df


# 用户 - 文章,点击
# {user1:[(item,time),(item,time)], ..., user2:[...]}
def get_user_item_time(click_df):
click_df = click_df.sort_values('click_timestamp')
def make_item_time_list(df):
return list(zip(df['click_article_id'], df['click_timestamp']))
click_df = click_df.groupby('user_id')[['click_article_id', 'click_timestamp']] \
.apply(make_item_time_list) \
.reset_index() \
.rename(columns={0:'item_time_list'})
user_item_time_dict = dict(zip(click_df['user_id'], click_df['item_time_list']))
return user_item_time_dict


# 文章 - 用户,点击
# {item1: [(user1, time), (user2, time)], ... , item2:[...]}
def get_item_user_time(click_df):
click_df = click_df.sort_values('click_timestamp')
def make_user_time_list(df):
return list(zip(df['user_id'], df['click_timestamp']))
item_user_df = click_df.groupby('click_article_id')[['user_id','click_timestamp']] \
.apply(make_user_time_list) \
.reset_index() \
.rename(columns={0:'user_time_list'})
item_user_time_dict = dict(zip(item_user_df['click_article_id'], item_user_df['user_time_list']))
return item_user_time_dict


# 最热门的K个新闻(返回新闻id)
def get_item_topk_click(click_df, k):
return click_df['click_article_id'].value_counts().index[:k]


# 获取文章信息
def get_item_info_df(data_path):
item_info_df = pd.read_csv(data_path / 'articles.csv')
item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'})
return item_info_df


# 文章属性字典
def get_item_info_dict(item_info_df):
max_min_scaler = lambda x: (x-np.min(x)) / (np.max(x)-np.min(x))
item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(max_min_scaler)

item_creat_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts']))
item_category_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id']))
item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count']))

return item_creat_dict, item_category_dict, item_words_dict

# 文章embedding_df
def get_item_emb_df(data_path):
item_emb_df = pd.read_csv(data_path / 'articles_emb.csv')
return item_emb_df

# 获取文章的 embedding 字典
def get_item_emb_dict(data_path, save_path):
item_emb_df = pd.read_csv(data_path / 'articles_emb.csv')
cols = [x for x in item_emb_df.columns if 'emb' in x]
emb_np = np.ascontiguousarray(item_emb_df[cols])

# L2归一化:每个 item 的 embedding 向量做 L2 归一化,使其模长为 1
# 从而在后续计算中可以用点积直接表示余弦相似度,常用于向量召回和内容推荐。
emb_np = emb_np / np.linalg.norm(emb_np, axis=1, keepdims=True)

emb_dict = dict(zip(item_emb_df['article_id'], emb_np))
pickle.dump(emb_dict, open(save_path / "item_content_emb.pkl", 'wb'))
return emb_dict

# 用户-历史文章信息 字典
def get_user_hist_item_info_dict(all_click):
# 历史文章id
item_ids = all_click.groupby('user_id')['click_article_id'].agg(set).reset_index()
item_ids_dict = dict(zip(item_ids['user_id'], item_ids['click_article_id']))

# 历史文章类型
item_cats = all_click.groupby('user_id')['category_id'].agg(set).reset_index()
item_cats_dict = dict(zip(item_cats['user_id'], item_cats['category_id']))

# 历史文章平均字数
item_words = all_click.groupby('user_id')['words_count'].agg('mean').reset_index()
item_words_dict = dict(zip(item_words['user_id'], item_words['words_count']))

# 历史文章最近相对新鲜度
all_click = all_click.sort_values('click_timestamp')
item_last_time = (
all_click.groupby('user_id')
.tail(1)[['user_id', 'created_at_ts']]
.reset_index(drop=True)
)
max_min_scaler = lambda x: (x-np.min(x)) / (np.max(x)-np.min(x))
item_last_time['created_at_ts'] = item_last_time[['created_at_ts']].apply(max_min_scaler)
item_last_time_dict = dict(zip(item_last_time['user_id'], item_last_time['created_at_ts']))

return item_ids_dict, item_cats_dict, item_words_dict, item_last_time_dict


# 用户相对活跃度
def get_user_active_degree_dict(all_click_df):
all_click_df = all_click_df.groupby('user_id')['click_article_id'].count().reset_index()
mm = MinMaxScaler()
all_click_df['active_degree'] = mm.fit_transform(all_click_df[['click_article_id']])
active_dict = dict(zip(all_click_df['user_id'], all_click_df['active_degree']))

return active_dict


# 向量检索相似度计算
# 对于每一篇文章, 基于embedding的相似性返回topk个与其最相似的文章, 只不过由于文章数量太多,这里用了faiss进行加速
def embedding_sim(click_df, item_emb_df, save_path, topk):
# 保存原始索引
item_idx2rawid_dict = dict(zip(item_emb_df.index, item_emb_df['article_id']))

# 准备纯数值,并单位化
cols = [x for x in item_emb_df.columns if 'emb' in x]
item_emb_np = np.ascontiguousarray(
item_emb_df[cols],
dtype=np.float32 # faiss必须是float32
)
item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)

# faiss索引:建立标准、添加数据、搜索topk个
print('faiss 正在处理(等待时间较长)...')
item_index = faiss.IndexFlatIP(item_emb_np.shape[1]) # 使用InnerProduct内积计算相似度,数据维度数为250维
item_index.add(item_emb_np)
sim_list, idx_list = item_index.search(item_emb_np, topk+1) # 由于第一个会是自己,因此要找topk+1个出来
print('faiss 处理完毕.')

# 建立成为i2i矩阵
Sim = defaultdict(lambda: defaultdict(float))
for i, sim_l, idx_l in tqdm(zip(range(len(item_emb_np)), sim_list, idx_list), desc='embedding_sim'):
target_rawid = item_idx2rawid_dict[i]
for sim, idx in zip(sim_l[1:], idx_l[1:]): # 第一个是自己,去掉
related_rawid = item_idx2rawid_dict[idx]
Sim[target_rawid][related_rawid] += sim

i2i_sim = dict(Sim)
save_name = save_path / 'emb_i2i_sim.pkl'
pickle.dump(i2i_sim, open(save_name, 'wb'))
print(f'content-based相似度矩阵保存:{save_name}')

return i2i_sim



def combine_recall_results(user_multi_recall_dict, weight_dict=None, topk=25):
# 对每一种召回结果按照用户进行归一化,方便后面多种召回结果,相同用户的物品之间权重相加
def norm_user_recall_items_sim(sorted_item_list):
# 如果冷启动中没有文章或者只有一篇文章,直接返回,出现这种情况的原因可能是冷启动召回的文章数量太少了,
# 基于规则筛选之后就没有文章了, 这里还可以做一些其他的策略性的筛选
if len(sorted_item_list) < 2:
return sorted_item_list

min_sim = sorted_item_list[-1][1]
max_sim = sorted_item_list[0][1]

norm_sorted_item_list = []
for item, score in sorted_item_list:
if max_sim > 0:
norm_score = 1.0 * (score - min_sim) / (max_sim - min_sim) if max_sim > min_sim else 1.0
else:
norm_score = 0.0
norm_sorted_item_list.append((item, norm_score))

return norm_sorted_item_list

print('多路召回合并...')
final_recall_items_dict = {}
for method, user_recall_items in tqdm(user_multi_recall_dict.items(), desc='Combine results'):
print(method + '...')
# 在计算最终召回结果的时候,也可以为每一种召回结果设置一个权重
if weight_dict == None:
recall_method_weight = 1
else:
recall_method_weight = weight_dict[method]

for user_id, sorted_item_list in user_recall_items.items(): # 进行归一化
user_recall_items[user_id] = norm_user_recall_items_sim(sorted_item_list)

for user_id, sorted_item_list in user_recall_items.items():
final_recall_items_dict.setdefault(user_id, {})
for item, score in sorted_item_list:
final_recall_items_dict[user_id].setdefault(item, 0)
final_recall_items_dict[user_id][item] += recall_method_weight * score

final_recall_items_dict_rank = {}
# 多路召回时也可以控制最终的召回数量
for user, recall_item_dict in final_recall_items_dict.items():
final_recall_items_dict_rank[user] = sorted(recall_item_dict.items(), key=lambda x: x[1], reverse=True)[:topk]

return final_recall_items_dict_rank

model_itemcf.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import pickle
import math
import numpy as np
from tqdm import tqdm
from datetime import datetime
from collections import defaultdict
from utils import get_user_item_time


# itemcf 相似度矩阵计算
# Sim = 共现权重W(i,j) / sqrt(Ni用户数, Nj用户数)
# 其中共现权重不是直接次数,而是这个用户是否活跃 1/log(活跃数+1)是权值
# 关联规则: 1. 用户点击的时间权重 2. 用户点击的顺序权重 3. 文章创建的时间权重
def itemcf_sim(df, save_path, item_creat_dict=None):
user_item_time_dict = get_user_item_time(df)

# 计算共现权重W、每个物品的用户数Ni
Wij = defaultdict(lambda: defaultdict(float))
Ni = defaultdict(int)
for user, item_list in tqdm(user_item_time_dict.items(), desc='ItemCF_sim'):
len_w = math.log(len(item_list)+1)
for iloc, (i, i_time) in enumerate(item_list):
Ni[i] += 1
for jloc, (j, j_time) in enumerate(item_list):
if i == j:
continue
dir_w = 1.0 if jloc>iloc else 0.7
loc_w = dir_w * (0.9 ** (np.abs(jloc-iloc)-1))
t_click_w = np.exp(0.7 ** np.abs(j_time-i_time))
if item_creat_dict:
t_creat_w = np.exp(0.8 ** np.abs(item_creat_dict[j]-item_creat_dict[i]))
else:
t_creat_w = 1.0
Wij[i][j] += loc_w * t_click_w * t_creat_w * 1.0 / len_w

# 计算 Sim
Sim = defaultdict(lambda: defaultdict(float))
for i, wj in Wij.items():
for j, w in wj.items():
Sim[i][j] = w / math.sqrt(Ni[i] * Ni[j])

# 转为普通字典
i2i_Sim = dict(Sim)
# 存储相似度矩阵
save_name = save_path / 'itemcf_i2i_sim.pkl'
pickle.dump(i2i_Sim, open(save_name, 'wb'))
print(f"itemcf相似度矩阵保存:{save_name}")
return i2i_Sim

# itemcf 召回
def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click,
item_creat_dict=None, emb_i2i_sim=None):
user_history = user_item_time_dict[user_id]
# user_history = [item for item, _ in user_history]

item_rank = defaultdict(float)
# 每个历史物品都找到 k 个相似物品(此处k不是最终数量)
for iloc, (i, click_time) in enumerate(user_history):
if i not in i2i_sim: # 没有相似的物品,不在相似度矩阵中
continue
items = sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]

for j, w in items:
if j in user_history:
continue
# 文章创建时间差权重
creat_w = np.exp(0.8 ** np.abs(item_creat_dict[i] - item_creat_dict[j])) \
if item_creat_dict else 1.0
# 相似文章和历史点击文章序列中历史文章所在的位置权重
loc_w = (0.9 ** (len(user_history) - iloc))
# 文章内容相似度权重
content_w = 1.0
if emb_i2i_sim:
if emb_i2i_sim.get(i, {}).get(j, None) is not None:
content_w += emb_i2i_sim[i][j]
if emb_i2i_sim.get(j, {}).get(i, None) is not None:
content_w += emb_i2i_sim[j][i]

item_rank[j] += w * creat_w * loc_w * content_w

# 数量不足则用热门补全
if len(item_rank) < recall_item_num:
for idx, i in enumerate(item_topk_click):
if i in item_rank:
continue
item_rank[i] = -idx-100
if len(item_rank) == recall_item_num:
break

# 排序输出最终所需的K个(这里是最终的召回K)
res = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
return res

# 执行 itemcf 召回
def itemcf_recall(hist_click_df, item_topk_click, i2i_sim, save_path, item_creat_dict, emb_i2i_sim, sim_item_topk=20, recall_item_num=10):
user_recall_items_dict = defaultdict(dict)
user_item_time_dict = get_user_item_time(hist_click_df)

for user in tqdm(hist_click_df['user_id'].unique(), desc='ItemCF i2i recall'):
user_recall_items_dict[user] = item_based_recommend(
user, user_item_time_dict, i2i_sim, \
sim_item_topk, recall_item_num, item_topk_click, \
item_creat_dict, emb_i2i_sim
)

return user_recall_items_dict



def cold_start_prepare(hist_click_df, i2i_sim, item_topk_click, save_path, item_creat_dict, emb_i2i_sim):
# 先进行itemcf召回,这里不需要做召回评估,这里只是一种策略
sim_item_topk = 150
recall_item_num = 100 # 稍微召回多一点文章,便于后续的规则筛选
user_recall_items_dict = itemcf_recall(hist_click_df, item_topk_click, i2i_sim, save_path, \
item_creat_dict, emb_i2i_sim,\
sim_item_topk, recall_item_num)
return user_recall_items_dict


# 基于规则进行文章过滤
# 保留文章主题与用户历史浏览主题相似的文章
# 保留文章字数与用户历史浏览文章字数相差不大的文章
# 保留最后一次点击当天的文章
# 按照相似度返回最终的结果
def cold_start_recall(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \
user_last_item_created_time_dict, item_type_dict, item_words_dict,
item_created_time_dict, click_article_ids_set, recall_item_num):
"""
冷启动的情况下召回一些文章
:param user_recall_items_dict: 基于内容embedding相似性召回来的很多文章, 字典, {user1: [item1, item2, ..], }
:param user_hist_item_typs_dict: 字典, 用户点击的文章的主题映射
:param user_hist_item_words_dict: 字典, 用户点击的历史文章的字数映射
:param user_last_item_created_time_idct: 字典,用户点击的历史文章创建时间映射
:param item_tpye_idct: 字典,文章主题映射
:param item_words_dict: 字典,文章字数映射
:param item_created_time_dict: 字典, 文章创建时间映射
:param click_article_ids_set: 集合,用户点击过得文章, 也就是日志里面出现过的文章
:param recall_item_num: 召回文章的数量, 这个指的是没有出现在日志里面的文章数量
"""
cold_start_user_items_dict = {}
for user, item_list in tqdm(user_recall_items_dict.items(), desc='Cold start recall'):
cold_start_user_items_dict.setdefault(user, [])
for item, score in item_list:
# 获取历史文章信息
hist_item_type_set = user_hist_item_typs_dict[user]
hist_mean_words = user_hist_item_words_dict[user]
hist_last_item_created_time = user_last_item_created_time_dict[user]
hist_last_item_created_time = datetime.fromtimestamp(hist_last_item_created_time)

# 获取当前召回文章的信息
curr_item_type = item_type_dict[item]
curr_item_words = item_words_dict[item]
curr_item_created_time = item_created_time_dict[item]
curr_item_created_time = datetime.fromtimestamp(curr_item_created_time)

# 删除:文章不能出现在用户的历史点击中, 然后根据文章主题,文章单词数,文章创建时间进行筛选
if item in click_article_ids_set or \
curr_item_type not in hist_item_type_set or \
abs(curr_item_words - hist_mean_words) > 200 or \
abs((curr_item_created_time - hist_last_item_created_time).days) > 90:
continue

cold_start_user_items_dict[user].append((item, score)) # {user1: [(item1, score1), (item2, score2)..]...}

# 需要控制一下冷启动召回的数量
cold_start_user_items_dict = {k: sorted(v, key=lambda x:x[1], reverse=True)[:recall_item_num] \
for k, v in cold_start_user_items_dict.items()}

return cold_start_user_items_dict

model_usercf.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import math, pickle, faiss
import numpy as np
from tqdm import tqdm
from collections import defaultdict
from utils import get_item_user_time, get_user_item_time


# 基于用户的协同过滤 UserCF 计算相似度
def usercf_sim(all_click_df, save_path, user_active_dict):
item_user_time_dict = get_item_user_time(all_click_df)

Wuv = defaultdict(lambda: defaultdict(float))
Nu = defaultdict(int)
for item, user_time in tqdm(item_user_time_dict.items(), desc='UserCF_sim'):
len_w = len(user_time)
for u, ut in user_time:
Nu[u] += 1
for v, vt in user_time:
if u == v:
continue
active_w = 100 * 0.5 * (user_active_dict[u] + user_active_dict[v])
Wuv[u][v] += active_w * 1.0 / len_w

Sim = defaultdict(lambda: defaultdict(float))
for u, wv in Wuv.items():
for v, w in wv.items():
Sim[u][v] = w / math.sqrt(Nu[u] * Nu[v])

u2u_sim = dict(Sim)
save_name = save_path / 'usercf_u2u_sim.pkl'
pickle.dump(u2u_sim, open(save_name, 'wb'))
print(f"usercf相似度矩阵保存:{save_name}")

return u2u_sim

# 基于用户的召回 u2u2i
# 第一步的 u2u 看 UserCF_sim
# 第二步的 u2i 看这个相似用户u看的完整中有没有 user_hist 中的相似物品(如何判断物品相似:看emb_sim)
def user_based_recommend(user_id, user_item_time_dict, u2u_sim, sim_user_topk, recall_item_num,
item_topk_click, item_created_time_dict, emb_i2i_sim):
# 历史交互
user_item_time_list = user_item_time_dict[user_id] # {item1: time1, item2: time2...}
user_hist_items = set([i for i, t in user_item_time_list]) # 存在一个用户与某篇文章的多次交互, 这里得去重

items_rank = {}
if user_id in u2u_sim:
for sim_u, wuv in sorted(u2u_sim[user_id].items(), key=lambda x: x[1], reverse=True)[:sim_user_topk]:
for i, click_time in user_item_time_dict[sim_u]:
if i in user_hist_items:
continue
items_rank.setdefault(i, 0)

loc_weight = 1.0
content_weight = 1.0
created_time_weight = 1.0

# 当前文章与该用户看的历史文章进行一个权重交互
for loc, (j, click_time) in enumerate(user_item_time_list):
# 点击时的相对位置权重
loc_weight += 0.9 ** (len(user_item_time_list) - loc)
# 内容相似性权重
if emb_i2i_sim.get(i, {}).get(j, None) is not None:
content_weight += emb_i2i_sim[i][j]
if emb_i2i_sim.get(j, {}).get(i, None) is not None:
content_weight += emb_i2i_sim[j][i]

# 创建时间差权重
created_time_weight += np.exp(0.8 * np.abs(item_created_time_dict[i] - item_created_time_dict[j]))

items_rank[i] += loc_weight * content_weight * created_time_weight * wuv

# 热度补全
if len(items_rank) < recall_item_num:
for i, item in enumerate(item_topk_click):
if item in items_rank.items(): # 填充的item应该不在原来的列表中
continue
items_rank[item] = - i - 100 # 随便给个复数就行
if len(items_rank) == recall_item_num:
break

items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]

return items_rank


# 使用Embedding的方式获取u2u的相似性矩阵
# topk指的是每个user, faiss搜索后返回最相似的topk个user
def u2u_embdding_sim(click_df, user_emb_dict, save_path, topk):

user_list = []
user_emb_list = []
for user_id, user_emb in user_emb_dict.items():
user_list.append(user_id)
user_emb_list.append(user_emb)

user_index_2_rawid_dict = {k: v for k, v in zip(range(len(user_list)), user_list)}

user_emb_np = np.array(user_emb_list, dtype=np.float32)

# 建立faiss索引
user_index = faiss.IndexFlatIP(user_emb_np.shape[1])
user_index.add(user_emb_np)
# 相似度查询,给每个索引位置上的向量返回topk个item以及相似度
sim, idx = user_index.search(user_emb_np, topk) # 返回的是列表

# 将向量检索的结果保存成原始id的对应关系
user_sim_dict = defaultdict(dict)
for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(user_emb_np)), sim, idx), desc='UserCF u2u_sim'):
target_raw_id = user_index_2_rawid_dict[target_idx]
# 从1开始是为了去掉商品本身, 所以最终获得的相似商品只有topk-1
for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
rele_raw_id = user_index_2_rawid_dict[rele_idx]
user_sim_dict[target_raw_id][rele_raw_id] = user_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value

# 保存i2i相似度矩阵
pickle.dump(user_sim_dict, open(save_path / 'youtube_u2u_sim.pkl', 'wb'))
return user_sim_dict

def usercf_recall(hist_click_df, item_topk_click, u2u_sim, item_creat_dict, emb_i2i_sim, sim_user_topk=20, recall_item_num=10):
user_item_time_dict = get_user_item_time(hist_click_df)
user_recall_items_dict = defaultdict(dict)
for user in tqdm(hist_click_df['user_id'].unique(), desc='UserCF u2u2i recall'):
user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, \
sim_user_topk, recall_item_num, item_topk_click, \
item_creat_dict, emb_i2i_sim)

return user_recall_items_dict



def usercf_recall_YoutubeUser(hist_click_df, item_topk_click, u2u_sim, item_creat_dict, emb_i2i_sim, save_path, sim_user_topk=20,recall_item_num=10):
# UserCF emb u2u2i
# 读取YoutubeDNN过程中产生的user embedding, 然后使用faiss计算用户之间的相似度
# 这里需要注意,这里得到的user embedding其实并不是很好,因为YoutubeDNN中使用的是用户点击序列来训练的user embedding,
# 如果序列普遍都比较短的话,其实效果并不是很好
user_recall_items_dict = defaultdict(dict)
user_item_time_dict = get_user_item_time(hist_click_df)
for user in tqdm(hist_click_df['user_id'].unique(), desc='UserCF(YoutubeUser) recall'):
user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \
recall_item_num, item_topk_click, item_creat_dict, emb_i2i_sim)

return user_recall_items_dict

model_youtubednn.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import random, pickle, faiss
import numpy as np
from tqdm import tqdm
from collections import defaultdict
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.preprocessing.sequence import pad_sequences
from feature import FeatureColumn
from trainer import train_model

# 生成所需数据集
# negsample 滑动窗口构建样本时负样本的数量(对于每个正样本选几个负样本)
def gen_data_set(click_df, negsample=0):
click_df = click_df.sort_values('click_timestamp')
item_ids = click_df['click_article_id'].unique()

train_set = []
test_set = []
for uid, hist in tqdm(click_df.groupby('user_id'), desc='gen_data_set'):
pos_list = hist['click_article_id'].tolist()

if negsample > 0:
candidate_set = list(set(item_ids) - set(pos_list))
neg_list = np.random.choice(candidate_set, size=len(pos_list)*negsample, replace=True) # 有可能不够,因此允许重复拿

# 滑窗构造正负样本
# 格式 ( user_id, his_item, pos_item, label, len(his_item) )
for i in range(1, len(pos_list)):
ihist = pos_list[:i]
ihist = ihist[::-1] # 倒序,最近的在最前面
# 最长的做测试,其余的做训练
if i == len(pos_list)-1:
test_set.append((uid, ihist, pos_list[i], 1, i))
else:
train_set.append((uid, ihist, pos_list[i], 1, i))
for negi in range(negsample):
train_set.append((uid, ihist, neg_list[i*negsample+negi], 0, i))

# 对于只有一个数据的,需要数据泄露一下,否则直接在train_set中丢失了
if len(pos_list) == 1:
train_set.append((uid, pos_list, pos_list[0], 1, 1))
test_set.append((uid, pos_list, pos_list[0], 1, 1))

random.shuffle(train_set)
random.shuffle(test_set)

return train_set, test_set

# 生成模型输入,保证数据规范
def gen_model_input(train_set, seq_maxlen):
# 格式 ( user_id, his_item, pos_item, label, len(his_item) )
uid = np.array([x[0] for x in train_set])
hist = [x[1] for x in train_set]
pos_item = np.array([x[2] for x in train_set])
label = np.array([x[3] for x in train_set])
hlen = np.array([x[4] for x in train_set])

hist = pad_sequences(
hist,
maxlen=seq_maxlen,
padding='post',
truncating='post',
value=0
)

train_model_input = {
"user_id": uid,
"click_article_id": pos_item,
"hist_article_id": hist,
"hist_len": hlen}

return train_model_input, label


# YoutubeDNN 召回
def YoutubeDNN_u2i_dict(click_df, save_path, topk=20):
'''
YoutubeDNN 双塔模型召回
- 训练完成后提取 user/item embedding,使用FAISS基于内积做 topk 召回
- 返回:{user1:[(item1, score1),(item2, score2)], ...}
'''
# 内联配置
SEQ_LEN = 30
emb_dim = 16
neg_sample = 20
dnn_units = [32]
label_name = 'click_article_id'

# 类别编码与回退映射
df = click_df.copy()
user_rawid = df[['user_id']].drop_duplicates('user_id')
item_rawid = df[['click_article_id']].drop_duplicates('click_article_id')

encoders = {}
feature_maxidx = {}
for col in ['user_id', 'click_article_id']:
lbe = LabelEncoder()
df[col] = lbe.fit_transform(df[col])
encoders[col] = lbe
feature_maxidx[col] = int(df[col].max())+1

user_idx = df[['user_id']].drop_duplicates('user_id')
item_idx = df[['click_article_id']].drop_duplicates('click_article_id')
user_idx2rawid = dict(zip(user_idx['user_id'], user_rawid['user_id']))
item_idx2rawid = dict(zip(item_idx['click_article_id'], item_rawid['click_article_id']))

# 构建数据集
train_set, test_set = gen_data_set(df, negsample=0) # 这里全用的正样本,没有负采样
train_model_input, train_label = gen_model_input(train_set, SEQ_LEN)
test_model_input, test_label = gen_model_input(test_set, SEQ_LEN)

# 只用必要的数据
input_keys = ['user_id', 'click_article_id', 'hist_article_id']
train_X = {k: train_model_input[k] for k in input_keys}
test_X = {k: test_model_input[k] for k in input_keys}

# 手动定义特征列(不依赖外部数据字典)
feature_columns = [
FeatureColumn(name='user_id', group=['user_dnn'],
type='sparse', vocab_size=feature_maxidx['user_id'], emb_dim=emb_dim),

FeatureColumn(name='click_article_id', group=['target_item'],
type='sparse', vocab_size=feature_maxidx['click_article_id'], emb_dim=emb_dim),

FeatureColumn(name='hist_article_id', emb_name='click_article_id', group=['raw_hist_seq'],
type='varlen_sparse', max_len=SEQ_LEN, combiner='mean', emb_dim=emb_dim,
vocab_size=feature_maxidx['click_article_id']),
]

# 组装 processed_data
processed_data = {
'train': {
'features': train_X,
'labels': None # 由 positive_sampling_labels 规则内部替换为全 1
},
'test': {
'features': test_X,
'labels': None,
'eval_data': {}
},
'all_items': {
'click_article_id': np.arange(feature_maxidx['click_article_id'], dtype=np.int32)
},
'feature_dict': {
'user_id': feature_maxidx['user_id'],
'click_article_id': feature_maxidx['click_article_id']
}
}

# 训练配置(内联)
training_config = {
'build_function': 'funrec.models.youtubednn.build_youtubednn_model',
'data_preprocessing': [
{'type': 'positive_sampling_labels'}
],
'model_params': {
'emb_dim': emb_dim,
'neg_sample': neg_sample,
'dnn_units': dnn_units,
'label_name': label_name
},
'optimizer': 'adam',
'optimizer_params': {
'learning_rate': 1e-4
},
'loss': 'sampledsoftmaxloss',
'batch_size': 128,
'epochs': 5,
'verbose': 0
}

# 训练模型(返回 main_model, user_model, item_model)
model, user_model, item_model = train_model(training_config, feature_columns, processed_data)

# 提取 user/item 的 embedding
user_inputs_for_pred = {k: test_X[k] for k in ['user_id', 'hist_article_id']}
user_embs = user_model.predict(user_inputs_for_pred, batch_size=2 ** 12, verbose=0)
item_embs = item_model.predict(processed_data['all_items'], batch_size=2 ** 12, verbose=0)
# 归一化(与现有逻辑一致)
user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True)
item_embs = item_embs / np.linalg.norm(item_embs, axis=1, keepdims=True)

# 保存 embedding(与现有逻辑一致,注意 id 回退)
raw_user_id_emb_dict = {user_idx2rawid[k]: v for k, v in zip(test_X['user_id'], user_embs)}
raw_item_id_emb_dict = {item_idx2rawid[k]: v for k, v in zip(processed_data['all_items']['click_article_id'], item_embs)}
save1 = save_path / 'user_youtube_emb.pkl'
save2 = save_path / 'item_youtube_emb.pkl'
pickle.dump(raw_user_id_emb_dict, open(save1, 'wb'))
pickle.dump(raw_item_id_emb_dict, open(save2, 'wb'))
print(f'YoutubeDNN训练后分离user向量矩阵保存:{save1}')
print(f'YoutubeDNN训练后分离item向量矩阵保存:{save2}')

# 使用 FAISS 做向量检索召回
print(f'FAISS开始处理...')
index = faiss.IndexFlatIP(emb_dim) # 直接内积就是相似度,因为前面归一化过了
index.add(item_embs.astype(np.float32))
sim_lists, idx_lists = index.search(np.ascontiguousarray(user_embs.astype(np.float32)), topk)
# 这里是把 item 在空间中保存好,用 user 去匹配最佳的 item 出来
print(f'FAISS处理完毕.')

user_recall_items_dict = defaultdict(lambda: defaultdict(float))
for target_idx, sim_l, item_l in tqdm(zip(test_X['user_id'], sim_lists, idx_lists), desc='YoutubeDNN_相似度矩阵'):
target_rawid = user_idx2rawid[int(target_idx)]
for item, sim in zip(item_l[1:], sim_l[1:]): # 去除本身
item_rawid = item_idx2rawid[int(item)]
user_recall_items_dict[target_rawid][item_rawid] += sim

# 排序并保存
user_recall_items_dict = {
k: sorted(v.items(), key=lambda x: x[1], reverse=True)
for k, v in user_recall_items_dict.items()
}

save_name = save_path / 'youtube_u2i_dict.pkl'
pickle.dump(user_recall_items_dict, open(save_name, 'wb'))
print(f'YoutubeDNN_u2i矩阵保存:{save_name}')

return user_recall_items_dict

preprocessor.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
"""
特征处理
"""

import logging, pickle
from typing import Dict, Any, List, Tuple

logger = logging.getLogger(__name__)

import numpy as np
import pandas as pd

from feature import FeatureColumn
from data_config import DATASET_CONFIG


def read_pkl_data(path):
with open(path, "rb") as f:
data_dict = pickle.load(f)
return data_dict


def apply_feature_transformations(
features_config: Dict[str, Any], data_dict: Dict[str, Any]
) -> Dict[str, Any]:
"""
根据配置规则进行特征处理

Args:
features_config: 特征配置,包含变换规则
data_dict: 数据字典,包含需要变换的数据

Returns:
变换后的数据字典
"""
transformations = features_config.get("feature_transformations", [])
if not transformations:
return data_dict

# 创建一个副本,避免修改原始数据
transformed_data = data_dict.copy()

for transform in transformations:
transform_type = transform.get("type")
source_features = transform.get("source_features", [])
target_suffix = transform.get("target_suffix", "")

if transform_type == "sequence_slice":
slice_start = transform.get("slice_start")
slice_end = transform.get("slice_end")

for source_feat in source_features:
if source_feat in transformed_data:
target_feat = source_feat + target_suffix
if slice_end is not None:
transformed_data[target_feat] = transformed_data[source_feat][
:, slice_start:slice_end
]
else:
transformed_data[target_feat] = transformed_data[source_feat][
:, slice_start:
]

elif transform_type == "sequence_copy":
for source_feat in source_features:
if source_feat in transformed_data:
target_feat = source_feat + target_suffix
transformed_data[target_feat] = transformed_data[source_feat].copy()

return transformed_data


def apply_training_preprocessing(
training_config: Dict[str, Any], train_features: Dict[str, Any], train_labels: Any
) -> Tuple[Dict[str, Any], Any]:
"""
训练数据预处理

Args:
training_config: 训练配置,包含预处理规则
train_features: 训练特征字典
train_labels: 训练标签

Returns:
Tuple of (processed_features, processed_labels)
"""
processed_features = train_features.copy()
processed_labels = train_labels

# 如果指定,采样训练数据
subsample_size = training_config.get("subsample_size")
if subsample_size is not None and subsample_size > 0:
# 获取第一个特征的采样数量
current_size = len(next(iter(processed_features.values())))
if current_size > subsample_size:
logger.debug(
f"Subsampling training data from {current_size} to {subsample_size} samples"
)
# 对所有特征进行采样
for key in processed_features:
processed_features[key] = processed_features[key][:subsample_size]
# 对标签进行采样
if isinstance(processed_labels, list):
processed_labels = [
labels[:subsample_size] for labels in processed_labels
]
else:
processed_labels = processed_labels[:subsample_size]

preprocessing_rules = training_config.get("data_preprocessing", [])
if not preprocessing_rules:
return processed_features, processed_labels

for rule in preprocessing_rules:
rule_type = rule.get("type")

if rule_type == "positive_sampling_labels":
# 将标签转换为全1,用于模型(YouTubeDNN, SDM, etc.)
processed_labels = np.ones_like(list(processed_features.values())[0])
elif rule_type == "eges_generate_walk_pairs":
# 构建EGES训练对,通过随机游走
num_walks = int(rule.get("num_walks", 10))
walk_length = int(rule.get("walk_length", 10))
window_size = int(rule.get("window_size", 5))
neg_samples = int(rule.get("neg_samples", 2))

# 导入EGES辅助函数
from ..models.eges import (
SimpleWalker,
get_graph_context_all_pairs,
generate_negative_samples,
)

# 需要从原始训练字典中获取输入
if "hist_movie_id" not in processed_features:
raise ValueError("EGES预处理需要'hist_movie_id'在训练特征中")

hist_movie_id = processed_features["hist_movie_id"]
hist_len = processed_features.get("hist_len", None)
# 构建会话列表
session_list = []
if hist_len is not None:
for seq, l in zip(hist_movie_id, hist_len):
seq = seq[-int(l) :] if isinstance(l, (int, np.integer)) else seq
session_list.append(
seq.tolist() if hasattr(seq, "tolist") else list(seq)
)
else:
for seq in hist_movie_id:
session_list.append(
(seq.tolist() if hasattr(seq, "tolist") else list(seq))
)

# 构建电影->genre映射
genre_map = {}
if "movie_id" in processed_features and "genres" in processed_features:
for m, g in zip(
processed_features["movie_id"], processed_features["genres"]
):
try:
genre_map[int(m)] = int(g)
except Exception:
continue

# 随机游走图
walker = SimpleWalker()
G, maps = walker.build_graph(session_list)
if G is None:
raise ValueError("EGES预处理无法从会话列表构建图")

walks = walker.generate_walks(
G, num_walks=num_walks, walk_length=walk_length
)
all_pairs = get_graph_context_all_pairs(walks, window_size=window_size)

# 正样本
pos_dict = {
"movie_id": np.array([int(x[0]) for x in all_pairs], dtype=np.int32),
"context_id": np.array([int(x[1]) for x in all_pairs], dtype=np.int32),
"genre_id": np.array(
[int(genre_map.get(int(x[0]), 0)) for x in all_pairs],
dtype=np.int32,
),
}

# 负样本
neg_dict = generate_negative_samples(pos_dict, num_negatives=neg_samples)

labels = np.concatenate(
[
np.ones(len(all_pairs), dtype=np.float32),
np.zeros(len(neg_dict["movie_id"]), dtype=np.float32),
]
)

new_features = {
"movie_id": np.concatenate(
[pos_dict["movie_id"], neg_dict["movie_id"]]
),
"context_id": np.concatenate(
[pos_dict["context_id"], neg_dict["context_id"]]
),
"genre_id": np.concatenate(
[pos_dict["genre_id"], neg_dict["genre_id"]]
),
}

# 打乱
idx = np.random.permutation(len(new_features["movie_id"]))
for k in new_features:
new_features[k] = new_features[k][idx]
labels = labels[idx]

processed_features = new_features
processed_labels = labels

return processed_features, processed_labels


def prepare_features(
features_config: Dict[str, Any],
train_data: Dict[str, Any],
test_data: Dict[str, Any],
) -> Tuple[List[FeatureColumn], Dict[str, Dict[str, Any]]]:
"""
准备特征列和处理后的数据

Args:
features_config: 特征配置字典
train_data: 训练数据字典
test_data: 测试数据字典

Returns:
Tuple of (feature_columns, processed_data)
"""

# 如果未定义特征,使用数据集配置中的特征字典
if not features_config or not features_config.get("features"):
feature_dict = None
dataset_name = features_config.get("dataset_name") if features_config else None
if dataset_name and dataset_name in DATASET_CONFIG:
dataset_config = DATASET_CONFIG[dataset_name]
try:
feature_dict = read_pkl_data(dataset_config["dict_path"])
except Exception:
feature_dict = None
processed = {"train": train_data, "test": test_data}
if feature_dict is not None:
processed["feature_dict"] = feature_dict
return [], processed

# 可选采样,用来加速特征准备
def _early_subsample(data: Dict[str, Any], size: int) -> Dict[str, Any]:
if not data or size is None or size <= 0:
return data
# 从第一个数组值确定数据长度
first_key = next((k for k, v in data.items() if hasattr(v, "__len__")), None)
if first_key is None:
return data
current_size = len(data[first_key])
if current_size <= size:
return data

sliced = {}
for k, v in data.items():
try:
sliced[k] = v[:size]
except Exception:
sliced[k] = v
return sliced

pre_sub_train = features_config.get(
"pre_subsample_size_train", features_config.get("pre_subsample_size")
)
pre_sub_test = features_config.get(
"pre_subsample_size_test", features_config.get("pre_subsample_size")
)
if pre_sub_train is not None:
train_data = _early_subsample(train_data, int(pre_sub_train))
if pre_sub_test is not None:
test_data = _early_subsample(test_data, int(pre_sub_test))

# 获取数据信息
dataset_name = features_config.get("dataset_name")
if not dataset_name:
raise ValueError("dataset_name must be specified in features config")

if dataset_name not in DATASET_CONFIG:
raise ValueError(f"Dataset {dataset_name} not found in DATASET_CONFIG")

# Load feature dictionary
# 加载特征字典
dataset_config = DATASET_CONFIG[dataset_name]
feature_dict = read_pkl_data(dataset_config["dict_path"])

# 获取embedding维度
emb_dim = features_config.get("emb_dim", 8)

# 创建特征列
feature_columns = []
feature_definitions = features_config.get("features", [])

# 获取序列最大长度
max_seq_len = features_config.get("max_seq_len", 50)

for feat_def in feature_definitions:
feature_name = feat_def["name"]

# 确定特征类型和参数
explicit_type = feat_def.get("type")

if feature_name == "hist_len":
# 处理hist_len 长度信息
fc = FeatureColumn(name=feature_name, emb_name=None, type="sparse")
elif explicit_type == "dense":
# 稠密特征
fc = FeatureColumn(
name=feature_name,
emb_name=None,
group=feat_def.get("group", []),
type="dense",
dimension=feat_def.get("dimension", 1),
max_len=feat_def.get("max_len", 1),
dtype=feat_def.get("dtype", "float32"),
)
elif explicit_type == "varlen_sparse" or (
feature_name.startswith("hist_") and explicit_type is None
):
# 序列特征
if feature_name == "timestamps":
fc = FeatureColumn(
name=feature_name,
emb_name=None,
emb_dim=1,
vocab_size=1,
type="varlen_sparse",
max_len=feat_def.get("max_len", max_seq_len),
)
else:
fc = FeatureColumn(
name=feature_name,
emb_name=feat_def.get("emb_name", feature_name),
emb_dim=emb_dim,
vocab_size=feature_dict[feat_def.get("emb_name", feature_name)],
group=feat_def.get("group", []),
type="varlen_sparse",
max_len=feat_def.get("max_len", max_seq_len),
combiner=feat_def.get("combiner", "mean"),
att_key_name=feat_def.get("att_key_name"),
)
else:
# 常规稀疏特征
emb_name = feat_def.get("emb_name", feature_name)

if feature_name == "timestamps":
fc = FeatureColumn(
name=feature_name,
emb_name=None,
emb_dim=1,
vocab_size=1,
type="varlen_sparse",
max_len=feat_def.get("max_len", 50),
)
else:
fc = FeatureColumn(
name=feature_name,
emb_name=emb_name,
emb_dim=emb_dim,
vocab_size=feature_dict[emb_name],
group=feat_def.get("group", []),
)

feature_columns.append(fc)

# 获取特征名称和任务名称
feature_name_list = [fc.name for fc in feature_columns]
task_name_list = features_config.get("task_names", ["label"])

# 特征变换
train_data_transformed = apply_feature_transformations(features_config, train_data)
test_data_transformed = apply_feature_transformations(features_config, test_data)

# DSIN会话特征构造
dsin_cfg = features_config.get("dsin_session", None)
if dsin_cfg:
sess_max_count = dsin_cfg.get("sess_max_count", 5)
sess_max_len = dsin_cfg.get("sess_max_len", 10)
session_feature_list = dsin_cfg.get("session_feature_list", ["video_id"])

def _create_session_features_from_session_id(
data_dict: Dict[str, Any],
) -> Dict[str, Any]:
# 构建DataFrame
df = pd.DataFrame(
{"user_id": data_dict["user_id"], "session_id": data_dict["session_id"]}
)
for feat in session_feature_list:
if feat in data_dict:
df[feat] = data_dict[feat]

# 按用户分组
user_sessions: Dict[Any, Dict[Any, Dict[str, list]]] = {}
for user_id in df["user_id"].unique():
user_df = df[df["user_id"] == user_id]
sessions: Dict[Any, Dict[str, list]] = {}
for sid in user_df["session_id"].unique():
s_df = user_df[user_df["session_id"] == sid]
sessions[sid] = {}
for feat in session_feature_list:
if feat in s_df.columns:
seq = s_df[feat].tolist()
if len(seq) > sess_max_len:
seq = seq[:sess_max_len]
elif len(seq) < sess_max_len:
seq = seq + [0] * (sess_max_len - len(seq))
sessions[sid][feat] = seq
user_sessions[user_id] = sessions

batch_size = len(data_dict["user_id"])
for sess_idx in range(sess_max_count):
for feat in session_feature_list:
key = f"sess_{sess_idx}_{feat}"
out = []
for i in range(batch_size):
uid = data_dict["user_id"][i]
user_sess = user_sessions.get(uid, {})
sids = list(user_sess.keys())
if sess_idx < len(sids):
sid = sids[sess_idx]
seq = user_sess[sid].get(feat, [0] * sess_max_len)
else:
seq = [0] * sess_max_len
out.append(seq)
data_dict[key] = np.array(out, dtype=np.int32)
return data_dict

# 应用到训练和测试数据
if "session_id" in train_data_transformed:
train_data_transformed = _create_session_features_from_session_id(
train_data_transformed
)
train_data_transformed.pop("session_id", None)
if "session_id" in test_data_transformed:
test_data_transformed = _create_session_features_from_session_id(
test_data_transformed
)
test_data_transformed.pop("session_id", None)

# 处理训练数据
train_sample_dict = {
k: v for k, v in train_data_transformed.items() if k in feature_name_list
}
train_label_list = [
v for k, v in train_data_transformed.items() if k in task_name_list
]

# 处理测试数据
test_sample_dict = {
k: v for k, v in test_data_transformed.items() if k in feature_name_list
}

# 保留测试标签和其他评估数据
test_labels = {
k: v for k, v in test_data_transformed.items() if k in task_name_list
}
test_eval_data = {
k: v
for k, v in test_data_transformed.items()
if k not in feature_name_list and k not in task_name_list
}

# 处理特定特征处理
if "movie_id" in train_sample_dict and "genres" in train_sample_dict:
movie_id_to_genre_id_dict = {
movie_id: genre_id
for movie_id, genre_id in zip(
train_sample_dict["movie_id"], train_sample_dict["genres"]
)
}
movie_id_to_genre_id_dict.update(
{
movie_id: genre_id
for movie_id, genre_id in zip(
test_sample_dict["movie_id"], test_sample_dict["genres"]
)
}
)

# 准备所有物品数据
all_item_model_input = {
"movie_id": np.array(list(range(feature_dict["movie_id"])))
}
all_item_model_input["genres"] = np.array(
[
movie_id_to_genre_id_dict.get(movie_id, 0)
for movie_id in all_item_model_input["movie_id"]
]
)
else:
all_item_model_input = None

processed_data = {
"train": {"features": train_sample_dict, "labels": train_label_list},
"test": {
"features": test_sample_dict,
"labels": test_labels,
"eval_data": test_eval_data,
},
"all_items": all_item_model_input,
"feature_dict": feature_dict,
}

return feature_columns, processed_data

trainer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""
模型训练
"""

import importlib
from typing import Dict, Any, List, Tuple, Union
import tensorflow as tf
import platform

from feature import FeatureColumn
from processor import apply_training_preprocessing


def train_model(
training_config: Dict[str, Any],
feature_columns: List[FeatureColumn],
processed_data: Dict[str, Any],
) -> Union[Tuple[tf.keras.Model, tf.keras.Model, tf.keras.Model], Any]:
"""
基于配置和处理后的数据训练模型。
参数:
training_config: 训练配置字典,包含以下内容:
- build_function: 模型构建函数的完整路径 (例如:'funrec.models.dssm.build_dssm_model')
- model_params: 模型特定参数
- classical_model: 布尔值,指示是否为经典模型 (默认: False)
- data_preprocessing: 要应用的预处理规则列表
- optimizer: 使用的优化器 (默认: 'adam')
- loss: 损失函数 (默认: ['binary_crossentropy'])
- metrics: 要跟踪的指标 (默认: ['binary_accuracy'])
- batch_size: 训练批次大小 (默认: 1024)
- epochs: 训练轮数 (默认: 1)
- validation_split: 验证集分割比例 (默认: 0.2)
- verbose: 训练详细程度 (默认: 1)
feature_columns: 特征列规范列表
processed_data: 处理后的数据字典,包含训练/测试特征和标签
返回:
对于神经网络模型: 元组 (main_model, user_model, item_model)
对于经典模型: 训练后的经典模型实例
"""
# 从配置中获取构建函数路径和参数
build_function_path = training_config.get(
"build_function", "funrec.models.dssm.build_dssm_model"
)
model_params = training_config.get("model_params", {})
is_classical = training_config.get("classical_model", False)
is_external_embedding = training_config.get("embedding_external", False)

# 解析模块和函数名
module_path, function_name = build_function_path.rsplit(".", 1)

# 动态导入并调用构建函数
module = importlib.import_module(module_path)
build_function = getattr(module, function_name)

if is_classical:
# 经典模型:直接构建和拟合
model = build_function(feature_columns, model_params)

# 经典模型:准备交互数据
# 经典模型期望用户-物品交互:[(user_id, item_id, label), ...]
train_interactions = []
# 当特征配置为空时,processed_data已经直接包含训练字典
train_features = (
processed_data["train"]["features"]
if "features" in processed_data["train"]
else processed_data["train"]
)
train_labels = processed_data["train"]["labels"]

# 从特征中提取用户和物品ID
# 假设第一个特征是user_id,第二个是item_id
user_ids = (
train_features[0]
if isinstance(train_features, list)
else train_features["user_id"]
)
item_ids = (
train_features[1]
if isinstance(train_features, list)
else train_features["item_id"]
)

# 转换为交互格式
for i in range(len(user_ids)):
train_interactions.append((user_ids[i], item_ids[i], train_labels[i]))

# 训练经典模型
model.fit(train_interactions)

# 返回模型和None作为用户和物品模型(经典模型)
return model, None, None

elif is_external_embedding:
# 外部嵌入模型(例如Item2Vec)从用户历史序列训练
# 使用自己的参数签名构建模型
model = build_function(model_params)

# 从处理后的数据准备训练序列
train_features = (
processed_data["train"]["features"]
if "features" in processed_data["train"]
else processed_data["train"]
)
# 支持多个潜在键
hist_key_candidates = [
"hist_movie_id_list",
"hist_movie_ids",
"hist_item_id_list",
"hist_item_ids",
]
hist_array = None
for key in hist_key_candidates:
if key in train_features:
hist_array = train_features[key]
break
if hist_array is None:
raise ValueError(
"外部嵌入训练需要训练特征中包含'hist_movie_id_list'(或兼容键)"
)

# 转换为token序列列表(过滤填充0)
try:
import numpy as np

if isinstance(hist_array, list):
train_sequences = [np.array(seq) for seq in hist_array]
else:
train_sequences = [hist_array[i] for i in range(len(hist_array))]
train_hist_sequences = [
seq[np.where(seq != 0)[0]].tolist() for seq in train_sequences
]
except Exception:
train_hist_sequences = []
for seq in hist_array:
try:
train_hist_sequences.append([token for token in seq if token != 0])
except Exception:
train_hist_sequences.append(list(seq))

model.fit(train_hist_sequences)

# 以统一元组形式返回
return model, None, None

else:
# 神经网络模型:原始训练流水线
model, user_model, item_model = build_function(feature_columns, model_params)

# 编译模型
optimizer_name = training_config.get("optimizer", "adam")
optimizer_params = training_config.get("optimizer_params", {})
loss = training_config.get("loss", ["binary_crossentropy"])
loss_weights = training_config.get("loss_weights", None)
metrics = training_config.get("metrics", ["binary_accuracy"])

# 处理自定义损失函数
if isinstance(loss, str) and loss == "sampledsoftmaxloss":
from loss import sampledsoftmaxloss

loss = sampledsoftmaxloss

# 处理优化器 - 对Apple Silicon使用legacy Adam
is_apple_silicon = platform.machine().lower() in ["arm64", "aarch64"]

if optimizer_name == "adam":
if is_apple_silicon:
optimizer = (
tf.keras.optimizers.legacy.Adam(**optimizer_params)
if optimizer_params
else tf.keras.optimizers.legacy.Adam()
)
else:
optimizer = (
tf.keras.optimizers.Adam(**optimizer_params)
if optimizer_params
else tf.keras.optimizers.Adam()
)
else:
optimizer = optimizer_name

# 编译时包含loss_weights如果提供的话
compile_kwargs = {"optimizer": optimizer, "loss": loss, "metrics": metrics}
if loss_weights is not None:
compile_kwargs["loss_weights"] = loss_weights
model.compile(**compile_kwargs)

# 获取训练参数
batch_size = training_config.get("batch_size", 1024)
epochs = training_config.get("epochs", 1)
validation_split = training_config.get("validation_split", 0.2)
verbose = training_config.get("verbose", 0)

# 基于配置应用训练特定的预处理
# 支持完全准备的字典和原始字典
if "features" in processed_data["train"]:
train_features = processed_data["train"]["features"]
train_labels = processed_data["train"].get("labels")
else:
train_features = processed_data["train"]
train_labels = (
processed_data["train"].get("labels")
if isinstance(processed_data["train"], dict)
else None
)
train_features, train_labels = apply_training_preprocessing(
training_config, train_features, train_labels
)

labels_for_fit = train_labels
if isinstance(labels_for_fit, list) and len(labels_for_fit) == 1:
labels_for_fit = labels_for_fit[0]

# 处理多输出模型:如果模型有多个输出但只有一个标签集,复制标签
if (
isinstance(loss, list)
and len(loss) > 1
and not isinstance(labels_for_fit, list)
):
# 对于PRS等模型:两个输出都使用相同的标签
labels_for_fit = [labels_for_fit] * len(loss)

history = model.fit(
train_features,
labels_for_fit,
batch_size=batch_size,
epochs=epochs,
verbose=verbose,
validation_split=validation_split,
)

return model, user_model, item_model

feature.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""
特征列
"""

from typing import Union, List
from dataclasses import dataclass, field
from tensorflow.keras.initializers import Initializer


@dataclass
class FeatureColumn:
"""特征列配置类,用于定义特征的属性和嵌入参数

Attributes:
name: 特征名称,唯一标识符
emb_name: 嵌入名称,可选值为字符串或字符串列表。默认与特征名称相同
emb_dim: 嵌入维度,默认为4
vocab_size: 词汇表大小,默认为1(表示未指定)
group: 特征所属组,例如线性组、DNN组、FM组等,默认为空列表
type: 特征类型,可选值为" sparse"(稀疏特征)、"dense"(稠密特征)、"varlen_sparse"(变长稀疏特征)
dimension: 稠密特征的维度,默认为1
trainable: 是否可训练,默认为True
max_len: 最大长度,默认为1
combiner: 合并方式,默认为"mean"
l2_reg: L2正则化系数,默认为0.0
initializer: 初始化器,默认为"uniform"
dtype: 数据类型,默认为"int32"
att_key_name: 注意力键名称,默认为None
"""

name: str
emb_name: Union[str, List[str]] = None # 嵌入名称,可以是单个字符串或多个字符串列表
emb_dim: int = 4 # 嵌入维度
vocab_size: int = 1 # 词汇表大小,1表示未指定
group: List[str] = field(
default_factory=list
) # 特征所属组,例如线性组、DNN组、FM组等
type: str = "sparse" # 特征类型,可选值为" sparse"、"dense"、"varlen_sparse"
dimension: int = 1 # 稠密特征的维度
trainable: bool = True # 是否可训练
max_len: int = 1 # 最大长度
combiner: str = "mean" # 变长特征聚合方式
l2_reg: float = 0.0 # L2正则化系数
initializer: Union[str, Initializer] = "uniform" # 初始化器
dtype: str = "int32" # 数据类型
att_key_name: str = (
None # 注意力键名称,当特征是变长特征且通过din方式进行聚合时使用
)

def __post_init__(self):
"""初始化后处理方法,用于设置默认值和基本验证"""
# 如果emb_name未指定,则默认与特征名称相同
if self.type in ["sparse", "varlen_sparse"] and self.emb_name is None:
self.emb_name = self.name

# 验证特征类型是否有效
valid_types = ["sparse", "dense", "varlen_sparse"]
if self.type not in valid_types:
raise ValueError(f"Invalid type: {self.type}. Must be one of {valid_types}")

# 验证combiner是否有效(仅对变长特征有意义)
if self.type == "varlen_sparse":
valid_combiners = [
"mean",
"din",
"mha",
"dien",
] # None表示不聚合, 返回原始序列特征
if self.combiner is not None:
for t in self.combiner.split(","):
if t not in valid_combiners:
raise ValueError(
f"combiner: {self.combiner}. 必须为 {valid_combiners} 中的一个"
)

# 序列din聚合时需要指定对应的key
if self.combiner is not None and "din" in self.combiner:
if self.att_key_name is None:
raise ValueError("att_key_name 不能为空,当combiner为'din'时")
if not isinstance(self.att_key_name, str):
raise ValueError("att_key_name 必须为字符串,当combiner为'din'时")

# 序列dien聚合时需要指定对应的key
if self.combiner is not None and "dien" in self.combiner:
if self.att_key_name is None:
raise ValueError("att_key_name 不能为空,当combiner为'dien'时")
if not isinstance(self.att_key_name, str):
raise ValueError("att_key_name 必须为字符串,当combiner为'dien'时")

loss.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import tensorflow as tf
from tensorflow.keras import backend as K


def contrastive_loss(y_true, y_pred, temperature=0.1):
"""
Batch内对比损失 (InfoNCE)。
参数:
y_true: 未使用,但为了与Keras兼容而保留
y_pred: 来自模型的余弦相似度分数
temperature: 温度参数控制分布的集中度
返回:
对比损失值
"""
batch_size = tf.shape(y_pred)[0]

# 通过温度参数缩放相似度
scaled_sim = y_pred / temperature

# 为正样本对创建掩码(对角线元素)
pos_mask = tf.eye(batch_size)

# 计算log softmax
log_softmax = scaled_sim - tf.math.log(
tf.reduce_sum(tf.exp(scaled_sim), axis=1, keepdims=True)
)

# 计算损失作为选择正样本的负对数似然
loss = -tf.reduce_sum(pos_mask * log_softmax, axis=1)

return tf.reduce_mean(loss)


def sampledsoftmaxloss(y_true, y_pred):
"""因为在模型构建的时候使用了sampled_softmax_loss,这里只需要计算mean就可以了"""
return K.mean(y_pred)


def sum_loss(y_true, y_pred):
return K.sum(y_pred)


def mean_loss(y_true, y_pred):
return K.mean(y_pred)

data_config.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""
数据配置
"""

import os
from utils import load_env_with_fallback

# 自动加载环境变量
load_env_with_fallback()

PROCESSED_DATA_PATH = os.getenv('FUNREC_PROCESSED_DATA_PATH')
if not PROCESSED_DATA_PATH:
raise ValueError("FUNREC_PROCESSED_DATA_PATH 未设置, 请在环境变量中设置")

DATASET_CONFIG = {
"ml_latest_small_youtubednn": {
"dataset_name": "ml_latest_small",
"links": PROCESSED_DATA_PATH + "/ml-latest-small/links.csv",
"movies": PROCESSED_DATA_PATH + "/ml-latest-small/movies.csv",
"ratings": PROCESSED_DATA_PATH + "/ml-latest-small/ratings.csv",
"tags": PROCESSED_DATA_PATH + "/ml-latest-small/tags.csv",
"dict_path": PROCESSED_DATA_PATH + "/feature_dict/ml_latest_small_youtubednn.pkl",
"raw_log_data": PROCESSED_DATA_PATH + "/feature_dict/ml_latest_small_youtubednn.csv",
"train_eval_sample_raw": PROCESSED_DATA_PATH + "/train_eval_sample_raw/ml_latest_small_youtubednn_raw.pkl",
"train_eval_sample_final": PROCESSED_DATA_PATH + "/train_eval_sample_final/ml_latest_small_youtubednn_final.pkl"
},
"e_commerce_rerank_data": {
"dataset_name": "e_commerce_rerank_data",
"dict_path": PROCESSED_DATA_PATH + "/feature_dict/rerank_feature_dict.pkl",
"train_eval_sample_final": PROCESSED_DATA_PATH + "/train_eval_sample_final/rerank_data.pkl"
},
"kuairand_data": {
"dataset_name": "kuairand_data",
"dict_path": PROCESSED_DATA_PATH + "/feature_dict/kuairand_feature_dict.pkl",
"train_sample_raw": PROCESSED_DATA_PATH + "/train_eval_sample_raw/kuairand_1k_train.csv",
"test_sample_raw": PROCESSED_DATA_PATH + "/train_eval_sample_raw/kuairand_1k_test.csv",
"train_eval_sample_final": PROCESSED_DATA_PATH + "/train_eval_sample_final/kuairand_train_eval.pkl"
},
"ml-1m_sasrec": {
"dataset_name": "ml-1m_sasrec",
"dict_path": PROCESSED_DATA_PATH + "/feature_dict/ml-1m_sequence_feature_dict.pkl",
"train_eval_sample_final": PROCESSED_DATA_PATH + "/train_eval_sample_final/ml-1m_sequence_data_dict.pkl"
},
"ml-1m_recall_data": {
"dataset_name": "ml-1m_recall_data",
"dict_path": PROCESSED_DATA_PATH + "/feature_dict/ml-1m_recall_feature_dict.pkl",
"train_eval_sample_final": PROCESSED_DATA_PATH + "/train_eval_sample_final/ml-1m_recall_train_eval.pkl"
},
"ml-1m_recall_pos_neg_data": {
"dataset_name": "ml-1m_recall_pos_neg_data",
"dict_path": PROCESSED_DATA_PATH + "/feature_dict/ml-1m_recall_pos_neg_feature_dict.pkl",
"train_eval_sample_final": PROCESSED_DATA_PATH + "/train_eval_sample_final/ml-1m_recall_pos_neg_train_eval.pkl"
},
"ml-1m_tiger": {
"dataset_name": "ml-1m_tiger",
"dense_feature_path": PROCESSED_DATA_PATH + "/dense_feature/ml-1m_dense_feature.pkl",
"dict_path": PROCESSED_DATA_PATH + "/feature_dict/ml-1m_sequence_feature_dict.pkl",
"train_eval_sample_final": PROCESSED_DATA_PATH + "/train_eval_sample_final/ml-1m_sequence_data_dict.pkl"
},
"ml_latest_small_classical": {
"dataset_name": "ml_latest_small_classical",
"dict_path": PROCESSED_DATA_PATH + "/feature_dict/ml_latest_small_classical.pkl",
"train_eval_sample_final": PROCESSED_DATA_PATH + "/train_eval_sample_final/ml_latest_small_classical.pkl"
}
}

运行输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
已加载 .env 文件: /Users/seymour/GitHub/LLM4Rec/FunRec/.env
[INFO] 14:28:47 Start...

[1] 采样数据...
[1] 新闻数据...
[2] 相似矩阵...
ItemCF_sim: 100%|███████████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 2356880.20it/s]
itemcf相似度矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/itemcf_i2i_sim.pkl
UserCF_sim: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 9622/9622 [00:06<00:00, 1533.21it/s]
usercf相似度矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/usercf_u2u_sim.pkl
faiss 正在处理(等待时间较长)...
faiss 处理完毕.
embedding_sim: 364047it [00:03, 101248.70it/s]
content-based相似度矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/emb_i2i_sim.pkl

[INFO] 1. YoutubeDNN recall ==============
gen_data_set: 100%|███████████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 91134.86it/s]
YoutubeDNN训练后分离user向量矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/user_youtube_emb.pkl
YoutubeDNN训练后分离item向量矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/item_youtube_emb.pkl
FAISS开始处理...
FAISS处理完毕.
YoutubeDNN_相似度矩阵: 20000it [00:00, 68662.67it/s]
YoutubeDNN_u2i矩阵保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/youtube_u2i_dict.pkl
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/YoutubeDNN_recall_12-21 14:30.pkl
Hit@5 2.72% (544/20000)
Hit@10 4.78% (955/20000)
Hit@15 6.61% (1322/20000)
Hit@20 7.82% (1563/20000)

[INFO] 2. ItemCF(itemcf) recall ===============
ItemCF i2i recall: 100%|█████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 398453.80it/s]
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/ItemCF(itemcf)_recall_12-21 14:30.pkl
Hit@5 5.58% (1117/20000)
Hit@10 10.18% (2036/20000)

[INFO] 3. ItemCF(emb) recall ===============
ItemCF i2i recall: 100%|██████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 62596.41it/s]
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/ItemCF(emb)_recall_12-21 14:30.pkl
Hit@5 1.24% (248/20000)
Hit@10 1.47% (293/20000)

[INFO] 4. UserCF(emb+usercf) recall ===============
UserCF u2u2i recall: 100%|█████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:02<00:00, 6715.24it/s]
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/UserCF(emb+usercf)_recall_12-21 14:31.pkl
Hit@5 12.99% (2599/20000)
Hit@10 17.82% (3564/20000)

[INFO] 5. UserCF(emb+youtubeuser) recall ===============
UserCF u2u_sim: 20000it [00:00, 99120.86it/s]
UserCF(YoutubeUser) recall: 100%|█████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 61500.24it/s]
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/UserCF(emb+youtubeuser)_recall_12-21 14:31.pkl
Hit@5 0.98% (196/20000)
Hit@10 4.23% (846/20000)

[INFO] 6. Coldstart recall ===============
ItemCF i2i recall: 100%|█████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 113369.68it/s]
Cold start recall: 100%|██████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:00<00:00, 27419.95it/s]
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/Coldstart_recall_12-21 14:31.pkl
Hit@5 0.00% (0/20000)
Hit@10 0.00% (0/20000)

[INFO] Combine recall results ===============
多路召回合并...
Combine results: 0%| | 0/6 [00:00<?, ?it/s]YoutubeDNN_recall...
Combine results: 17%|█████████████████▏ | 1/6 [00:00<00:00, 7.58it/s]ItemCF(itemcf)_recall...
ItemCF(emb)_recall...
Combine results: 50%|███████████████████████████████████████████████████▌ | 3/6 [00:00<00:00, 11.43it/s]UserCF(emb+usercf)_recall...
UserCF(emb+youtubeuser)_recall...
Combine results: 83%|█████████████████████████████████████████████████████████████████████████████████████▊ | 5/6 [00:00<00:00, 11.16it/s]Coldstart_recall...
Combine results: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████| 6/6 [00:00<00:00, 12.92it/s]
itemcf recall结果保存:/Users/seymour/GitHub/LLM4Rec/FunRec/dataset_processed/projects/news_recommendation/Final_recall_12-21 14:31.pkl
Hit@5 11.84% (2368/20000)
Hit@10 13.63% (2726/20000)
Hit@15 15.66% (3133/20000)
Hit@20 18.55% (3711/20000)
Hit@25 20.75% (4151/20000)
Hit@30 22.70% (4540/20000)

[INFO] 14:31:05 Finished. (2.30 mins)