diff --git a/python/orca/tutorial/pytorch/NCF/model.py b/python/orca/tutorial/pytorch/NCF/model.py new file mode 100644 index 00000000000..a2ba6e2195c --- /dev/null +++ b/python/orca/tutorial/pytorch/NCF/model.py @@ -0,0 +1,128 @@ +# +# Copyright 2016 The BigDL Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +# Most of the pytorch code is adapted from guoyang9's NCF implementation for +# ml-1m dataset. +# https://github.com/guoyang9/NCF +# + +import numpy as np +import torch +import torch.nn as nn + + +class NCF(nn.Module): + def __init__(self, user_num, item_num, factor_num, num_layers, + dropout, model, GMF_model=None, MLP_model=None): + super(NCF, self).__init__() + """ + user_num: number of users; + item_num: number of items; + factor_num: number of predictive factors; + num_layers: the number of layers in MLP model; + dropout: dropout rate between fully connected layers; + model: 'MLP', 'GMF', 'NeuMF-end', and 'NeuMF-pre'; + GMF_model: pre-trained GMF weights; + MLP_model: pre-trained MLP weights; + """ + self.dropout = dropout + self.model = model + self.GMF_model = GMF_model + self.MLP_model = MLP_model + + self.embed_user_GMF = nn.Embedding(user_num, factor_num) + self.embed_item_GMF = nn.Embedding(item_num, factor_num) + self.embed_user_MLP = nn.Embedding(user_num, + factor_num * (2 ** (num_layers - 1))) + self.embed_item_MLP = nn.Embedding(item_num, + factor_num * (2 ** (num_layers - 1))) + + MLP_modules = [] + for i in range(num_layers): + input_size = factor_num * (2 ** (num_layers - i)) + MLP_modules.append(nn.Dropout(p=self.dropout)) + MLP_modules.append(nn.Linear(input_size, input_size//2)) + MLP_modules.append(nn.ReLU()) + self.MLP_layers = nn.Sequential(*MLP_modules) + + if self.model in ['MLP', 'GMF']: + predict_size = factor_num + else: + predict_size = factor_num * 2 + output_modules = [] + output_modules.append(nn.Linear(predict_size, 1)) + output_modules.append(nn.Sigmoid()) + self.predict_layer = nn.Sequential(*output_modules) + + self._init_weight_() + + def _init_weight_(self): + """ We leave the weights initialization here. """ + if not self.model == 'NeuMF-pre': + nn.init.normal_(self.embed_user_GMF.weight, std=0.01) + nn.init.normal_(self.embed_user_MLP.weight, std=0.01) + nn.init.normal_(self.embed_item_GMF.weight, std=0.01) + nn.init.normal_(self.embed_item_MLP.weight, std=0.01) + + for m in self.MLP_layers: + if isinstance(m, nn.Linear): + nn.init.xavier_uniform_(m.weight) + + for m in self.modules(): + if isinstance(m, nn.Linear) and m.bias is not None: + m.bias.data.zero_() + else: + # embedding layers + self.embed_user_GMF.weight.data.copy_(self.GMF_model.embed_user_GMF.weight) + self.embed_item_GMF.weight.data.copy_(self.GMF_model.embed_item_GMF.weight) + self.embed_user_MLP.weight.data.copy_(self.MLP_model.embed_user_MLP.weight) + self.embed_item_MLP.weight.data.copy_(self.MLP_model.embed_item_MLP.weight) + + # mlp layers + for (m1, m2) in zip(self.MLP_layers, self.MLP_model.MLP_layers): + if isinstance(m1, nn.Linear) and isinstance(m2, nn.Linear): + m1.weight.data.copy_(m2.weight) + m1.bias.data.copy_(m2.bias) + + # predict layers + predict_weight = torch.cat([ + self.GMF_model.predict_layer.weight, + self.MLP_model.predict_layer.weight], dim=1) + precit_bias = self.GMF_model.predict_layer.bias + \ + self.MLP_model.predict_layer.bias + + self.predict_layer.weight.data.copy_(0.5 * predict_weight) + self.predict_layer.bias.data.copy_(0.5 * precit_bias) + + def forward(self, user, item): + if not self.model == 'MLP': + embed_user_GMF = self.embed_user_GMF(user) + embed_item_GMF = self.embed_item_GMF(item) + output_GMF = embed_user_GMF * embed_item_GMF + if not self.model == 'GMF': + embed_user_MLP = self.embed_user_MLP(user) + embed_item_MLP = self.embed_item_MLP(item) + interaction = torch.cat((embed_user_MLP, embed_item_MLP), -1) + output_MLP = self.MLP_layers(interaction) + + if self.model == 'GMF': + concat = output_GMF + elif self.model == 'MLP': + concat = output_MLP + else: + concat = torch.cat((output_GMF, output_MLP), -1) + + prediction = self.predict_layer(concat) + return prediction.view(-1) diff --git a/python/orca/tutorial/pytorch/NCF/train_data_loader.py b/python/orca/tutorial/pytorch/NCF/train_data_loader.py new file mode 100644 index 00000000000..649895f0d27 --- /dev/null +++ b/python/orca/tutorial/pytorch/NCF/train_data_loader.py @@ -0,0 +1,192 @@ +# +# Copyright 2016 The BigDL Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +# Most of the pytorch code is adapted from guoyang9's NCF implementation for +# ml-1m dataset. +# https://github.com/guoyang9/NCF +# + +import numpy as np +import pandas as pd +from bigdl.dllib.utils.log4Error import * + +# Step 1: Init Orca Context + +from bigdl.orca import init_orca_context, stop_orca_context +init_orca_context() + +# Step 2: Define Train Dataset + +from sklearn.model_selection import train_test_split +import torch.utils.data as data + + +class NCFData(data.Dataset): + def __init__(self, data): + self.data = data.values.tolist() + + def __len__(self): + return len(self.data) + + def __getitem__(self, idx): + user = int(self.data[idx][0]) + item = int(self.data[idx][1]) + label = float(self.data[idx][2]) + return user, item, label + + +def train_loader_func(config, batch_size): + data_X = pd.read_csv( + "ml-1m/ratings.dat", + sep="::", header=None, names=['user', 'item'], + usecols=[0, 1], dtype={0: np.int64, 1: np.int64}) + user_num = data_X['user'].max() + 1 + item_num = data_X['item'].max() + 1 + + features_ps = data_X.values.tolist() + + # load ratings as a dok matrix + import scipy.sparse as sp + train_mat = sp.dok_matrix((user_num, item_num), dtype=np.int64) + for x in features_ps: + train_mat[x[0], x[1]] = 1 + + # sample negative items for training datasets + np.random.seed(0) + features_ng = [] + for x in features_ps: + u = x[0] + for t in range(4): + j = np.random.randint(item_num) + while (u, j) in train_mat: + j = np.random.randint(item_num) + features_ng.append([u, j]) + features = features_ps + features_ng + labels_ps = [1 for _ in range(len(features_ps))] + labels_ng = [0 for _ in range(len(features_ng))] + labels = labels_ps + labels_ng + data_X = pd.DataFrame(features, columns=["user", "item"], dtype=np.int64) + data_X["label"] = labels + + # train test split + data_X = data_X.values.tolist() + train_data, _ = train_test_split(data_X, test_size=0.2, random_state=100) + train_data = pd.DataFrame(train_data, columns=["user", "item", "label"], dtype=np.int64) + train_data["label"] = train_data["label"].astype(np.float) + + train_dataset = NCFData(train_data) + train_loader = data.DataLoader(train_dataset, batch_size=batch_size, + shuffle=True, num_workers=0) + return train_loader + + +def test_loader_func(config, batch_size): + data_X = pd.read_csv( + "ml-1m/ratings.dat", + sep="::", header=None, names=['user', 'item'], + usecols=[0, 1], dtype={0: np.int64, 1: np.int64}) + user_num = data_X['user'].max() + 1 + item_num = data_X['item'].max() + 1 + + features_ps = data_X.values.tolist() + + # load ratings as a dok matrix + import scipy.sparse as sp + train_mat = sp.dok_matrix((user_num, item_num), dtype=np.int64) + for x in features_ps: + train_mat[x[0], x[1]] = 1 + + # sample negative items for training datasets + np.random.seed(0) + features_ng = [] + for x in features_ps: + u = x[0] + for t in range(4): + j = np.random.randint(item_num) + while (u, j) in train_mat: + j = np.random.randint(item_num) + features_ng.append([u, j]) + features = features_ps + features_ng + labels_ps = [1 for _ in range(len(features_ps))] + labels_ng = [0 for _ in range(len(features_ng))] + labels = labels_ps + labels_ng + data_X = pd.DataFrame(features, columns=["user", "item"], dtype=np.int64) + data_X["label"] = labels + + # train test split + data_X = data_X.values.tolist() + _, test_data = train_test_split(data_X, test_size=0.2, random_state=100) + test_data = pd.DataFrame(test_data, columns=["user", "item", "label"], dtype=np.int64) + test_data["label"] = test_data["label"].astype(np.float) + + test_dataset = NCFData(test_data) + test_loader = data.DataLoader(test_dataset, batch_size=batch_size, + shuffle=False, num_workers=0) + return test_loader + +# Step 3: Define the Model + +from model import NCF +import torch.nn as nn +import torch.optim as optim + + +def model_creator(config): + data_X = pd.read_csv( + "ml-1m/ratings.dat", + sep="::", header=None, names=['user', 'item'], + usecols=[0, 1], dtype={0: np.int64, 1: np.int64}) + user_num = data_X['user'].max() + 1 + item_num = data_X['item'].max() + 1 + + model = NCF(user_num, item_num, + factor_num=32, num_layers=3, dropout=0.0, model="NeuMF-end") + model.train() + return model + + +def optimizer_creator(model, config): + return optim.Adam(model.parameters(), lr=0.001) + +loss_function = nn.BCEWithLogitsLoss() + +# Step 4: Fit with Orca Estimator + +from bigdl.orca.learn.pytorch import Estimator +from bigdl.orca.learn.metrics import Accuracy, Precision, Recall + +# Create the estimator +backend = "ray" # "ray" or "spark" +est = Estimator.from_torch(model=model_creator, optimizer=optimizer_creator, + loss=loss_function, metrics=[Accuracy(), Precision(), Recall()], + backend=backend) + +# Fit the estimator +batch_size = 1024 +est.fit(data=train_loader_func, epochs=10, batch_size=batch_size) + +# Step 5: Save and Load the Model + +# Evaluate the model +result = est.evaluate(data=test_loader_func, batch_size=batch_size) +print('Evaluate results:') +for r in result: + print(r, ":", result[r]) + +# Save the model +est.save("NCF_model") + +# Stop orca context when program finishes +stop_orca_context() diff --git a/python/orca/tutorial/pytorch/NCF/train_xshards.py b/python/orca/tutorial/pytorch/NCF/train_xshards.py new file mode 100644 index 00000000000..bcc1a2a12fb --- /dev/null +++ b/python/orca/tutorial/pytorch/NCF/train_xshards.py @@ -0,0 +1,145 @@ +# +# Copyright 2016 The BigDL Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +# Most of the pytorch code is adapted from guoyang9's NCF implementation for +# ml-1m dataset. +# https://github.com/guoyang9/NCF +# + +import numpy as np +import pandas as pd + +# Step 1: Init Orca Context + +from bigdl.orca import init_orca_context, stop_orca_context +init_orca_context() + +# Step 2: Define Dataset + +from bigdl.orca.data.pandas import read_csv +from sklearn.model_selection import train_test_split + + +def preprocess_data(): + data_X = read_csv( + "ml-1m/ratings.dat", + sep="::", header=None, names=['user', 'item'], + usecols=[0, 1], dtype={0: np.int64, 1: np.int64}) + data_X = data_X.partition_by("user") + + user_set = set(data_X["user"].unique()) + item_set = set(data_X["item"].unique()) + user_num = max(user_set) + 1 + item_num = max(item_set) + 1 + return data_X, user_num, item_num + + +def ng_sampling(data): + data_X = data.values.tolist() + + # calculate a dok matrix + import scipy.sparse as sp + train_mat = sp.dok_matrix((user_num, item_num), dtype=np.int64) + for row in data_X: + train_mat[row[0], row[1]] = 1 + + # negative sampling + np.random.seed(0) + features_ps = data_X + features_ng = [] + for x in features_ps: + u = x[0] + for t in range(4): # sample 4 negative items for training + j = np.random.randint(item_num) + while (u, j) in train_mat: + j = np.random.randint(item_num) + features_ng.append([u, j]) + + labels_ps = [1 for _ in range(len(features_ps))] + labels_ng = [0 for _ in range(len(features_ng))] + + features_fill = features_ps + features_ng + labels_fill = labels_ps + labels_ng + data_XY = pd.DataFrame(data=features_fill, columns=["user", "item"], dtype=np.int64) + data_XY["label"] = labels_fill + data_XY["label"] = data_XY["label"].astype(np.float) + return data_XY + + +def split_dataset(data): + # split training set and testing set + train_data, test_data = train_test_split(data, test_size=0.2, random_state=100) + return train_data, test_data + +# Prepare the train and test datasets +data_X, user_num, item_num = preprocess_data() + +# Construct the train and test xshards +data_X = data_X.transform_shard(ng_sampling) +train_shards, test_shards = data_X.transform_shard(split_dataset).split() + +# Step 3: Define the Model + +import torch.nn as nn +import torch.optim as optim +from model import NCF + + +def model_creator(config): + model = NCF(config['user_num'], config['item_num'], + factor_num=32, num_layers=3, dropout=0.0, model="NeuMF-end") + model.train() + return model + + +def optimizer_creator(model, config): + return optim.Adam(model.parameters(), lr=0.001) + +loss_function = nn.BCEWithLogitsLoss() + +# Step 4: Fit with Orca Estimator + +from bigdl.orca.learn.pytorch import Estimator +from bigdl.orca.learn.metrics import Accuracy, Precision, Recall + +# Create the estimator +backend = "spark" # "ray" or "spark" +est = Estimator.from_torch(model=model_creator, + optimizer=optimizer_creator, + loss=loss_function, + metrics=[Accuracy(), Precision(), Recall()], + config={'user_num': user_num, 'item_num': item_num}, + backend=backend) + +# Fit the estimator +batch_size = 1024 +est.fit(data=train_shards, epochs=10, batch_size=batch_size, + feature_cols=["user", "item"], label_cols=["label"]) + +# Step 5: Evaluate and save the Model + +# Evaluate the model +result = est.evaluate(data=test_shards, + feature_cols=["user", "item"], + label_cols=["label"], batch_size=batch_size) +print('Evaluate results:') +for r in result: + print(r, ":", result[r]) + +# Save the model +est.save("NCF_model") + +# Stop orca context when program finishes +stop_orca_context()