diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 3239af9d5df47..9ecffb048907d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -42,7 +42,7 @@ abstract class Gradient extends Serializable { class LogisticGradient extends Gradient { override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { val brzData = data.toBreeze - val brzWeights = data.toBreeze + val brzWeights = weights.toBreeze val margin: Double = -1.0 * brzWeights.dot(brzData) val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label val gradient = brzData * gradientMultiplier @@ -67,7 +67,7 @@ class LeastSquaresGradient extends Gradient { override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { val brzData = data.toBreeze val brzWeights = weights.toBreeze - val diff: Double = brzWeights.dot(brzData) - label + val diff = brzWeights.dot(brzData) - label val loss = diff * diff val gradient = brzData * (2.0 * diff) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index ad8868093435e..41752142247fc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -17,11 +17,10 @@ package org.apache.spark.mllib.optimization -import org.apache.spark.Logging -import org.apache.spark.rdd.RDD - import scala.collection.mutable.ArrayBuffer +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Vectors, Vector} /** @@ -92,16 +91,15 @@ class GradientDescent(var gradient: Gradient, var updater: Updater) } def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = { - - val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD( - data, - gradient, - updater, - stepSize, - numIterations, - regParam, - miniBatchFraction, - initialWeights) + val (weights, _) = GradientDescent.runMiniBatchSGD( + data, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFraction, + initialWeights) weights } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala index a62aecae5dd0d..f9ce908a5f3b0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.Vector -trait Optimizer { +trait Optimizer extends Serializable { /** * Solve the provided convex optimization problem. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala index e5d4c4f7d282d..2766c8dbb42a0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala @@ -63,8 +63,12 @@ abstract class Updater extends Serializable { * Uses a step-size decreasing with the square root of the number of iterations. */ class SimpleUpdater extends Updater { - override def compute(weightsOld: Vector, gradient: Vector, - stepSize: Double, iter: Int, regParam: Double): (Vector, Double) = { + override def compute( + weightsOld: Vector, + gradient: Vector, + stepSize: Double, + iter: Int, + regParam: Double): (Vector, Double) = { val thisIterStepSize = stepSize / math.sqrt(iter) val brzWeights = weightsOld.toBreeze - gradient.toBreeze * thisIterStepSize (Vectors.fromBreeze(brzWeights), 0) @@ -101,9 +105,11 @@ class L1Updater extends Updater { val brzWeights = weightsOld.toBreeze - gradient.toBreeze * thisIterStepSize // Apply proximal operator (soft thresholding) val shrinkageVal = regParam * thisIterStepSize - (0 until brzWeights.length).foreach { i => + var i = 0 + while (i < brzWeights.length) { val wi = brzWeights(i) brzWeights(i) = signum(wi) * max(0.0, abs(wi) - shrinkageVal) + i += 1 } (Vectors.fromBreeze(brzWeights), brzNorm(brzWeights, 1.0) * regParam) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index e6d550fe3782b..797698e4909d8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -118,12 +118,12 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] /** Prepends one to the input vector. */ private def prependOne(vector: Vector): Vector = { - val vectorWithIntercept = vector.toBreeze match { + val vector1 = vector.toBreeze match { case dv: BDV[Double] => BDV.vertcat(BDV.ones[Double](1), dv) case sv: BSV[Double] => BSV.vertcat(new BSV[Double](Array(0), Array(1.0), 1), sv) case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass) } - Vectors.fromBreeze(vectorWithIntercept) + Vectors.fromBreeze(vector1) } /** @@ -151,10 +151,14 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] } val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept) - val brzWeightsWithIntercept = weightsWithIntercept.toBreeze - val intercept = if (addIntercept) brzWeightsWithIntercept(0) else 0.0 - val brzWeights = if (addIntercept) brzWeightsWithIntercept(1 to -1) else brzWeightsWithIntercept - - createModel(Vectors.fromBreeze(brzWeights), intercept) + val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0 + val weights = + if (addIntercept) { + Vectors.dense(weightsWithIntercept.toArray.slice(1, weightsWithIntercept.size)) + } else { + weightsWithIntercept + } + + createModel(weights, intercept) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala index 2b6dbacde22d6..4034e79ae924a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala @@ -71,9 +71,9 @@ class LassoWithSGD private ( // We don't want to penalize the intercept, so set this to false. super.setIntercept(false) - var yMean = 0.0 - var xColMean: BV[Double] = _ - var xColSd: BV[Double] = _ + private var yMean = 0.0 + private var xColMean: BV[Double] = _ + private var xColSd: BV[Double] = _ /** * Construct a Lasso object with default parameters @@ -141,11 +141,8 @@ object LassoWithSGD { stepSize: Double, regParam: Double, miniBatchFraction: Double, - initialWeights: Vector) - : LassoModel = - { - new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input, - initialWeights) + initialWeights: Vector): LassoModel = { + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input, initialWeights) } /** @@ -165,9 +162,7 @@ object LassoWithSGD { numIterations: Int, stepSize: Double, regParam: Double, - miniBatchFraction: Double) - : LassoModel = - { + miniBatchFraction: Double): LassoModel = { new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input) } @@ -187,9 +182,7 @@ object LassoWithSGD { input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, - regParam: Double) - : LassoModel = - { + regParam: Double): LassoModel = { train(input, numIterations, stepSize, regParam, 1.0) } @@ -205,9 +198,7 @@ object LassoWithSGD { */ def train( input: RDD[LabeledPoint], - numIterations: Int) - : LassoModel = - { + numIterations: Int): LassoModel = { train(input, numIterations, 1.0, 1.0, 1.0) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala index 13c401aca31ef..6bc8850f8f6c6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala @@ -97,11 +97,9 @@ object LinearRegressionWithSGD { numIterations: Int, stepSize: Double, miniBatchFraction: Double, - initialWeights: Vector) - : LinearRegressionModel = - { - new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input, - initialWeights) + initialWeights: Vector): LinearRegressionModel = { + new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction) + .run(input, initialWeights) } /** @@ -119,9 +117,7 @@ object LinearRegressionWithSGD { input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, - miniBatchFraction: Double) - : LinearRegressionModel = - { + miniBatchFraction: Double): LinearRegressionModel = { new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input) } @@ -139,9 +135,7 @@ object LinearRegressionWithSGD { def train( input: RDD[LabeledPoint], numIterations: Int, - stepSize: Double) - : LinearRegressionModel = - { + stepSize: Double): LinearRegressionModel = { train(input, numIterations, stepSize, 1.0) } @@ -157,9 +151,7 @@ object LinearRegressionWithSGD { */ def train( input: RDD[LabeledPoint], - numIterations: Int) - : LinearRegressionModel = - { + numIterations: Int): LinearRegressionModel = { train(input, numIterations, 1.0, 1.0) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index d495d590fd4f6..866596ded8d5c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -82,13 +82,10 @@ object MLUtils { * xColMean - Row vector with mean for every column (or feature) of the input data * xColSd - Row vector standard deviation for every column (or feature) of the input data. */ - def computeStats(data: RDD[LabeledPoint], numFeatures: Int, numExamples: Long) - : (Double, Vector, Vector) = { - + def computeStats(data: RDD[LabeledPoint], numFeatures: Int, numExamples: Long): (Double, Vector, Vector) = { val brzData = data.map { case LabeledPoint(label, features) => (label, features.toBreeze) } - val aggStats = brzData.aggregate( (0L, 0.0, BDV.zeros[Double](numFeatures), BDV.zeros[Double](numFeatures)) )( @@ -104,9 +101,10 @@ object MLUtils { (n1 + n2, sumLabel1 + sumLabel2, sum1 += sum2, sumSq1 += sumSq2) } ) - val (nl, sumLabel, sum, sumSq) = aggStats + require(nl > 0, "Input data is empty.") + require(nl == numExamples) val n = nl.toDouble val yMean = sumLabel / n diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala index 9ec2749cc4872..c4b433499a091 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala @@ -57,8 +57,7 @@ object GradientDescentSuite { if (yVal > 0) 1 else 0 } - val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(Array(x1(i))))) - testData + (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(x1(i)))) } } @@ -86,7 +85,7 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMa } val dataRDD = sc.parallelize(data, 2).cache() - val initialWeightsWithIntercept = Vectors.dense(0.0, initialWeights: _*) + val initialWeightsWithIntercept = Vectors.dense(1.0, initialWeights: _*) val (_, loss) = GradientDescent.runMiniBatchSGD( dataRDD, diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala index bca4a3322484c..02e99fac6146f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala @@ -34,7 +34,7 @@ class LassoSuite extends FunSuite with LocalSparkContext { } test("Lasso local random SGD") { - val nPoints = 10000 + val nPoints = 1000 val A = 2.0 val B = -1.5 @@ -46,7 +46,7 @@ class LassoSuite extends FunSuite with LocalSparkContext { testRDD.cache() val ls = new LassoWithSGD() - ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40) val model = ls.run(testRDD) val weight0 = model.weights(0) @@ -66,23 +66,23 @@ class LassoSuite extends FunSuite with LocalSparkContext { } test("Lasso local random SGD with initial weights") { - val nPoints = 10000 + val nPoints = 1000 val A = 2.0 val B = -1.5 val C = 1.0e-2 - val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42) + val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, C), nPoints, 42) val initialB = -1.0 val initialC = -1.0 - val initialWeights = Vectors.dense(Array(initialB, initialC)) + val initialWeights = Vectors.dense(initialB, initialC) val testRDD = sc.parallelize(testData, 2) testRDD.cache() val ls = new LassoWithSGD() - ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40) val model = ls.run(testRDD, initialWeights) val weight0 = model.weights(0) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 60f053b381305..2081fe46b17ef 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -23,8 +23,10 @@ import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNor squaredDistance => breezeSquaredDistance} import org.apache.spark.mllib.util.MLUtils._ +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint -class MLUtilsSuite extends FunSuite { +class MLUtilsSuite extends FunSuite with LocalSparkContext { test("epsilon computation") { assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.") @@ -49,4 +51,16 @@ class MLUtilsSuite extends FunSuite { assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, s"failed with m = $m") } } + + test("compute stats") { + val data = Seq.fill(3)(Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0)), + LabeledPoint(0.0, Vectors.dense(3.0, 4.0, 5.0)) + )).flatten + val rdd = sc.parallelize(data, 2) + val (meanLabel, mean, std) = MLUtils.computeStats(rdd, 3, 6) + assert(meanLabel === 0.5) + assert(mean === Vectors.dense(2.0, 3.0, 4.0)) + assert(std === Vectors.dense(1.0, 1.0, 1.0)) + } }