From d2a600d5c0ab8a068cb23bdd422645d8b1a39f0b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 13 Mar 2014 01:47:45 -0700 Subject: [PATCH 01/19] add sliding to rdd --- .../main/scala/org/apache/spark/rdd/RDD.scala | 16 +++ .../org/apache/spark/rdd/SlidedRDD.scala | 100 ++++++++++++++++++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 14 +++ 3 files changed, 130 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4afa7523dd802..9c69843008754 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -950,6 +950,22 @@ abstract class RDD[T: ClassTag]( */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) + /** + * Returns a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding + * window over them. The ordering is first based on the partition index and then the ordering of + * items within each partition. This is similar to sliding in Scala collections, except that it + * becomes an empty RDD if the window size is greater than the total number of items. It needs to + * trigger a Spark job if the parent RDD has more than one partitions and the window size is + * greater than 1. + */ + def sliding(windowSize: Int): RDD[Array[T]] = { + if (windowSize == 1) { + this.map(Array(_)) + } else { + new SlidedRDD[T](this, windowSize) + } + } + /** * Save this RDD as a text file, using string representations of elements. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala new file mode 100644 index 0000000000000..e89f4cc0936de --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.spark.{TaskContext, Partition} + +private[spark] +class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T]) + extends Partition with Serializable { + override val index: Int = idx +} + +/** + * Represents a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding + * window over them. The ordering is first based on the partition index and then the ordering of + * items within each partition. This is similar to sliding in Scala collections, except that it + * becomes an empty RDD if the window size is greater than the total number of items. It needs to + * trigger a Spark job if the parent RDD has more than one partitions. + * + * @param parent the parent RDD + * @param windowSize the window size, must be greater than 1 + * + * @see [[org.apache.spark.rdd.RDD#sliding]] + */ +private[spark] +class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) + extends RDD[Array[T]](parent) { + + require(windowSize > 1, "Window size must be greater than 1.") + + override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = { + val part = split.asInstanceOf[SlidedRDDPartition[T]] + (firstParent[T].iterator(part.prev, context) ++ part.tail) + .sliding(windowSize) + .map(_.toArray) + .filter(_.size == windowSize) + } + + override def getPreferredLocations(split: Partition): Seq[String] = + firstParent[T].preferredLocations(split.asInstanceOf[SlidedRDDPartition[T]].prev) + + override def getPartitions: Array[Partition] = { + val parentPartitions = parent.partitions + val n = parentPartitions.size + if (n == 0) { + Array.empty + } else if (n == 1) { + Array(new SlidedRDDPartition[T](0, parentPartitions(0), Array.empty)) + } else { + val n1 = n - 1 + val w1 = windowSize - 1 + // Get the first w1 items of each partition, starting from the second partition. + val nextHeads = + parent.context.runJob(parent, (iter: Iterator[T]) => iter.take(w1).toArray, 1 until n, true) + val partitions = mutable.ArrayBuffer[SlidedRDDPartition[T]]() + var i = 0 + var partitionIndex = 0 + while (i < n1) { + var j = i + val tail = mutable.ArrayBuffer[T]() + // Keep appending to the current tail until appended a head of size w1. + while (j < n1 && nextHeads(j).size < w1) { + tail ++= nextHeads(j) + j += 1 + } + if (j < n1) { + tail ++= nextHeads(j) + j += 1 + } + partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray) + partitionIndex += 1 + // Skip appended heads. + i = j + } + // If the head of last partition has size w1, we also need to add this partition. + if (nextHeads(n1 - 1).size == w1) { + partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty) + } + partitions.toArray + } + } +} diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 60bcada55245b..a5962406b2e1a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -553,4 +553,18 @@ class RDDSuite extends FunSuite with SharedSparkContext { val ids = ranked.map(_._1).distinct().collect() assert(ids.length === n) } + + test("sliding") { + val data = 0 until 6 + for (numPartitions <- 1 to 8) { + val rdd = sc.parallelize(data, numPartitions) + for (windowSize <- 1 to 6) { + val slided = rdd.sliding(windowSize).collect().map(_.toList).toList + val expected = data.sliding(windowSize).map(_.toList).toList + assert(slided === expected) + } + assert(rdd.sliding(7).collect().isEmpty, + "Should return an empty RDD if the window size is greater than the number of items.") + } + } } From 5ee6001471b1897400fef1e35b5e10fbfb47395f Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 13 Mar 2014 11:49:04 -0700 Subject: [PATCH 02/19] add TODO --- core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala index e89f4cc0936de..5d14bcdfdcb66 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala @@ -97,4 +97,6 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) partitions.toArray } } + + // TODO: Override methods such as aggregate, which only requires one Spark job. } From c1c6c2228a446ed42bf4382d4703309865f6dc54 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 13 Mar 2014 13:47:11 -0700 Subject: [PATCH 03/19] add AreaUnderCurve --- .../mllib/evaluation/AreaUnderCurve.scala | 55 +++++++++++++++++++ .../evaluation/AreaUnderCurveSuite.scala | 47 ++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala new file mode 100644 index 0000000000000..8d014c9f38726 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import org.apache.spark.rdd.RDD + +/** + * Computes the area under the curve (AUC) using the trapezoidal rule. + */ +object AreaUnderCurve { + + private def trapezoid(points: Array[(Double, Double)]): Double = { + require(points.length == 2) + (points(1)._1 - points(0)._1) * (points(1)._2 + points(0)._2 ) / 2.0 + } + + /** + * Returns the area under the given curve. + * + * @param curve a RDD of ordered 2D points stored in pairs representing a curve + */ + def of(curve: RDD[(Double, Double)]): Double = { + curve.sliding(2).aggregate(0.0)( + seqOp = (auc: Double, points: Array[(Double, Double)]) => auc + trapezoid(points), + combOp = (_ + _) + ) + } + + /** + * Returns the area under the given curve. + * + * @param curve an iterable of ordered 2D points stored in pairs representing a curve + */ + def of(curve: Iterable[(Double, Double)]): Double = { + curve.sliding(2).map(_.toArray).filter(_.size == 2).aggregate(0.0)( + seqop = (auc: Double, points: Array[(Double, Double)]) => auc + trapezoid(points), + combop = (_ + _) + ) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala new file mode 100644 index 0000000000000..78dd65c1721b6 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.util.LocalSparkContext + +class AreaUnderCurveSuite extends FunSuite with LocalSparkContext { + + test("auc computation") { + val curve = Seq((0.0, 0.0), (1.0, 1.0), (2.0, 3.0), (3.0, 0.0)) + val auc = 4.0 + assert(AreaUnderCurve.of(curve) === auc) + val rddCurve = sc.parallelize(curve, 2) + assert(AreaUnderCurve.of(rddCurve) == auc) + } + + test("auc of an empty curve") { + val curve = Seq.empty[(Double, Double)] + assert(AreaUnderCurve.of(curve) === 0.0) + val rddCurve = sc.parallelize(curve, 2) + assert(AreaUnderCurve.of(rddCurve) === 0.0) + } + + test("auc of a curve with a single point") { + val curve = Seq((1.0, 1.0)) + assert(AreaUnderCurve.of(curve) === 0.0) + val rddCurve = sc.parallelize(curve, 2) + assert(AreaUnderCurve.of(rddCurve) === 0.0) + } +} From 284d991cf8c79a1ef7db79a9caa35a238e02338a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 15 Mar 2014 10:12:41 -0700 Subject: [PATCH 04/19] change SlidedRDD to SlidingRDD --- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../rdd/{SlidedRDD.scala => SlidingRDD.scala} | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) rename core/src/main/scala/org/apache/spark/rdd/{SlidedRDD.scala => SlidingRDD.scala} (81%) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9c69843008754..dfffa50cb2aed 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -962,7 +962,7 @@ abstract class RDD[T: ClassTag]( if (windowSize == 1) { this.map(Array(_)) } else { - new SlidedRDD[T](this, windowSize) + new SlidingRDD[T](this, windowSize) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala similarity index 81% rename from core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala rename to core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala index 5d14bcdfdcb66..ac0016b18298d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala @@ -23,7 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.{TaskContext, Partition} private[spark] -class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T]) +class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T]) extends Partition with Serializable { override val index: Int = idx } @@ -41,13 +41,13 @@ class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T * @see [[org.apache.spark.rdd.RDD#sliding]] */ private[spark] -class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) +class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) extends RDD[Array[T]](parent) { - require(windowSize > 1, "Window size must be greater than 1.") + require(windowSize > 1, s"Window size must be greater than 1, but got $windowSize.") override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = { - val part = split.asInstanceOf[SlidedRDDPartition[T]] + val part = split.asInstanceOf[SlidingRDDPartition[T]] (firstParent[T].iterator(part.prev, context) ++ part.tail) .sliding(windowSize) .map(_.toArray) @@ -55,7 +55,7 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) } override def getPreferredLocations(split: Partition): Seq[String] = - firstParent[T].preferredLocations(split.asInstanceOf[SlidedRDDPartition[T]].prev) + firstParent[T].preferredLocations(split.asInstanceOf[SlidingRDDPartition[T]].prev) override def getPartitions: Array[Partition] = { val parentPartitions = parent.partitions @@ -63,14 +63,14 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) if (n == 0) { Array.empty } else if (n == 1) { - Array(new SlidedRDDPartition[T](0, parentPartitions(0), Array.empty)) + Array(new SlidingRDDPartition[T](0, parentPartitions(0), Array.empty)) } else { val n1 = n - 1 val w1 = windowSize - 1 // Get the first w1 items of each partition, starting from the second partition. val nextHeads = parent.context.runJob(parent, (iter: Iterator[T]) => iter.take(w1).toArray, 1 until n, true) - val partitions = mutable.ArrayBuffer[SlidedRDDPartition[T]]() + val partitions = mutable.ArrayBuffer[SlidingRDDPartition[T]]() var i = 0 var partitionIndex = 0 while (i < n1) { @@ -85,14 +85,14 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) tail ++= nextHeads(j) j += 1 } - partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray) partitionIndex += 1 // Skip appended heads. i = j } // If the head of last partition has size w1, we also need to add this partition. if (nextHeads(n1 - 1).size == w1) { - partitions += new SlidedRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty) } partitions.toArray } From 9916202e0c6bc9d183bc35f3f16302bb7fbbb644 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 15 Mar 2014 10:46:35 -0700 Subject: [PATCH 05/19] change RDD.sliding return type to RDD[Seq[T]] --- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- .../scala/org/apache/spark/rdd/SlidingRDD.scala | 17 ++++++++--------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index dfffa50cb2aed..debe57883ed33 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -958,9 +958,9 @@ abstract class RDD[T: ClassTag]( * trigger a Spark job if the parent RDD has more than one partitions and the window size is * greater than 1. */ - def sliding(windowSize: Int): RDD[Array[T]] = { + def sliding(windowSize: Int): RDD[Seq[T]] = { if (windowSize == 1) { - this.map(Array(_)) + this.map(Seq(_)) } else { new SlidingRDD[T](this, windowSize) } diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala index ac0016b18298d..96e3442b878c1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala @@ -23,7 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.{TaskContext, Partition} private[spark] -class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T]) +class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]) extends Partition with Serializable { override val index: Int = idx } @@ -42,16 +42,15 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[ */ private[spark] class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) - extends RDD[Array[T]](parent) { + extends RDD[Seq[T]](parent) { require(windowSize > 1, s"Window size must be greater than 1, but got $windowSize.") - override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = { + override def compute(split: Partition, context: TaskContext): Iterator[Seq[T]] = { val part = split.asInstanceOf[SlidingRDDPartition[T]] (firstParent[T].iterator(part.prev, context) ++ part.tail) .sliding(windowSize) - .map(_.toArray) - .filter(_.size == windowSize) + .withPartial(false) } override def getPreferredLocations(split: Partition): Seq[String] = @@ -63,7 +62,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int if (n == 0) { Array.empty } else if (n == 1) { - Array(new SlidingRDDPartition[T](0, parentPartitions(0), Array.empty)) + Array(new SlidingRDDPartition[T](0, parentPartitions(0), Seq.empty)) } else { val n1 = n - 1 val w1 = windowSize - 1 @@ -75,7 +74,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int var partitionIndex = 0 while (i < n1) { var j = i - val tail = mutable.ArrayBuffer[T]() + val tail = mutable.ListBuffer[T]() // Keep appending to the current tail until appended a head of size w1. while (j < n1 && nextHeads(j).size < w1) { tail ++= nextHeads(j) @@ -85,14 +84,14 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int tail ++= nextHeads(j) j += 1 } - partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toArray) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toSeq) partitionIndex += 1 // Skip appended heads. i = j } // If the head of last partition has size w1, we also need to add this partition. if (nextHeads(n1 - 1).size == w1) { - partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Array.empty) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Seq.empty) } partitions.toArray } From db6cb30da9ef7ce5ca473f32e709aedb2eeabc34 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 15 Mar 2014 10:59:13 -0700 Subject: [PATCH 06/19] remove unnecessary toSeq --- core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala index 96e3442b878c1..9d1f56732ea41 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala @@ -84,7 +84,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int tail ++= nextHeads(j) j += 1 } - partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toSeq) + partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail) partitionIndex += 1 // Skip appended heads. i = j From cab9a52349a7ffcefeae7660836a6ea1b77d910f Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 15 Mar 2014 11:06:32 -0700 Subject: [PATCH 07/19] use last for the last element --- core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala index 9d1f56732ea41..df87bc5459699 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala @@ -90,7 +90,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int i = j } // If the head of last partition has size w1, we also need to add this partition. - if (nextHeads(n1 - 1).size == w1) { + if (nextHeads.last.size == w1) { partitions += new SlidingRDDPartition[T](partitionIndex, parentPartitions(n1), Seq.empty) } partitions.toArray From a9b250a22e61192fd7c90b936b5eb798d1a5039e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 21 Mar 2014 17:52:44 -0700 Subject: [PATCH 08/19] move sliding to mllib --- .../main/scala/org/apache/spark/rdd/RDD.scala | 16 ------ .../scala/org/apache/spark/rdd/RDDSuite.scala | 14 ----- .../apache/spark/mllib/rdd/RDDFunctions.scala | 53 +++++++++++++++++++ .../apache/spark/mllib}/rdd/SlidingRDD.scala | 13 +++-- .../spark/mllib/rdd/RDDFunctionsSuite.scala | 40 ++++++++++++++ 5 files changed, 101 insertions(+), 35 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala rename {core/src/main/scala/org/apache/spark => mllib/src/main/scala/org/apache/spark/mllib}/rdd/SlidingRDD.scala (92%) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index debe57883ed33..4afa7523dd802 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -950,22 +950,6 @@ abstract class RDD[T: ClassTag]( */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) - /** - * Returns a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding - * window over them. The ordering is first based on the partition index and then the ordering of - * items within each partition. This is similar to sliding in Scala collections, except that it - * becomes an empty RDD if the window size is greater than the total number of items. It needs to - * trigger a Spark job if the parent RDD has more than one partitions and the window size is - * greater than 1. - */ - def sliding(windowSize: Int): RDD[Seq[T]] = { - if (windowSize == 1) { - this.map(Seq(_)) - } else { - new SlidingRDD[T](this, windowSize) - } - } - /** * Save this RDD as a text file, using string representations of elements. */ diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index a5962406b2e1a..60bcada55245b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -553,18 +553,4 @@ class RDDSuite extends FunSuite with SharedSparkContext { val ids = ranked.map(_._1).distinct().collect() assert(ids.length === n) } - - test("sliding") { - val data = 0 until 6 - for (numPartitions <- 1 to 8) { - val rdd = sc.parallelize(data, numPartitions) - for (windowSize <- 1 to 6) { - val slided = rdd.sliding(windowSize).collect().map(_.toList).toList - val expected = data.sliding(windowSize).map(_.toList).toList - assert(slided === expected) - } - assert(rdd.sliding(7).collect().isEmpty, - "Should return an empty RDD if the window size is greater than the number of items.") - } - } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala new file mode 100644 index 0000000000000..873de871fd884 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import scala.reflect.ClassTag + +import org.apache.spark.rdd.RDD + +/** + * Machine learning specific RDD functions. + */ +private[mllib] +class RDDFunctions[T: ClassTag](self: RDD[T]) { + + /** + * Returns a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding + * window over them. The ordering is first based on the partition index and then the ordering of + * items within each partition. This is similar to sliding in Scala collections, except that it + * becomes an empty RDD if the window size is greater than the total number of items. It needs to + * trigger a Spark job if the parent RDD has more than one partitions and the window size is + * greater than 1. + */ + def sliding(windowSize: Int): RDD[Seq[T]] = { + require(windowSize > 0, s"Sliding window size must be positive, but got $windowSize.") + if (windowSize == 1) { + self.map(Seq(_)) + } else { + new SlidingRDD[T](self, windowSize) + } + } +} + +private[mllib] +object RDDFunctions { + + /** Implicit conversion from an RDD to RDDFunctions. */ + implicit def fromRDD[T: ClassTag](rdd: RDD[T]) = new RDDFunctions[T](rdd) +} diff --git a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala similarity index 92% rename from core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala rename to mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala index df87bc5459699..dd80782c0f001 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SlidingRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala @@ -15,14 +15,15 @@ * limitations under the License. */ -package org.apache.spark.rdd +package org.apache.spark.mllib.rdd import scala.collection.mutable import scala.reflect.ClassTag import org.apache.spark.{TaskContext, Partition} +import org.apache.spark.rdd.RDD -private[spark] +private[mllib] class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]) extends Partition with Serializable { override val index: Int = idx @@ -33,14 +34,16 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T] * window over them. The ordering is first based on the partition index and then the ordering of * items within each partition. This is similar to sliding in Scala collections, except that it * becomes an empty RDD if the window size is greater than the total number of items. It needs to - * trigger a Spark job if the parent RDD has more than one partitions. + * trigger a Spark job if the parent RDD has more than one partitions. To make this operation + * efficient, the number of items per partition should be larger than the window size and the + * window size should be small, e.g., 2. * * @param parent the parent RDD * @param windowSize the window size, must be greater than 1 * - * @see [[org.apache.spark.rdd.RDD#sliding]] + * @see [[org.apache.spark.mllib.rdd.RDDFunctions#sliding]] */ -private[spark] +private[mllib] class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int) extends RDD[Seq[T]](parent) { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala new file mode 100644 index 0000000000000..a3245d731a15b --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.rdd.RDDFunctions._ + +class RDDFunctionsSuite extends FunSuite with LocalSparkContext { + + test("sliding") { + val data = 0 until 6 + for (numPartitions <- 1 to 8) { + val rdd = sc.parallelize(data, numPartitions) + for (windowSize <- 1 to 6) { + val slided = rdd.sliding(windowSize).collect().map(_.toList).toList + val expected = data.sliding(windowSize).map(_.toList).toList + assert(slided === expected) + } + assert(rdd.sliding(7).collect().isEmpty, + "Should return an empty RDD if the window size is greater than the number of items.") + } + } +} From 221ebced1b36b0b625ce1bc19316f310a7e9f44c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 31 Mar 2014 15:03:08 -0700 Subject: [PATCH 09/19] add a new test to sliding --- .../mllib/evaluation/AreaUnderCurve.scala | 25 ++++++++++++------- .../evaluation/AreaUnderCurveSuite.scala | 6 ++--- .../spark/mllib/rdd/RDDFunctionsSuite.scala | 13 ++++++++-- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala index 8d014c9f38726..fc878c658e096 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala @@ -18,15 +18,22 @@ package org.apache.spark.mllib.evaluation import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.rdd.RDDFunctions._ /** * Computes the area under the curve (AUC) using the trapezoidal rule. */ object AreaUnderCurve { - private def trapezoid(points: Array[(Double, Double)]): Double = { + /** + * Uses the trapezoidal rule to compute the area under the line connecting the two input points. + * @param points two 2D points stored in Seq + */ + private def trapezoid(points: Seq[(Double, Double)]): Double = { require(points.length == 2) - (points(1)._1 - points(0)._1) * (points(1)._2 + points(0)._2 ) / 2.0 + val x = points.head + val y = points.last + (y._1 - x._1) * (y._2 + x._2) / 2.0 } /** @@ -36,20 +43,20 @@ object AreaUnderCurve { */ def of(curve: RDD[(Double, Double)]): Double = { curve.sliding(2).aggregate(0.0)( - seqOp = (auc: Double, points: Array[(Double, Double)]) => auc + trapezoid(points), - combOp = (_ + _) + seqOp = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points), + combOp = _ + _ ) } /** * Returns the area under the given curve. * - * @param curve an iterable of ordered 2D points stored in pairs representing a curve + * @param curve an iterator over ordered 2D points stored in pairs representing a curve */ - def of(curve: Iterable[(Double, Double)]): Double = { - curve.sliding(2).map(_.toArray).filter(_.size == 2).aggregate(0.0)( - seqop = (auc: Double, points: Array[(Double, Double)]) => auc + trapezoid(points), - combop = (_ + _) + def of(curve: Iterator[(Double, Double)]): Double = { + curve.sliding(2).withPartial(false).aggregate(0.0)( + seqop = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points), + combop = _ + _ ) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala index 78dd65c1721b6..b2b406afc7636 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala @@ -26,21 +26,21 @@ class AreaUnderCurveSuite extends FunSuite with LocalSparkContext { test("auc computation") { val curve = Seq((0.0, 0.0), (1.0, 1.0), (2.0, 3.0), (3.0, 0.0)) val auc = 4.0 - assert(AreaUnderCurve.of(curve) === auc) + assert(AreaUnderCurve.of(curve.toIterator) === auc) val rddCurve = sc.parallelize(curve, 2) assert(AreaUnderCurve.of(rddCurve) == auc) } test("auc of an empty curve") { val curve = Seq.empty[(Double, Double)] - assert(AreaUnderCurve.of(curve) === 0.0) + assert(AreaUnderCurve.of(curve.toIterator) === 0.0) val rddCurve = sc.parallelize(curve, 2) assert(AreaUnderCurve.of(rddCurve) === 0.0) } test("auc of a curve with a single point") { val curve = Seq((1.0, 1.0)) - assert(AreaUnderCurve.of(curve) === 0.0) + assert(AreaUnderCurve.of(curve.toIterator) === 0.0) val rddCurve = sc.parallelize(curve, 2) assert(AreaUnderCurve.of(rddCurve) === 0.0) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala index a3245d731a15b..3f3b10dfff35e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala @@ -29,12 +29,21 @@ class RDDFunctionsSuite extends FunSuite with LocalSparkContext { for (numPartitions <- 1 to 8) { val rdd = sc.parallelize(data, numPartitions) for (windowSize <- 1 to 6) { - val slided = rdd.sliding(windowSize).collect().map(_.toList).toList + val sliding = rdd.sliding(windowSize).collect().map(_.toList).toList val expected = data.sliding(windowSize).map(_.toList).toList - assert(slided === expected) + assert(sliding === expected) } assert(rdd.sliding(7).collect().isEmpty, "Should return an empty RDD if the window size is greater than the number of items.") } } + + test("sliding with empty partitions") { + val data = Seq(Seq(1, 2, 3), Seq.empty[Int], Seq(4), Seq.empty[Int], Seq(5, 6, 7)) + val rdd = sc.parallelize(data, data.length).flatMap(s => s) + assert(rdd.partitions.size === data.length) + val sliding = rdd.sliding(3) + val expected = data.flatMap(x => x).sliding(3).toList + assert(sliding.collect().toList === expected) + } } From aa7e278d589fb342dd505c23b35a789eb1f7ed55 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 31 Mar 2014 15:30:25 -0700 Subject: [PATCH 10/19] add initial version of binary classification evaluator --- .../mllib/evaluation/AreaUnderCurve.scala | 6 +- .../BinaryClassificationEvaluator.scala | 109 ++++++++++++++++++ .../evaluation/AreaUnderCurveSuite.scala | 6 +- .../BinaryClassificationEvaluationSuite.scala | 13 +++ 4 files changed, 128 insertions(+), 6 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluationSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala index fc878c658e096..5fdd8d8cb2480 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala @@ -23,7 +23,7 @@ import org.apache.spark.mllib.rdd.RDDFunctions._ /** * Computes the area under the curve (AUC) using the trapezoidal rule. */ -object AreaUnderCurve { +private[mllib] object AreaUnderCurve { /** * Uses the trapezoidal rule to compute the area under the line connecting the two input points. @@ -53,8 +53,8 @@ object AreaUnderCurve { * * @param curve an iterator over ordered 2D points stored in pairs representing a curve */ - def of(curve: Iterator[(Double, Double)]): Double = { - curve.sliding(2).withPartial(false).aggregate(0.0)( + def of(curve: Iterable[(Double, Double)]): Double = { + curve.toIterator.sliding(2).withPartial(false).aggregate(0.0)( seqop = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points), combop = _ + _ ) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala new file mode 100644 index 0000000000000..784d03b36400d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala @@ -0,0 +1,109 @@ +package org.apache.spark.mllib.evaluation + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ + +class BinaryClassificationEvaluator(scoreAndLabel: RDD[(Double, Double)]) { + +} + +object BinaryClassificationEvaluator { + + def get(rdd: RDD[(Double, Double)]) { + // Create a bin for each distinct score value, count positives and negatives within each bin, + // and then sort by score values in descending order. + val counts = rdd.combineByKey( + createCombiner = (label: Double) => new Counter(0L, 0L) += label, + mergeValue = (c: Counter, label: Double) => c += label, + mergeCombiners = (c1: Counter, c2: Counter) => c1 += c2 + ).sortByKey(ascending = false) + println(counts.collect().toList) + val agg = counts.values.mapPartitions((iter: Iterator[Counter]) => { + val agg = new Counter() + iter.foreach(agg += _) + Iterator(agg) + }, preservesPartitioning = true).collect() + println(agg.toList) + val cum = agg.scanLeft(new Counter())((agg: Counter, c: Counter) => agg + c) + val total = cum.last + println(total) + println(cum.toList) + val cumCountsRdd = counts.mapPartitionsWithIndex((index: Int, iter: Iterator[(Double, Counter)]) => { + val cumCount = cum(index) + iter.map { case (score, c) => + cumCount += c + (score, cumCount.clone()) + } + }, preservesPartitioning = true) + println("cum: " + cumCountsRdd.collect().toList) + val rocAUC = AreaUnderCurve.of(cumCountsRdd.values.map((c: Counter) => { + (1.0 * c.numNegatives / total.numNegatives, + 1.0 * c.numPositives / total.numPositives) + })) + println(rocAUC) + val prAUC = AreaUnderCurve.of(cumCountsRdd.values.map((c: Counter) => { + (1.0 * c.numPositives / total.numPositives, + 1.0 * c.numPositives / (c.numPositives + c.numNegatives)) + })) + println(prAUC) + } + + def get(data: Iterable[(Double, Double)]) { + val counts = data.groupBy(_._1).mapValues { s => + val c = new Counter() + s.foreach(c += _._2) + c + }.toSeq.sortBy(- _._1) + println("counts: " + counts.toList) + val total = new Counter() + val cum = counts.map { s => + total += s._2 + (s._1, total.clone()) + } + println("cum: " + cum.toList) + val roc = cum.map { case (s, c) => + (1.0 * c.numNegatives / total.numNegatives, 1.0 * c.numPositives / total.numPositives) + } + val rocAUC = AreaUnderCurve.of(roc) + println(rocAUC) + val pr = cum.map { case (s, c) => + (1.0 * c.numPositives / total.numPositives, + 1.0 * c.numPositives / (c.numPositives + c.numNegatives)) + } + val prAUC = AreaUnderCurve.of(pr) + println(prAUC) + } +} + +class Counter(var numPositives: Long = 0L, var numNegatives: Long = 0L) extends Serializable { + + def +=(label: Double): Counter = { + // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle + // -1.0 for negative as well. + if (label > 0.5) numPositives += 1L else numNegatives += 1L + this + } + + def +=(other: Counter): Counter = { + numPositives += other.numPositives + numNegatives += other.numNegatives + this + } + + def +(label: Double): Counter = { + this.clone() += label + } + + def +(other: Counter): Counter = { + this.clone() += other + } + + def sum: Long = numPositives + numNegatives + + override def clone(): Counter = { + new Counter(numPositives, numNegatives) + } + + override def toString(): String = s"[$numPositives,$numNegatives]" +} + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala index b2b406afc7636..78dd65c1721b6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala @@ -26,21 +26,21 @@ class AreaUnderCurveSuite extends FunSuite with LocalSparkContext { test("auc computation") { val curve = Seq((0.0, 0.0), (1.0, 1.0), (2.0, 3.0), (3.0, 0.0)) val auc = 4.0 - assert(AreaUnderCurve.of(curve.toIterator) === auc) + assert(AreaUnderCurve.of(curve) === auc) val rddCurve = sc.parallelize(curve, 2) assert(AreaUnderCurve.of(rddCurve) == auc) } test("auc of an empty curve") { val curve = Seq.empty[(Double, Double)] - assert(AreaUnderCurve.of(curve.toIterator) === 0.0) + assert(AreaUnderCurve.of(curve) === 0.0) val rddCurve = sc.parallelize(curve, 2) assert(AreaUnderCurve.of(rddCurve) === 0.0) } test("auc of a curve with a single point") { val curve = Seq((1.0, 1.0)) - assert(AreaUnderCurve.of(curve.toIterator) === 0.0) + assert(AreaUnderCurve.of(curve) === 0.0) val rddCurve = sc.parallelize(curve, 2) assert(AreaUnderCurve.of(rddCurve) === 0.0) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluationSuite.scala new file mode 100644 index 0000000000000..db5cffe280f60 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluationSuite.scala @@ -0,0 +1,13 @@ +package org.apache.spark.mllib.evaluation + +import org.scalatest.FunSuite +import org.apache.spark.mllib.util.LocalSparkContext + +class BinaryClassificationEvaluationSuite extends FunSuite with LocalSparkContext { + test("test") { + val data = Seq((0.0, 0.0), (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0), (0.9, 1.0)) + BinaryClassificationEvaluator.get(data) + val rdd = sc.parallelize(data, 3) + BinaryClassificationEvaluator.get(rdd) + } +} From dda82d5253f448b3e3f37ba712d420fe942efd26 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Apr 2014 15:51:51 -0700 Subject: [PATCH 11/19] add confusion matrix --- .../BinaryClassificationEvaluator.scala | 180 ++++++++++++++---- 1 file changed, 142 insertions(+), 38 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala index 784d03b36400d..27a282f4bce20 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala @@ -1,61 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.evaluation import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -class BinaryClassificationEvaluator(scoreAndLabel: RDD[(Double, Double)]) { - +/** + * Binary confusion matrix. + * + * @param count label counter for labels with scores greater than or equal to the current score + * @param total label counter for all labels + */ +case class BinaryConfusionMatrix( + private val count: LabelCounter, + private val total: LabelCounter) extends Serializable { + + /** number of true positives */ + def tp: Long = count.numPositives + + /** number of false positives */ + def fp: Long = count.numNegatives + + /** number of false negatives */ + def fn: Long = total.numPositives - count.numPositives + + /** number of true negatives */ + def tn: Long = total.numNegatives - count.numNegatives + + /** number of positives */ + def p: Long = total.numPositives + + /** number of negatives */ + def n: Long = total.numNegatives } -object BinaryClassificationEvaluator { +private trait Metric { + def apply(c: BinaryConfusionMatrix): Double +} + +object Precision extends Metric { + override def apply(c: BinaryConfusionMatrix): Double = + c.tp.toDouble / (c.tp + c.fp) +} - def get(rdd: RDD[(Double, Double)]) { +object FalsePositiveRate extends Metric { + override def apply(c: BinaryConfusionMatrix): Double = + c.fp.toDouble / c.n +} + +object Recall extends Metric { + override def apply(c: BinaryConfusionMatrix): Double = + c.tp.toDouble / c.p +} + +case class FMeasure(beta: Double) extends Metric { + private val beta2 = beta * beta + override def apply(c: BinaryConfusionMatrix): Double = { + val precision = Precision(c) + val recall = Recall(c) + (1.0 + beta2) * (precision * recall) / (beta2 * precision + recall) + } +} + +/** + * Evaluator for binary classification. + * + * @param scoreAndlabels an RDD of (score, label) pairs. + */ +class BinaryClassificationEvaluator(scoreAndlabels: RDD[(Double, Double)]) extends Serializable { + + private lazy val (cumCounts: RDD[(Double, LabelCounter)], totalCount: LabelCounter, scoreAndConfusion: RDD[(Double, BinaryConfusionMatrix)]) = { // Create a bin for each distinct score value, count positives and negatives within each bin, // and then sort by score values in descending order. - val counts = rdd.combineByKey( - createCombiner = (label: Double) => new Counter(0L, 0L) += label, - mergeValue = (c: Counter, label: Double) => c += label, - mergeCombiners = (c1: Counter, c2: Counter) => c1 += c2 + val counts = scoreAndlabels.combineByKey( + createCombiner = (label: Double) => new LabelCounter(0L, 0L) += label, + mergeValue = (c: LabelCounter, label: Double) => c += label, + mergeCombiners = (c1: LabelCounter, c2: LabelCounter) => c1 += c2 ).sortByKey(ascending = false) - println(counts.collect().toList) - val agg = counts.values.mapPartitions((iter: Iterator[Counter]) => { - val agg = new Counter() + val agg = counts.values.mapPartitions({ iter => + val agg = new LabelCounter() iter.foreach(agg += _) Iterator(agg) }, preservesPartitioning = true).collect() - println(agg.toList) - val cum = agg.scanLeft(new Counter())((agg: Counter, c: Counter) => agg + c) - val total = cum.last - println(total) - println(cum.toList) - val cumCountsRdd = counts.mapPartitionsWithIndex((index: Int, iter: Iterator[(Double, Counter)]) => { + val cum = agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg + c) + val totalCount = cum.last + val cumCounts = counts.mapPartitionsWithIndex((index: Int, iter: Iterator[(Double, LabelCounter)]) => { val cumCount = cum(index) iter.map { case (score, c) => cumCount += c (score, cumCount.clone()) } }, preservesPartitioning = true) - println("cum: " + cumCountsRdd.collect().toList) - val rocAUC = AreaUnderCurve.of(cumCountsRdd.values.map((c: Counter) => { - (1.0 * c.numNegatives / total.numNegatives, - 1.0 * c.numPositives / total.numPositives) - })) - println(rocAUC) - val prAUC = AreaUnderCurve.of(cumCountsRdd.values.map((c: Counter) => { - (1.0 * c.numPositives / total.numPositives, - 1.0 * c.numPositives / (c.numPositives + c.numNegatives)) - })) - println(prAUC) + cumCounts.persist() + val scoreAndConfusion = cumCounts.map { case (score, cumCount) => + (score, BinaryConfusionMatrix(cumCount, totalCount)) + } + (cumCounts, totalCount, scoreAndConfusion) + } + + def unpersist() { + cumCounts.unpersist() } + def rocCurve(): RDD[(Double, Double)] = createCurve(FalsePositiveRate, Recall) + + def rocAUC(): Double = AreaUnderCurve.of(rocCurve()) + + def prCurve(): RDD[(Double, Double)] = createCurve(Recall, Precision) + + def prAUC(): Double = AreaUnderCurve.of(prCurve()) + + def fMeasureByThreshold(beta: Double): RDD[(Double, Double)] = createCurve(FMeasure(beta)) + + def fMeasureByThreshold() = fMeasureByThreshold(1.0) + + private def createCurve(y: Metric): RDD[(Double, Double)] = { + scoreAndConfusion.map { case (s, c) => + (s, y(c)) + } + } + + private def createCurve(x: Metric, y: Metric): RDD[(Double, Double)] = { + scoreAndConfusion.map { case (_, c) => + (x(c), y(c)) + } + } +} + +class LocalBinaryClassificationEvaluator { def get(data: Iterable[(Double, Double)]) { val counts = data.groupBy(_._1).mapValues { s => - val c = new Counter() + val c = new LabelCounter() s.foreach(c += _._2) c }.toSeq.sortBy(- _._1) println("counts: " + counts.toList) - val total = new Counter() + val total = new LabelCounter() val cum = counts.map { s => total += s._2 (s._1, total.clone()) @@ -75,35 +170,44 @@ object BinaryClassificationEvaluator { } } -class Counter(var numPositives: Long = 0L, var numNegatives: Long = 0L) extends Serializable { +/** + * A counter for positives and negatives. + * + * @param numPositives + * @param numNegatives + */ +private[evaluation] +class LabelCounter(var numPositives: Long = 0L, var numNegatives: Long = 0L) extends Serializable { - def +=(label: Double): Counter = { + /** Process a label. */ + def +=(label: Double): LabelCounter = { // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle // -1.0 for negative as well. if (label > 0.5) numPositives += 1L else numNegatives += 1L this } - def +=(other: Counter): Counter = { + /** Merge another counter. */ + def +=(other: LabelCounter): LabelCounter = { numPositives += other.numPositives numNegatives += other.numNegatives this } - def +(label: Double): Counter = { + def +(label: Double): LabelCounter = { this.clone() += label } - def +(other: Counter): Counter = { + def +(other: LabelCounter): LabelCounter = { this.clone() += other } def sum: Long = numPositives + numNegatives - override def clone(): Counter = { - new Counter(numPositives, numNegatives) + override def clone: LabelCounter = { + new LabelCounter(numPositives, numNegatives) } - override def toString(): String = s"[$numPositives,$numNegatives]" + override def toString: String = s"[$numPositives,$numNegatives]" } From 8f78958cf366ae2bdecbf987bfa6f23d29c36c71 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Apr 2014 16:29:53 -0700 Subject: [PATCH 12/19] add PredictionAndResponse --- .../evaluation/PredictionAndResponse.scala | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/PredictionAndResponse.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/PredictionAndResponse.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/PredictionAndResponse.scala new file mode 100644 index 0000000000000..d72c432838057 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/PredictionAndResponse.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +/** + * Wrapper for (prediction/score, response/label) pairs. + * + * @param prediction prediction + * @param response true response + * @tparam P prediction type + * @tparam R response/label type + */ +case class PredictionAndResponse[@specialized(Double) P, @specialized(Double) R](prediction: P, response: R) { + /** Alias for prediction. */ + def score: P = prediction + + /** Alias for response. */ + def label: R = response +} From 3d71525d05ef3b5619c9af8d436ec585d648c1c9 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Apr 2014 18:12:39 -0700 Subject: [PATCH 13/19] move binary evalution classes to evaluation.binary --- .../BinaryClassificationEvaluator.scala | 130 +++++++----------- .../binary/BinaryClassificationMetrics.scala | 57 ++++++++ .../binary/BinaryConfusionMatrix.scala | 41 ++++++ 3 files changed, 149 insertions(+), 79 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/evaluation/{ => binary}/BinaryClassificationEvaluator.scala (58%) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala similarity index 58% rename from mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala rename to mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala index 27a282f4bce20..4f25b524716cb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala @@ -15,66 +15,40 @@ * limitations under the License. */ -package org.apache.spark.mllib.evaluation +package org.apache.spark.mllib.evaluation.binary import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.evaluation.AreaUnderCurve +import org.apache.spark.Logging /** - * Binary confusion matrix. + * Implementation of [[org.apache.spark.mllib.evaluation.binary.BinaryConfusionMatrix]]. * * @param count label counter for labels with scores greater than or equal to the current score - * @param total label counter for all labels + * @param totalCount label counter for all labels */ -case class BinaryConfusionMatrix( +private case class BinaryConfusionMatrixImpl( private val count: LabelCounter, - private val total: LabelCounter) extends Serializable { + private val totalCount: LabelCounter) extends BinaryConfusionMatrix with Serializable { /** number of true positives */ - def tp: Long = count.numPositives + override def tp: Long = count.numPositives /** number of false positives */ - def fp: Long = count.numNegatives + override def fp: Long = count.numNegatives /** number of false negatives */ - def fn: Long = total.numPositives - count.numPositives + override def fn: Long = totalCount.numPositives - count.numPositives /** number of true negatives */ - def tn: Long = total.numNegatives - count.numNegatives + override def tn: Long = totalCount.numNegatives - count.numNegatives /** number of positives */ - def p: Long = total.numPositives + override def p: Long = totalCount.numPositives /** number of negatives */ - def n: Long = total.numNegatives -} - -private trait Metric { - def apply(c: BinaryConfusionMatrix): Double -} - -object Precision extends Metric { - override def apply(c: BinaryConfusionMatrix): Double = - c.tp.toDouble / (c.tp + c.fp) -} - -object FalsePositiveRate extends Metric { - override def apply(c: BinaryConfusionMatrix): Double = - c.fp.toDouble / c.n -} - -object Recall extends Metric { - override def apply(c: BinaryConfusionMatrix): Double = - c.tp.toDouble / c.p -} - -case class FMeasure(beta: Double) extends Metric { - private val beta2 = beta * beta - override def apply(c: BinaryConfusionMatrix): Double = { - val precision = Precision(c) - val recall = Recall(c) - (1.0 + beta2) * (precision * recall) / (beta2 * precision + recall) - } + override def n: Long = totalCount.numNegatives } /** @@ -82,9 +56,11 @@ case class FMeasure(beta: Double) extends Metric { * * @param scoreAndlabels an RDD of (score, label) pairs. */ -class BinaryClassificationEvaluator(scoreAndlabels: RDD[(Double, Double)]) extends Serializable { +class BinaryClassificationEvaluator(scoreAndlabels: RDD[(Double, Double)]) extends Serializable with Logging { - private lazy val (cumCounts: RDD[(Double, LabelCounter)], totalCount: LabelCounter, scoreAndConfusion: RDD[(Double, BinaryConfusionMatrix)]) = { + private lazy val ( + cumCounts: RDD[(Double, LabelCounter)], + confusionByThreshold: RDD[(Double, BinaryConfusionMatrix)]) = { // Create a bin for each distinct score value, count positives and negatives within each bin, // and then sort by score values in descending order. val counts = scoreAndlabels.combineByKey( @@ -99,6 +75,7 @@ class BinaryClassificationEvaluator(scoreAndlabels: RDD[(Double, Double)]) exten }, preservesPartitioning = true).collect() val cum = agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg + c) val totalCount = cum.last + logInfo(s"Total counts: totalCount") val cumCounts = counts.mapPartitionsWithIndex((index: Int, iter: Iterator[(Double, LabelCounter)]) => { val cumCount = cum(index) iter.map { case (score, c) => @@ -108,76 +85,71 @@ class BinaryClassificationEvaluator(scoreAndlabels: RDD[(Double, Double)]) exten }, preservesPartitioning = true) cumCounts.persist() val scoreAndConfusion = cumCounts.map { case (score, cumCount) => - (score, BinaryConfusionMatrix(cumCount, totalCount)) + (score, BinaryConfusionMatrixImpl(cumCount, totalCount)) } (cumCounts, totalCount, scoreAndConfusion) } + /** Unpersist intermediate RDDs used in the computation. */ def unpersist() { cumCounts.unpersist() } + /** + * Returns the receiver operating characteristic (ROC) curve. + * @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic + */ def rocCurve(): RDD[(Double, Double)] = createCurve(FalsePositiveRate, Recall) + /** + * Computes the area under the receiver operating characteristic (ROC) curve. + */ def rocAUC(): Double = AreaUnderCurve.of(rocCurve()) + /** + * Returns the precision-recall curve. + * @see http://en.wikipedia.org/wiki/Precision_and_recall + */ def prCurve(): RDD[(Double, Double)] = createCurve(Recall, Precision) + /** + * Computes the area under the precision-recall curve. + */ def prAUC(): Double = AreaUnderCurve.of(prCurve()) + /** + * Returns the (threshold, F-Measure) curve. + * @param beta the beta factor in F-Measure computation. + * @return an RDD of (threshold, F-Measure) pairs. + * @see http://en.wikipedia.org/wiki/F1_score + */ def fMeasureByThreshold(beta: Double): RDD[(Double, Double)] = createCurve(FMeasure(beta)) + /** Returns the (threshold, F-Measure) curve with beta = 1.0. */ def fMeasureByThreshold() = fMeasureByThreshold(1.0) - private def createCurve(y: Metric): RDD[(Double, Double)] = { - scoreAndConfusion.map { case (s, c) => + /** Creates a curve of (threshold, metric). */ + private def createCurve(y: BinaryClassificationMetric): RDD[(Double, Double)] = { + confusionByThreshold.map { case (s, c) => (s, y(c)) } } - private def createCurve(x: Metric, y: Metric): RDD[(Double, Double)] = { - scoreAndConfusion.map { case (_, c) => + /** Creates a curve of (metricX, metricY). */ + private def createCurve(x: BinaryClassificationMetric, y: BinaryClassificationMetric): RDD[(Double, Double)] = { + confusionByThreshold.map { case (_, c) => (x(c), y(c)) } } } -class LocalBinaryClassificationEvaluator { - def get(data: Iterable[(Double, Double)]) { - val counts = data.groupBy(_._1).mapValues { s => - val c = new LabelCounter() - s.foreach(c += _._2) - c - }.toSeq.sortBy(- _._1) - println("counts: " + counts.toList) - val total = new LabelCounter() - val cum = counts.map { s => - total += s._2 - (s._1, total.clone()) - } - println("cum: " + cum.toList) - val roc = cum.map { case (s, c) => - (1.0 * c.numNegatives / total.numNegatives, 1.0 * c.numPositives / total.numPositives) - } - val rocAUC = AreaUnderCurve.of(roc) - println(rocAUC) - val pr = cum.map { case (s, c) => - (1.0 * c.numPositives / total.numPositives, - 1.0 * c.numPositives / (c.numPositives + c.numNegatives)) - } - val prAUC = AreaUnderCurve.of(pr) - println(prAUC) - } -} - /** * A counter for positives and negatives. * - * @param numPositives - * @param numNegatives + * @param numPositives number of positive labels + * @param numNegatives number of negative labels */ -private[evaluation] -class LabelCounter(var numPositives: Long = 0L, var numNegatives: Long = 0L) extends Serializable { +private class LabelCounter(var numPositives: Long = 0L, var numNegatives: Long = 0L) extends Serializable { /** Process a label. */ def +=(label: Double): LabelCounter = { @@ -208,6 +180,6 @@ class LabelCounter(var numPositives: Long = 0L, var numNegatives: Long = 0L) ext new LabelCounter(numPositives, numNegatives) } - override def toString: String = s"[$numPositives,$numNegatives]" + override def toString: String = s"{numPos: $numPositives, numNeg: $numNegatives}" } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala new file mode 100644 index 0000000000000..09581bcc75c2c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation.binary + +/** + * Trait for a binary classification evaluation metric. + */ +private[evaluation] trait BinaryClassificationMetric { + def apply(c: BinaryConfusionMatrix): Double +} + +/** Precision. */ +private[evaluation] object Precision extends BinaryClassificationMetric { + override def apply(c: BinaryConfusionMatrix): Double = + c.tp.toDouble / (c.tp + c.fp) +} + +/** False positive rate. */ +private[evaluation] object FalsePositiveRate extends BinaryClassificationMetric { + override def apply(c: BinaryConfusionMatrix): Double = + c.fp.toDouble / c.n +} + +/** Recall. */ +private[evalution] object Recall extends BinaryClassificationMetric { + override def apply(c: BinaryConfusionMatrix): Double = + c.tp.toDouble / c.p +} + +/** + * F-Measure. + * @param beta the beta constant in F-Measure + * @see http://en.wikipedia.org/wiki/F1_score + */ +private[evaluation] case class FMeasure(beta: Double) extends BinaryClassificationMetric { + private val beta2 = beta * beta + override def apply(c: BinaryConfusionMatrix): Double = { + val precision = Precision(c) + val recall = Recall(c) + (1.0 + beta2) * (precision * recall) / (beta2 * precision + recall) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala new file mode 100644 index 0000000000000..f846d05cd894c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation.binary + +/** + * Trait for a binary confusion matrix. + */ +private[evaluation] trait BinaryConfusionMatrix { + /** number of true positives */ + def tp: Long + + /** number of false positives */ + def fp: Long + + /** number of false negatives */ + def fn: Long + + /** number of true negatives */ + def tn: Long + + /** number of positives */ + def p: Long = tp + fn + + /** number of negatives */ + def n: Long = fp + tn +} From ca31da590e25a8b18e347534a07b5e8392e1036e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Apr 2014 18:13:20 -0700 Subject: [PATCH 14/19] remove PredictionAndResponse --- .../evaluation/PredictionAndResponse.scala | 34 ------------------- 1 file changed, 34 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/PredictionAndResponse.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/PredictionAndResponse.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/PredictionAndResponse.scala deleted file mode 100644 index d72c432838057..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/PredictionAndResponse.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.evaluation - -/** - * Wrapper for (prediction/score, response/label) pairs. - * - * @param prediction prediction - * @param response true response - * @tparam P prediction type - * @tparam R response/label type - */ -case class PredictionAndResponse[@specialized(Double) P, @specialized(Double) R](prediction: P, response: R) { - /** Alias for prediction. */ - def score: P = prediction - - /** Alias for response. */ - def label: R = response -} From 9dc35182725c8dca5293cee7ab7dccca9a258c06 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Apr 2014 19:16:52 -0700 Subject: [PATCH 15/19] add tests for BinaryClassificationEvaluator --- .../mllib/evaluation/AreaUnderCurve.scala | 2 +- .../BinaryClassificationEvaluator.scala | 44 +++++++---------- .../binary/BinaryClassificationMetrics.scala | 4 +- .../evaluation/AreaUnderCurveSuite.scala | 1 - .../BinaryClassificationEvaluationSuite.scala | 13 ----- .../BinaryClassificationEvaluatorSuite.scala | 49 +++++++++++++++++++ 6 files changed, 71 insertions(+), 42 deletions(-) delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluationSuite.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluatorSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala index 5fdd8d8cb2480..7858ec602483f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala @@ -23,7 +23,7 @@ import org.apache.spark.mllib.rdd.RDDFunctions._ /** * Computes the area under the curve (AUC) using the trapezoidal rule. */ -private[mllib] object AreaUnderCurve { +private[evaluation] object AreaUnderCurve { /** * Uses the trapezoidal rule to compute the area under the line connecting the two input points. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala index 4f25b524716cb..290d8fe127ec7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala @@ -29,8 +29,8 @@ import org.apache.spark.Logging * @param totalCount label counter for all labels */ private case class BinaryConfusionMatrixImpl( - private val count: LabelCounter, - private val totalCount: LabelCounter) extends BinaryConfusionMatrix with Serializable { + count: LabelCounter, + totalCount: LabelCounter) extends BinaryConfusionMatrix with Serializable { /** number of true positives */ override def tp: Long = count.numPositives @@ -54,16 +54,16 @@ private case class BinaryConfusionMatrixImpl( /** * Evaluator for binary classification. * - * @param scoreAndlabels an RDD of (score, label) pairs. + * @param scoreAndLabels an RDD of (score, label) pairs. */ -class BinaryClassificationEvaluator(scoreAndlabels: RDD[(Double, Double)]) extends Serializable with Logging { +class BinaryClassificationEvaluator(scoreAndLabels: RDD[(Double, Double)]) extends Serializable with Logging { private lazy val ( cumCounts: RDD[(Double, LabelCounter)], - confusionByThreshold: RDD[(Double, BinaryConfusionMatrix)]) = { + confusions: RDD[(Double, BinaryConfusionMatrix)]) = { // Create a bin for each distinct score value, count positives and negatives within each bin, // and then sort by score values in descending order. - val counts = scoreAndlabels.combineByKey( + val counts = scoreAndLabels.combineByKey( createCombiner = (label: Double) => new LabelCounter(0L, 0L) += label, mergeValue = (c: LabelCounter, label: Double) => c += label, mergeCombiners = (c1: LabelCounter, c2: LabelCounter) => c1 += c2 @@ -73,21 +73,21 @@ class BinaryClassificationEvaluator(scoreAndlabels: RDD[(Double, Double)]) exten iter.foreach(agg += _) Iterator(agg) }, preservesPartitioning = true).collect() - val cum = agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg + c) - val totalCount = cum.last - logInfo(s"Total counts: totalCount") + val partitionwiseCumCounts = agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg + c) + val totalCount = partitionwiseCumCounts.last + logInfo(s"Total counts: $totalCount") val cumCounts = counts.mapPartitionsWithIndex((index: Int, iter: Iterator[(Double, LabelCounter)]) => { - val cumCount = cum(index) + val cumCount = partitionwiseCumCounts(index) iter.map { case (score, c) => cumCount += c (score, cumCount.clone()) } }, preservesPartitioning = true) cumCounts.persist() - val scoreAndConfusion = cumCounts.map { case (score, cumCount) => - (score, BinaryConfusionMatrixImpl(cumCount, totalCount)) + val confusions = cumCounts.map { case (score, cumCount) => + (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix]) } - (cumCounts, totalCount, scoreAndConfusion) + (cumCounts, confusions) } /** Unpersist intermediate RDDs used in the computation. */ @@ -126,18 +126,18 @@ class BinaryClassificationEvaluator(scoreAndlabels: RDD[(Double, Double)]) exten def fMeasureByThreshold(beta: Double): RDD[(Double, Double)] = createCurve(FMeasure(beta)) /** Returns the (threshold, F-Measure) curve with beta = 1.0. */ - def fMeasureByThreshold() = fMeasureByThreshold(1.0) + def fMeasureByThreshold(): RDD[(Double, Double)] = fMeasureByThreshold(1.0) /** Creates a curve of (threshold, metric). */ private def createCurve(y: BinaryClassificationMetric): RDD[(Double, Double)] = { - confusionByThreshold.map { case (s, c) => + confusions.map { case (s, c) => (s, y(c)) } } /** Creates a curve of (metricX, metricY). */ private def createCurve(x: BinaryClassificationMetric, y: BinaryClassificationMetric): RDD[(Double, Double)] = { - confusionByThreshold.map { case (_, c) => + confusions.map { case (_, c) => (x(c), y(c)) } } @@ -151,7 +151,7 @@ class BinaryClassificationEvaluator(scoreAndlabels: RDD[(Double, Double)]) exten */ private class LabelCounter(var numPositives: Long = 0L, var numNegatives: Long = 0L) extends Serializable { - /** Process a label. */ + /** Processes a label. */ def +=(label: Double): LabelCounter = { // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle // -1.0 for negative as well. @@ -159,27 +159,21 @@ private class LabelCounter(var numPositives: Long = 0L, var numNegatives: Long = this } - /** Merge another counter. */ + /** Merges another counter. */ def +=(other: LabelCounter): LabelCounter = { numPositives += other.numPositives numNegatives += other.numNegatives this } - def +(label: Double): LabelCounter = { - this.clone() += label - } - + /** Sums this counter and another counter and returns the result in a new counter. */ def +(other: LabelCounter): LabelCounter = { this.clone() += other } - def sum: Long = numPositives + numNegatives - override def clone: LabelCounter = { new LabelCounter(numPositives, numNegatives) } override def toString: String = s"{numPos: $numPositives, numNeg: $numNegatives}" } - diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala index 09581bcc75c2c..11581586de817 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.evaluation.binary /** * Trait for a binary classification evaluation metric. */ -private[evaluation] trait BinaryClassificationMetric { +private[evaluation] trait BinaryClassificationMetric extends Serializable { def apply(c: BinaryConfusionMatrix): Double } @@ -37,7 +37,7 @@ private[evaluation] object FalsePositiveRate extends BinaryClassificationMetric } /** Recall. */ -private[evalution] object Recall extends BinaryClassificationMetric { +private[evaluation] object Recall extends BinaryClassificationMetric { override def apply(c: BinaryConfusionMatrix): Double = c.tp.toDouble / c.p } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala index 78dd65c1721b6..1c9844f289fe0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/AreaUnderCurveSuite.scala @@ -22,7 +22,6 @@ import org.scalatest.FunSuite import org.apache.spark.mllib.util.LocalSparkContext class AreaUnderCurveSuite extends FunSuite with LocalSparkContext { - test("auc computation") { val curve = Seq((0.0, 0.0), (1.0, 1.0), (2.0, 3.0), (3.0, 0.0)) val auc = 4.0 diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluationSuite.scala deleted file mode 100644 index db5cffe280f60..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationEvaluationSuite.scala +++ /dev/null @@ -1,13 +0,0 @@ -package org.apache.spark.mllib.evaluation - -import org.scalatest.FunSuite -import org.apache.spark.mllib.util.LocalSparkContext - -class BinaryClassificationEvaluationSuite extends FunSuite with LocalSparkContext { - test("test") { - val data = Seq((0.0, 0.0), (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0), (0.9, 1.0)) - BinaryClassificationEvaluator.get(data) - val rdd = sc.parallelize(data, 3) - BinaryClassificationEvaluator.get(rdd) - } -} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluatorSuite.scala new file mode 100644 index 0000000000000..8a3ef5a8713a3 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluatorSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation.binary + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.evaluation.AreaUnderCurve + +class BinaryClassificationEvaluatorSuite extends FunSuite with LocalSparkContext { + test("binary evaluation metrics") { + val scoreAndLabels = sc.parallelize( + Seq((0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)), 2) + val evaluator = new BinaryClassificationEvaluator(scoreAndLabels) + val score = Seq(0.8, 0.6, 0.4, 0.1) + val tp = Seq(1, 3, 3, 4) + val fp = Seq(0, 1, 2, 3) + val p = 4 + val n = 3 + val precision = tp.zip(fp).map { case (t, f) => t.toDouble / (t + f) } + val recall = tp.map(t => t.toDouble / p) + val fpr = fp.map(f => f.toDouble / n) + val roc = fpr.zip(recall) + val pr = recall.zip(precision) + val f1 = pr.map { case (re, prec) => 2.0 * (prec * re) / (prec + re) } + val f2 = pr.map { case (re, prec) => 5.0 * (prec * re) / (4.0 * prec + re)} + assert(evaluator.rocCurve().collect().toSeq === roc) + assert(evaluator.rocAUC() === AreaUnderCurve.of(roc)) + assert(evaluator.prCurve().collect().toSeq === pr) + assert(evaluator.prAUC() === AreaUnderCurve.of(pr)) + assert(evaluator.fMeasureByThreshold().collect().toSeq === score.zip(f1)) + assert(evaluator.fMeasureByThreshold(2.0).collect().toSeq === score.zip(f2)) + } +} From b1b7dab57dbc10e7a661cd6f670d7470f6c63cd0 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Apr 2014 20:11:47 -0700 Subject: [PATCH 16/19] fix code styles --- .../BinaryClassificationEvaluator.scala | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala index 290d8fe127ec7..56a14c6e42adb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala @@ -56,7 +56,8 @@ private case class BinaryConfusionMatrixImpl( * * @param scoreAndLabels an RDD of (score, label) pairs. */ -class BinaryClassificationEvaluator(scoreAndLabels: RDD[(Double, Double)]) extends Serializable with Logging { +class BinaryClassificationEvaluator(scoreAndLabels: RDD[(Double, Double)]) + extends Serializable with Logging { private lazy val ( cumCounts: RDD[(Double, LabelCounter)], @@ -73,16 +74,18 @@ class BinaryClassificationEvaluator(scoreAndLabels: RDD[(Double, Double)]) exten iter.foreach(agg += _) Iterator(agg) }, preservesPartitioning = true).collect() - val partitionwiseCumCounts = agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg + c) + val partitionwiseCumCounts = + agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg.clone() += c) val totalCount = partitionwiseCumCounts.last logInfo(s"Total counts: $totalCount") - val cumCounts = counts.mapPartitionsWithIndex((index: Int, iter: Iterator[(Double, LabelCounter)]) => { - val cumCount = partitionwiseCumCounts(index) - iter.map { case (score, c) => - cumCount += c - (score, cumCount.clone()) - } - }, preservesPartitioning = true) + val cumCounts = counts.mapPartitionsWithIndex( + (index: Int, iter: Iterator[(Double, LabelCounter)]) => { + val cumCount = partitionwiseCumCounts(index) + iter.map { case (score, c) => + cumCount += c + (score, cumCount.clone()) + } + }, preservesPartitioning = true) cumCounts.persist() val confusions = cumCounts.map { case (score, cumCount) => (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix]) @@ -136,7 +139,9 @@ class BinaryClassificationEvaluator(scoreAndLabels: RDD[(Double, Double)]) exten } /** Creates a curve of (metricX, metricY). */ - private def createCurve(x: BinaryClassificationMetric, y: BinaryClassificationMetric): RDD[(Double, Double)] = { + private def createCurve( + x: BinaryClassificationMetric, + y: BinaryClassificationMetric): RDD[(Double, Double)] = { confusions.map { case (_, c) => (x(c), y(c)) } @@ -149,7 +154,9 @@ class BinaryClassificationEvaluator(scoreAndLabels: RDD[(Double, Double)]) exten * @param numPositives number of positive labels * @param numNegatives number of negative labels */ -private class LabelCounter(var numPositives: Long = 0L, var numNegatives: Long = 0L) extends Serializable { +private class LabelCounter( + var numPositives: Long = 0L, + var numNegatives: Long = 0L) extends Serializable { /** Processes a label. */ def +=(label: Double): LabelCounter = { @@ -166,11 +173,6 @@ private class LabelCounter(var numPositives: Long = 0L, var numNegatives: Long = this } - /** Sums this counter and another counter and returns the result in a new counter. */ - def +(other: LabelCounter): LabelCounter = { - this.clone() += other - } - override def clone: LabelCounter = { new LabelCounter(numPositives, numNegatives) } From fb4b6d2b5a1d49837a022b0be08c1e732ec9612f Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Apr 2014 20:31:06 -0700 Subject: [PATCH 17/19] rename Evaluator to Metrics and add more metrics --- .../BinaryClassificationEvaluator.scala | 181 ----------------- .../BinaryClassificationMetricComputers.scala | 57 ++++++ .../binary/BinaryClassificationMetrics.scala | 187 +++++++++++++++--- ...=> BinaryClassificationMetricsSuite.scala} | 19 +- 4 files changed, 229 insertions(+), 215 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala rename mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/{BinaryClassificationEvaluatorSuite.scala => BinaryClassificationMetricsSuite.scala} (69%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala deleted file mode 100644 index 56a14c6e42adb..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluator.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.evaluation.binary - -import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.evaluation.AreaUnderCurve -import org.apache.spark.Logging - -/** - * Implementation of [[org.apache.spark.mllib.evaluation.binary.BinaryConfusionMatrix]]. - * - * @param count label counter for labels with scores greater than or equal to the current score - * @param totalCount label counter for all labels - */ -private case class BinaryConfusionMatrixImpl( - count: LabelCounter, - totalCount: LabelCounter) extends BinaryConfusionMatrix with Serializable { - - /** number of true positives */ - override def tp: Long = count.numPositives - - /** number of false positives */ - override def fp: Long = count.numNegatives - - /** number of false negatives */ - override def fn: Long = totalCount.numPositives - count.numPositives - - /** number of true negatives */ - override def tn: Long = totalCount.numNegatives - count.numNegatives - - /** number of positives */ - override def p: Long = totalCount.numPositives - - /** number of negatives */ - override def n: Long = totalCount.numNegatives -} - -/** - * Evaluator for binary classification. - * - * @param scoreAndLabels an RDD of (score, label) pairs. - */ -class BinaryClassificationEvaluator(scoreAndLabels: RDD[(Double, Double)]) - extends Serializable with Logging { - - private lazy val ( - cumCounts: RDD[(Double, LabelCounter)], - confusions: RDD[(Double, BinaryConfusionMatrix)]) = { - // Create a bin for each distinct score value, count positives and negatives within each bin, - // and then sort by score values in descending order. - val counts = scoreAndLabels.combineByKey( - createCombiner = (label: Double) => new LabelCounter(0L, 0L) += label, - mergeValue = (c: LabelCounter, label: Double) => c += label, - mergeCombiners = (c1: LabelCounter, c2: LabelCounter) => c1 += c2 - ).sortByKey(ascending = false) - val agg = counts.values.mapPartitions({ iter => - val agg = new LabelCounter() - iter.foreach(agg += _) - Iterator(agg) - }, preservesPartitioning = true).collect() - val partitionwiseCumCounts = - agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg.clone() += c) - val totalCount = partitionwiseCumCounts.last - logInfo(s"Total counts: $totalCount") - val cumCounts = counts.mapPartitionsWithIndex( - (index: Int, iter: Iterator[(Double, LabelCounter)]) => { - val cumCount = partitionwiseCumCounts(index) - iter.map { case (score, c) => - cumCount += c - (score, cumCount.clone()) - } - }, preservesPartitioning = true) - cumCounts.persist() - val confusions = cumCounts.map { case (score, cumCount) => - (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix]) - } - (cumCounts, confusions) - } - - /** Unpersist intermediate RDDs used in the computation. */ - def unpersist() { - cumCounts.unpersist() - } - - /** - * Returns the receiver operating characteristic (ROC) curve. - * @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic - */ - def rocCurve(): RDD[(Double, Double)] = createCurve(FalsePositiveRate, Recall) - - /** - * Computes the area under the receiver operating characteristic (ROC) curve. - */ - def rocAUC(): Double = AreaUnderCurve.of(rocCurve()) - - /** - * Returns the precision-recall curve. - * @see http://en.wikipedia.org/wiki/Precision_and_recall - */ - def prCurve(): RDD[(Double, Double)] = createCurve(Recall, Precision) - - /** - * Computes the area under the precision-recall curve. - */ - def prAUC(): Double = AreaUnderCurve.of(prCurve()) - - /** - * Returns the (threshold, F-Measure) curve. - * @param beta the beta factor in F-Measure computation. - * @return an RDD of (threshold, F-Measure) pairs. - * @see http://en.wikipedia.org/wiki/F1_score - */ - def fMeasureByThreshold(beta: Double): RDD[(Double, Double)] = createCurve(FMeasure(beta)) - - /** Returns the (threshold, F-Measure) curve with beta = 1.0. */ - def fMeasureByThreshold(): RDD[(Double, Double)] = fMeasureByThreshold(1.0) - - /** Creates a curve of (threshold, metric). */ - private def createCurve(y: BinaryClassificationMetric): RDD[(Double, Double)] = { - confusions.map { case (s, c) => - (s, y(c)) - } - } - - /** Creates a curve of (metricX, metricY). */ - private def createCurve( - x: BinaryClassificationMetric, - y: BinaryClassificationMetric): RDD[(Double, Double)] = { - confusions.map { case (_, c) => - (x(c), y(c)) - } - } -} - -/** - * A counter for positives and negatives. - * - * @param numPositives number of positive labels - * @param numNegatives number of negative labels - */ -private class LabelCounter( - var numPositives: Long = 0L, - var numNegatives: Long = 0L) extends Serializable { - - /** Processes a label. */ - def +=(label: Double): LabelCounter = { - // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle - // -1.0 for negative as well. - if (label > 0.5) numPositives += 1L else numNegatives += 1L - this - } - - /** Merges another counter. */ - def +=(other: LabelCounter): LabelCounter = { - numPositives += other.numPositives - numNegatives += other.numNegatives - this - } - - override def clone: LabelCounter = { - new LabelCounter(numPositives, numNegatives) - } - - override def toString: String = s"{numPos: $numPositives, numNeg: $numNegatives}" -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala new file mode 100644 index 0000000000000..34583da800079 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation.binary + +/** + * Trait for a binary classification evaluation metric computer. + */ +private[evaluation] trait BinaryClassificationMetricComputer extends Serializable { + def apply(c: BinaryConfusionMatrix): Double +} + +/** Precision. */ +private[evaluation] object Precision extends BinaryClassificationMetricComputer { + override def apply(c: BinaryConfusionMatrix): Double = + c.tp.toDouble / (c.tp + c.fp) +} + +/** False positive rate. */ +private[evaluation] object FalsePositiveRate extends BinaryClassificationMetricComputer { + override def apply(c: BinaryConfusionMatrix): Double = + c.fp.toDouble / c.n +} + +/** Recall. */ +private[evaluation] object Recall extends BinaryClassificationMetricComputer { + override def apply(c: BinaryConfusionMatrix): Double = + c.tp.toDouble / c.p +} + +/** + * F-Measure. + * @param beta the beta constant in F-Measure + * @see http://en.wikipedia.org/wiki/F1_score + */ +private[evaluation] case class FMeasure(beta: Double) extends BinaryClassificationMetricComputer { + private val beta2 = beta * beta + override def apply(c: BinaryConfusionMatrix): Double = { + val precision = Precision(c) + val recall = Recall(c) + (1.0 + beta2) * (precision * recall) / (beta2 * precision + recall) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala index 11581586de817..c95cbf525a2e8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala @@ -17,41 +17,176 @@ package org.apache.spark.mllib.evaluation.binary +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.evaluation.AreaUnderCurve +import org.apache.spark.Logging + /** - * Trait for a binary classification evaluation metric. + * Implementation of [[org.apache.spark.mllib.evaluation.binary.BinaryConfusionMatrix]]. + * + * @param count label counter for labels with scores greater than or equal to the current score + * @param totalCount label counter for all labels */ -private[evaluation] trait BinaryClassificationMetric extends Serializable { - def apply(c: BinaryConfusionMatrix): Double -} +private case class BinaryConfusionMatrixImpl( + count: LabelCounter, + totalCount: LabelCounter) extends BinaryConfusionMatrix with Serializable { -/** Precision. */ -private[evaluation] object Precision extends BinaryClassificationMetric { - override def apply(c: BinaryConfusionMatrix): Double = - c.tp.toDouble / (c.tp + c.fp) -} + /** number of true positives */ + override def tp: Long = count.numPositives + + /** number of false positives */ + override def fp: Long = count.numNegatives -/** False positive rate. */ -private[evaluation] object FalsePositiveRate extends BinaryClassificationMetric { - override def apply(c: BinaryConfusionMatrix): Double = - c.fp.toDouble / c.n + /** number of false negatives */ + override def fn: Long = totalCount.numPositives - count.numPositives + + /** number of true negatives */ + override def tn: Long = totalCount.numNegatives - count.numNegatives + + /** number of positives */ + override def p: Long = totalCount.numPositives + + /** number of negatives */ + override def n: Long = totalCount.numNegatives } -/** Recall. */ -private[evaluation] object Recall extends BinaryClassificationMetric { - override def apply(c: BinaryConfusionMatrix): Double = - c.tp.toDouble / c.p +/** + * Evaluator for binary classification. + * + * @param scoreAndLabels an RDD of (score, label) pairs. + */ +class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) + extends Serializable with Logging { + + private lazy val ( + cumCounts: RDD[(Double, LabelCounter)], + confusions: RDD[(Double, BinaryConfusionMatrix)]) = { + // Create a bin for each distinct score value, count positives and negatives within each bin, + // and then sort by score values in descending order. + val counts = scoreAndLabels.combineByKey( + createCombiner = (label: Double) => new LabelCounter(0L, 0L) += label, + mergeValue = (c: LabelCounter, label: Double) => c += label, + mergeCombiners = (c1: LabelCounter, c2: LabelCounter) => c1 += c2 + ).sortByKey(ascending = false) + val agg = counts.values.mapPartitions({ iter => + val agg = new LabelCounter() + iter.foreach(agg += _) + Iterator(agg) + }, preservesPartitioning = true).collect() + val partitionwiseCumCounts = + agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg.clone() += c) + val totalCount = partitionwiseCumCounts.last + logInfo(s"Total counts: $totalCount") + val cumCounts = counts.mapPartitionsWithIndex( + (index: Int, iter: Iterator[(Double, LabelCounter)]) => { + val cumCount = partitionwiseCumCounts(index) + iter.map { case (score, c) => + cumCount += c + (score, cumCount.clone()) + } + }, preservesPartitioning = true) + cumCounts.persist() + val confusions = cumCounts.map { case (score, cumCount) => + (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix]) + } + (cumCounts, confusions) + } + + /** Unpersist intermediate RDDs used in the computation. */ + def unpersist() { + cumCounts.unpersist() + } + + /** Returns thresholds in descending order. */ + def thresholds(): RDD[Double] = cumCounts.map(_._1) + + /** + * Returns the receiver operating characteristic (ROC) curve, + * which is an RDD of (false positive rate, true positive rate). + * @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic + */ + def roc(): RDD[(Double, Double)] = createCurve(FalsePositiveRate, Recall) + + /** + * Computes the area under the receiver operating characteristic (ROC) curve. + */ + def areaUnderROC(): Double = AreaUnderCurve.of(roc()) + + /** + * Returns the precision-recall curve, + * which is an RDD of (recall, precision), NOT (precision, recall). + * @see http://en.wikipedia.org/wiki/Precision_and_recall + */ + def pr(): RDD[(Double, Double)] = createCurve(Recall, Precision) + + /** + * Computes the area under the precision-recall curve. + */ + def areaUnderPR(): Double = AreaUnderCurve.of(pr()) + + /** + * Returns the (threshold, F-Measure) curve. + * @param beta the beta factor in F-Measure computation. + * @return an RDD of (threshold, F-Measure) pairs. + * @see http://en.wikipedia.org/wiki/F1_score + */ + def fMeasureByThreshold(beta: Double): RDD[(Double, Double)] = createCurve(FMeasure(beta)) + + /** Returns the (threshold, F-Measure) curve with beta = 1.0. */ + def fMeasureByThreshold(): RDD[(Double, Double)] = fMeasureByThreshold(1.0) + + /** Returns the (threshold, precision) curve. */ + def precisionByThreshold(): RDD[(Double, Double)] = createCurve(Precision) + + /** Returns the (threshold, recall) curve. */ + def recallByThreshold(): RDD[(Double, Double)] = createCurve(Recall) + + /** Creates a curve of (threshold, metric). */ + private def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = { + confusions.map { case (s, c) => + (s, y(c)) + } + } + + /** Creates a curve of (metricX, metricY). */ + private def createCurve( + x: BinaryClassificationMetricComputer, + y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = { + confusions.map { case (_, c) => + (x(c), y(c)) + } + } } /** - * F-Measure. - * @param beta the beta constant in F-Measure - * @see http://en.wikipedia.org/wiki/F1_score + * A counter for positives and negatives. + * + * @param numPositives number of positive labels + * @param numNegatives number of negative labels */ -private[evaluation] case class FMeasure(beta: Double) extends BinaryClassificationMetric { - private val beta2 = beta * beta - override def apply(c: BinaryConfusionMatrix): Double = { - val precision = Precision(c) - val recall = Recall(c) - (1.0 + beta2) * (precision * recall) / (beta2 * precision + recall) +private class LabelCounter( + var numPositives: Long = 0L, + var numNegatives: Long = 0L) extends Serializable { + + /** Processes a label. */ + def +=(label: Double): LabelCounter = { + // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle + // -1.0 for negative as well. + if (label > 0.5) numPositives += 1L else numNegatives += 1L + this + } + + /** Merges another counter. */ + def +=(other: LabelCounter): LabelCounter = { + numPositives += other.numPositives + numNegatives += other.numNegatives + this + } + + override def clone: LabelCounter = { + new LabelCounter(numPositives, numNegatives) } + + override def toString: String = s"{numPos: $numPositives, numNeg: $numNegatives}" } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala similarity index 69% rename from mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluatorSuite.scala rename to mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala index 8a3ef5a8713a3..a1f3c44becb66 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala @@ -22,11 +22,11 @@ import org.scalatest.FunSuite import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.mllib.evaluation.AreaUnderCurve -class BinaryClassificationEvaluatorSuite extends FunSuite with LocalSparkContext { +class BinaryClassificationMetricsSuite extends FunSuite with LocalSparkContext { test("binary evaluation metrics") { val scoreAndLabels = sc.parallelize( Seq((0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)), 2) - val evaluator = new BinaryClassificationEvaluator(scoreAndLabels) + val metrics = new BinaryClassificationMetrics(scoreAndLabels) val score = Seq(0.8, 0.6, 0.4, 0.1) val tp = Seq(1, 3, 3, 4) val fp = Seq(0, 1, 2, 3) @@ -39,11 +39,14 @@ class BinaryClassificationEvaluatorSuite extends FunSuite with LocalSparkContext val pr = recall.zip(precision) val f1 = pr.map { case (re, prec) => 2.0 * (prec * re) / (prec + re) } val f2 = pr.map { case (re, prec) => 5.0 * (prec * re) / (4.0 * prec + re)} - assert(evaluator.rocCurve().collect().toSeq === roc) - assert(evaluator.rocAUC() === AreaUnderCurve.of(roc)) - assert(evaluator.prCurve().collect().toSeq === pr) - assert(evaluator.prAUC() === AreaUnderCurve.of(pr)) - assert(evaluator.fMeasureByThreshold().collect().toSeq === score.zip(f1)) - assert(evaluator.fMeasureByThreshold(2.0).collect().toSeq === score.zip(f2)) + assert(metrics.thresholds().collect().toSeq === score) + assert(metrics.roc().collect().toSeq === roc) + assert(metrics.areaUnderROC() === AreaUnderCurve.of(roc)) + assert(metrics.pr().collect().toSeq === pr) + assert(metrics.areaUnderPR() === AreaUnderCurve.of(pr)) + assert(metrics.fMeasureByThreshold().collect().toSeq === score.zip(f1)) + assert(metrics.fMeasureByThreshold(2.0).collect().toSeq === score.zip(f2)) + assert(metrics.precisionByThreshold().collect().toSeq === score.zip(precision)) + assert(metrics.recallByThreshold().collect().toSeq === score.zip(recall)) } } From 3f42e985351c70050dc2329b649f2450d9ef57f7 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Apr 2014 21:53:12 -0700 Subject: [PATCH 18/19] add (0, 0), (1, 1) to roc, and (0, 1) to pr --- .../binary/BinaryClassificationMetrics.scala | 24 ++++++++++++++----- .../BinaryClassificationMetricsSuite.scala | 11 +++++---- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala index c95cbf525a2e8..658142b0ebf53 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.evaluation.binary -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{UnionRDD, RDD} import org.apache.spark.SparkContext._ import org.apache.spark.mllib.evaluation.AreaUnderCurve import org.apache.spark.Logging @@ -103,10 +103,17 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) /** * Returns the receiver operating characteristic (ROC) curve, - * which is an RDD of (false positive rate, true positive rate). + * which is an RDD of (false positive rate, true positive rate) + * with (0.0, 0.0) prepended and (1.0, 1.0) appended to it. * @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic */ - def roc(): RDD[(Double, Double)] = createCurve(FalsePositiveRate, Recall) + def roc(): RDD[(Double, Double)] = { + val rocCurve = createCurve(FalsePositiveRate, Recall) + val sc = confusions.context + val first = sc.makeRDD(Seq((0.0, 0.0)), 1) + val last = sc.makeRDD(Seq((1.0, 1.0)), 1) + new UnionRDD[(Double, Double)](sc, Seq(first, rocCurve, last)) + } /** * Computes the area under the receiver operating characteristic (ROC) curve. @@ -114,11 +121,16 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) def areaUnderROC(): Double = AreaUnderCurve.of(roc()) /** - * Returns the precision-recall curve, - * which is an RDD of (recall, precision), NOT (precision, recall). + * Returns the precision-recall curve, which is an RDD of (recall, precision), + * NOT (precision, recall), with (0.0, 1.0) prepended to it. * @see http://en.wikipedia.org/wiki/Precision_and_recall */ - def pr(): RDD[(Double, Double)] = createCurve(Recall, Precision) + def pr(): RDD[(Double, Double)] = { + val prCurve = createCurve(Recall, Precision) + val sc = confusions.context + val first = sc.makeRDD(Seq((0.0, 1.0)), 1) + first.union(prCurve) + } /** * Computes the area under the precision-recall curve. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala index a1f3c44becb66..f92adbe8378c6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala @@ -35,15 +35,16 @@ class BinaryClassificationMetricsSuite extends FunSuite with LocalSparkContext { val precision = tp.zip(fp).map { case (t, f) => t.toDouble / (t + f) } val recall = tp.map(t => t.toDouble / p) val fpr = fp.map(f => f.toDouble / n) - val roc = fpr.zip(recall) + val rocCurve = Seq((0.0, 0.0)) ++ fpr.zip(recall) ++ Seq((1.0, 1.0)) val pr = recall.zip(precision) + val prCurve = Seq((0.0, 1.0)) ++ pr val f1 = pr.map { case (re, prec) => 2.0 * (prec * re) / (prec + re) } val f2 = pr.map { case (re, prec) => 5.0 * (prec * re) / (4.0 * prec + re)} assert(metrics.thresholds().collect().toSeq === score) - assert(metrics.roc().collect().toSeq === roc) - assert(metrics.areaUnderROC() === AreaUnderCurve.of(roc)) - assert(metrics.pr().collect().toSeq === pr) - assert(metrics.areaUnderPR() === AreaUnderCurve.of(pr)) + assert(metrics.roc().collect().toSeq === rocCurve) + assert(metrics.areaUnderROC() === AreaUnderCurve.of(rocCurve)) + assert(metrics.pr().collect().toSeq === prCurve) + assert(metrics.areaUnderPR() === AreaUnderCurve.of(prCurve)) assert(metrics.fMeasureByThreshold().collect().toSeq === score.zip(f1)) assert(metrics.fMeasureByThreshold(2.0).collect().toSeq === score.zip(f2)) assert(metrics.precisionByThreshold().collect().toSeq === score.zip(precision)) From a05941db7fac61222ac23e69e78f86e165e41ad2 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 10 Apr 2014 01:05:45 -0700 Subject: [PATCH 19/19] replace TP/FP/TN/FN by their full names --- .../BinaryClassificationMetricComputers.scala | 6 ++-- .../binary/BinaryClassificationMetrics.scala | 34 +++++++++---------- .../binary/BinaryConfusionMatrix.scala | 12 +++---- .../BinaryClassificationMetricsSuite.scala | 32 +++++++++-------- 4 files changed, 43 insertions(+), 41 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala index 34583da800079..562663ad36b40 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala @@ -27,19 +27,19 @@ private[evaluation] trait BinaryClassificationMetricComputer extends Serializabl /** Precision. */ private[evaluation] object Precision extends BinaryClassificationMetricComputer { override def apply(c: BinaryConfusionMatrix): Double = - c.tp.toDouble / (c.tp + c.fp) + c.numTruePositives.toDouble / (c.numTruePositives + c.numFalsePositives) } /** False positive rate. */ private[evaluation] object FalsePositiveRate extends BinaryClassificationMetricComputer { override def apply(c: BinaryConfusionMatrix): Double = - c.fp.toDouble / c.n + c.numFalsePositives.toDouble / c.numNegatives } /** Recall. */ private[evaluation] object Recall extends BinaryClassificationMetricComputer { override def apply(c: BinaryConfusionMatrix): Double = - c.tp.toDouble / c.p + c.numTruePositives.toDouble / c.numPositives } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala index 658142b0ebf53..ed7b0fc943367 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetrics.scala @@ -33,22 +33,22 @@ private case class BinaryConfusionMatrixImpl( totalCount: LabelCounter) extends BinaryConfusionMatrix with Serializable { /** number of true positives */ - override def tp: Long = count.numPositives + override def numTruePositives: Long = count.numPositives /** number of false positives */ - override def fp: Long = count.numNegatives + override def numFalsePositives: Long = count.numNegatives /** number of false negatives */ - override def fn: Long = totalCount.numPositives - count.numPositives + override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives /** number of true negatives */ - override def tn: Long = totalCount.numNegatives - count.numNegatives + override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives /** number of positives */ - override def p: Long = totalCount.numPositives + override def numPositives: Long = totalCount.numPositives /** number of negatives */ - override def n: Long = totalCount.numNegatives + override def numNegatives: Long = totalCount.numNegatives } /** @@ -57,10 +57,10 @@ private case class BinaryConfusionMatrixImpl( * @param scoreAndLabels an RDD of (score, label) pairs. */ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) - extends Serializable with Logging { + extends Serializable with Logging { private lazy val ( - cumCounts: RDD[(Double, LabelCounter)], + cumulativeCounts: RDD[(Double, LabelCounter)], confusions: RDD[(Double, BinaryConfusionMatrix)]) = { // Create a bin for each distinct score value, count positives and negatives within each bin, // and then sort by score values in descending order. @@ -74,32 +74,32 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) iter.foreach(agg += _) Iterator(agg) }, preservesPartitioning = true).collect() - val partitionwiseCumCounts = + val partitionwiseCumulativeCounts = agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg.clone() += c) - val totalCount = partitionwiseCumCounts.last + val totalCount = partitionwiseCumulativeCounts.last logInfo(s"Total counts: $totalCount") - val cumCounts = counts.mapPartitionsWithIndex( + val cumulativeCounts = counts.mapPartitionsWithIndex( (index: Int, iter: Iterator[(Double, LabelCounter)]) => { - val cumCount = partitionwiseCumCounts(index) + val cumCount = partitionwiseCumulativeCounts(index) iter.map { case (score, c) => cumCount += c (score, cumCount.clone()) } }, preservesPartitioning = true) - cumCounts.persist() - val confusions = cumCounts.map { case (score, cumCount) => + cumulativeCounts.persist() + val confusions = cumulativeCounts.map { case (score, cumCount) => (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix]) } - (cumCounts, confusions) + (cumulativeCounts, confusions) } /** Unpersist intermediate RDDs used in the computation. */ def unpersist() { - cumCounts.unpersist() + cumulativeCounts.unpersist() } /** Returns thresholds in descending order. */ - def thresholds(): RDD[Double] = cumCounts.map(_._1) + def thresholds(): RDD[Double] = cumulativeCounts.map(_._1) /** * Returns the receiver operating characteristic (ROC) curve, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala index f846d05cd894c..75a75b216002a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala @@ -22,20 +22,20 @@ package org.apache.spark.mllib.evaluation.binary */ private[evaluation] trait BinaryConfusionMatrix { /** number of true positives */ - def tp: Long + def numTruePositives: Long /** number of false positives */ - def fp: Long + def numFalsePositives: Long /** number of false negatives */ - def fn: Long + def numFalseNegatives: Long /** number of true negatives */ - def tn: Long + def numTrueNegatives: Long /** number of positives */ - def p: Long = tp + fn + def numPositives: Long = numTruePositives + numFalseNegatives /** number of negatives */ - def n: Long = fp + tn + def numNegatives: Long = numFalsePositives + numTrueNegatives } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala index f92adbe8378c6..173fdaefab3da 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricsSuite.scala @@ -27,27 +27,29 @@ class BinaryClassificationMetricsSuite extends FunSuite with LocalSparkContext { val scoreAndLabels = sc.parallelize( Seq((0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)), 2) val metrics = new BinaryClassificationMetrics(scoreAndLabels) - val score = Seq(0.8, 0.6, 0.4, 0.1) - val tp = Seq(1, 3, 3, 4) - val fp = Seq(0, 1, 2, 3) - val p = 4 - val n = 3 - val precision = tp.zip(fp).map { case (t, f) => t.toDouble / (t + f) } - val recall = tp.map(t => t.toDouble / p) - val fpr = fp.map(f => f.toDouble / n) + val threshold = Seq(0.8, 0.6, 0.4, 0.1) + val numTruePositives = Seq(1, 3, 3, 4) + val numFalsePositives = Seq(0, 1, 2, 3) + val numPositives = 4 + val numNegatives = 3 + val precision = numTruePositives.zip(numFalsePositives).map { case (t, f) => + t.toDouble / (t + f) + } + val recall = numTruePositives.map(t => t.toDouble / numPositives) + val fpr = numFalsePositives.map(f => f.toDouble / numNegatives) val rocCurve = Seq((0.0, 0.0)) ++ fpr.zip(recall) ++ Seq((1.0, 1.0)) val pr = recall.zip(precision) val prCurve = Seq((0.0, 1.0)) ++ pr - val f1 = pr.map { case (re, prec) => 2.0 * (prec * re) / (prec + re) } - val f2 = pr.map { case (re, prec) => 5.0 * (prec * re) / (4.0 * prec + re)} - assert(metrics.thresholds().collect().toSeq === score) + val f1 = pr.map { case (r, p) => 2.0 * (p * r) / (p + r) } + val f2 = pr.map { case (r, p) => 5.0 * (p * r) / (4.0 * p + r)} + assert(metrics.thresholds().collect().toSeq === threshold) assert(metrics.roc().collect().toSeq === rocCurve) assert(metrics.areaUnderROC() === AreaUnderCurve.of(rocCurve)) assert(metrics.pr().collect().toSeq === prCurve) assert(metrics.areaUnderPR() === AreaUnderCurve.of(prCurve)) - assert(metrics.fMeasureByThreshold().collect().toSeq === score.zip(f1)) - assert(metrics.fMeasureByThreshold(2.0).collect().toSeq === score.zip(f2)) - assert(metrics.precisionByThreshold().collect().toSeq === score.zip(precision)) - assert(metrics.recallByThreshold().collect().toSeq === score.zip(recall)) + assert(metrics.fMeasureByThreshold().collect().toSeq === threshold.zip(f1)) + assert(metrics.fMeasureByThreshold(2.0).collect().toSeq === threshold.zip(f2)) + assert(metrics.precisionByThreshold().collect().toSeq === threshold.zip(precision)) + assert(metrics.recallByThreshold().collect().toSeq === threshold.zip(recall)) } }