Skip to content

Commit

Permalink
add scala doc, refine code and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yinxusen committed Apr 10, 2014
1 parent 036b7a5 commit 4a5c38d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
package org.apache.spark.mllib.rdd

import breeze.linalg.{Vector => BV, axpy}
import breeze.linalg.{axpy, Vector => BV}

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD

/**
* Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
* elements count.
*/
case class VectorRDDStatisticalSummary(
mean: Vector,
variance: Vector,
Expand All @@ -29,6 +33,12 @@ case class VectorRDDStatisticalSummary(
min: Vector,
nonZeroCnt: Vector) extends Serializable

/**
* Case class of the aggregate value for collecting summary statistics from RDD[Vector]. These
* values are relatively with
* [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]], the
* latter is computed from the former.
*/
private case class VectorRDDStatisticalRing(
fakeMean: BV[Double],
fakeM2n: BV[Double],
Expand All @@ -45,18 +55,8 @@ private case class VectorRDDStatisticalRing(
class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {

/**
* Compute full column-wise statistics for the RDD, including
* {{{
* Mean: Vector,
* Variance: Vector,
* Count: Double,
* Non-zero count: Vector,
* Maximum elements: Vector,
* Minimum elements: Vector.
* }}},
* with the size of Vector as input parameter.
* Aggregate function used for aggregating elements in a worker together.
*/

private def seqOp(
aggregator: VectorRDDStatisticalRing,
currData: BV[Double]): VectorRDDStatisticalRing = {
Expand Down Expand Up @@ -84,6 +84,9 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
}
}

/**
* Combine function used for combining intermediate results together from every worker.
*/
private def combOp(
statistics1: VectorRDDStatisticalRing,
statistics2: VectorRDDStatisticalRing): VectorRDDStatisticalRing = {
Expand All @@ -92,27 +95,38 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
VectorRDDStatisticalRing(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
val totalCnt = cnt1 + cnt2
val deltaMean = mean2 - mean1

mean2.activeIterator.foreach {
case (id, 0.0) =>
case (id, value) => mean1(id) = (mean1(id) * nnz1(id) + mean2(id) * nnz2(id)) / (nnz1(id) + nnz2(id))
case (id, value) =>
mean1(id) = (mean1(id) * nnz1(id) + mean2(id) * nnz2(id)) / (nnz1(id) + nnz2(id))
}

m2n2.activeIterator.foreach {
case (id, 0.0) =>
case (id, value) => m2n1(id) += value + deltaMean(id) * deltaMean(id) * nnz1(id) * nnz2(id) / (nnz1(id)+nnz2(id))
case (id, value) =>
m2n1(id) +=
value + deltaMean(id) * deltaMean(id) * nnz1(id) * nnz2(id) / (nnz1(id)+nnz2(id))
}

max2.activeIterator.foreach {
case (id, value) =>
if (max1(id) < value) max1(id) = value
}

min2.activeIterator.foreach {
case (id, value) =>
if (min1(id) > value) min1(id) = value
}

axpy(1.0, nnz2, nnz1)
VectorRDDStatisticalRing(mean1, m2n1, totalCnt, nnz1, max1, min1)
}
}

/**
* Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
*/
def summarizeStatistics(size: Int): VectorRDDStatisticalSummary = {
val zeroValue = VectorRDDStatisticalRing(
BV.zeros[Double](size),
Expand All @@ -122,16 +136,17 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
BV.fill(size)(Double.MinValue),
BV.fill(size)(Double.MaxValue))

val breezeVectors = self.map(_.toBreeze)
val VectorRDDStatisticalRing(fakeMean, fakeM2n, totalCnt, nnz, fakeMax, fakeMin) =
breezeVectors.aggregate(zeroValue)(seqOp, combOp)
self.map(_.toBreeze).aggregate(zeroValue)(seqOp, combOp)

// solve real mean
val realMean = fakeMean :* nnz :/ totalCnt
// solve real variance
val deltaMean = fakeMean :- 0.0
val realVar = fakeM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)
// max, min

// solve real m2n
val deltaMean = fakeMean
val realM2n = fakeM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)

// remove the initial value in max and min, i.e. the Double.MaxValue or Double.MinValue.
val max = Vectors.sparse(size, fakeMax.activeIterator.map { case (id, value) =>
if ((value == Double.MinValue) && (realMean(id) != Double.MinValue)) (id, 0.0)
else (id, value)
Expand All @@ -142,11 +157,11 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
}.toSeq)

// get variance
realVar :/= totalCnt
realM2n :/= totalCnt

VectorRDDStatisticalSummary(
Vectors.fromBreeze(realMean),
Vectors.fromBreeze(realVar),
Vectors.fromBreeze(realM2n),
totalCnt.toLong,
Vectors.fromBreeze(nnz),
max,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@

package org.apache.spark.mllib.rdd

import scala.collection.mutable.ArrayBuffer

import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.mllib.util.MLUtils._
import scala.collection.mutable.ArrayBuffer

/**
* Test suite for the summary statistics of RDD[Vector]. Both the accuracy and the time consuming
* between dense and sparse vector are tested.
*/
class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
import VectorRDDFunctionsSuite._

Expand All @@ -33,13 +39,15 @@ class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
)

val sparseData = ArrayBuffer(Vectors.sparse(20, Seq((0, 1.0), (9, 2.0), (10, 7.0))))
for (i <- 0 to 10000) sparseData += Vectors.sparse(20, Seq((9, 0.0)))
for (i <- 0 until 10000) sparseData += Vectors.sparse(20, Seq((9, 0.0)))
sparseData += Vectors.sparse(20, Seq((0, 5.0), (9, 13.0), (16, 2.0)))
sparseData += Vectors.sparse(20, Seq((3, 5.0), (9, 13.0), (18, 2.0)))

test("full-statistics") {
val data = sc.parallelize(localData, 2)
val (VectorRDDStatisticalSummary(mean, variance, cnt, nnz, max, min), denseTime) = time(data.summarizeStatistics(3))
val (VectorRDDStatisticalSummary(mean, variance, cnt, nnz, max, min), denseTime) =
time(data.summarizeStatistics(3))

assert(equivVector(mean, Vectors.dense(4.0, 5.0, 6.0)), "Column mean do not match.")
assert(equivVector(variance, Vectors.dense(6.0, 6.0, 6.0)), "Column variance do not match.")
assert(cnt === 3, "Column cnt do not match.")
Expand All @@ -48,21 +56,12 @@ class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
assert(equivVector(min, Vectors.dense(1.0, 2.0, 3.0)), "Column min do not match.")

val dataForSparse = sc.parallelize(sparseData.toSeq, 2)
val (VectorRDDStatisticalSummary(sparseMean, sparseVariance, sparseCnt, sparseNnz, sparseMax, sparseMin), sparseTime) = time(dataForSparse.summarizeStatistics(20))
/*
assert(equivVector(sparseMean, Vectors.dense(4.0, 5.0, 6.0)), "Column mean do not match.")
assert(equivVector(sparseVariance, Vectors.dense(6.0, 6.0, 6.0)), "Column variance do not match.")
assert(sparseCnt === 3, "Column cnt do not match.")
assert(equivVector(sparseNnz, Vectors.dense(3.0, 3.0, 3.0)), "Column nnz do not match.")
assert(equivVector(sparseMax, Vectors.dense(7.0, 8.0, 9.0)), "Column max do not match.")
assert(equivVector(sparseMin, Vectors.dense(1.0, 2.0, 3.0)), "Column min do not match.")
*/


val (_, sparseTime) = time(dataForSparse.summarizeStatistics(20))

println(s"dense time is $denseTime, sparse time is $sparseTime.")
assert(relativeTime(denseTime, sparseTime),
"Relative time between dense and sparse vector doesn't match.")
}

}

object VectorRDDFunctionsSuite {
Expand All @@ -76,5 +75,10 @@ object VectorRDDFunctionsSuite {
def equivVector(lhs: Vector, rhs: Vector): Boolean = {
(lhs.toBreeze - rhs.toBreeze).norm(2) < 1e-9
}

def relativeTime(lhs: Double, rhs: Double): Boolean = {
val denominator = math.max(lhs, rhs)
math.abs(lhs - rhs) / denominator < 0.3
}
}

0 comments on commit 4a5c38d

Please sign in to comment.