diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala index 3ddc507a2e601..57f4eec312cb7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala @@ -49,7 +49,7 @@ private class Aggregator( val deltaMean = currMean var i = 0 while(i < currM2n.size) { - currM2n(i) -= deltaMean(i) * deltaMean(i) * nnz(i) * (nnz(i)-totalCnt) / totalCnt + currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt currM2n(i) /= totalCnt i += 1 } @@ -61,7 +61,7 @@ private class Aggregator( override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz) override lazy val max: Vector = { - nnz.activeIterator.foreach { + nnz.iterator.foreach { case (id, count) => if ((count == 0.0) || ((count < totalCnt) && (currMax(id) < 0.0))) currMax(id) = 0.0 } @@ -69,7 +69,7 @@ private class Aggregator( } override lazy val min: Vector = { - nnz.activeIterator.foreach { + nnz.iterator.foreach { case (id, count) => if ((count == 0.0) || ((count < totalCnt) && (currMin(id) > 0.0))) currMin(id) = 0.0 } @@ -88,7 +88,7 @@ private class Aggregator( if (currMin(id) > value) currMin(id) = value val tmpPrevMean = currMean(id) - currMean(id) = (currMean(id) * totalCnt + value) / (totalCnt + 1.0) + currMean(id) = (currMean(id) * nnz(id) + value) / (nnz(id) + 1.0) currM2n(id) += (value - currMean(id)) * (value - tmpPrevMean) nnz(id) += 1.0 @@ -114,11 +114,14 @@ private class Aggregator( (currMean(id) * nnz(id) + other.currMean(id) * other.nnz(id)) / (nnz(id) + other.nnz(id)) } - other.currM2n.activeIterator.foreach { - case (id, 0.0) => - case (id, value) => - currM2n(id) += - value + deltaMean(id) * deltaMean(id) * nnz(id) * other.nnz(id) / (nnz(id)+other.nnz(id)) + var i = 0 + while(i < currM2n.size) { + (nnz(i), other.nnz(i)) match { + case (0.0, 0.0) => + case _ => currM2n(i) += + other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / (nnz(i)+other.nnz(i)) + } + i += 1 } other.currMax.activeIterator.foreach { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala index 5eb9d8e2c3da8..b621bf79b6e8b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala @@ -38,54 +38,59 @@ class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext { Vectors.dense(7.0, 8.0, 9.0) ) - val sparseData = ArrayBuffer(Vectors.sparse(20, Seq((0, 1.0), (9, 2.0), (10, 7.0)))) - for (i <- 0 until 100) sparseData += Vectors.sparse(20, Seq((9, 0.0))) - sparseData += Vectors.sparse(20, Seq((0, 5.0), (9, 13.0), (16, 2.0))) - sparseData += Vectors.sparse(20, Seq((3, 5.0), (9, 13.0), (18, 2.0))) + val sparseData = ArrayBuffer(Vectors.sparse(3, Seq((0, 1.0)))) + for (i <- 0 until 97) sparseData += Vectors.sparse(3, Seq((2, 0.0))) + sparseData += Vectors.sparse(3, Seq((0, 5.0))) + sparseData += Vectors.sparse(3, Seq((1, 5.0))) - test("full-statistics") { + test("dense statistical summary") { val data = sc.parallelize(localData, 2) - val (summary, denseTime) = - time(data.summarizeStatistics()) + val summary = data.summarizeStatistics() assert(equivVector(summary.mean, Vectors.dense(4.0, 5.0, 6.0)), - "Column mean do not match.") + "Dense column mean do not match.") assert(equivVector(summary.variance, Vectors.dense(6.0, 6.0, 6.0)), - "Column variance do not match.") + "Dense column variance do not match.") - assert(summary.totalCount === 3, "Column cnt do not match.") + assert(summary.totalCount === 3, "Dense column cnt do not match.") assert(equivVector(summary.numNonZeros, Vectors.dense(3.0, 3.0, 3.0)), - "Column nnz do not match.") + "Dense column nnz do not match.") assert(equivVector(summary.max, Vectors.dense(7.0, 8.0, 9.0)), - "Column max do not match.") + "Dense column max do not match.") assert(equivVector(summary.min, Vectors.dense(1.0, 2.0, 3.0)), - "Column min do not match.") + "Dense column min do not match.") + } + test("sparse statistical summary") { val dataForSparse = sc.parallelize(sparseData.toSeq, 2) - val (_, sparseTime) = time(dataForSparse.summarizeStatistics()) + val summary = dataForSparse.summarizeStatistics() + + assert(equivVector(summary.mean, Vectors.dense(0.06, 0.05, 0.0)), + "Sparse column mean do not match.") + + assert(equivVector(summary.variance, Vectors.dense(0.2564, 0.2475, 0.0)), + "Sparse column variance do not match.") + + assert(summary.totalCount === 100, "Sparse column cnt do not match.") + + assert(equivVector(summary.numNonZeros, Vectors.dense(2.0, 1.0, 0.0)), + "Sparse column nnz do not match.") - println(s"dense time is $denseTime, sparse time is $sparseTime.") + assert(equivVector(summary.max, Vectors.dense(5.0, 5.0, 0.0)), + "Sparse column max do not match.") + + assert(equivVector(summary.min, Vectors.dense(0.0, 0.0, 0.0)), + "Sparse column min do not match.") } } object VectorRDDFunctionsSuite { - def time[R](block: => R): (R, Double) = { - val t0 = System.nanoTime() - val result = block - val t1 = System.nanoTime() - (result, (t1 - t0).toDouble / 1.0e9) - } def equivVector(lhs: Vector, rhs: Vector): Boolean = { (lhs.toBreeze - rhs.toBreeze).norm(2) < 1e-9 } - - def relativeTime(lhs: Double, rhs: Double): Boolean = { - val denominator = math.max(lhs, rhs) - math.abs(lhs - rhs) / denominator < 0.3 - } }