Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-25299: Add rest of shuffle writer benchmarks #507

Merged
merged 80 commits into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from 79 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
c7abec6
add initial bypass merge sort shuffle writer benchmarks
yifeih Feb 23, 2019
22ef648
dd unsafe shuffle writer benchmarks
yifeih Feb 25, 2019
4084e27
changes in bypassmergesort benchmarks
yifeih Feb 25, 2019
fb8266d
cleanup
yifeih Feb 26, 2019
89104e2
add circle script
yifeih Feb 26, 2019
b90b381
add this branch for testing
yifeih Feb 26, 2019
5e13dd8
fix circle attempt 1
yifeih Feb 26, 2019
845e645
checkout code
yifeih Feb 26, 2019
a68f459
add some caches?
yifeih Feb 26, 2019
757f6fe
why is it not pull caches...
yifeih Feb 26, 2019
0bcd5d9
save as artifact instead of publishing
yifeih Feb 26, 2019
26c01ec
mkdir
yifeih Feb 26, 2019
0d7a036
typo
yifeih Feb 26, 2019
3fc5331
try uploading artifacts again
yifeih Feb 26, 2019
8c33701
try print per iteration to avoid circle erroring out on idle
yifeih Feb 26, 2019
9546397
blah (#495)
yifeih Feb 27, 2019
d72ba73
make a PR comment
yifeih Feb 27, 2019
1859805
actually delete files
yifeih Feb 27, 2019
c20f0be
run benchmarks on test build branch
yifeih Feb 27, 2019
444d46a
oops forgot to enable upload
yifeih Feb 27, 2019
2322933
add sort shuffle writer benchmarks
yifeih Feb 27, 2019
da0d91c
add stdev
yifeih Feb 27, 2019
e590917
cleanup sort a bit
yifeih Feb 27, 2019
cbfdb99
fix stdev text
yifeih Feb 27, 2019
cbe38c6
fix sort shuffle
yifeih Feb 27, 2019
acdda71
initial code for read side
yifeih Feb 28, 2019
fd7a7c5
format
yifeih Feb 28, 2019
d82618b
use times and sample stdev
yifeih Feb 28, 2019
610ea1d
add assert for at least one iteration
yifeih Feb 28, 2019
295d7f3
cleanup shuffle write to use fewer mocks and single base interface
yifeih Mar 1, 2019
0c696dc
shuffle read works with transport client... needs lots of cleaning
yifeih Mar 1, 2019
323a296
test running in cicle
yifeih Mar 1, 2019
85836c2
scalastyle
yifeih Mar 1, 2019
b67d1f3
dont publish results yet
yifeih Mar 1, 2019
252963d
cleanup writer code
yifeih Mar 4, 2019
f72afb2
get only git message
yifeih Mar 4, 2019
3bcd35e
fix command to get PR number
yifeih Mar 4, 2019
d8b5d79
add SortshuffleWriterBenchmark
yifeih Mar 4, 2019
d9fb78a
writer code
yifeih Mar 4, 2019
b142951
cleanup
yifeih Mar 5, 2019
d0466b8
Merge remote-tracking branch 'origin' into yh/add-benchmarks-and-ci
yifeih Mar 5, 2019
f91dfad
fix benchmark script
yifeih Mar 5, 2019
5839b1d
use ArgumentMatchers
yifeih Mar 5, 2019
0b8c7ed
also in shufflewriterbenchmarkbase
yifeih Mar 5, 2019
d11f87f
scalastyle
yifeih Mar 5, 2019
6f2779f
add apache license
yifeih Mar 5, 2019
bbe9edc
fix some scale stuff
yifeih Mar 5, 2019
567d372
fix up tests
yifeih Mar 5, 2019
47c1938
only copy benchmarks we care about
yifeih Mar 5, 2019
e79ac28
increase size for reader again
yifeih Mar 5, 2019
c3858df
delete two writers and reader for PR
yifeih Mar 5, 2019
68d6f62
Revert "delete two writers and reader for PR"
yifeih Mar 5, 2019
7c8d52e
delete reader
yifeih Mar 5, 2019
9d46fae
SPARK-25299: Add shuffle reader benchmarks (#506)
yifeih Mar 5, 2019
9f51758
Revert "SPARK-25299: Add shuffle reader benchmarks (#506)"
yifeih Mar 5, 2019
f169acd
start tests and fix timer
yifeih Mar 5, 2019
bcb09c5
add -e to bash script
yifeih Mar 5, 2019
d4a1b52
Merge branch 'yh/add-benchmarks-and-ci' into yh/add-benchmarks-and-ci…
yifeih Mar 5, 2019
25da723
blah
yifeih Mar 5, 2019
8559264
Merge branch 'yh/add-benchmarks-and-ci' into yh/add-benchmarks-and-ci…
yifeih Mar 5, 2019
13703fa
enable upload as a PR comment and prevent running benchmarks on this …
yifeih Mar 6, 2019
e3751cd
Revert "enable upload as a PR comment and prevent running benchmarks …
yifeih Mar 6, 2019
33a1b72
try machine execution
yifeih Mar 6, 2019
fa1b96c
try uploading benchmarks (#498)
yifeih Mar 7, 2019
b38abb0
Merge branch 'yh/add-benchmarks-and-ci' into yh/add-benchmarks-and-ci…
yifeih Mar 7, 2019
96c66c9
don't upload results yet
yifeih Mar 7, 2019
37cef1f
only upload results when merging into the feature branch
yifeih Mar 11, 2019
459e1b5
lock down machine image
yifeih Mar 12, 2019
4cabdbd
don't write input data to disk
yifeih Mar 13, 2019
9225bb7
Merge branch 'yh/add-benchmarks-and-ci' into yh/add-benchmarks-and-ci…
yifeih Mar 13, 2019
a3b0ee5
dont write input data to disk
yifeih Mar 13, 2019
47d2dcf
run benchmark test
yifeih Mar 13, 2019
c78e491
stop creating file cleanup threads for every block manager
yifeih Mar 13, 2019
f28b75c
use alphanumeric again
yifeih Mar 13, 2019
a85acf4
use a new random everytime
yifeih Mar 13, 2019
f26ab40
close the writers -__________-
yifeih Mar 13, 2019
e5481b4
Merge branch 'yh/add-benchmarks-and-ci' into yh/add-benchmarks-and-ci…
yifeih Mar 13, 2019
6151cab
Merge branch 'spark-25299' into yh/add-benchmarks-and-ci-two-writers
yifeih Mar 14, 2019
da1c2d0
refactor
yifeih Mar 14, 2019
350eb6e
style
yifeih Mar 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.util.Utils

/**
* Benchmark to measure performance for aggregate primitives.
* {{{
* To run this benchmark:
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/<this class>-results.txt".
* }}}
*/
object BypassMergeSortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase {

private val shuffleHandle: BypassMergeSortShuffleHandle[String, String] =
new BypassMergeSortShuffleHandle[String, String](
shuffleId = 0,
numMaps = 1,
dependency)

private val MIN_NUM_ITERS = 10
private val DATA_SIZE_SMALL = 1000
private val DATA_SIZE_LARGE =
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/4/DEFAULT_DATA_STRING_SIZE

def getWriter(transferTo: Boolean): BypassMergeSortShuffleWriter[String, String] = {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.file.transferTo", String.valueOf(transferTo))
conf.set("spark.shuffle.file.buffer", "32k")

val shuffleWriter = new BypassMergeSortShuffleWriter[String, String](
blockManager,
blockResolver,
shuffleHandle,
0,
conf,
taskContext.taskMetrics().shuffleWriteMetrics
)

shuffleWriter
}

def writeBenchmarkWithLargeDataset(): Unit = {
val size = DATA_SIZE_LARGE
val benchmark = new Benchmark(
"BypassMergeSortShuffleWrite with spill",
size,
minNumIters = MIN_NUM_ITERS,
output = output)

addBenchmarkCase(benchmark, "without transferTo", size, () => getWriter(false))
addBenchmarkCase(benchmark, "with transferTo", size, () => getWriter(true))
benchmark.run()
}

def writeBenchmarkWithSmallDataset(): Unit = {
val size = DATA_SIZE_SMALL
val benchmark = new Benchmark("BypassMergeSortShuffleWrite without spill",
size,
minNumIters = MIN_NUM_ITERS,
output = output)
addBenchmarkCase(benchmark, "small dataset without disk spill", size, () => getWriter(false))
benchmark.run()
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("BypassMergeSortShuffleWriter write") {
writeBenchmarkWithSmallDataset()
writeBenchmarkWithLargeDataset()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.shuffle.sort

import java.io.{BufferedInputStream, Closeable, File, FileInputStream, FileOutputStream}
import java.io.File
import java.util.UUID

import org.apache.commons.io.FileUtils
Expand All @@ -35,7 +35,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager}
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.serializer.{KryoSerializer, Serializer, SerializerManager}
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter}
import org.apache.spark.storage.{BlockManager, DiskBlockManager, TempShuffleBlockId}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -121,10 +121,23 @@ abstract class ShuffleWriterBenchmarkBase extends BenchmarkBase {
blockManager)
}

def addBenchmarkCase(benchmark: Benchmark, name: String)(func: Benchmark.Timer => Unit): Unit = {
def addBenchmarkCase(benchmark: Benchmark, name: String, size: Int,
writerSupplier: () => ShuffleWriter[String, String],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Start args on this line, then 1 arg per line, with 4-space indentation from def.

numSpillFiles: Option[Int] = Option.empty): Unit = {
benchmark.addTimerCase(name) { timer =>
setup()
func(timer)
val writer = writerSupplier()
val dataIterator = createDataIterator(size)
try {
timer.startTiming()
writer.write(dataIterator)
timer.stopTiming()
if (numSpillFiles.isDefined) {
assert(tempFilesCreated.length == numSpillFiles.get)
}
} finally {
writer.stop(true)
}
teardown()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,74 +92,45 @@ object SortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase {
size,
minNumIters = MIN_NUM_ITERS,
output = output)
addBenchmarkCase(benchmark, "small dataset without spills") { timer =>
val shuffleWriter = getWriter(Option.empty, Option.empty)
val dataIterator = createDataIterator(size)
try {
timer.startTiming()
shuffleWriter.write(dataIterator)
timer.stopTiming()
assert(tempFilesCreated.isEmpty)
} finally {
shuffleWriter.stop(true)
}
}
addBenchmarkCase(benchmark,
"small dataset without spills",
size,
() => getWriter(Option.empty, Option.empty),
Some(0))
benchmark.run()
}

def writeBenchmarkWithSpill(): Unit = {
val size = DATA_SIZE_LARGE

val benchmark = new Benchmark("SortShuffleWriter with spills",
size,
minNumIters = MIN_NUM_ITERS,
output = output,
outputPerIteration = true)
addBenchmarkCase(benchmark, "no map side combine") { timer =>
val shuffleWriter = getWriter(Option.empty, Option.empty)
val dataIterator = createDataIterator(size)
try {
timer.startTiming()
shuffleWriter.write(dataIterator)
timer.stopTiming()
assert(tempFilesCreated.length == 7)
} finally {
shuffleWriter.stop(true)
}
}
addBenchmarkCase(benchmark,
"no map side combine",
size,
() => getWriter(Option.empty, Option.empty),
Some(7))

def createCombiner(i: String): String = i
def mergeValue(i: String, j: String): String = if (Ordering.String.compare(i, j) > 0) i else j
def mergeCombiners(i: String, j: String): String =
if (Ordering.String.compare(i, j) > 0) i else j
val aggregator =
new Aggregator[String, String, String](createCombiner, mergeValue, mergeCombiners)
addBenchmarkCase(benchmark, "with map side aggregation") { timer =>
val shuffleWriter = getWriter(Some(aggregator), Option.empty)
val dataIterator = createDataIterator(size)
try {
timer.startTiming()
shuffleWriter.write(dataIterator)
timer.stopTiming()
assert(tempFilesCreated.length == 7)
} finally {
shuffleWriter.stop(true)
}
}
addBenchmarkCase(benchmark,
"with map side aggregation",
size,
() => getWriter(Some(aggregator), Option.empty),
Some(7))

val sorter = Ordering.String
addBenchmarkCase(benchmark, "with map side sort") { timer =>
val shuffleWriter = getWriter(Option.empty, Some(sorter))
val dataIterator = createDataIterator(size)
try {
timer.startTiming()
shuffleWriter.write(dataIterator)
timer.stopTiming()
assert(tempFilesCreated.length == 7)
} finally {
shuffleWriter.stop(true)
}
}
addBenchmarkCase(benchmark,
"with map side sort",
size,
() => getWriter(Option.empty, Some(sorter)),
Some(7))
benchmark.run()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.sort

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.util.Utils

/**
* Benchmark to measure performance for aggregate primitives.
* {{{
* To run this benchmark:
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/<this class>-results.txt".
* }}}
*/
object UnsafeShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase {

private val shuffleHandle: SerializedShuffleHandle[String, String] =
new SerializedShuffleHandle[String, String](0, 0, this.dependency)

private val MIN_NUM_ITERS = 10
private val DATA_SIZE_SMALL = 1000
private val DATA_SIZE_LARGE =
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/2/DEFAULT_DATA_STRING_SIZE

def getWriter(transferTo: Boolean): UnsafeShuffleWriter[String, String] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea I just thought of - why not make newWriter(transferTo: Boolean) an abstract method in ShuffleWRiterBenchmarkBase, then pass transferTo to addBenchmarkCase but removes the need to pass the writer supplier? The superclass can just call such an abstract method in addBenchmarkCase. Thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transferTo flag only applies to BypassMergeSortShuffleWriter and UnsafeShuffleWriter, but the SortShuffleWriter tests have other parameters, none of which are transferTo

val conf = new SparkConf(loadDefaults = false)
conf.set("spark.file.transferTo", String.valueOf(transferTo))

new UnsafeShuffleWriter[String, String](
blockManager,
blockResolver,
taskMemoryManager,
shuffleHandle,
0,
taskContext,
conf,
taskContext.taskMetrics().shuffleWriteMetrics
)
}

def writeBenchmarkWithSmallDataset(): Unit = {
val size = DATA_SIZE_SMALL
val benchmark = new Benchmark("UnsafeShuffleWriter without spills",
size,
minNumIters = MIN_NUM_ITERS,
output = output)
addBenchmarkCase(benchmark,
"small dataset without spills",
size,
() => getWriter(false),
Some(1)) // The single temp file is for the temp index file
benchmark.run()
}

def writeBenchmarkWithSpill(): Unit = {
val size = DATA_SIZE_LARGE
val benchmark = new Benchmark("UnsafeShuffleWriter with spills",
size,
minNumIters = MIN_NUM_ITERS,
output = output,
outputPerIteration = true)
addBenchmarkCase(benchmark, "without transferTo", size, () => getWriter(false), Some(7))
addBenchmarkCase(benchmark, "with transferTo", size, () => getWriter(true), Some(7))
benchmark.run()
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("UnsafeShuffleWriter write") {
writeBenchmarkWithSmallDataset()
writeBenchmarkWithSpill()
}
}
}
4 changes: 4 additions & 0 deletions dev/run-spark-25299-benchmarks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ done

echo "Running SPARK-25299 benchmarks"

SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriterBenchmark"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for right now, but if this list grows beyond size 5, let's make a text file with all the classes that we need to benchmark, and then for-loop over it.

SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.SortShuffleWriterBenchmark"
SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.UnsafeShuffleWriterBenchmark"

SPARK_DIR=`pwd`

mkdir -p /tmp/artifacts
cp $SPARK_DIR/sql/core/benchmarks/BypassMergeSortShuffleWriterBenchmark-results.txt /tmp/artifacts/
cp $SPARK_DIR/sql/core/benchmarks/SortShuffleWriterBenchmark-results.txt /tmp/artifacts/
cp $SPARK_DIR/sql/core/benchmarks/UnsafeShuffleWriterBenchmark-results.txt /tmp/artifacts/

if [ "$UPLOAD" = false ]; then
exit 0
Expand Down