Skip to content

Commit

Permalink
separate seqop and combop out as independent functions
Browse files Browse the repository at this point in the history
  • Loading branch information
yinxusen committed Apr 10, 2014
1 parent a6d5a2e commit 4e4fbd1
Showing 1 changed file with 62 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ case class VectorRDDStatisticalSummary(
min: Vector,
nonZeroCnt: Vector) extends Serializable

private case class VectorRDDStatisticalRing(
fakeMean: BV[Double],
fakeM2n: BV[Double],
totalCnt: Double,
nnz: BV[Double],
max: BV[Double],
min: BV[Double])

/**
* Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
* implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
Expand All @@ -49,57 +57,71 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
* }}},
* with the size of Vector as input parameter.
*/

private def seqOp(aggregator: VectorRDDStatisticalRing, currData: BV[Double]): VectorRDDStatisticalRing = {
aggregator match {
case VectorRDDStatisticalRing(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
currData.activeIterator.foreach {
case (id, value) =>
if (maxVec(id) < value) maxVec(id) = value
if (minVec(id) > value) minVec(id) = value

val tmpPrevMean = prevMean(id)
prevMean(id) = (prevMean(id) * cnt + value) / (cnt + 1.0)
prevM2n(id) += (value - prevMean(id)) * (value - tmpPrevMean)

nnzVec(id) += 1.0
}

VectorRDDStatisticalRing(prevMean,
prevM2n,
cnt + 1.0,
nnzVec,
maxVec,
minVec)
}
}

private def combOp(statistics1: VectorRDDStatisticalRing, statistics2: VectorRDDStatisticalRing): VectorRDDStatisticalRing = {
(statistics1, statistics2) match {
case (VectorRDDStatisticalRing(mean1, m2n1, cnt1, nnz1, max1, min1),
VectorRDDStatisticalRing(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
val totalCnt = cnt1 + cnt2
val deltaMean = mean2 - mean1
val totalMean = ((mean1 :* nnz1) + (mean2 :* nnz2)) :/ (nnz1 + nnz2)
val totalM2n = m2n1 + m2n2 + ((deltaMean :* deltaMean) :* (nnz1 :* nnz2) :/ (nnz1 + nnz2))
max2.activeIterator.foreach {
case (id, value) =>
if (max1(id) < value) max1(id) = value
}
min2.activeIterator.foreach {
case (id, value) =>
if (min1(id) > value) min1(id) = value
}
VectorRDDStatisticalRing(totalMean, totalM2n, totalCnt, nnz1 + nnz2, max1, min1)
}
}

def summarizeStatistics(size: Int): VectorRDDStatisticalSummary = {
val (fakeMean, fakeM2n, totalCnt, nnz, max, min) = self.map(_.toBreeze).aggregate((
val zeroValue = VectorRDDStatisticalRing(
BV.zeros[Double](size),
BV.zeros[Double](size),
0.0,
BV.zeros[Double](size),
BV.fill(size){Double.MinValue},
BV.fill(size){Double.MaxValue}))(
seqOp = (c, v) => (c, v) match {
case ((prevMean, prevM2n, cnt, nnzVec, maxVec, minVec), currData) =>
currData.activeIterator.map{ case (id, value) =>
val tmpPrevMean = prevMean(id)
prevMean(id) = (prevMean(id) * cnt + value) / (cnt + 1.0)
if (maxVec(id) < value) maxVec(id) = value
if (minVec(id) > value) minVec(id) = value
nnzVec(id) += 1.0
prevM2n(id) += (value - prevMean(id)) * (value - tmpPrevMean)
}

(prevMean,
prevM2n,
cnt + 1.0,
nnzVec,
maxVec,
minVec)
},
combOp = (c, v) => (c, v) match {
case (
(mean1, m2n1, cnt1, nnz1, max1, min1),
(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
val totalCnt = cnt1 + cnt2
val deltaMean = mean2 - mean1
val totalMean = ((mean1 :* nnz1) + (mean2 :* nnz2)) :/ (nnz1 + nnz2)
val totalM2n = m2n1 + m2n2 + ((deltaMean :* deltaMean) :* (nnz1 :* nnz2) :/ (nnz1 + nnz2))
max2.activeIterator.foreach { case (id, value) =>
if (max1(id) < value) max1(id) = value
}
min2.activeIterator.foreach { case (id, value) =>
if (min1(id) > value) min1(id) = value
}
(totalMean, totalM2n, totalCnt, nnz1 + nnz2, max1, min1)
}
)
BV.fill(size)(Double.MinValue),
BV.fill(size)(Double.MaxValue))

val breezeVectors = self.collect().map(_.toBreeze)
val VectorRDDStatisticalRing(fakeMean, fakeM2n, totalCnt, nnz, max, min) = breezeVectors.aggregate(zeroValue)(seqOp, combOp)

// solve real mean
val realMean = fakeMean :* nnz :/ totalCnt
// solve real variance
val deltaMean = fakeMean :- 0.0
val realVar = fakeM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)
max :+= 0.0
min :+= 0.0
// max, min process, in case of a column is all zero.
// max :+= 0.0
// min :+= 0.0

realVar :/= totalCnt

Expand Down

0 comments on commit 4e4fbd1

Please sign in to comment.