diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala new file mode 100644 index 0000000000000..7f67affe56364 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.util.Utils + +/** + * Benchmark to measure performance for aggregate primitives. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/-results.txt". + * }}} + */ +object BypassMergeSortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { + + private val shuffleHandle: BypassMergeSortShuffleHandle[String, String] = + new BypassMergeSortShuffleHandle[String, String]( + shuffleId = 0, + numMaps = 1, + dependency) + + private val MIN_NUM_ITERS = 10 + private val DATA_SIZE_SMALL = 1000 + private val DATA_SIZE_LARGE = + PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/4/DEFAULT_DATA_STRING_SIZE + + def getWriter(transferTo: Boolean): BypassMergeSortShuffleWriter[String, String] = { + val conf = new SparkConf(loadDefaults = false) + conf.set("spark.file.transferTo", String.valueOf(transferTo)) + conf.set("spark.shuffle.file.buffer", "32k") + + val shuffleWriter = new BypassMergeSortShuffleWriter[String, String]( + blockManager, + blockResolver, + shuffleHandle, + 0, + conf, + taskContext.taskMetrics().shuffleWriteMetrics + ) + + shuffleWriter + } + + def writeBenchmarkWithLargeDataset(): Unit = { + val size = DATA_SIZE_LARGE + val benchmark = new Benchmark( + "BypassMergeSortShuffleWrite with spill", + size, + minNumIters = MIN_NUM_ITERS, + output = output) + + addBenchmarkCase(benchmark, "without transferTo", size, () => getWriter(false)) + addBenchmarkCase(benchmark, "with transferTo", size, () => getWriter(true)) + benchmark.run() + } + + def writeBenchmarkWithSmallDataset(): Unit = { + val size = DATA_SIZE_SMALL + val benchmark = new Benchmark("BypassMergeSortShuffleWrite without spill", + size, + minNumIters = MIN_NUM_ITERS, + output = output) + addBenchmarkCase(benchmark, "small dataset without disk spill", size, () => getWriter(false)) + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("BypassMergeSortShuffleWriter write") { + writeBenchmarkWithSmallDataset() + writeBenchmarkWithLargeDataset() + } + } +} diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleWriterBenchmarkBase.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleWriterBenchmarkBase.scala index 8e6a69fb7080c..8cca6c331ff24 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleWriterBenchmarkBase.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleWriterBenchmarkBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.shuffle.sort -import java.io.{BufferedInputStream, Closeable, File, FileInputStream, FileOutputStream} +import java.io.File import java.util.UUID import org.apache.commons.io.FileUtils @@ -35,7 +35,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager} import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.serializer.{KryoSerializer, Serializer, SerializerManager} -import org.apache.spark.shuffle.IndexShuffleBlockResolver +import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter} import org.apache.spark.storage.{BlockManager, DiskBlockManager, TempShuffleBlockId} import org.apache.spark.util.Utils @@ -121,10 +121,26 @@ abstract class ShuffleWriterBenchmarkBase extends BenchmarkBase { blockManager) } - def addBenchmarkCase(benchmark: Benchmark, name: String)(func: Benchmark.Timer => Unit): Unit = { + def addBenchmarkCase( + benchmark: Benchmark, + name: String, + size: Int, + writerSupplier: () => ShuffleWriter[String, String], + numSpillFiles: Option[Int] = Option.empty): Unit = { benchmark.addTimerCase(name) { timer => setup() - func(timer) + val writer = writerSupplier() + val dataIterator = createDataIterator(size) + try { + timer.startTiming() + writer.write(dataIterator) + timer.stopTiming() + if (numSpillFiles.isDefined) { + assert(tempFilesCreated.length == numSpillFiles.get) + } + } finally { + writer.stop(true) + } teardown() } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala index 317cd23279ede..62cc13fa107f0 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala @@ -92,41 +92,26 @@ object SortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { size, minNumIters = MIN_NUM_ITERS, output = output) - addBenchmarkCase(benchmark, "small dataset without spills") { timer => - val shuffleWriter = getWriter(Option.empty, Option.empty) - val dataIterator = createDataIterator(size) - try { - timer.startTiming() - shuffleWriter.write(dataIterator) - timer.stopTiming() - assert(tempFilesCreated.isEmpty) - } finally { - shuffleWriter.stop(true) - } - } + addBenchmarkCase(benchmark, + "small dataset without spills", + size, + () => getWriter(Option.empty, Option.empty), + Some(0)) benchmark.run() } def writeBenchmarkWithSpill(): Unit = { val size = DATA_SIZE_LARGE - val benchmark = new Benchmark("SortShuffleWriter with spills", size, minNumIters = MIN_NUM_ITERS, output = output, outputPerIteration = true) - addBenchmarkCase(benchmark, "no map side combine") { timer => - val shuffleWriter = getWriter(Option.empty, Option.empty) - val dataIterator = createDataIterator(size) - try { - timer.startTiming() - shuffleWriter.write(dataIterator) - timer.stopTiming() - assert(tempFilesCreated.length == 7) - } finally { - shuffleWriter.stop(true) - } - } + addBenchmarkCase(benchmark, + "no map side combine", + size, + () => getWriter(Option.empty, Option.empty), + Some(7)) def createCombiner(i: String): String = i def mergeValue(i: String, j: String): String = if (Ordering.String.compare(i, j) > 0) i else j @@ -134,32 +119,18 @@ object SortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { if (Ordering.String.compare(i, j) > 0) i else j val aggregator = new Aggregator[String, String, String](createCombiner, mergeValue, mergeCombiners) - addBenchmarkCase(benchmark, "with map side aggregation") { timer => - val shuffleWriter = getWriter(Some(aggregator), Option.empty) - val dataIterator = createDataIterator(size) - try { - timer.startTiming() - shuffleWriter.write(dataIterator) - timer.stopTiming() - assert(tempFilesCreated.length == 7) - } finally { - shuffleWriter.stop(true) - } - } + addBenchmarkCase(benchmark, + "with map side aggregation", + size, + () => getWriter(Some(aggregator), Option.empty), + Some(7)) val sorter = Ordering.String - addBenchmarkCase(benchmark, "with map side sort") { timer => - val shuffleWriter = getWriter(Option.empty, Some(sorter)) - val dataIterator = createDataIterator(size) - try { - timer.startTiming() - shuffleWriter.write(dataIterator) - timer.stopTiming() - assert(tempFilesCreated.length == 7) - } finally { - shuffleWriter.stop(true) - } - } + addBenchmarkCase(benchmark, + "with map side sort", + size, + () => getWriter(Option.empty, Some(sorter)), + Some(7)) benchmark.run() } diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala new file mode 100644 index 0000000000000..ac62b496406fd --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.sort + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.util.Utils + +/** + * Benchmark to measure performance for aggregate primitives. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/-results.txt". + * }}} + */ +object UnsafeShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { + + private val shuffleHandle: SerializedShuffleHandle[String, String] = + new SerializedShuffleHandle[String, String](0, 0, this.dependency) + + private val MIN_NUM_ITERS = 10 + private val DATA_SIZE_SMALL = 1000 + private val DATA_SIZE_LARGE = + PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/2/DEFAULT_DATA_STRING_SIZE + + def getWriter(transferTo: Boolean): UnsafeShuffleWriter[String, String] = { + val conf = new SparkConf(loadDefaults = false) + conf.set("spark.file.transferTo", String.valueOf(transferTo)) + + new UnsafeShuffleWriter[String, String]( + blockManager, + blockResolver, + taskMemoryManager, + shuffleHandle, + 0, + taskContext, + conf, + taskContext.taskMetrics().shuffleWriteMetrics + ) + } + + def writeBenchmarkWithSmallDataset(): Unit = { + val size = DATA_SIZE_SMALL + val benchmark = new Benchmark("UnsafeShuffleWriter without spills", + size, + minNumIters = MIN_NUM_ITERS, + output = output) + addBenchmarkCase(benchmark, + "small dataset without spills", + size, + () => getWriter(false), + Some(1)) // The single temp file is for the temp index file + benchmark.run() + } + + def writeBenchmarkWithSpill(): Unit = { + val size = DATA_SIZE_LARGE + val benchmark = new Benchmark("UnsafeShuffleWriter with spills", + size, + minNumIters = MIN_NUM_ITERS, + output = output, + outputPerIteration = true) + addBenchmarkCase(benchmark, "without transferTo", size, () => getWriter(false), Some(7)) + addBenchmarkCase(benchmark, "with transferTo", size, () => getWriter(true), Some(7)) + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("UnsafeShuffleWriter write") { + writeBenchmarkWithSmallDataset() + writeBenchmarkWithSpill() + } + } +} diff --git a/dev/run-spark-25299-benchmarks.sh b/dev/run-spark-25299-benchmarks.sh index 2a0fe2088f219..d11060a50d111 100755 --- a/dev/run-spark-25299-benchmarks.sh +++ b/dev/run-spark-25299-benchmarks.sh @@ -50,12 +50,16 @@ done echo "Running SPARK-25299 benchmarks" +SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriterBenchmark" SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.SortShuffleWriterBenchmark" +SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.UnsafeShuffleWriterBenchmark" SPARK_DIR=`pwd` mkdir -p /tmp/artifacts +cp $SPARK_DIR/sql/core/benchmarks/BypassMergeSortShuffleWriterBenchmark-results.txt /tmp/artifacts/ cp $SPARK_DIR/sql/core/benchmarks/SortShuffleWriterBenchmark-results.txt /tmp/artifacts/ +cp $SPARK_DIR/sql/core/benchmarks/UnsafeShuffleWriterBenchmark-results.txt /tmp/artifacts/ if [ "$UPLOAD" = false ]; then exit 0