Skip to content

Commit

Permalink
fix session recommender (#1558)
Browse files Browse the repository at this point in the history
  • Loading branch information
songhappy authored Aug 8, 2019
1 parent b98d97a commit 78afc61
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import random

from zoo.pipeline.api.keras.layers import *
from zoo.models.recommendation.session_recommender import SessionRecommender
from zoo.models.recommendation import SessionRecommender
from test.zoo.pipeline.utils.test_utils import ZooTestCase

np.random.seed(1337) # for reproducibility
Expand Down
1 change: 1 addition & 0 deletions pyzoo/zoo/models/recommendation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
from .recommender import *
from .neuralcf import *
from .wide_and_deep import *
from .session_recommender import *
13 changes: 8 additions & 5 deletions pyzoo/zoo/models/recommendation/session_recommender.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ class SessionRecommender(Recommender):
mlp_hidden_layers: Units of hidden layers for the mlp model. Array of positive integers.
history_length: The max number of items in the sequence of historical purchase
"""
def __init__(self, item_count, item_embed, rnn_hidden_layers, session_length,
include_history=False, mlp_hidden_layers=[10, 5], his_length=2,
def __init__(self, item_count, item_embed, rnn_hidden_layers=[40, 20], session_length=0,
include_history=False, mlp_hidden_layers=[40, 20], history_length=0,
bigdl_type="float"):
assert session_length > 0, "session_length should align with input features"
if include_history:
assert history_length > 0, "history_length should align with input features"
self.item_count = int(item_count)
self.item_embed = int(item_embed)
self.mlp_hidden_layers = [int(unit) for unit in mlp_hidden_layers]
self.rnn_hidden_layers = [int(unit) for unit in rnn_hidden_layers]
self.include_history = include_history
self.session_length = int(session_length)
self.his_length = int(his_length)
self.history_length = int(history_length)
self.bigdl_type = bigdl_type
self.model = self.build_model()
super(SessionRecommender, self).__init__(None, self.bigdl_type,
Expand All @@ -58,7 +61,7 @@ def __init__(self, item_count, item_embed, rnn_hidden_layers, session_length,
self.session_length,
self.include_history,
self.mlp_hidden_layers,
self.his_length,
self.history_length,
self.model)

def build_model(self):
Expand All @@ -72,7 +75,7 @@ def build_model(self):
rnn = Dense(self.item_count)(gru_last)

if self.include_history:
input_mlp = Input(shape=(self.his_length,))
input_mlp = Input(shape=(self.history_length,))
his_table = Embedding(self.item_count + 1, self.item_embed, init="uniform")(input_mlp)
embedSum = KerasLayerWrapper(Sum(dimension=2))(his_table)
flatten = Flatten()(embedSum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ object SessionRecExp {
val model = SessionRecommender[Float](
itemCount = itemCount,
itemEmbed = params.embedOutDim,
mlpHiddenLayers = Array(20, 10),
sessionLength = params.sessionLength,
includeHistory = true,
mlpHiddenLayers = Array(20, 10),
historyLength = params.historyLength)

val optimMethod = new RMSprop[Float](
Expand Down Expand Up @@ -178,7 +178,7 @@ object SessionRecExp {

// dataFrame to rdd of sample
val samples = paddedDF.rdd.map(r => {
rows2sample(r, sessionLength, true, historyLength)
row2sampleSession(r, sessionLength, true, historyLength)
})
samples
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class SessionRecommender[T: ClassTag](
val itemCount: Int,
val itemEmbed: Int = 100,
val rnnHiddenLayers: Array[Int] = Array(40, 20),
val sessionLength: Int = 5,
val includeHistory: Boolean = true,
val sessionLength: Int = 0,
val includeHistory: Boolean = false,
val mlpHiddenLayers: Array[Int] = Array(40, 20),
val historyLength: Int = 10)(implicit ev: TensorNumeric[T]) extends Recommender[T] {
val historyLength: Int = 0)(implicit ev: TensorNumeric[T]) extends Recommender[T] {

override def buildModel(): AbstractModule[Tensor[T], Tensor[T], T] = {
val inputRnn: ModuleNode[T] = Input(inputShape = Shape(sessionLength))
Expand All @@ -72,15 +72,17 @@ class SessionRecommender[T: ClassTag](
val hisTable = Embedding[T](itemCount + 1, itemEmbed, inputLength = historyLength)
.inputs(inputMlp)
val sum = new KerasLayerWrapper[T](Sum[T](2)).inputs(hisTable)
var mlp = Dense(mlpHiddenLayers(0), activation = "relu").inputs(sum)
val flatten = Flatten().inputs(sum)

var mlp = Dense(mlpHiddenLayers(0), activation = "relu").inputs(flatten)
for (i <- 1 until mlpHiddenLayers.length) {
mlp = Dense(mlpHiddenLayers(i), activation = "relu").inputs(mlp)
}
val mlpLast = Dense(itemCount).inputs(mlp)
// combine rnn and mlp
val merged = Merge.merge[T](List(mlpLast, rnn), "sum")
val merged = Merge.merge[T](List(rnn, mlpLast), "sum")
val out = Activation("softmax").inputs(merged)
Model(Array(inputMlp, inputRnn), out).asInstanceOf[AbstractModule[Tensor[T], Tensor[T], T]]
Model(Array(inputRnn, inputMlp), out).asInstanceOf[AbstractModule[Tensor[T], Tensor[T], T]]
}
else {
val out = Activation("softmax").inputs(rnn)
Expand All @@ -103,13 +105,31 @@ class SessionRecommender[T: ClassTag](
recommends.toArray
}

/**
* recommend for sessions given rdd of samples
*
* @param sessions: rdd of samples
* @param maxItems: Number of items to be recommended to each user. Positive integer.
* @param zeroBasedLabel: True if data starts from 0, False if data starts from 1
* @return rdd of array of (item, probability)
*
*/
def recommendForSession(sessions: RDD[Sample[T]],
maxItems: Int,
zeroBasedLabel: Boolean): RDD[Array[(Int, Float)]] = {
val raw = predict(sessions)
raw.map(x => topk(x.toTensor[T], maxItems, zeroBasedLabel))
}

/**
* recommend for sessions given array of samples
*
* @param sessions: array of samples
* @param maxItems: Number of items to be recommended to each user. Positive integer.
* @param zeroBasedLabel: True if data starts from 0, False if data starts from 1
* @return array of array of (item, probability)
*
*/
def recommendForSession(sessions: Array[Sample[T]],
maxItems: Int,
zeroBasedLabel: Boolean): Array[Array[(Int, Float)]] = {
Expand Down Expand Up @@ -151,10 +171,15 @@ object SessionRecommender {
itemCount: Int,
itemEmbed: Int = 100,
rnnHiddenLayers: Array[Int] = Array(40, 20),
sessionLength: Int = 5,
includeHistory: Boolean = true,
sessionLength: Int = 0,
includeHistory: Boolean = false,
mlpHiddenLayers: Array[Int] = Array(40, 20),
historyLength: Int = 10)(implicit ev: TensorNumeric[T]): SessionRecommender[T] = {
historyLength: Int = 0)(implicit ev: TensorNumeric[T]): SessionRecommender[T] = {
require(sessionLength > 0, s"sessionLength should align with input features")
if (includeHistory) {
require(historyLength > 0, s"historyLength should align with input features")
}

new SessionRecommender[T](
itemCount,
itemEmbed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ object Utils {
deepTensor
}

def rows2sample(r: Row,
sessionLength: Int,
includeHistory: Boolean,
historyLength: Int): Sample[Float] = {
def row2sampleSession(r: Row,
sessionLength: Int,
includeHistory: Boolean,
historyLength: Int): Sample[Float] = {
val label = Tensor[Float](T(r.getAs[Float]("label")))
val rnnFeature: Array[Float] = r
.getAs[mutable.WrappedArray[java.lang.Float]]("session").array.map(_.toFloat)
Expand All @@ -283,7 +283,7 @@ object Utils {
val mlpFeature: Array[Float] = r
.getAs[mutable.WrappedArray[java.lang.Float]]("purchase_history").array.map(_.toFloat)
val mlpTensor = Tensor(mlpFeature, Array(historyLength))
Sample[Float](Array(mlpTensor, rnnTensor), Array(label))
Sample[Float](Array(rnnTensor, mlpTensor), Array(label))
}
else {
Sample[Float](Array(rnnTensor), Array(label))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class SessionRecommenderSpec extends ZooSpecHelper {

val itemCount = 100
val sessionLength = 10
val model = SessionRecommender[Float](itemCount, sessionLength, includeHistory = false)
val model = SessionRecommender[Float](itemCount, sessionLength = sessionLength)
val ran = new Random(42L)
val data = (1 to 100).map { x =>
val items: Seq[Float] = for (i <- 1 to sessionLength) yield
Expand All @@ -69,7 +69,7 @@ class SessionRecommenderSpec extends ZooSpecHelper {
val itemCount = 100
val sessionLength = 10
val historyLength = 5
val model = SessionRecommender[Float](itemCount, sessionLength,
val model = SessionRecommender[Float](itemCount, sessionLength = sessionLength,
includeHistory = true, historyLength = historyLength)
val ran = new Random(42L)
val data = (1 to 100).map { x =>
Expand All @@ -91,26 +91,10 @@ class SessionRecommenderSpec extends ZooSpecHelper {
val itemCount = 100
val sessionLength = 10
val historyLength = 5
val model = SessionRecommender[Float](itemCount, sessionLength,
val model = SessionRecommender[Float](itemCount, sessionLength = sessionLength,
includeHistory = true, historyLength = historyLength)
val ran = new Random(42L)
val data1: RDD[Sample[Float]] = sc.parallelize(1 to 100)
.map { x =>
val items1: Seq[Float] = for (i <- 1 to sessionLength) yield ran.nextInt(itemCount).toFloat
val items2: Seq[Float] = for (i <- 1 to historyLength) yield ran.nextInt(itemCount).toFloat
val input1 = Tensor(items1.toArray, Array(sessionLength))
val input2 = Tensor(items2.toArray, Array(historyLength))
Sample[Float](Array(input1, input2))
}

val recommedations1 = model.recommendForSession(data1, 3, zeroBasedLabel = false)
recommedations1.take(10)
.map { x =>
assert(x.size == 3)
assert(x(0)._2 >= x(1)._2)
}

val data2: Array[Sample[Float]] = (1 to 10)
val data1: Array[Sample[Float]] = (1 to 10)
.map { x =>
val items1: Seq[Float] = for (i <- 1 to sessionLength) yield ran.nextInt(itemCount).toFloat
val items2: Seq[Float] = for (i <- 1 to historyLength) yield ran.nextInt(itemCount).toFloat
Expand All @@ -119,19 +103,26 @@ class SessionRecommenderSpec extends ZooSpecHelper {
Sample[Float](Array(input1, input2))
}.toArray

val recommedations2 = model.recommendForSession(data2, 4, zeroBasedLabel = false)
recommedations2.map { x =>
val recommedations1 = model.recommendForSession(data1, 4, zeroBasedLabel = false)
recommedations1.map { x =>
assert(x.size == 4)
assert(x(0)._2 >= x(1)._2)
}

val data2: RDD[Sample[Float]] = sc.parallelize(data1)
val recommedations2 = model.recommendForSession(data2, 3, zeroBasedLabel = false)
recommedations2.take(10).map { x =>
assert(x.size == 3)
assert(x(0)._2 >= x(1)._2)
}
}

"SessionRecommender compile and fit" should "work properly" in {

val itemCount = 100
val sessionLength = 10
val historyLength = 5
val model = SessionRecommender[Float](itemCount, sessionLength,
val model = SessionRecommender[Float](itemCount, 10, sessionLength = sessionLength,
includeHistory = true, historyLength = historyLength)
val ran = new Random(42L)
val data1 = sc.parallelize(1 to 100)
Expand All @@ -144,6 +135,7 @@ class SessionRecommenderSpec extends ZooSpecHelper {
Sample(Array(input1, input2), Array(label))
}
model.compile(optimizer = "rmsprop", loss = "sparse_categorical_crossentropy")
model.summary()
model.fit(data1, nbEpoch = 1)
}
}
Expand All @@ -153,7 +145,7 @@ class SessionRecommenderSerialTest extends ModuleSerializationTest {
val ran = new Random(42L)
val itemCount = 100
val sessionLength = 10
val model = SessionRecommender[Float](100, 10, includeHistory = false)
val model = SessionRecommender[Float](100, sessionLength = 10)
val items: Seq[Float] = for (i <- 1 to sessionLength) yield
ran.nextInt(itemCount - 1).toFloat + 1
val data = Tensor(items.toArray, Array(sessionLength)).resize(1, sessionLength)
Expand Down

0 comments on commit 78afc61

Please sign in to comment.