Skip to content

Commit

Permalink
SPARK-25299: Add rest of shuffle writer benchmarks (apache-spark-on-k…
Browse files Browse the repository at this point in the history
  • Loading branch information
yifeih authored and ifilonenko committed Apr 4, 2019
1 parent cb3cfb4 commit ef563da
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.util.Utils

/**
* Benchmark to measure performance for aggregate primitives.
* {{{
* To run this benchmark:
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/<this class>-results.txt".
* }}}
*/
object BypassMergeSortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase {

private val shuffleHandle: BypassMergeSortShuffleHandle[String, String] =
new BypassMergeSortShuffleHandle[String, String](
shuffleId = 0,
numMaps = 1,
dependency)

private val MIN_NUM_ITERS = 10
private val DATA_SIZE_SMALL = 1000
private val DATA_SIZE_LARGE =
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/4/DEFAULT_DATA_STRING_SIZE

def getWriter(transferTo: Boolean): BypassMergeSortShuffleWriter[String, String] = {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.file.transferTo", String.valueOf(transferTo))
conf.set("spark.shuffle.file.buffer", "32k")

val shuffleWriter = new BypassMergeSortShuffleWriter[String, String](
blockManager,
blockResolver,
shuffleHandle,
0,
conf,
taskContext.taskMetrics().shuffleWriteMetrics
)

shuffleWriter
}

def writeBenchmarkWithLargeDataset(): Unit = {
val size = DATA_SIZE_LARGE
val benchmark = new Benchmark(
"BypassMergeSortShuffleWrite with spill",
size,
minNumIters = MIN_NUM_ITERS,
output = output)

addBenchmarkCase(benchmark, "without transferTo", size, () => getWriter(false))
addBenchmarkCase(benchmark, "with transferTo", size, () => getWriter(true))
benchmark.run()
}

def writeBenchmarkWithSmallDataset(): Unit = {
val size = DATA_SIZE_SMALL
val benchmark = new Benchmark("BypassMergeSortShuffleWrite without spill",
size,
minNumIters = MIN_NUM_ITERS,
output = output)
addBenchmarkCase(benchmark, "small dataset without disk spill", size, () => getWriter(false))
benchmark.run()
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("BypassMergeSortShuffleWriter write") {
writeBenchmarkWithSmallDataset()
writeBenchmarkWithLargeDataset()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.shuffle.sort

import java.io.{BufferedInputStream, Closeable, File, FileInputStream, FileOutputStream}
import java.io.File
import java.util.UUID

import org.apache.commons.io.FileUtils
Expand All @@ -35,7 +35,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager}
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.serializer.{KryoSerializer, Serializer, SerializerManager}
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter}
import org.apache.spark.storage.{BlockManager, DiskBlockManager, TempShuffleBlockId}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -121,10 +121,26 @@ abstract class ShuffleWriterBenchmarkBase extends BenchmarkBase {
blockManager)
}

def addBenchmarkCase(benchmark: Benchmark, name: String)(func: Benchmark.Timer => Unit): Unit = {
def addBenchmarkCase(
benchmark: Benchmark,
name: String,
size: Int,
writerSupplier: () => ShuffleWriter[String, String],
numSpillFiles: Option[Int] = Option.empty): Unit = {
benchmark.addTimerCase(name) { timer =>
setup()
func(timer)
val writer = writerSupplier()
val dataIterator = createDataIterator(size)
try {
timer.startTiming()
writer.write(dataIterator)
timer.stopTiming()
if (numSpillFiles.isDefined) {
assert(tempFilesCreated.length == numSpillFiles.get)
}
} finally {
writer.stop(true)
}
teardown()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,74 +92,45 @@ object SortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase {
size,
minNumIters = MIN_NUM_ITERS,
output = output)
addBenchmarkCase(benchmark, "small dataset without spills") { timer =>
val shuffleWriter = getWriter(Option.empty, Option.empty)
val dataIterator = createDataIterator(size)
try {
timer.startTiming()
shuffleWriter.write(dataIterator)
timer.stopTiming()
assert(tempFilesCreated.isEmpty)
} finally {
shuffleWriter.stop(true)
}
}
addBenchmarkCase(benchmark,
"small dataset without spills",
size,
() => getWriter(Option.empty, Option.empty),
Some(0))
benchmark.run()
}

def writeBenchmarkWithSpill(): Unit = {
val size = DATA_SIZE_LARGE

val benchmark = new Benchmark("SortShuffleWriter with spills",
size,
minNumIters = MIN_NUM_ITERS,
output = output,
outputPerIteration = true)
addBenchmarkCase(benchmark, "no map side combine") { timer =>
val shuffleWriter = getWriter(Option.empty, Option.empty)
val dataIterator = createDataIterator(size)
try {
timer.startTiming()
shuffleWriter.write(dataIterator)
timer.stopTiming()
assert(tempFilesCreated.length == 7)
} finally {
shuffleWriter.stop(true)
}
}
addBenchmarkCase(benchmark,
"no map side combine",
size,
() => getWriter(Option.empty, Option.empty),
Some(7))

def createCombiner(i: String): String = i
def mergeValue(i: String, j: String): String = if (Ordering.String.compare(i, j) > 0) i else j
def mergeCombiners(i: String, j: String): String =
if (Ordering.String.compare(i, j) > 0) i else j
val aggregator =
new Aggregator[String, String, String](createCombiner, mergeValue, mergeCombiners)
addBenchmarkCase(benchmark, "with map side aggregation") { timer =>
val shuffleWriter = getWriter(Some(aggregator), Option.empty)
val dataIterator = createDataIterator(size)
try {
timer.startTiming()
shuffleWriter.write(dataIterator)
timer.stopTiming()
assert(tempFilesCreated.length == 7)
} finally {
shuffleWriter.stop(true)
}
}
addBenchmarkCase(benchmark,
"with map side aggregation",
size,
() => getWriter(Some(aggregator), Option.empty),
Some(7))

val sorter = Ordering.String
addBenchmarkCase(benchmark, "with map side sort") { timer =>
val shuffleWriter = getWriter(Option.empty, Some(sorter))
val dataIterator = createDataIterator(size)
try {
timer.startTiming()
shuffleWriter.write(dataIterator)
timer.stopTiming()
assert(tempFilesCreated.length == 7)
} finally {
shuffleWriter.stop(true)
}
}
addBenchmarkCase(benchmark,
"with map side sort",
size,
() => getWriter(Option.empty, Some(sorter)),
Some(7))
benchmark.run()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.sort

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.util.Utils

/**
* Benchmark to measure performance for aggregate primitives.
* {{{
* To run this benchmark:
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/<this class>-results.txt".
* }}}
*/
object UnsafeShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase {

private val shuffleHandle: SerializedShuffleHandle[String, String] =
new SerializedShuffleHandle[String, String](0, 0, this.dependency)

private val MIN_NUM_ITERS = 10
private val DATA_SIZE_SMALL = 1000
private val DATA_SIZE_LARGE =
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/2/DEFAULT_DATA_STRING_SIZE

def getWriter(transferTo: Boolean): UnsafeShuffleWriter[String, String] = {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.file.transferTo", String.valueOf(transferTo))

new UnsafeShuffleWriter[String, String](
blockManager,
blockResolver,
taskMemoryManager,
shuffleHandle,
0,
taskContext,
conf,
taskContext.taskMetrics().shuffleWriteMetrics
)
}

def writeBenchmarkWithSmallDataset(): Unit = {
val size = DATA_SIZE_SMALL
val benchmark = new Benchmark("UnsafeShuffleWriter without spills",
size,
minNumIters = MIN_NUM_ITERS,
output = output)
addBenchmarkCase(benchmark,
"small dataset without spills",
size,
() => getWriter(false),
Some(1)) // The single temp file is for the temp index file
benchmark.run()
}

def writeBenchmarkWithSpill(): Unit = {
val size = DATA_SIZE_LARGE
val benchmark = new Benchmark("UnsafeShuffleWriter with spills",
size,
minNumIters = MIN_NUM_ITERS,
output = output,
outputPerIteration = true)
addBenchmarkCase(benchmark, "without transferTo", size, () => getWriter(false), Some(7))
addBenchmarkCase(benchmark, "with transferTo", size, () => getWriter(true), Some(7))
benchmark.run()
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("UnsafeShuffleWriter write") {
writeBenchmarkWithSmallDataset()
writeBenchmarkWithSpill()
}
}
}
4 changes: 4 additions & 0 deletions dev/run-spark-25299-benchmarks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ done

echo "Running SPARK-25299 benchmarks"

SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriterBenchmark"
SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.SortShuffleWriterBenchmark"
SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.UnsafeShuffleWriterBenchmark"

SPARK_DIR=`pwd`

mkdir -p /tmp/artifacts
cp $SPARK_DIR/sql/core/benchmarks/BypassMergeSortShuffleWriterBenchmark-results.txt /tmp/artifacts/
cp $SPARK_DIR/sql/core/benchmarks/SortShuffleWriterBenchmark-results.txt /tmp/artifacts/
cp $SPARK_DIR/sql/core/benchmarks/UnsafeShuffleWriterBenchmark-results.txt /tmp/artifacts/

if [ "$UPLOAD" = false ]; then
exit 0
Expand Down

0 comments on commit ef563da

Please sign in to comment.