diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 9ecffb048907d..a174d4e77d33e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.optimization +import breeze.linalg.{axpy => brzAxpy} + import org.apache.spark.mllib.linalg.{Vectors, Vector} /** @@ -33,6 +35,19 @@ abstract class Gradient extends Serializable { * @return (gradient: Vector, loss: Double) */ def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) + + /** + * Compute the gradient and loss given the features of a single data point, add the gradient to a provided vector to + * avoid creating new objects, and return loss. + * + * @param data features for one data point + * @param label label for this data point + * @param weights weights/coefficients corresponding to features + * @param gradientAddTo gradient will be added to this vector + * + * @return (gradient: Vector, loss: Double) + */ + def compute(data: Vector, label: Double, weights: Vector, gradientAddTo: Vector): Double } /** @@ -55,6 +70,21 @@ class LogisticGradient extends Gradient { (Vectors.fromBreeze(gradient), loss) } + + override def compute(data: Vector, label: Double, weights: Vector, gradientAddTo: Vector): Double = { + val brzData = data.toBreeze + val brzWeights = weights.toBreeze + val margin: Double = -1.0 * brzWeights.dot(brzData) + val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label + + brzAxpy(gradientMultiplier, brzData, gradientAddTo.toBreeze) + + if (label > 0) { + math.log(1 + math.exp(margin)) + } else { + math.log(1 + math.exp(margin)) - margin + } + } } /** @@ -73,6 +103,16 @@ class LeastSquaresGradient extends Gradient { (Vectors.fromBreeze(gradient), loss) } + + override def compute(data: Vector, label: Double, weights: Vector, gradientAddTo: Vector): Double = { + val brzData = data.toBreeze + val brzWeights = weights.toBreeze + val diff = brzWeights.dot(brzData) - label + + brzAxpy(2.0 * diff, brzData, gradientAddTo.toBreeze) + + diff * diff + } } /** @@ -96,4 +136,21 @@ class HingeGradient extends Gradient { (Vectors.dense(new Array[Double](weights.size)), 0.0) } } + + override def compute(data: Vector, label: Double, weights: Vector, gradientAddTo: Vector): Double = { + val brzData = data.toBreeze + val brzWeights = weights.toBreeze + val dotProduct = brzWeights.dot(brzData) + + // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val labelScaled = 2 * label - 1.0 + + if (1.0 > labelScaled * dotProduct) { + brzAxpy(-labelScaled, brzData, gradientAddTo.toBreeze) + 1.0 - labelScaled * dotProduct + } else { + 0.0 + } + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 8131925cfc87d..d0777ffd63ff8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -162,13 +162,12 @@ object GradientDescent extends Logging { val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .aggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => - val (g, l) = gradient.compute(features, label, weights) - (grad += g.toBreeze, loss + l) + val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) + (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => (grad1 += grad2, loss1 + loss2) - } - ) + }) /** * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration