From 834ada23f66e871576ab8e3f38a4929f0c913a12 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 26 Mar 2014 13:49:49 -0700 Subject: [PATCH] optimized MLUtils.computeStats update some ml algorithms to use Vector (cont.) --- .../mllib/optimization/GradientDescent.scala | 19 +++-- .../spark/mllib/optimization/Updater.scala | 5 +- .../GeneralizedLinearAlgorithm.scala | 4 +- .../mllib/regression/LinearRegression.scala | 17 ++--- .../mllib/regression/RidgeRegression.scala | 43 ++++++----- .../org/apache/spark/mllib/util/MLUtils.scala | 73 ++++++++++--------- .../regression/RidgeRegressionSuite.scala | 4 +- 7 files changed, 84 insertions(+), 81 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index e5555cc7f73e3..ad8868093435e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -22,7 +22,7 @@ import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Vectors, Vector} /** * Class used to solve an optimization problem using Gradient Descent. @@ -139,7 +139,7 @@ object GradientDescent extends Logging { numIterations: Int, regParam: Double, miniBatchFraction: Double, - initialWeights: Vector): (Vector, Vector) = { + initialWeights: Vector): (Vector, Array[Double]) = { val stochasticLossHistory = new ArrayBuffer[Double](numIterations) @@ -147,24 +147,23 @@ object GradientDescent extends Logging { val miniBatchSize = nexamples * miniBatchFraction // Initialize weights as a column vector - var weights = initialWeights.toBreeze.toDenseVector + var weights = Vectors.dense(initialWeights.toArray) /** * For the first iteration, the regVal will be initialized as sum of sqrt of * weights if it's L2 update; for L1 update; the same logic is followed. */ var regVal = updater.compute( - weights, new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2 + weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2 for (i <- 1 to numIterations) { // Sample a subset (fraction miniBatchFraction) of the total data // compute and sum up the subgradients on this subset (this is one map-reduce) val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i).map { case (y, features) => - val featuresCol = new DoubleMatrix(features.length, 1, features:_*) - val (grad, loss) = gradient.compute(featuresCol, y, weights) - (grad, loss) - }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) + val (grad, loss) = gradient.compute(features, y, weights) + (grad.toBreeze, loss) + }.reduce((a, b) => (a._1 += b._1, a._2 + b._2)) /** * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration @@ -172,7 +171,7 @@ object GradientDescent extends Logging { */ stochasticLossHistory.append(lossSum / miniBatchSize + regVal) val update = updater.compute( - weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) + weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam) weights = update._1 regVal = update._2 } @@ -180,6 +179,6 @@ object GradientDescent extends Logging { logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( stochasticLossHistory.takeRight(10).mkString(", "))) - (weights.toArray, stochasticLossHistory.toArray) + (weights, stochasticLossHistory.toArray) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala index 6070071c5c18b..e5d4c4f7d282d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala @@ -18,7 +18,6 @@ package org.apache.spark.mllib.optimization import scala.math._ -import org.jblas.DoubleMatrix import breeze.linalg.{norm => brzNorm} @@ -122,7 +121,7 @@ class SquaredL2Updater extends Updater { gradient: Vector, stepSize: Double, iter: Int, - regParam: Double): (DoubleMatrix, Double) = { + regParam: Double): (Vector, Double) = { // add up both updates from the gradient of the loss (= step) as well as // the gradient of the regularizer (= regParam * weightsOld) // w' = w - thisIterStepSize * (gradient + regParam * w) @@ -132,7 +131,7 @@ class SquaredL2Updater extends Updater { (gradient.toBreeze * thisIterStepSize) val norm = brzNorm(brzWeights, 2.0) - (Vectors.fromBreeze(newWeights), 0.5 * regParam * norm * norm) + (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 643ce63f62eeb..e6d550fe3782b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -118,8 +118,8 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] /** Prepends one to the input vector. */ private def prependOne(vector: Vector): Vector = { - val vectorWithIntercept = vector match { - case dv: BDV[Double] => BDV.vertcat(BDV.ones(1), dv) + val vectorWithIntercept = vector.toBreeze match { + case dv: BDV[Double] => BDV.vertcat(BDV.ones[Double](1), dv) case sv: BSV[Double] => BSV.vertcat(new BSV[Double](Array(0), Array(1.0), 1), sv) case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala index b4aafbe8bcaff..13c401aca31ef 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala @@ -21,8 +21,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.util.MLUtils - -import org.jblas.DoubleMatrix +import org.apache.spark.mllib.linalg.{Vector, Vectors} /** * Regression model trained using LinearRegression. @@ -31,15 +30,15 @@ import org.jblas.DoubleMatrix * @param intercept Intercept computed for this model. */ class LinearRegressionModel( - override val weights: Array[Double], + override val weights: Vector, override val intercept: Double) extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable { override protected def predictPoint( - dataMatrix: DoubleMatrix, - weightMatrix: DoubleMatrix, + dataMatrix: Vector, + weightMatrix: Vector, intercept: Double): Double = { - dataMatrix.dot(weightMatrix) + intercept + weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept } } @@ -69,7 +68,7 @@ class LinearRegressionWithSGD private ( */ def this() = this(1.0, 100, 1.0) - override protected def createModel(weights: Array[Double], intercept: Double) = { + override protected def createModel(weights: Vector, intercept: Double) = { new LinearRegressionModel(weights, intercept) } } @@ -98,7 +97,7 @@ object LinearRegressionWithSGD { numIterations: Int, stepSize: Double, miniBatchFraction: Double, - initialWeights: Array[Double]) + initialWeights: Vector) : LinearRegressionModel = { new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input, @@ -172,7 +171,7 @@ object LinearRegressionWithSGD { val sc = new SparkContext(args(0), "LinearRegression") val data = MLUtils.loadLabeledData(sc, args(1)) val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) - println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Weights: " + model.weights) println("Intercept: " + model.intercept) sc.stop() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala index d5371f5c33414..22a0e8b495957 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala @@ -21,7 +21,9 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Vectors, Vector} + +import breeze.linalg.{Vector => BV, DenseVector => BDV} /** * Regression model trained using RidgeRegression. @@ -36,10 +38,10 @@ class RidgeRegressionModel( with RegressionModel with Serializable { override protected def predictPoint( - dataMatrix: DoubleMatrix, - weightMatrix: DoubleMatrix, + dataMatrix: Vector, + weightMatrix: Vector, intercept: Double): Double = { - dataMatrix.dot(weightMatrix) + intercept + weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept } } @@ -71,8 +73,8 @@ class RidgeRegressionWithSGD private ( super.setIntercept(false) var yMean = 0.0 - var xColMean: DoubleMatrix = _ - var xColSd: DoubleMatrix = _ + var xColMean: BV[Double] = _ + var xColSd: BV[Double] = _ /** * Construct a RidgeRegression object with default parameters @@ -85,33 +87,33 @@ class RidgeRegressionWithSGD private ( this } - override protected def createModel(weights: Array[Double], intercept: Double) = { - val weightsMat = new DoubleMatrix(weights.length, 1, weights: _*) - val weightsScaled = weightsMat.div(xColSd) - val interceptScaled = yMean - weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0) + override protected def createModel(weights: Vector, intercept: Double) = { + val weightsMat = weights.toBreeze + val weightsScaled = weightsMat :/ xColSd + val interceptScaled = yMean - weightsMat.dot(xColMean :/ xColSd) - new RidgeRegressionModel(weightsScaled.data, interceptScaled) + new RidgeRegressionModel(Vectors.fromBreeze(weightsScaled), interceptScaled) } override def run( input: RDD[LabeledPoint], - initialWeights: Array[Double]) + initialWeights: Vector) : RidgeRegressionModel = { - val nfeatures: Int = input.first().features.length + val nfeatures: Int = input.first().features.size val nexamples: Long = input.count() // To avoid penalizing the intercept, we center and scale the data. val stats = MLUtils.computeStats(input, nfeatures, nexamples) yMean = stats._1 - xColMean = stats._2 - xColSd = stats._3 + xColMean = stats._2.toBreeze + xColSd = stats._3.toBreeze val normalizedData = input.map { point => val yNormalized = point.label - yMean - val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*) - val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd) - LabeledPoint(yNormalized, featuresNormalized.toArray) + val featuresMat = point.features.toBreeze + val featuresNormalized = (featuresMat - xColMean) :/ xColSd + LabeledPoint(yNormalized, Vectors.fromBreeze(featuresNormalized)) } super.run(normalizedData, initialWeights) @@ -143,7 +145,7 @@ object RidgeRegressionWithSGD { stepSize: Double, regParam: Double, miniBatchFraction: Double, - initialWeights: Array[Double]) + initialWeights: Vector) : RidgeRegressionModel = { new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run( @@ -220,7 +222,8 @@ object RidgeRegressionWithSGD { val data = MLUtils.loadLabeledData(sc, args(1)) val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) - println("Weights: " + model.weights.mkString("[", ", ", "]")) + + println("Weights: " + model.weights) println("Intercept: " + model.intercept) sc.stop() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 08cd9ab05547b..d495d590fd4f6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -17,15 +17,13 @@ package org.apache.spark.mllib.util +import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV, + squaredDistance => breezeSquaredDistance} + import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ - -import org.jblas.DoubleMatrix - import org.apache.spark.mllib.regression.LabeledPoint - -import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance} +import org.apache.spark.mllib.linalg.{Vector, Vectors} /** * Helper methods to load, save and pre-process data used in ML Lib. @@ -54,7 +52,7 @@ object MLUtils { sc.textFile(dir).map { line => val parts = line.split(',') val label = parts(0).toDouble - val features = parts(1).trim().split(' ').map(_.toDouble) + val features = Vectors.dense(parts(1).trim().split(' ').map(_.toDouble)) LabeledPoint(label, features) } } @@ -68,7 +66,7 @@ object MLUtils { * @param dir Directory to save the data. */ def saveLabeledData(data: RDD[LabeledPoint], dir: String) { - val dataStr = data.map(x => x.label + "," + x.features.mkString(" ")) + val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" ")) dataStr.saveAsTextFile(dir) } @@ -76,44 +74,51 @@ object MLUtils { * Utility function to compute mean and standard deviation on a given dataset. * * @param data - input data set whose statistics are computed - * @param nfeatures - number of features - * @param nexamples - number of examples in input dataset + * @param numFeatures - number of features + * @param numExamples - number of examples in input dataset * * @return (yMean, xColMean, xColSd) - Tuple consisting of * yMean - mean of the labels * xColMean - Row vector with mean for every column (or feature) of the input data * xColSd - Row vector standard deviation for every column (or feature) of the input data. */ - def computeStats(data: RDD[LabeledPoint], nfeatures: Int, nexamples: Long): - (Double, DoubleMatrix, DoubleMatrix) = { - val yMean: Double = data.map { labeledPoint => labeledPoint.label }.reduce(_ + _) / nexamples + def computeStats(data: RDD[LabeledPoint], numFeatures: Int, numExamples: Long) + : (Double, Vector, Vector) = { - // NOTE: We shuffle X by column here to compute column sum and sum of squares. - val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { labeledPoint => - val nCols = labeledPoint.features.length - // Traverse over every column and emit (col, value, value^2) - Iterator.tabulate(nCols) { i => - (i, (labeledPoint.features(i), labeledPoint.features(i)*labeledPoint.features(i))) - } - }.reduceByKey { case(x1, x2) => - (x1._1 + x2._1, x1._2 + x2._2) + val brzData = data.map { case LabeledPoint(label, features) => + (label, features.toBreeze) } - val xColSumsMap = xColSumSq.collectAsMap() - val xColMean = DoubleMatrix.zeros(nfeatures, 1) - val xColSd = DoubleMatrix.zeros(nfeatures, 1) + val aggStats = brzData.aggregate( + (0L, 0.0, BDV.zeros[Double](numFeatures), BDV.zeros[Double](numFeatures)) + )( + seqOp = (c , v) => (c, v) match { + case ((n, sumLabel, sum, sumSq), (label, features)) => + features.activeIterator.foreach { case (i, x) => + sumSq(i) += x * x + } + (n + 1L, sumLabel + label, sum += features, sumSq) + }, + combOp = (c1, c2) => (c1, c2) match { + case ((n1, sumLabel1, sum1, sumSq1), (n2, sumLabel2, sum2, sumSq2)) => + (n1 + n2, sumLabel1 + sumLabel2, sum1 += sum2, sumSq1 += sumSq2) + } + ) + + val (nl, sumLabel, sum, sumSq) = aggStats + require(nl > 0, "Input data is empty.") - // Compute mean and unbiased variance using column sums - var col = 0 - while (col < nfeatures) { - xColMean.put(col, xColSumsMap(col)._1 / nexamples) - val variance = - (xColSumsMap(col)._2 - (math.pow(xColSumsMap(col)._1, 2) / nexamples)) / nexamples - xColSd.put(col, math.sqrt(variance)) - col += 1 + val n = nl.toDouble + val yMean = sumLabel / n + val mean: BDV[Double] = sum / n + val std = new Array[Double](sum.length) + var i = 0 + while (i < numFeatures) { + std(i) = sumSq(i) / n - mean(i) * mean(i) + i += 1 } - (yMean, xColMean, xColSd) + (yMean, Vectors.fromBreeze(mean), Vectors.dense(std)) } /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala index 67dd06cc0f5eb..a1f72bc8536c0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala @@ -17,14 +17,12 @@ package org.apache.spark.mllib.regression +import org.scalatest.FunSuite import org.jblas.DoubleMatrix -import org.scalatest.BeforeAndAfterAll -import org.scalatest.FunSuite import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext} - class RidgeRegressionSuite extends FunSuite with LocalSparkContext { def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]) = {