Skip to content

Commit

Permalink
[SPARK-8428][SPARK-13850] Fix integer overflows in TimSort
Browse files Browse the repository at this point in the history
This patch fixes a few integer overflows in `UnsafeSortDataFormat.copyRange()` and `ShuffleSortDataFormat copyRange()` that seems to be the most likely cause behind a number of `TimSort` contract violation errors seen in Spark 2.0 and Spark 1.6 while sorting large datasets.

Added a test in `ExternalSorterSuite` that instantiates a large array of the form of [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999] that triggers a `copyRange` in `TimSort.mergeLo` or `TimSort.mergeHi`. Note that the input dataset should contain at least 268.43 million rows with a certain data distribution for an overflow to occur.

Author: Sameer Agarwal <[email protected]>

Closes #13336 from sameeragarwal/timsort-bug.

(cherry picked from commit fe6de16)
Signed-off-by: Reynold Xin <[email protected]>
  • Loading branch information
sameeragarwal authored and rxin committed May 26, 2016
1 parent 5cc1e2c commit 0b8bdf7
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) {
public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
Platform.copyMemory(
src.getBaseObject(),
src.getBaseOffset() + srcPos * 8,
src.getBaseOffset() + srcPos * 8L,
dst.getBaseObject(),
dst.getBaseOffset() + dstPos * 8,
length * 8
dst.getBaseOffset() + dstPos * 8L,
length * 8L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) {
public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
Platform.copyMemory(
src.getBaseObject(),
src.getBaseOffset() + srcPos * 16,
src.getBaseOffset() + srcPos * 16L,
dst.getBaseObject(),
dst.getBaseOffset() + dstPos * 16,
length * 16);
dst.getBaseOffset() + dstPos * 16L,
length * 16L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.spark.util.collection

import org.apache.spark.memory.MemoryTestingUtils
import java.util.Comparator

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import org.apache.spark._
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.unsafe.array.LongArray
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix, UnsafeSortDataFormat}


class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
Expand Down Expand Up @@ -95,6 +99,25 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
testWithMultipleSer("sort without breaking sorting contracts", loadDefaults = true)(
sortWithoutBreakingSortingContracts)

// This test is ignored by default as it requires a fairly large heap size (16GB)
ignore("sort without breaking timsort contracts for large arrays") {
val size = 300000000
// To manifest the bug observed in SPARK-8428 and SPARK-13850, we explicitly use an array of
// the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999]
// that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi()
val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i }
val buf = new LongArray(MemoryBlock.fromLongArray(ref))

new Sorter(UnsafeSortDataFormat.INSTANCE).sort(
buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] {
override def compare(
r1: RecordPointerAndKeyPrefix,
r2: RecordPointerAndKeyPrefix): Int = {
PrefixComparators.LONG.compare(r1.keyPrefix, r2.keyPrefix)
}
})
}

test("spilling with hash collisions") {
val size = 1000
val conf = createSparkConf(loadDefaults = true, kryo = false)
Expand Down

0 comments on commit 0b8bdf7

Please sign in to comment.