forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-25299: Add rest of shuffle writer benchmarks #507
Merged
bulldozer-bot
merged 80 commits into
spark-25299
from
yh/add-benchmarks-and-ci-two-writers
Mar 18, 2019
Merged
Changes from all commits
Commits
Show all changes
80 commits
Select commit
Hold shift + click to select a range
c7abec6
add initial bypass merge sort shuffle writer benchmarks
yifeih 22ef648
dd unsafe shuffle writer benchmarks
yifeih 4084e27
changes in bypassmergesort benchmarks
yifeih fb8266d
cleanup
yifeih 89104e2
add circle script
yifeih b90b381
add this branch for testing
yifeih 5e13dd8
fix circle attempt 1
yifeih 845e645
checkout code
yifeih a68f459
add some caches?
yifeih 757f6fe
why is it not pull caches...
yifeih 0bcd5d9
save as artifact instead of publishing
yifeih 26c01ec
mkdir
yifeih 0d7a036
typo
yifeih 3fc5331
try uploading artifacts again
yifeih 8c33701
try print per iteration to avoid circle erroring out on idle
yifeih 9546397
blah (#495)
yifeih d72ba73
make a PR comment
yifeih 1859805
actually delete files
yifeih c20f0be
run benchmarks on test build branch
yifeih 444d46a
oops forgot to enable upload
yifeih 2322933
add sort shuffle writer benchmarks
yifeih da0d91c
add stdev
yifeih e590917
cleanup sort a bit
yifeih cbfdb99
fix stdev text
yifeih cbe38c6
fix sort shuffle
yifeih acdda71
initial code for read side
yifeih fd7a7c5
format
yifeih d82618b
use times and sample stdev
yifeih 610ea1d
add assert for at least one iteration
yifeih 295d7f3
cleanup shuffle write to use fewer mocks and single base interface
yifeih 0c696dc
shuffle read works with transport client... needs lots of cleaning
yifeih 323a296
test running in cicle
yifeih 85836c2
scalastyle
yifeih b67d1f3
dont publish results yet
yifeih 252963d
cleanup writer code
yifeih f72afb2
get only git message
yifeih 3bcd35e
fix command to get PR number
yifeih d8b5d79
add SortshuffleWriterBenchmark
yifeih d9fb78a
writer code
yifeih b142951
cleanup
yifeih d0466b8
Merge remote-tracking branch 'origin' into yh/add-benchmarks-and-ci
yifeih f91dfad
fix benchmark script
yifeih 5839b1d
use ArgumentMatchers
yifeih 0b8c7ed
also in shufflewriterbenchmarkbase
yifeih d11f87f
scalastyle
yifeih 6f2779f
add apache license
yifeih bbe9edc
fix some scale stuff
yifeih 567d372
fix up tests
yifeih 47c1938
only copy benchmarks we care about
yifeih e79ac28
increase size for reader again
yifeih c3858df
delete two writers and reader for PR
yifeih 68d6f62
Revert "delete two writers and reader for PR"
yifeih 7c8d52e
delete reader
yifeih 9d46fae
SPARK-25299: Add shuffle reader benchmarks (#506)
yifeih 9f51758
Revert "SPARK-25299: Add shuffle reader benchmarks (#506)"
yifeih f169acd
start tests and fix timer
yifeih bcb09c5
add -e to bash script
yifeih d4a1b52
Merge branch 'yh/add-benchmarks-and-ci' into yh/add-benchmarks-and-ci…
yifeih 25da723
blah
yifeih 8559264
Merge branch 'yh/add-benchmarks-and-ci' into yh/add-benchmarks-and-ci…
yifeih 13703fa
enable upload as a PR comment and prevent running benchmarks on this …
yifeih e3751cd
Revert "enable upload as a PR comment and prevent running benchmarks …
yifeih 33a1b72
try machine execution
yifeih fa1b96c
try uploading benchmarks (#498)
yifeih b38abb0
Merge branch 'yh/add-benchmarks-and-ci' into yh/add-benchmarks-and-ci…
yifeih 96c66c9
don't upload results yet
yifeih 37cef1f
only upload results when merging into the feature branch
yifeih 459e1b5
lock down machine image
yifeih 4cabdbd
don't write input data to disk
yifeih 9225bb7
Merge branch 'yh/add-benchmarks-and-ci' into yh/add-benchmarks-and-ci…
yifeih a3b0ee5
dont write input data to disk
yifeih 47d2dcf
run benchmark test
yifeih c78e491
stop creating file cleanup threads for every block manager
yifeih f28b75c
use alphanumeric again
yifeih a85acf4
use a new random everytime
yifeih f26ab40
close the writers -__________-
yifeih e5481b4
Merge branch 'yh/add-benchmarks-and-ci' into yh/add-benchmarks-and-ci…
yifeih 6151cab
Merge branch 'spark-25299' into yh/add-benchmarks-and-ci-two-writers
yifeih da1c2d0
refactor
yifeih 350eb6e
style
yifeih File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
93 changes: 93 additions & 0 deletions
93
.../src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle.sort | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.benchmark.Benchmark | ||
import org.apache.spark.util.Utils | ||
|
||
/** | ||
* Benchmark to measure performance for aggregate primitives. | ||
* {{{ | ||
* To run this benchmark: | ||
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar> | ||
* 2. build/sbt "sql/test:runMain <this class>" | ||
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" | ||
* Results will be written to "benchmarks/<this class>-results.txt". | ||
* }}} | ||
*/ | ||
object BypassMergeSortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { | ||
|
||
private val shuffleHandle: BypassMergeSortShuffleHandle[String, String] = | ||
new BypassMergeSortShuffleHandle[String, String]( | ||
shuffleId = 0, | ||
numMaps = 1, | ||
dependency) | ||
|
||
private val MIN_NUM_ITERS = 10 | ||
private val DATA_SIZE_SMALL = 1000 | ||
private val DATA_SIZE_LARGE = | ||
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/4/DEFAULT_DATA_STRING_SIZE | ||
|
||
def getWriter(transferTo: Boolean): BypassMergeSortShuffleWriter[String, String] = { | ||
val conf = new SparkConf(loadDefaults = false) | ||
conf.set("spark.file.transferTo", String.valueOf(transferTo)) | ||
conf.set("spark.shuffle.file.buffer", "32k") | ||
|
||
val shuffleWriter = new BypassMergeSortShuffleWriter[String, String]( | ||
blockManager, | ||
blockResolver, | ||
shuffleHandle, | ||
0, | ||
conf, | ||
taskContext.taskMetrics().shuffleWriteMetrics | ||
) | ||
|
||
shuffleWriter | ||
} | ||
|
||
def writeBenchmarkWithLargeDataset(): Unit = { | ||
val size = DATA_SIZE_LARGE | ||
val benchmark = new Benchmark( | ||
"BypassMergeSortShuffleWrite with spill", | ||
size, | ||
minNumIters = MIN_NUM_ITERS, | ||
output = output) | ||
|
||
addBenchmarkCase(benchmark, "without transferTo", size, () => getWriter(false)) | ||
addBenchmarkCase(benchmark, "with transferTo", size, () => getWriter(true)) | ||
benchmark.run() | ||
} | ||
|
||
def writeBenchmarkWithSmallDataset(): Unit = { | ||
val size = DATA_SIZE_SMALL | ||
val benchmark = new Benchmark("BypassMergeSortShuffleWrite without spill", | ||
size, | ||
minNumIters = MIN_NUM_ITERS, | ||
output = output) | ||
addBenchmarkCase(benchmark, "small dataset without disk spill", size, () => getWriter(false)) | ||
benchmark.run() | ||
} | ||
|
||
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { | ||
runBenchmark("BypassMergeSortShuffleWriter write") { | ||
writeBenchmarkWithSmallDataset() | ||
writeBenchmarkWithLargeDataset() | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.shuffle.sort | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.benchmark.Benchmark | ||
import org.apache.spark.util.Utils | ||
|
||
/** | ||
* Benchmark to measure performance for aggregate primitives. | ||
* {{{ | ||
* To run this benchmark: | ||
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar> | ||
* 2. build/sbt "sql/test:runMain <this class>" | ||
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" | ||
* Results will be written to "benchmarks/<this class>-results.txt". | ||
* }}} | ||
*/ | ||
object UnsafeShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { | ||
|
||
private val shuffleHandle: SerializedShuffleHandle[String, String] = | ||
new SerializedShuffleHandle[String, String](0, 0, this.dependency) | ||
|
||
private val MIN_NUM_ITERS = 10 | ||
private val DATA_SIZE_SMALL = 1000 | ||
private val DATA_SIZE_LARGE = | ||
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/2/DEFAULT_DATA_STRING_SIZE | ||
|
||
def getWriter(transferTo: Boolean): UnsafeShuffleWriter[String, String] = { | ||
val conf = new SparkConf(loadDefaults = false) | ||
conf.set("spark.file.transferTo", String.valueOf(transferTo)) | ||
|
||
new UnsafeShuffleWriter[String, String]( | ||
blockManager, | ||
blockResolver, | ||
taskMemoryManager, | ||
shuffleHandle, | ||
0, | ||
taskContext, | ||
conf, | ||
taskContext.taskMetrics().shuffleWriteMetrics | ||
) | ||
} | ||
|
||
def writeBenchmarkWithSmallDataset(): Unit = { | ||
val size = DATA_SIZE_SMALL | ||
val benchmark = new Benchmark("UnsafeShuffleWriter without spills", | ||
size, | ||
minNumIters = MIN_NUM_ITERS, | ||
output = output) | ||
addBenchmarkCase(benchmark, | ||
"small dataset without spills", | ||
size, | ||
() => getWriter(false), | ||
Some(1)) // The single temp file is for the temp index file | ||
benchmark.run() | ||
} | ||
|
||
def writeBenchmarkWithSpill(): Unit = { | ||
val size = DATA_SIZE_LARGE | ||
val benchmark = new Benchmark("UnsafeShuffleWriter with spills", | ||
size, | ||
minNumIters = MIN_NUM_ITERS, | ||
output = output, | ||
outputPerIteration = true) | ||
addBenchmarkCase(benchmark, "without transferTo", size, () => getWriter(false), Some(7)) | ||
addBenchmarkCase(benchmark, "with transferTo", size, () => getWriter(true), Some(7)) | ||
benchmark.run() | ||
} | ||
|
||
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { | ||
runBenchmark("UnsafeShuffleWriter write") { | ||
writeBenchmarkWithSmallDataset() | ||
writeBenchmarkWithSpill() | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,12 +50,16 @@ done | |
|
||
echo "Running SPARK-25299 benchmarks" | ||
|
||
SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriterBenchmark" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not for right now, but if this list grows beyond size 5, let's make a text file with all the classes that we need to benchmark, and then for-loop over it. |
||
SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.SortShuffleWriterBenchmark" | ||
SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.UnsafeShuffleWriterBenchmark" | ||
|
||
SPARK_DIR=`pwd` | ||
|
||
mkdir -p /tmp/artifacts | ||
cp $SPARK_DIR/sql/core/benchmarks/BypassMergeSortShuffleWriterBenchmark-results.txt /tmp/artifacts/ | ||
cp $SPARK_DIR/sql/core/benchmarks/SortShuffleWriterBenchmark-results.txt /tmp/artifacts/ | ||
cp $SPARK_DIR/sql/core/benchmarks/UnsafeShuffleWriterBenchmark-results.txt /tmp/artifacts/ | ||
|
||
if [ "$UPLOAD" = false ]; then | ||
exit 0 | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One idea I just thought of - why not make
newWriter(transferTo: Boolean)
anabstract
method inShuffleWRiterBenchmarkBase
, then passtransferTo
toaddBenchmarkCase
but removes the need to pass the writer supplier? The superclass can just call such an abstract method inaddBenchmarkCase
. Thoughts?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
transferTo
flag only applies toBypassMergeSortShuffleWriter
andUnsafeShuffleWriter
, but theSortShuffleWriter
tests have other parameters, none of which aretransferTo