Skip to content

Commit

Permalink
[SPARK-3394] [SQL] Fix crash in TakeOrdered when limit is 0
Browse files Browse the repository at this point in the history
This resolves https://issues.apache.org/jira/browse/SPARK-3394

Author: Eric Liang <[email protected]>

Closes #2264 from ericl/spark-3394 and squashes the following commits:

c87355b [Eric Liang] refactor
bfb6140 [Eric Liang] change RDD takeOrdered instead
7a51528 [Eric Liang] fix takeordered when limit = 0

(cherry picked from commit 6754570)
Signed-off-by: Matei Zaharia <[email protected]>
  • Loading branch information
Eric Liang authored and mateiz committed Sep 8, 2014
1 parent 4d3ab29 commit ae6f554
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1051,15 +1051,19 @@ abstract class RDD[T: ClassTag](
* @return an array of top elements
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
if (num == 0) {
Array.empty
} else {
mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}

/**
Expand Down
7 changes: 7 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,13 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sortedLowerK === Array(1, 2, 3, 4, 5))
}

test("takeOrdered with limit 0") {
val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd = sc.makeRDD(nums, 2)
val sortedLowerK = rdd.takeOrdered(0)
assert(sortedLowerK.size === 0)
}

test("takeOrdered with custom ordering") {
val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
implicit val ord = implicitly[Ordering[Int]].reverse
Expand Down

0 comments on commit ae6f554

Please sign in to comment.