From 4eaf28af25f981f147a3bfc95dcc08ab883497b6 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 9 Apr 2014 19:28:16 +0800 Subject: [PATCH] merge VectorRDDStatistics into RowMatrix --- .../mllib/linalg/distributed/RowMatrix.scala | 186 +++++++++++++++- .../spark/mllib/rdd/VectorRDDFunctions.scala | 208 ------------------ .../org/apache/spark/mllib/util/MLUtils.scala | 1 - .../linalg/distributed/RowMatrixSuite.scala | 45 ++++ .../mllib/rdd/VectorRDDFunctionsSuite.scala | 95 -------- 5 files changed, 229 insertions(+), 306 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index f65f43dd3007b..d970c3db16bf2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg.distributed import java.util -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd} +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd} import breeze.numerics.{sqrt => brzSqrt} import com.github.fommil.netlib.BLAS.{getInstance => blas} @@ -29,7 +29,171 @@ import org.apache.spark.rdd.RDD import org.apache.spark.Logging /** - * :: Experimental :: + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements + * count. + */ +trait VectorRDDStatisticalSummary { + + /** + * Computes the mean of columns in RDD[Vector]. + */ + def mean: Vector + + /** + * Computes the sample variance of columns in RDD[Vector]. + */ + def variance: Vector + + /** + * Computes number of vectors in RDD[Vector]. + */ + def count: Long + + /** + * Computes the number of non-zero elements in each column of RDD[Vector]. + */ + def numNonZeros: Vector + + /** + * Computes the maximum of each column in RDD[Vector]. + */ + def max: Vector + + /** + * Computes the minimum of each column in RDD[Vector]. + */ + def min: Vector +} + + +/** + * Aggregates [[org.apache.spark.mllib.linalg.distributed.VectorRDDStatisticalSummary + * VectorRDDStatisticalSummary]] together with add() and merge() function. Online variance solution + * used in add() function, while parallel variance solution used in merge() function. Reference here + * : [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. Solution + * here ignoring the zero elements when calling add() and merge(), for decreasing the O(n) algorithm + * to O(nnz). Real variance is computed here after we get other statistics, simply by another + * parallel combination process. + */ +private class VectorRDDStatisticsAggregator( + val currMean: BDV[Double], + val currM2n: BDV[Double], + var totalCnt: Double, + val nnz: BDV[Double], + val currMax: BDV[Double], + val currMin: BDV[Double]) + extends VectorRDDStatisticalSummary with Serializable { + + override def mean = { + val realMean = BDV.zeros[Double](currMean.length) + var i = 0 + while (i < currMean.length) { + realMean(i) = currMean(i) * nnz(i) / totalCnt + i += 1 + } + Vectors.fromBreeze(realMean) + } + + override def variance = { + val realVariance = BDV.zeros[Double](currM2n.length) + + val denominator = totalCnt - 1.0 + + // Sample variance is computed, if the denominator is 0, the variance is just 0. + if (denominator != 0.0) { + val deltaMean = currMean + var i = 0 + while (i < currM2n.size) { + realVariance(i) = + currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt + realVariance(i) /= denominator + i += 1 + } + } + + Vectors.fromBreeze(realVariance) + } + + override def count: Long = totalCnt.toLong + + override def numNonZeros: Vector = Vectors.fromBreeze(nnz) + + override def max: Vector = { + var i = 0 + while (i < nnz.length) { + if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0 + i += 1 + } + Vectors.fromBreeze(currMax) + } + + override def min: Vector = { + var i = 0 + while (i < nnz.length) { + if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0 + i += 1 + } + Vectors.fromBreeze(currMin) + } + + /** + * Aggregate function used for aggregating elements in a worker together. + */ + def add(currData: BV[Double]): this.type = { + currData.activeIterator.foreach { + // this case is used for filtering the zero elements if the vector. + case (id, 0.0) => + case (id, value) => + if (currMax(id) < value) currMax(id) = value + if (currMin(id) > value) currMin(id) = value + + val tmpPrevMean = currMean(id) + currMean(id) = (currMean(id) * nnz(id) + value) / (nnz(id) + 1.0) + currM2n(id) += (value - currMean(id)) * (value - tmpPrevMean) + + nnz(id) += 1.0 + } + + totalCnt += 1.0 + this + } + + /** + * Combine function used for combining intermediate results together from every worker. + */ + def merge(other: VectorRDDStatisticsAggregator): this.type = { + + totalCnt += other.totalCnt + + val deltaMean = currMean - other.currMean + + var i = 0 + while (i < other.currMean.length) { + // merge mean together + if (other.currMean(i) != 0.0) { + currMean(i) = (currMean(i) * nnz(i) + other.currMean(i) * other.nnz(i)) / + (nnz(i) + other.nnz(i)) + } + + // merge m2n together + if (nnz(i) + other.nnz(i) != 0.0) { + currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / + (nnz(i) + other.nnz(i)) + } + + if (currMax(i) < other.currMax(i)) currMax(i) = other.currMax(i) + + if (currMin(i) > other.currMin(i)) currMin(i) = other.currMin(i) + + i += 1 + } + + nnz += other.nnz + this + } +} + +/** * Represents a row-oriented distributed Matrix with no meaningful row indices. * * @param rows rows stored as an RDD[Vector] @@ -240,6 +404,24 @@ class RowMatrix( } } + /** + * Compute full column-wise statistics for the RDD with the size of Vector as input parameter. + */ + def multiVariateSummaryStatistics(): VectorRDDStatisticalSummary = { + val zeroValue = new VectorRDDStatisticsAggregator( + BDV.zeros[Double](nCols), + BDV.zeros[Double](nCols), + 0.0, + BDV.zeros[Double](nCols), + BDV.fill(nCols)(Double.MinValue), + BDV.fill(nCols)(Double.MaxValue)) + + rows.map(_.toBreeze).aggregate[VectorRDDStatisticsAggregator](zeroValue)( + (aggregator, data) => aggregator.add(data), + (aggregator1, aggregator2) => aggregator1.merge(aggregator2) + ) + } + /** * Multiply this matrix by a local matrix on the right. * diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala deleted file mode 100644 index 0b677d9c4fdef..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.rdd - -import breeze.linalg.{Vector => BV, DenseVector => BDV} - -import org.apache.spark.mllib.linalg.{Vectors, Vector} -import org.apache.spark.rdd.RDD - -/** - * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements - * count. - */ -trait VectorRDDStatisticalSummary { - - /** - * Computes the mean of columns in RDD[Vector]. - */ - def mean: Vector - - /** - * Computes the sample variance of columns in RDD[Vector]. - */ - def variance: Vector - - /** - * Computes number of vectors in RDD[Vector]. - */ - def count: Long - - /** - * Computes the number of non-zero elements in each column of RDD[Vector]. - */ - def numNonZeros: Vector - - /** - * Computes the maximum of each column in RDD[Vector]. - */ - def max: Vector - - /** - * Computes the minimum of each column in RDD[Vector]. - */ - def min: Vector -} - -/** - * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]] - * together with add() and merge() function. Online variance solution used in add() function, while - * parallel variance solution used in merge() function. Reference here: - * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. Solution here - * ignoring the zero elements when calling add() and merge(), for decreasing the O(n) algorithm to - * O(nnz). Real variance is computed here after we get other statistics, simply by another parallel - * combination process. - */ -private class VectorRDDStatisticsAggregator( - val currMean: BDV[Double], - val currM2n: BDV[Double], - var totalCnt: Double, - val nnz: BDV[Double], - val currMax: BDV[Double], - val currMin: BDV[Double]) - extends VectorRDDStatisticalSummary with Serializable { - - override def mean = { - val realMean = BDV.zeros[Double](currMean.length) - var i = 0 - while (i < currMean.length) { - realMean(i) = currMean(i) * nnz(i) / totalCnt - i += 1 - } - Vectors.fromBreeze(realMean) - } - - override def variance = { - val realVariance = BDV.zeros[Double](currM2n.length) - val deltaMean = currMean - var i = 0 - while (i < currM2n.size) { - realVariance(i) = - currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt - realVariance(i) /= (totalCnt - 1.0) - i += 1 - } - Vectors.fromBreeze(realVariance) - } - - override def count: Long = totalCnt.toLong - - override def numNonZeros: Vector = Vectors.fromBreeze(nnz) - - override def max: Vector = { - var i = 0 - while (i < nnz.length) { - if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0 - i += 1 - } - Vectors.fromBreeze(currMax) - } - - override def min: Vector = { - var i = 0 - while (i < nnz.length) { - if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0 - i += 1 - } - Vectors.fromBreeze(currMin) - } - - /** - * Aggregate function used for aggregating elements in a worker together. - */ - def add(currData: BV[Double]): this.type = { - currData.activeIterator.foreach { - // this case is used for filtering the zero elements if the vector. - case (id, 0.0) => - case (id, value) => - if (currMax(id) < value) currMax(id) = value - if (currMin(id) > value) currMin(id) = value - - val tmpPrevMean = currMean(id) - currMean(id) = (currMean(id) * nnz(id) + value) / (nnz(id) + 1.0) - currM2n(id) += (value - currMean(id)) * (value - tmpPrevMean) - - nnz(id) += 1.0 - } - - totalCnt += 1.0 - this - } - - /** - * Combine function used for combining intermediate results together from every worker. - */ - def merge(other: VectorRDDStatisticsAggregator): this.type = { - - totalCnt += other.totalCnt - - val deltaMean = currMean - other.currMean - - var i = 0 - while (i < other.currMean.length) { - // merge mean together - if (other.currMean(i) != 0.0) { - currMean(i) = (currMean(i) * nnz(i) + other.currMean(i) * other.nnz(i)) / - (nnz(i) + other.nnz(i)) - } - - // merge m2n together - if (nnz(i) + other.nnz(i) != 0.0) { - currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / - (nnz(i) + other.nnz(i)) - } - - if (currMax(i) < other.currMax(i)) currMax(i) = other.currMax(i) - - if (currMin(i) > other.currMin(i)) currMin(i) = other.currMin(i) - - i += 1 - } - - nnz += other.nnz - this - } -} - -/** - * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an - * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use - * these functions. - */ -class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { - - /** - * Compute full column-wise statistics for the RDD with the size of Vector as input parameter. - */ - def computeSummaryStatistics(): VectorRDDStatisticalSummary = { - val size = self.first().size - - val zeroValue = new VectorRDDStatisticsAggregator( - BDV.zeros[Double](size), - BDV.zeros[Double](size), - 0.0, - BDV.zeros[Double](size), - BDV.fill(size)(Double.MinValue), - BDV.fill(size)(Double.MaxValue)) - - self.map(_.toBreeze).aggregate[VectorRDDStatisticsAggregator](zeroValue)( - (aggregator, data) => aggregator.add(data), - (aggregator1, aggregator2) => aggregator1.merge(aggregator2) - ) - } -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 2bc3ab97ca2fc..ac2360c429e2b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -265,5 +265,4 @@ object MLUtils { } sqDist } - implicit def rddToVectorRDDFunctions(rdd: RDD[Vector]) = new VectorRDDFunctions(rdd) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 71ee8e8a4f6fd..19c8a7730cb09 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -137,6 +137,9 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { brzNorm(v, 1.0) < 1e-6 } + def equivVector(lhs: Vector, rhs: Vector): Boolean = + closeToZero(lhs.toBreeze.asInstanceOf[BDV[Double]] - rhs.toBreeze.asInstanceOf[BDV[Double]]) + def assertColumnEqualUpToSign(A: BDM[Double], B: BDM[Double], k: Int) { assert(A.rows === B.rows) for (j <- 0 until k) { @@ -170,4 +173,46 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { )) } } + + test("dense statistical summary") { + val summary = denseMat.multiVariateSummaryStatistics() + + assert(equivVector(summary.mean, Vectors.dense(4.5, 3.0, 4.0)), + "Dense column mean do not match.") + + assert(equivVector(summary.variance, Vectors.dense(15.0, 10.0, 10.0)), + "Dense column variance do not match.") + + assert(summary.count === 4, "Dense column cnt do not match.") + + assert(equivVector(summary.numNonZeros, Vectors.dense(3.0, 3.0, 4.0)), + "Dense column nnz do not match.") + + assert(equivVector(summary.max, Vectors.dense(9.0, 7.0, 8.0)), + "Dense column max do not match.") + + assert(equivVector(summary.min, Vectors.dense(0.0, 0.0, 1.0)), + "Dense column min do not match.") + } + + test("sparse statistical summary") { + val summary = sparseMat.multiVariateSummaryStatistics() + + assert(equivVector(summary.mean, Vectors.dense(4.5, 3.0, 4.0)), + "Sparse column mean do not match.") + + assert(equivVector(summary.variance, Vectors.dense(15.0, 10.0, 10.0)), + "Sparse column variance do not match.") + + assert(summary.count === 4, "Sparse column cnt do not match.") + + assert(equivVector(summary.numNonZeros, Vectors.dense(3.0, 3.0, 4.0)), + "Sparse column nnz do not match.") + + assert(equivVector(summary.max, Vectors.dense(9.0, 7.0, 8.0)), + "Sparse column max do not match.") + + assert(equivVector(summary.min, Vectors.dense(0.0, 0.0, 1.0)), + "Sparse column min do not match.") + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala deleted file mode 100644 index 9bf92d54429a4..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.rdd - -import scala.collection.mutable.ArrayBuffer - -import org.scalatest.FunSuite - -import org.apache.spark.mllib.linalg.{Vector, Vectors} -import org.apache.spark.mllib.rdd.VectorRDDFunctionsSuite._ -import org.apache.spark.mllib.util.LocalSparkContext -import org.apache.spark.mllib.util.MLUtils._ - -/** - * Test suite for the summary statistics of RDD[Vector]. Both the accuracy and the time consuming - * between dense and sparse vector are tested. - */ -class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext { - - val localData = Array( - Vectors.dense(1.0, 2.0, 3.0), - Vectors.dense(4.0, 0.0, 6.0), - Vectors.dense(0.0, 8.0, 9.0) - ) - - val sparseData = ArrayBuffer(Vectors.sparse(3, Seq((0, 1.0)))) - for (i <- 0 until 97) sparseData += Vectors.sparse(3, Seq((2, 0.0))) - sparseData += Vectors.sparse(3, Seq((0, 5.0))) - sparseData += Vectors.sparse(3, Seq((1, 5.0))) - - test("dense statistical summary") { - val data = sc.parallelize(localData, 2) - val summary = data.computeSummaryStatistics() - - assert(equivVector(summary.mean, Vectors.dense(5.0 / 3.0, 10.0 / 3.0, 6.0)), - "Dense column mean do not match.") - - assert(equivVector(summary.variance, Vectors.dense(4.333333333333334, 17.333333333333336, 9.0)), - "Dense column variance do not match.") - - assert(summary.count === 3, "Dense column cnt do not match.") - - assert(equivVector(summary.numNonZeros, Vectors.dense(2.0, 2.0, 3.0)), - "Dense column nnz do not match.") - - assert(equivVector(summary.max, Vectors.dense(4.0, 8.0, 9.0)), - "Dense column max do not match.") - - assert(equivVector(summary.min, Vectors.dense(0.0, 0.0, 3.0)), - "Dense column min do not match.") - } - - test("sparse statistical summary") { - val dataForSparse = sc.parallelize(sparseData.toSeq, 2) - val summary = dataForSparse.computeSummaryStatistics() - - assert(equivVector(summary.mean, Vectors.dense(0.06, 0.05, 0.0)), - "Sparse column mean do not match.") - - assert(equivVector(summary.variance, Vectors.dense(0.258989898989899, 0.25, 0.0)), - "Sparse column variance do not match.") - - assert(summary.count === 100, "Sparse column cnt do not match.") - - assert(equivVector(summary.numNonZeros, Vectors.dense(2.0, 1.0, 0.0)), - "Sparse column nnz do not match.") - - assert(equivVector(summary.max, Vectors.dense(5.0, 5.0, 0.0)), - "Sparse column max do not match.") - - assert(equivVector(summary.min, Vectors.dense(0.0, 0.0, 0.0)), - "Sparse column min do not match.") - } -} - -object VectorRDDFunctionsSuite { - def equivVector(lhs: Vector, rhs: Vector): Boolean = { - (lhs.toBreeze - rhs.toBreeze).norm(2) < 1e-5 - } -}