diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala index fcb5a5f18b127..ee4f940b1a069 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala @@ -53,35 +53,46 @@ private class VectorRDDStatisticsAggregator( val currMin: BDV[Double]) extends VectorRDDStatisticalSummary with Serializable { // lazy val is used for computing only once time. Same below. - override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt) + override def mean = { + val realMean = BDV.zeros[Double](currMean.length) + var i = 0 + while (i < currMean.length) { + realMean(i) = currMean(i) * nnz(i) / totalCnt + i += 1 + } + Vectors.fromBreeze(realMean) + } - override lazy val variance = { + override def variance = { + val realVariance = BDV.zeros[Double](currM2n.length) val deltaMean = currMean var i = 0 while (i < currM2n.size) { - currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt - currM2n(i) /= totalCnt + realVariance(i) = currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt + realVariance(i) /= totalCnt i += 1 } - Vectors.fromBreeze(currM2n) + Vectors.fromBreeze(realVariance) } - override lazy val count: Long = totalCnt.toLong + override def count: Long = totalCnt.toLong - override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz) + override def numNonZeros: Vector = Vectors.fromBreeze(nnz) - override lazy val max: Vector = { - nnz.iterator.foreach { - case (id, count) => - if ((count < totalCnt) && (currMax(id) < 0.0)) currMax(id) = 0.0 + override def max: Vector = { + var i = 0 + while (i < nnz.length) { + if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0 + i += 1 } Vectors.fromBreeze(currMax) } - override lazy val min: Vector = { - nnz.iterator.foreach { - case (id, count) => - if ((count < totalCnt) && (currMin(id) > 0.0)) currMin(id) = 0.0 + override def min: Vector = { + var i = 0 + while (i < nnz.length) { + if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0 + i += 1 } Vectors.fromBreeze(currMin) } @@ -117,15 +128,16 @@ private class VectorRDDStatisticsAggregator( val deltaMean = currMean - other.currMean - other.currMean.activeIterator.foreach { - case (id, 0.0) => - case (id, value) => - currMean(id) = - (currMean(id) * nnz(id) + other.currMean(id) * other.nnz(id)) / (nnz(id) + other.nnz(id)) + var i = 0 + while (i < other.currMean.length) { + if (other.currMean(i) != 0.0) + currMean(i) = (currMean(i) * nnz(i) + other.currMean(i) * other.nnz(i)) / + (nnz(i) + other.nnz(i)) + i += 1 } - var i = 0 - while(i < currM2n.size) { + i = 0 + while (i < currM2n.size) { (nnz(i), other.nnz(i)) match { case (0.0, 0.0) => case _ => currM2n(i) += @@ -134,14 +146,16 @@ private class VectorRDDStatisticsAggregator( i += 1 } - other.currMax.activeIterator.foreach { - case (id, value) => - if (currMax(id) < value) currMax(id) = value + i = 0 + while (i < other.currMax.length) { + if (currMax(i) < other.currMax(i)) currMax(i) = other.currMax(i) + i += 1 } - other.currMin.activeIterator.foreach { - case (id, value) => - if (currMin(id) > value) currMin(id) = value + i = 0 + while (i < other.currMin.length) { + if (currMin(i) > other.currMin(i)) currMin(i) = other.currMin(i) + i += 1 } nnz += other.nnz