diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala index 2071e32401df9..f4fb10ea459ae 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala @@ -157,7 +157,7 @@ object GradientBoostedTrees extends Logging { validationInput: RDD[LabeledPoint], boostingStrategy: BoostingStrategy, validate: Boolean): GradientBoostedTreesModel = { - + val sc = input.sparkContext val timer = new TimeTracker() timer.start("total") timer.start("init") @@ -166,8 +166,8 @@ object GradientBoostedTrees extends Logging { // Initialize gradient boosting parameters val numIterations = boostingStrategy.numIterations - val baseLearners = new Array[DecisionTreeModel](numIterations) - val baseLearnerWeights = new Array[Double](numIterations) + val baseLearners = sc.broadcast(new Array[DecisionTreeModel](numIterations)) + val baseLearnerWeights = sc.broadcast(new Array[Double](numIterations)) val loss = boostingStrategy.loss val learningRate = boostingStrategy.learningRate // Prepare strategy for individual trees, which use regression with variance impurity. @@ -192,25 +192,26 @@ object GradientBoostedTrees extends Logging { // Initialize tree timer.start("building tree 0") val firstTreeModel = new DecisionTree(treeStrategy).run(data) - baseLearners(0) = firstTreeModel - baseLearnerWeights(0) = 1.0 + val firstTreeWeight = 1.0 + baseLearners.value(0) = firstTreeModel + baseLearnerWeights.value(0) = firstTreeWeight val startingModel = new GradientBoostedTreesModel(Regression, Array(firstTreeModel), Array(1.0)) var predError: RDD[(Double, Double)] = GradientBoostedTreesModel. - computeInitialPredictionAndError(input, 1.0, firstTreeModel, loss) + computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss) logDebug("error of gbt = " + predError.values.mean()) // Note: A model of type regression is used since we require raw prediction timer.stop("building tree 0") var validatePredError: RDD[(Double, Double)] = GradientBoostedTreesModel. - computeInitialPredictionAndError(validationInput, 1.0, firstTreeModel, loss) + computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss) var bestValidateError = if (validate) validatePredError.values.mean() else 0.0 var bestM = 1 - // psuedo-residual for second iteration - data = predError.zip(input).map { - case ((pred, _), point) => LabeledPoint(loss.gradient(pred, point.label), point.features) + // pseudo-residual for second iteration + data = predError.zip(input).map { case ((pred, _), point) => + LabeledPoint(-loss.gradient(pred, point.label), point.features) } var m = 1 @@ -222,17 +223,18 @@ object GradientBoostedTrees extends Logging { val model = new DecisionTree(treeStrategy).run(data) timer.stop(s"building tree $m") // Create partial model - baseLearners(m) = model + baseLearners.value(m) = model // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError. // Technically, the weight should be optimized for the particular loss. // However, the behavior should be reasonable, though not optimal. - baseLearnerWeights(m) = learningRate + baseLearnerWeights.value(m) = learningRate // Note: A model of type regression is used since we require raw prediction val partialModel = new GradientBoostedTreesModel( - Regression, baseLearners.slice(0, m + 1), baseLearnerWeights.slice(0, m + 1)) + Regression, baseLearners.value.slice(0, m + 1), + baseLearnerWeights.value.slice(0, m + 1)) predError = GradientBoostedTreesModel.updatePredictionError( - input, predError, learningRate, model, loss) + input, predError, m, baseLearnerWeights, baseLearners, loss) logDebug("error of gbt = " + predError.values.mean()) if (validate) { @@ -242,21 +244,21 @@ object GradientBoostedTrees extends Logging { // We want the model returned corresponding to the best validation error. validatePredError = GradientBoostedTreesModel.updatePredictionError( - validationInput, validatePredError, learningRate, model, loss) + validationInput, validatePredError, m, baseLearnerWeights, baseLearners, loss) val currentValidateError = validatePredError.values.mean() if (bestValidateError - currentValidateError < validationTol) { return new GradientBoostedTreesModel( boostingStrategy.treeStrategy.algo, - baseLearners.slice(0, bestM), - baseLearnerWeights.slice(0, bestM)) + baseLearners.value.slice(0, bestM), + baseLearnerWeights.value.slice(0, bestM)) } else if (currentValidateError < bestValidateError) { bestValidateError = currentValidateError bestM = m + 1 } } // Update data with pseudo-residuals - data = predError.zip(input).map { - case ((pred, _), point) => LabeledPoint(-loss.gradient(pred, point.label), point.features) + data = predError.zip(input).map { case ((pred, _), point) => + LabeledPoint(-loss.gradient(pred, point.label), point.features) } m += 1 } @@ -268,11 +270,11 @@ object GradientBoostedTrees extends Logging { if (validate) { new GradientBoostedTreesModel( boostingStrategy.treeStrategy.algo, - baseLearners.slice(0, bestM), - baseLearnerWeights.slice(0, bestM)) + baseLearners.value.slice(0, bestM), + baseLearnerWeights.value.slice(0, bestM)) } else { new GradientBoostedTreesModel( - boostingStrategy.treeStrategy.algo, baseLearners, baseLearnerWeights) + boostingStrategy.treeStrategy.algo, baseLearners.value, baseLearnerWeights.value) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala index eea8725854549..6f570b4e09c79 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala @@ -37,7 +37,7 @@ object AbsoluteError extends Loss { * Method to calculate the gradients for the gradient boosting calculation for least * absolute error calculation. * The gradient with respect to F(x) is: sign(F(x) - y) - * @param prediction Predicted point + * @param prediction Predicted label. * @param label True label. * @return Loss gradient */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala index 7024d9dbeceab..24ee9f3d51293 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala @@ -39,7 +39,7 @@ object LogLoss extends Loss { * Method to calculate the loss gradients for the gradient boosting calculation for binary * classification * The gradient with respect to F(x) is: - 4 y / (1 + exp(2 y F(x))) - * @param prediction Predicted point + * @param prediction Predicted label. * @param label True label. * @return Loss gradient */ @@ -47,7 +47,6 @@ object LogLoss extends Loss { - 4.0 * label / (1.0 + math.exp(2.0 * label * prediction)) } - override def computeError(prediction: Double, label: Double): Double = { val margin = 2.0 * label * prediction // The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala index 4729d72a87fd8..58857ae15e93e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala @@ -37,7 +37,7 @@ object SquaredError extends Loss { * Method to calculate the gradients for the gradient boosting calculation for least * squares error calculation. * The gradient with respect to F(x) is: - 2 (y - F(x)) - * @param prediction Predicted point + * @param prediction Predicted label. * @param label True label. * @return Loss gradient */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index cdb16ad37ce69..774a8fd18686f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -27,6 +27,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD +import org.apache.spark.broadcast.Broadcast import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.Algo @@ -142,8 +143,7 @@ class GradientBoostedTreesModel( (1 until numIterations).map { nTree => predictionAndError = GradientBoostedTreesModel.updatePredictionError( - remappedData, predictionAndError, broadcastWeights.value(nTree), - broadcastTrees.value(nTree), loss) + remappedData, predictionAndError, nTree, broadcastWeights, broadcastTrees, loss) evaluationArray(nTree) = predictionAndError.values.mean() } @@ -157,49 +157,54 @@ class GradientBoostedTreesModel( object GradientBoostedTreesModel extends Loader[GradientBoostedTreesModel] { /** - * Method to compute initial error and prediction as a RDD for the first + * Compute the initial predictions and errors for a dataset for the first * iteration of gradient boosting. - * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @param Training data. * @param initTreeWeight: learning rate assigned to the first tree. - * @param initTree: first DecisionTreeModel - * @param loss: evaluation metric + * @param initTree: first DecisionTreeModel. + * @param loss: evaluation metric. * @return a RDD with each element being a zip of the prediction and error * corresponding to every sample. */ def computeInitialPredictionAndError( data: RDD[LabeledPoint], initTreeWeight: Double, - initTree: DecisionTreeModel, loss: Loss): RDD[(Double, Double)] = { - data.map { i => - val pred = initTreeWeight * initTree.predict(i.features) - val error = loss.computeError(pred, i.label) + initTree: DecisionTreeModel, + loss: Loss): RDD[(Double, Double)] = { + data.map { lp => + val pred = initTreeWeight * initTree.predict(lp.features) + val error = loss.computeError(pred, lp.label) (pred, error) } } /** - * Method to update a zipped predictionError RDD + * Update a zipped predictionError RDD * (as obtained with computeInitialPredictionAndError) - * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @param training data. * @param predictionAndError: predictionError RDD - * @param currentTreeWeight: learning rate. - * @param currentTree: first DecisionTree - * @param loss: evaluation metric + * @param nTree: tree index. + * @param TreeWeights: Broadcasted learning rates. + * @param Trees: Broadcasted trees. + * @param loss: evaluation metric. * @return a RDD with each element being a zip of the prediction and error - * corresponing to each sample. + * corresponding to each sample. */ def updatePredictionError( data: RDD[LabeledPoint], predictionAndError: RDD[(Double, Double)], - currentTreeWeight: Double, - currentTree: DecisionTreeModel, + nTree: Int, + TreeWeights: Broadcast[Array[Double]], + Trees: Broadcast[Array[DecisionTreeModel]], loss: Loss): RDD[(Double, Double)] = { data.zip(predictionAndError).mapPartitions { iter => + val currentTreeWeight = TreeWeights.value(nTree) + val currentTree = Trees.value(nTree) iter.map { - case (point, (pred, error)) => { - val newPred = pred + currentTree.predict(point.features) * currentTreeWeight - val newError = loss.computeError(newPred, point.label) + case (lp, (pred, error)) => { + val newPred = pred + currentTree.predict(lp.features) * currentTreeWeight + val newError = loss.computeError(newPred, lp.label) (newPred, newError) } }