diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala index 6ac0dd5f9b634..9ec7712142b1f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.mllib.rdd -import breeze.linalg.{Vector => BV, *} +import breeze.linalg.{Vector => BV, DenseVector => BDV} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.MLUtils._ @@ -28,14 +28,23 @@ import org.apache.spark.rdd.RDD */ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { + /** + * Compute the mean of each `Vector` in the RDD. + */ def rowMeans(): RDD[Double] = { self.map(x => x.toArray.sum / x.size) } + /** + * Compute the norm-2 of each `Vector` in the RDD. + */ def rowNorm2(): RDD[Double] = { self.map(x => math.sqrt(x.toArray.map(x => x*x).sum)) } + /** + * Compute the standard deviation of each `Vector` in the RDD. + */ def rowSDs(): RDD[Double] = { val means = self.rowMeans() self.zip(means) @@ -43,8 +52,14 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { .map{ x => math.sqrt(x.toArray.map(x => x*x).sum / x.size) } } + /** + * Compute the mean of each column in the RDD. + */ def colMeans(): Vector = colMeans(self.take(1).head.size) + /** + * Compute the mean of each column in the RDD with `size` as the dimension of each `Vector`. + */ def colMeans(size: Int): Vector = { Vectors.fromBreeze(self.map(_.toBreeze).aggregate((BV.zeros[Double](size), 0.0))( seqOp = (c, v) => (c, v) match { @@ -58,15 +73,27 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { )._1) } + /** + * Compute the norm-2 of each column in the RDD. + */ def colNorm2(): Vector = colNorm2(self.take(1).head.size) + /** + * Compute the norm-2 of each column in the RDD with `size` as the dimension of each `Vector`. + */ def colNorm2(size: Int): Vector = Vectors.fromBreeze(self.map(_.toBreeze).aggregate(BV.zeros[Double](size))( seqOp = (c, v) => c + (v :* v), combOp = (lhs, rhs) => lhs + rhs ).map(math.sqrt)) + /** + * Compute the standard deviation of each column in the RDD. + */ def colSDs(): Vector = colSDs(self.take(1).head.size) + /** + * Compute the standard deviation of each column in the RDD with `size` as the dimension of each `Vector`. + */ def colSDs(size: Int): Vector = { val means = self.colMeans() Vectors.fromBreeze(self.map(x => x.toBreeze - means.toBreeze).aggregate((BV.zeros[Double](size), 0.0))( @@ -81,6 +108,9 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { )._1.map(math.sqrt)) } + /** + * Find the optional max or min vector in the RDD. + */ private def maxMinOption(cmp: (Vector, Vector) => Boolean): Option[Vector] = { def cmpMaxMin(x1: Vector, x2: Vector) = if (cmp(x1, x2)) x1 else x2 self.mapPartitions { iterator => @@ -88,14 +118,39 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { }.collect { case Some(x) => x }.collect().reduceOption(cmpMaxMin) } + /** + * Find the optional max vector in the RDD, `None` will be returned if there is no elements at all. + */ def maxOption(cmp: (Vector, Vector) => Boolean) = maxMinOption(cmp) + /** + * Find the optional min vector in the RDD, `None` will be returned if there is no elements at all. + */ def minOption(cmp: (Vector, Vector) => Boolean) = maxMinOption(!cmp(_, _)) - def rowShrink(): RDD[Vector] = self.filter(x => x.toArray.sum != 0) + /** + * Filter the vectors whose standard deviation is not zero. + */ + def rowShrink(): RDD[Vector] = self.zip(self.rowSDs()).filter(_._2 != 0.0).map(_._1) + /** + * Filter each column of the RDD whose standard deviation is not zero. + */ def colShrink(): RDD[Vector] = { - val means = self.colMeans() - self.map( v => Vectors.dense(v.toArray.zip(means.toArray).filter{ case (x, m) => m != 0.0 }.map(_._1))) + val sds = self.colSDs() + self.take(1).head.toBreeze.isInstanceOf[BDV[Double]] match { + case true => + self.map{ v => + Vectors.dense(v.toArray.zip(sds.toArray).filter{case (x, m) => m != 0.0}.map(_._1)) + } + case false => + self.map { v => + val filtered = v.toArray.zip(sds.toArray).filter{case (x, m) => m != 0.0}.map(_._1) + val denseVector = Vectors.dense(filtered).toBreeze + val size = denseVector.size + val iterElement = denseVector.activeIterator.toSeq + Vectors.sparse(size, iterElement) + } + } } }