Skip to content

Commit

Permalink
remove useless APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
yinxusen committed Apr 10, 2014
1 parent c4651bb commit d816ac7
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*/
package org.apache.spark.mllib.rdd

import breeze.linalg.{Vector => BV, DenseVector => BDV}
import breeze.linalg.{Vector => BV}

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.MLUtils._
import org.apache.spark.rdd.RDD

/**
Expand All @@ -29,110 +28,6 @@ import org.apache.spark.rdd.RDD
*/
class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {

/**
* Compute the mean of each column in the RDD.
*/
def colMeans(): Vector = colMeans(self.take(1).head.size)

/**
* Compute the mean of each column in the RDD with `size` as the dimension of each `Vector`.
*/
def colMeans(size: Int): Vector = {
Vectors.fromBreeze(self.map(_.toBreeze).aggregate((BV.zeros[Double](size), 0.0))(
seqOp = (c, v) => (c, v) match {
case ((prev, cnt), current) =>
(((prev :* cnt) + current) :/ (cnt + 1.0), cnt + 1.0)
},
combOp = (lhs, rhs) => (lhs, rhs) match {
case ((lhsVec, lhsCnt), (rhsVec, rhsCnt)) =>
((lhsVec :* lhsCnt) + (rhsVec :* rhsCnt) :/ (lhsCnt + rhsCnt), lhsCnt + rhsCnt)
}
)._1)
}

/**
* Compute the norm-2 of each column in the RDD.
*/
def colNorm2(): Vector = colNorm2(self.take(1).head.size)

/**
* Compute the norm-2 of each column in the RDD with `size` as the dimension of each `Vector`.
*/
def colNorm2(size: Int): Vector = Vectors.fromBreeze(self.map(_.toBreeze)
.aggregate(BV.zeros[Double](size))(
seqOp = (c, v) => c + (v :* v),
combOp = (lhs, rhs) => lhs + rhs
).map(math.sqrt)
)

/**
* Compute the standard deviation of each column in the RDD.
*/
def colSDs(): Vector = colSDs(self.take(1).head.size)

/**
* Compute the standard deviation of each column in the RDD with `size` as the dimension of each
* `Vector`.
*/
def colSDs(size: Int): Vector = {
val means = self.colMeans()
Vectors.fromBreeze(self.map(x => x.toBreeze - means.toBreeze)
.aggregate((BV.zeros[Double](size), 0.0))(
seqOp = (c, v) => (c, v) match {
case ((prev, cnt), current) =>
(((prev :* cnt) + (current :* current)) :/ (cnt + 1.0), cnt + 1.0)
},
combOp = (lhs, rhs) => (lhs, rhs) match {
case ((lhsVec, lhsCnt), (rhsVec, rhsCnt)) =>
((lhsVec :* lhsCnt) + (rhsVec :* rhsCnt) :/ (lhsCnt + rhsCnt), lhsCnt + rhsCnt)
}
)._1.map(math.sqrt)
)
}

/**
* Find the optional max or min vector in the RDD.
*/
private def maxMinOption(cmp: (Vector, Vector) => Boolean): Option[Vector] = {
def cmpMaxMin(x1: Vector, x2: Vector) = if (cmp(x1, x2)) x1 else x2
self.mapPartitions { iterator =>
Seq(iterator.reduceOption(cmpMaxMin)).iterator
}.collect { case Some(x) => x }.collect().reduceOption(cmpMaxMin)
}

/**
* Find the optional max vector in the RDD, `None` will be returned if there is no elements at
* all.
*/
def maxOption(cmp: (Vector, Vector) => Boolean) = maxMinOption(cmp)

/**
* Find the optional min vector in the RDD, `None` will be returned if there is no elements at
* all.
*/
def minOption(cmp: (Vector, Vector) => Boolean) = maxMinOption(!cmp(_, _))

/**
* Filter each column of the RDD whose standard deviation is not zero.
*/
def colShrink(): RDD[Vector] = {
val sds = self.colSDs()
self.take(1).head.toBreeze.isInstanceOf[BDV[Double]] match {
case true =>
self.map{ v =>
Vectors.dense(v.toArray.zip(sds.toArray).filter{case (x, m) => m != 0.0}.map(_._1))
}
case false =>
self.map { v =>
val filtered = v.toArray.zip(sds.toArray).filter{case (x, m) => m != 0.0}.map(_._1)
val denseVector = Vectors.dense(filtered).toBreeze
val size = denseVector.size
val iterElement = denseVector.activeIterator.toSeq
Vectors.sparse(size, iterElement)
}
}
}

/**
* Compute full column-wise statistics for the RDD, including
* {{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,75 +31,11 @@ class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
Vectors.dense(7.0, 8.0, 9.0)
)

val colMeans = Array(4.0, 5.0, 6.0)
val colNorm2 = Array(math.sqrt(66.0), math.sqrt(93.0), math.sqrt(126.0))
val colSDs = Array(math.sqrt(6.0), math.sqrt(6.0), math.sqrt(6.0))
val colVar = Array(6.0, 6.0, 6.0)

val maxVec = Array(7.0, 8.0, 9.0)
val minVec = Array(1.0, 2.0, 3.0)

val shrinkingData = Array(
Vectors.dense(1.0, 2.0, 0.0),
Vectors.dense(0.0, 0.0, 0.0),
Vectors.dense(7.0, 8.0, 0.0)
)

val colShrinkData = Array(
Vectors.dense(1.0, 2.0),
Vectors.dense(0.0, 0.0),
Vectors.dense(7.0, 8.0)
)

test("colMeans") {
val data = sc.parallelize(localData, 2)
assert(equivVector(data.colMeans(), Vectors.dense(colMeans)),
"Column means do not match.")
}

test("colNorm2") {
val data = sc.parallelize(localData, 2)
assert(equivVector(data.colNorm2(), Vectors.dense(colNorm2)),
"Column norm2s do not match.")
}

test("colSDs") {
val data = sc.parallelize(localData, 2)
assert(equivVector(data.colSDs(), Vectors.dense(colSDs)),
"Column SDs do not match.")
}

test("maxOption") {
val data = sc.parallelize(localData, 2)
assert(equivVectorOption(
data.maxOption((lhs: Vector, rhs: Vector) => lhs.toBreeze.norm(2) >= rhs.toBreeze.norm(2)),
Some(Vectors.dense(maxVec))),
"Optional maximum does not match."
)
}

test("minOption") {
val data = sc.parallelize(localData, 2)
assert(equivVectorOption(
data.minOption((lhs: Vector, rhs: Vector) => lhs.toBreeze.norm(2) >= rhs.toBreeze.norm(2)),
Some(Vectors.dense(minVec))),
"Optional minimum does not match."
)
}

test("columnShrink") {
val data = sc.parallelize(shrinkingData, 2)
val res = data.colShrink().collect()
colShrinkData.zip(res).foreach { case (lhs, rhs) =>
assert(equivVector(lhs, rhs), "Column shrink error.")
}
}

test("full-statistics") {
val data = sc.parallelize(localData, 2)
val (mean, sd, cnt, nnz, max, min) = data.statistics(3)
assert(equivVector(mean, Vectors.dense(colMeans)), "Column means do not match.")
assert(equivVector(sd, Vectors.dense(colVar)), "Column SD do not match.")
val (mean, variance, cnt, nnz, max, min) = data.statistics(3)
assert(equivVector(mean, Vectors.dense(4.0, 5.0, 6.0)), "Column mean do not match.")
assert(equivVector(variance, Vectors.dense(6.0, 6.0, 6.0)), "Column variance do not match.")
assert(cnt === 3, "Column cnt do not match.")
assert(equivVector(nnz, Vectors.dense(3.0, 3.0, 3.0)), "Column nnz do not match.")
assert(equivVector(max, Vectors.dense(7.0, 8.0, 9.0)), "Column max do not match.")
Expand All @@ -111,13 +47,5 @@ object VectorRDDFunctionsSuite {
def equivVector(lhs: Vector, rhs: Vector): Boolean = {
(lhs.toBreeze - rhs.toBreeze).norm(2) < 1e-9
}

def equivVectorOption(lhs: Option[Vector], rhs: Option[Vector]): Boolean = {
(lhs, rhs) match {
case (Some(a), Some(b)) => (a.toBreeze - a.toBreeze).norm(2) < 1e-9
case (None, None) => true
case _ => false
}
}
}

0 comments on commit d816ac7

Please sign in to comment.