From 42230b6e3edb731eb69b3b8800805805e2234d10 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 22 Sep 2018 23:37:12 +0800 Subject: [PATCH] Add RunBenchmarkWithCodegen --- .../benchmarks/AggregateBenchmark-results.txt | 76 +++++----- .../benchmark/AggregateBenchmark.scala | 131 +++++++----------- .../benchmark/RunBenchmarkWithCodegen.scala | 58 ++++++++ 3 files changed, 149 insertions(+), 116 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/RunBenchmarkWithCodegen.scala diff --git a/sql/core/benchmarks/AggregateBenchmark-results.txt b/sql/core/benchmarks/AggregateBenchmark-results.txt index 11df00c680ee9..b0ccef02cb470 100644 --- a/sql/core/benchmarks/AggregateBenchmark-results.txt +++ b/sql/core/benchmarks/AggregateBenchmark-results.txt @@ -7,8 +7,8 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -agg w/o group wholestage off 39650 / 46049 52.9 18.9 1.0X -agg w/o group wholestage on 1224 / 1413 1713.5 0.6 32.4X +agg w/o group wholestage off 40454 / 44424 51.8 19.3 1.0X +agg w/o group wholestage on 907 / 929 2312.8 0.4 44.6X ================================================================================================ @@ -20,16 +20,16 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -stddev wholestage off 6149 / 6366 17.1 58.6 1.0X -stddev wholestage on 871 / 881 120.4 8.3 7.1X +stddev wholestage off 6281 / 6626 16.7 59.9 1.0X +stddev wholestage on 909 / 987 115.4 8.7 6.9X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -kurtosis wholestage off 28822 / 29231 3.6 274.9 1.0X -kurtosis wholestage on 929 / 944 112.9 8.9 31.0X +kurtosis wholestage off 32819 / 33789 3.2 313.0 1.0X +kurtosis wholestage on 988 / 1046 106.1 9.4 33.2X ================================================================================================ @@ -41,9 +41,9 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -codegen = F 7956 / 7967 10.5 94.8 1.0X -codegen = T hashmap = F 3872 / 4049 21.7 46.2 2.1X -codegen = T hashmap = T 872 / 883 96.3 10.4 9.1X +codegen = F 8490 / 8568 9.9 101.2 1.0X +codegen = T hashmap = F 5985 / 6916 14.0 71.4 1.4X +codegen = T hashmap = T 943 / 977 88.9 11.2 9.0X ================================================================================================ @@ -55,9 +55,9 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -codegen = F 9088 / 9240 9.2 108.3 1.0X -codegen = T hashmap = F 5065 / 5238 16.6 60.4 1.8X -codegen = T hashmap = T 1722 / 1768 48.7 20.5 5.3X +codegen = F 9509 / 9893 8.8 113.4 1.0X +codegen = T hashmap = F 5381 / 5715 15.6 64.1 1.8X +codegen = T hashmap = T 1792 / 1874 46.8 21.4 5.3X ================================================================================================ @@ -69,9 +69,9 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz Aggregate w string key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -codegen = F 3666 / 3704 5.7 174.8 1.0X -codegen = T hashmap = F 2322 / 2357 9.0 110.7 1.6X -codegen = T hashmap = T 1643 / 1676 12.8 78.3 2.2X +codegen = F 4372 / 4440 4.8 208.5 1.0X +codegen = T hashmap = F 2850 / 2940 7.4 135.9 1.5X +codegen = T hashmap = T 2033 / 2175 10.3 96.9 2.2X ================================================================================================ @@ -83,9 +83,9 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz Aggregate w decimal key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -codegen = F 2688 / 2704 7.8 128.2 1.0X -codegen = T hashmap = F 1401 / 1430 15.0 66.8 1.9X -codegen = T hashmap = T 394 / 415 53.2 18.8 6.8X +codegen = F 3819 / 4047 5.5 182.1 1.0X +codegen = T hashmap = F 2080 / 2122 10.1 99.2 1.8X +codegen = T hashmap = T 500 / 523 41.9 23.9 7.6X ================================================================================================ @@ -97,9 +97,9 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz Aggregate w multiple keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -codegen = F 5380 / 5437 3.9 256.5 1.0X -codegen = T hashmap = F 3554 / 3648 5.9 169.5 1.5X -codegen = T hashmap = T 2687 / 2719 7.8 128.1 2.0X +codegen = F 5815 / 6015 3.6 277.3 1.0X +codegen = T hashmap = F 3934 / 3955 5.3 187.6 1.5X +codegen = T hashmap = T 3196 / 3307 6.6 152.4 1.8X ================================================================================================ @@ -111,9 +111,9 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz max function bytecode size: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -codegen = F 375 / 416 1.7 572.0 1.0X -codegen = T hugeMethodLimit = 10000 231 / 245 2.8 352.0 1.6X -codegen = T hugeMethodLimit = 1500 383 / 412 1.7 583.7 1.0X +codegen = F 415 / 511 1.6 632.8 1.0X +codegen = T hugeMethodLimit = 10000 246 / 281 2.7 375.5 1.7X +codegen = T hugeMethodLimit = 1500 416 / 472 1.6 635.2 1.0X ================================================================================================ @@ -125,8 +125,8 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -cube wholestage off 2250 / 2266 2.3 429.1 1.0X -cube wholestage on 907 / 945 5.8 173.0 2.5X +cube wholestage off 2192 / 2270 2.4 418.1 1.0X +cube wholestage on 910 / 1001 5.8 173.5 2.4X ================================================================================================ @@ -138,17 +138,17 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -UnsafeRowhash 205 / 215 102.4 9.8 1.0X -murmur3 hash 104 / 111 202.0 4.9 2.0X -fast hash 55 / 60 381.2 2.6 3.7X -arrayEqual 132 / 139 158.9 6.3 1.6X -Java HashMap (Long) 89 / 103 235.9 4.2 2.3X -Java HashMap (two ints) 91 / 107 229.2 4.4 2.2X -Java HashMap (UnsafeRow) 759 / 772 27.6 36.2 0.3X -LongToUnsafeRowMap (opt=false) 384 / 406 54.7 18.3 0.5X -LongToUnsafeRowMap (opt=true) 82 / 88 256.5 3.9 2.5X -BytesToBytesMap (off Heap) 753 / 811 27.8 35.9 0.3X -BytesToBytesMap (on Heap) 765 / 784 27.4 36.5 0.3X -Aggregate HashMap 35 / 39 591.4 1.7 5.8X +UnsafeRowhash 218 / 225 96.4 10.4 1.0X +murmur3 hash 114 / 121 183.3 5.5 1.9X +fast hash 59 / 64 356.3 2.8 3.7X +arrayEqual 143 / 151 146.6 6.8 1.5X +Java HashMap (Long) 99 / 127 211.0 4.7 2.2X +Java HashMap (two ints) 101 / 136 207.2 4.8 2.1X +Java HashMap (UnsafeRow) 781 / 938 26.9 37.2 0.3X +LongToUnsafeRowMap (opt=false) 398 / 412 52.7 19.0 0.5X +LongToUnsafeRowMap (opt=true) 88 / 95 238.1 4.2 2.5X +BytesToBytesMap (off Heap) 1091 / 1113 19.2 52.0 0.2X +BytesToBytesMap (on Heap) 792 / 835 26.5 37.8 0.3X +Aggregate HashMap 36 / 41 587.9 1.7 6.1X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index b9abb1e1a940d..8fef8998eaa7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -20,10 +20,9 @@ package org.apache.spark.sql.execution.benchmark import java.util.HashMap import org.apache.spark.SparkConf -import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.benchmark.Benchmark import org.apache.spark.internal.config._ import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap import org.apache.spark.sql.execution.vectorized.AggregateHashMap @@ -43,37 +42,13 @@ import org.apache.spark.unsafe.map.BytesToBytesMap * Results will be written to "benchmarks/AggregateBenchmark-results.txt". * }}} */ -object AggregateBenchmark extends BenchmarkBase { - - lazy val sparkSession = SparkSession.builder - .master("local[1]") - .appName(this.getClass.getSimpleName) - .config("spark.sql.shuffle.partitions", 1) - .config("spark.sql.autoBroadcastJoinThreshold", 1) - .getOrCreate() - - /** Runs function `f` with whole stage codegen on and off. */ - def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { - val benchmark = new Benchmark(name, cardinality, output = output) - - benchmark.addCase(s"$name wholestage off", numIters = 2) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false) - f - } - - benchmark.addCase(s"$name wholestage on", numIters = 5) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true) - f - } - - benchmark.run() - } +object AggregateBenchmark extends RunBenchmarkWithCodegen { override def benchmark(): Unit = { runBenchmark("aggregate without grouping") { val N = 500L << 22 runBenchmark("agg w/o group", N) { - sparkSession.range(N).selectExpr("sum(id)").collect() + spark.range(N).selectExpr("sum(id)").collect() } } @@ -81,11 +56,11 @@ object AggregateBenchmark extends BenchmarkBase { val N = 100L << 20 runBenchmark("stddev", N) { - sparkSession.range(N).groupBy().agg("id" -> "stddev").collect() + spark.range(N).groupBy().agg("id" -> "stddev").collect() } runBenchmark("kurtosis", N) { - sparkSession.range(N).groupBy().agg("id" -> "kurtosis").collect() + spark.range(N).groupBy().agg("id" -> "kurtosis").collect() } } @@ -95,25 +70,25 @@ object AggregateBenchmark extends BenchmarkBase { val benchmark = new Benchmark("Aggregate w keys", N, output = output) def f(): Unit = { - sparkSession.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() + spark.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() } benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") + spark.conf.set("spark.sql.codegen.wholeStage", "false") f() } benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") + spark.conf.set("spark.sql.codegen.wholeStage", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") + spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") f() } benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") + spark.conf.set("spark.sql.codegen.wholeStage", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") f() } @@ -124,27 +99,27 @@ object AggregateBenchmark extends BenchmarkBase { val N = 20 << 22 val benchmark = new Benchmark("Aggregate w keys", N, output = output) - sparkSession.range(N).selectExpr("id", "floor(rand() * 10000) as k") + spark.range(N).selectExpr("id", "floor(rand() * 10000) as k") .createOrReplaceTempView("test") - def f(): Unit = sparkSession.sql("select k, k, sum(id) from test group by k, k").collect() + def f(): Unit = spark.sql("select k, k, sum(id) from test group by k, k").collect() benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false) + spark.conf.set("spark.sql.codegen.wholeStage", value = false) f() } benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true) - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") + spark.conf.set("spark.sql.codegen.wholeStage", value = true) + spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") + spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") f() } benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true) - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") + spark.conf.set("spark.sql.codegen.wholeStage", value = true) + spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") f() } @@ -156,25 +131,25 @@ object AggregateBenchmark extends BenchmarkBase { val benchmark = new Benchmark("Aggregate w string key", N, output = output) - def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 1023 as string) as k") + def f(): Unit = spark.range(N).selectExpr("id", "cast(id & 1023 as string) as k") .groupBy("k").count().collect() benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") + spark.conf.set("spark.sql.codegen.wholeStage", "false") f() } benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") + spark.conf.set("spark.sql.codegen.wholeStage", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") + spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") f() } benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") + spark.conf.set("spark.sql.codegen.wholeStage", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") f() } @@ -186,25 +161,25 @@ object AggregateBenchmark extends BenchmarkBase { val benchmark = new Benchmark("Aggregate w decimal key", N, output = output) - def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 65535 as decimal) as k") + def f(): Unit = spark.range(N).selectExpr("id", "cast(id & 65535 as decimal) as k") .groupBy("k").count().collect() benchmark.addCase(s"codegen = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") + spark.conf.set("spark.sql.codegen.wholeStage", "false") f() } benchmark.addCase(s"codegen = T hashmap = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") + spark.conf.set("spark.sql.codegen.wholeStage", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") + spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") + spark.conf.set("spark.sql.codegen.wholeStage", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") f() } @@ -216,7 +191,7 @@ object AggregateBenchmark extends BenchmarkBase { val benchmark = new Benchmark("Aggregate w multiple keys", N, output = output) - def f(): Unit = sparkSession.range(N) + def f(): Unit = spark.range(N) .selectExpr( "id", "(id & 1023) as k1", @@ -230,21 +205,21 @@ object AggregateBenchmark extends BenchmarkBase { .collect() benchmark.addCase(s"codegen = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") + spark.conf.set("spark.sql.codegen.wholeStage", "false") f() } benchmark.addCase(s"codegen = T hashmap = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") + spark.conf.set("spark.sql.codegen.wholeStage", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false") + spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") - sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") + spark.conf.set("spark.sql.codegen.wholeStage", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true") + spark.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true") f() } @@ -256,7 +231,7 @@ object AggregateBenchmark extends BenchmarkBase { val benchmark = new Benchmark("max function bytecode size", N, output = output) - def f(): Unit = sparkSession.range(N) + def f(): Unit = spark.range(N) .selectExpr( "id", "(id & 1023) as k1", @@ -285,19 +260,19 @@ object AggregateBenchmark extends BenchmarkBase { .collect() benchmark.addCase("codegen = F") { iter => - sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false") + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false") f() } benchmark.addCase("codegen = T hugeMethodLimit = 10000") { iter => - sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") - sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "10000") + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + spark.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "10000") f() } benchmark.addCase("codegen = T hugeMethodLimit = 1500") { iter => - sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") - sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "1500") + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + spark.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "1500") f() } @@ -309,7 +284,7 @@ object AggregateBenchmark extends BenchmarkBase { val N = 5 << 20 runBenchmark("cube", N) { - sparkSession.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2") + spark.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2") .cube("k1", "k2").sum("id").collect() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/RunBenchmarkWithCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/RunBenchmarkWithCodegen.scala new file mode 100644 index 0000000000000..e2d93fef4a7c3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/RunBenchmarkWithCodegen.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf + +/** + * Common base trait for micro benchmarks that are supposed to run standalone (i.e. not together + * with other test suites). + */ +trait RunBenchmarkWithCodegen extends BenchmarkBase { + + val spark: SparkSession = getSparkSession + + /** Subclass can override this function to build their own SparkSession */ + def getSparkSession: SparkSession = { + SparkSession.builder() + .master("local[1]") + .appName(this.getClass.getCanonicalName) + .config(SQLConf.SHUFFLE_PARTITIONS.key, 1) + .config(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, 1) + .getOrCreate() + } + + /** Runs function `f` with whole stage codegen on and off. */ + def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { + val benchmark = new Benchmark(name, cardinality, output = output) + + benchmark.addCase(s"$name wholestage off", numIters = 2) { iter => + spark.sqlContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, value = false) + f + } + + benchmark.addCase(s"$name wholestage on", numIters = 5) { iter => + spark.sqlContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, value = true) + f + } + + benchmark.run() + } +}