Skip to content

Commit

Permalink
Made the following changes
Browse files Browse the repository at this point in the history
Used mapPartition instead of map
Refactored computeError and unpersisted broadcast variables
  • Loading branch information
MechCoder committed Mar 11, 2015
1 parent bc99ac6 commit 6e8aa10
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,15 @@ object AbsoluteError extends Loss {
if ((point.label - model.predict(point.features)) < 0) 1.0 else -1.0
}

/**
* Method to calculate loss of the base learner for the gradient boosting calculation.
* Note: This method is not used by the gradient boosting algorithm but is useful for debugging
* purposes.
* @param model Ensemble model
* @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* @return Mean absolute error of model on data
*/
override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = {
data.map { y =>
val err = model.predict(y.features) - y.label
math.abs(err)
}.mean()
}

/**
* Method to calculate loss when the predictions are already known.
* Note: This method is used in the method evaluateEachIteration to avoid recomputing the
* predicted values from previously fit trees.
* @param datum: LabeledPoint
* @param prediction: Predicted label.
* @return Absolute error of model on the given datapoint.
* @param prediction Predicted label.
* @param datum LabeledPoint.
* @return Absolute error of model on the given datapoint.
*/
override def computeError(datum: LabeledPoint, prediction: Double): Double = {
override def computeError(prediction: Double, datum: LabeledPoint): Double = {
val err = datum.label - prediction
math.abs(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,15 @@ object LogLoss extends Loss {
- 4.0 * point.label / (1.0 + math.exp(2.0 * point.label * prediction))
}

/**
* Method to calculate loss of the base learner for the gradient boosting calculation.
* Note: This method is not used by the gradient boosting algorithm but is useful for debugging
* purposes.
* @param model Ensemble model
* @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* @return Mean log loss of model on data
*/
override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = {
data.map { case point =>
val prediction = model.predict(point.features)
val margin = 2.0 * point.label * prediction
// The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable.
2.0 * MLUtils.log1pExp(-margin)
}.mean()
}

/**
* Method to calculate loss when the predictions are already known.
* Note: This method is used in the method evaluateEachIteration to avoid recomputing the
* predicted values from previously fit trees.
* @param datum: LabeledPoint
* @param prediction: Predicted label.
* @param prediction Predicted label.
* @param datum LabeledPoint
* @return log loss of model on the datapoint.
*/
override def computeError(datum: LabeledPoint, prediction: Double): Double = {
override def computeError(prediction: Double, datum: LabeledPoint): Double = {
val margin = 2.0 * datum.label * prediction
// The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable.
2.0 * MLUtils.log1pExp(-margin)
Expand Down
10 changes: 6 additions & 4 deletions mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ trait Loss extends Serializable {
* @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* @return Measure of model error on data
*/
def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double
def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = {
data.map(point => computeError(model.predict(point.features), point)).mean()
}

/**
* Method to calculate loss when the predictions are already known.
* Note: This method is used in the method evaluateEachIteration to avoid recomputing the
* predicted values from previously fit trees.
* @param datum: LabeledPoint
* @param prediction: Predicted label.
* @param prediction Predicted label.
* @param datum LabeledPoint
* @return Measure of model error on datapoint.
*/
def computeError(datum: LabeledPoint, prediction: Double) : Double
def computeError(prediction: Double, datum: LabeledPoint): Double

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,15 @@ object SquaredError extends Loss {
2.0 * (model.predict(point.features) - point.label)
}

/**
* Method to calculate loss of the base learner for the gradient boosting calculation.
* Note: This method is not used by the gradient boosting algorithm but is useful for debugging
* purposes.
* @param model Ensemble model
* @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* @return Mean squared error of model on data
*/
override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = {
data.map { y =>
val err = model.predict(y.features) - y.label
err * err
}.mean()
}

/**
* Method to calculate loss when the predictions are already known.
* Note: This method is used in the method evaluateEachIteration to avoid recomputing the
* predicted values from previously fit trees.
* @param datum: LabeledPoint
* @param prediction: Predicted label.
* @return Mean squared error of model on datapoint.
* @param prediction Predicted label.
* @param datum LabeledPoint
* @return Mean squared error of model on datapoint.
*/
override def computeError(datum: LabeledPoint, prediction: Double): Double = {
override def computeError(prediction: Double, datum: LabeledPoint): Double = {
val err = prediction - datum.label
err * err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,47 +113,52 @@ class GradientBoostedTreesModel(

/**
* Method to compute error or loss for every iteration of gradient boosting.
* @param data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]
* @param loss: evaluation metric.
* @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]
* @param loss evaluation metric.
* @return an array with index i having the losses or errors for the ensemble
* containing trees 1 to i + 1
*/
def evaluateEachIteration(
data: RDD[LabeledPoint],
loss: Loss) : Array[Double] = {
loss: Loss): Array[Double] = {

val sc = data.sparkContext
val remappedData = algo match {
case Classification => data.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
case _ => data
}
val initialTree = trees(0)

val numIterations = trees.length
val evaluationArray = Array.fill(numIterations)(0.0)

// Initial weight is 1.0
var predictionErrorModel = remappedData.map {i =>
val pred = initialTree.predict(i.features)
val error = loss.computeError(i, pred)
var predictionAndError: RDD[(Double, Double)] = remappedData.map { i =>
val pred = treeWeights(0) * trees(0).predict(i.features)
val error = loss.computeError(pred, i)
(pred, error)
}
evaluationArray(0) = predictionErrorModel.values.mean()
evaluationArray(0) = predictionAndError.values.mean()

// Avoid the model being copied across numIterations.
val broadcastTrees = sc.broadcast(trees)
val broadcastWeights = sc.broadcast(treeWeights)

(1 until numIterations).map {nTree =>
predictionErrorModel = (remappedData zip predictionErrorModel) map {
case (point, (pred, error)) => {
val newPred = pred + (
broadcastTrees.value(nTree).predict(point.features) * broadcastWeights.value(nTree))
val newError = loss.computeError(point, newPred)
(newPred, newError)
(1 until numIterations).map { nTree =>
val currentTree = broadcastTrees.value(nTree)
val currentTreeWeight = broadcastWeights.value(nTree)
predictionAndError = remappedData.zip(predictionAndError).mapPartitions { iter =>
iter map {
case (point, (pred, error)) => {
val newPred = pred + currentTree.predict(point.features) * currentTreeWeight
val newError = loss.computeError(newPred, point)
(newPred, newError)
}
}
}
evaluationArray(nTree) = predictionErrorModel.values.mean()
evaluationArray(nTree) = predictionAndError.values.mean()
}

broadcastTrees.unpersist()
broadcastWeights.unpersist()
evaluationArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
assert(evaluationArray(numTrees) > evaluationArray(numTrees - 1))
var i = 1
while (i < numTrees) {
assert(evaluationArray(i) < evaluationArray(i - 1))
assert(evaluationArray(i) <= evaluationArray(i - 1))
i += 1
}
}
Expand Down

0 comments on commit 6e8aa10

Please sign in to comment.