-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25476][SPARK-25510][TEST] Refactor AggregateBenchmark and add a new trait to better support Dataset and DataFrame API #22484
Conversation
================================================================================================ | ||
stat functions | ||
================================================================================================ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies Do you know how to generate there benchmark:
spark/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
Lines 70 to 78 in 3c3eebc
Using ImperativeAggregate (as implemented in Spark 1.6): | |
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | |
stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate | |
------------------------------------------------------------------------------- | |
stddev w/o codegen 2019.04 10.39 1.00 X | |
stddev w codegen 2097.29 10.00 0.96 X | |
kurtosis w/o codegen 2108.99 9.94 0.96 X | |
kurtosis w codegen 2090.69 10.03 0.97 X |
Test build #96338 has finished for PR 22484 at commit
|
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
override def benchmark(): Unit = { | ||
runBenchmark("aggregate without grouping") { | ||
val N = 500L << 22 | ||
runBenchmark("agg w/o group", N) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The runBenchmark
here is different from the on in line 48, but they have the same name. We should have a different name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Do you have a suggested name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I don't a good name in mind. How about make the method runBenchmark
of RunBenchmarkWithCodegen
overriding the one in BenchmarkBase
?
Test build #96473 has finished for PR 22484 at commit
|
* Common base trait for micro benchmarks that are supposed to run standalone (i.e. not together | ||
* with other test suites). | ||
*/ | ||
trait RunBenchmarkWithCodegen extends BenchmarkBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about RunBenchmarkWithCodegen
-> SqlBaseBenchmark
?
} | ||
|
||
/** Runs function `f` with whole stage codegen on and off. */ | ||
def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about runBenchmark
-> runBenchmarkWithCodegen
?
Test build #96500 has finished for PR 22484 at commit
|
retest this please |
Test build #96502 has finished for PR 22484 at commit
|
LGTM, cc @dongjoon-hyun for sign-off |
@wangyum . Could you make the title and description up-to-date for this PR content? Also, please update JIRA title and description, too. |
val N = 5 << 20 | ||
benchmark.addCase("codegen = T hugeMethodLimit = 1500") { iter => | ||
spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") | ||
spark.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "1500") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although this is not a problem of this refactoring, this test suite seems to be unhealthy because the configuration from the previous benchmark is propagated to the next benchmark.
Can we fix this test suite to use withSQLConf
?
Thank you for updating, @wangyum . |
spark.conf.set("spark.sql.codegen.wholeStage", "false") | ||
f() | ||
benchmark.addCase(s"codegen = F", numIters = 2) { _ => | ||
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> false.toString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"false"
instead of false.toString
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we use Seq(true, false).foreach { value =>
, we usually do s"$value"
. But, for this, I think "false"
is the simplest and the best.
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { _ => | ||
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> true.toString, | ||
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> false.toString, | ||
"spark.sql.codegen.aggregate.map.vectorized.enable" -> false.toString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangyum . This one is also for indentation. Please note that withSQLConf(
is beyond the first configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean change
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
f()
}
to
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
f()
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { _ => | ||
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> true.toString, | ||
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> true.toString, | ||
"spark.sql.codegen.aggregate.map.vectorized.enable" -> true.toString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
spark.conf.set("spark.sql.codegen.wholeStage", value = false) | ||
f() | ||
benchmark.addCase(s"codegen = F", numIters = 2) { _ => | ||
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> false.toString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"false"
instead of false.toString
.
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { _ => | ||
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> true.toString, | ||
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> false.toString, | ||
"spark.sql.codegen.aggregate.map.vectorized.enable" -> false.toString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restores all SQL | ||
* configurations. | ||
*/ | ||
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we avoid duplicating the existing logic withSQLConf
? Let me try to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangyum . Thank you for waiting.
Since SPARK-25534 is merged, could you use SQLHelper.withSQLConf
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will do it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun I finished it.
I made an official PR for |
Test build #96616 has finished for PR 22484 at commit
|
retest this please |
Test build #96624 has finished for PR 22484 at commit
|
@wangyum Could you review and merge wangyum#12 , too? |
def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 1023 as string) as k") | ||
.groupBy("k").count().collect() | ||
benchmark.addCase(s"codegen = F", numIters = 2) { _ => | ||
spark.conf.set("spark.sql.codegen.wholeStage", "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove this redundant line 148?
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") | ||
f() | ||
} | ||
benchmark.addCase(s"codegen = F", numIters = 2) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"codegen = F"
-> "codegen = F"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dongjoon-hyun I plan add EmptyInterpolatedStringChecker
to scalastyle-config.xml to avoid this issue: SPARK-25553
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") | ||
f() | ||
} | ||
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"codegen = T hashmap = F"
-> "codegen = T hashmap = F"
Could you fix all instances like this?
Test build #96667 has finished for PR 22484 at commit
|
Test build #96666 has finished for PR 22484 at commit
|
retest this please |
Test build #96686 has finished for PR 22484 at commit
|
@dongjoon-hyun Other refactorings are waiting for this commit. |
/** | ||
* Common base trait to run benchmark with the Dataset and DataFrame API. | ||
*/ | ||
trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangyum and @gengliangwang .
What is the future plan for the usage of both SqlBasedBenchmark
and BenchmarkWithCodegen
? I'm wondering what is the criteria to choose each trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove BenchmarkWithCodegen
after all refactor finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if @gengliangwang agree with that, SqlBasedBenchmark
is another refactoring (renaming and improvement) like [SPARK-25499][TEST] Refactor BenchmarkBase and Benchmark
. Could you do that in a separate PR in advance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I don't think the the name SqlBasedBenchmark
is not appropriate..From the naming we can't tell it is about benchmarking with/without whole codegen. I will try to come up with a better name. Or we can discuss in this thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun in #22522 I feel that it would be better to have a example refactoring, thus we can see how the new trait is used.
We can move back to #22522 . I am OK either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about CodegenBenchmarkBase
? This is the best I can think of.. @wangyum @dongjoon-hyun @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can add more common functions in the future. e.g. runBenchmarkWithCodegen
, runBenchmarkWithParquetPushDown
, runBenchmarkWithOrcPushDown
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then each function can be in different trait...I don't think that runBenchmarkWithCodegen
has much in common with runBenchmarkWithParquetPushDown
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @gengliangwang and @wangyum . Let me think about this again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the naming, let's keep the current one for now.
*/ | ||
trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper { | ||
|
||
val spark: SparkSession = getSparkSession |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val spark
-> protected val spark
} | ||
|
||
/** Runs function `f` with whole stage codegen on and off. */ | ||
def runBenchmarkWithCodegen(name: String, cardinality: Long)(f: => Unit): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be final def runBenchmarkWithCodegen
instead of def runBenchmarkWithCodegen
.
.getOrCreate() | ||
} | ||
|
||
/** Runs function `f` with whole stage codegen on and off. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use codegenBenchmark
instead? runBenchmarkWithCodegen
looks like an extension of runBenchmark
. It's more like bitEncodingBenchmark
or sortBenchmark
.
Test build #96811 has finished for PR 22484 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Merged to master. |
Thank you, @wangyum , @cloud-fan and @gengliangwang ! |
## What changes were proposed in this pull request? Remove `BenchmarkWithCodegen` as we don't use it anymore. More details: #22484 (comment) ## How was this patch tested? N/A Closes #22985 from wangyum/SPARK-25510. Authored-by: Yuming Wang <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
…a new trait to better support Dataset and DataFrame API ## What changes were proposed in this pull request? This PR does 2 things: 1. Add a new trait(`SqlBasedBenchmark`) to better support Dataset and DataFrame API. 2. Refactor `AggregateBenchmark` to use main method. Generate benchmark result: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.AggregateBenchmark" ``` ## How was this patch tested? manual tests Closes apache#22484 from wangyum/SPARK-25476. Lead-authored-by: Yuming Wang <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
## What changes were proposed in this pull request? Remove `BenchmarkWithCodegen` as we don't use it anymore. More details: apache#22484 (comment) ## How was this patch tested? N/A Closes apache#22985 from wangyum/SPARK-25510. Authored-by: Yuming Wang <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
What changes were proposed in this pull request?
This PR does 2 things:
SqlBasedBenchmark
) to better support Dataset and DataFrame API.AggregateBenchmark
to use main method. Generate benchmark result:How was this patch tested?
manual tests