From 7b50eb5225b664c43cd5dd66a49024741d2ca19c Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 18 Sep 2018 00:51:03 +0800 Subject: [PATCH 1/3] Refactor FilterPushdownBenchmark --- .../org/apache/spark/util/BenchmarkBase.scala | 57 +++ .../benchmark/FilterPushdownBenchmark.scala | 335 ++++++++---------- 2 files changed, 207 insertions(+), 185 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/BenchmarkBase.scala diff --git a/core/src/main/scala/org/apache/spark/util/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/util/BenchmarkBase.scala new file mode 100644 index 0000000000000..c84032b8726db --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/BenchmarkBase.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.{File, FileOutputStream, OutputStream} + +/** + * A base class for generate benchmark results to a file. + */ +abstract class BenchmarkBase { + var output: Option[OutputStream] = None + + def benchmark(): Unit + + final def runBenchmark(benchmarkName: String)(func: => Any): Unit = { + val separator = "=" * 96 + val testHeader = (separator + '\n' + benchmarkName + '\n' + separator + '\n' + '\n').getBytes + output.foreach(_.write(testHeader)) + func + output.foreach(_.write('\n')) + } + + def main(args: Array[String]): Unit = { + val regenerateBenchmarkFiles: Boolean = System.getenv("SPARK_GENERATE_BENCHMARK_FILES") == "1" + if (regenerateBenchmarkFiles) { + val resultFileName = s"${this.getClass.getSimpleName.replace("$", "")}-results.txt" + val file = new File(s"benchmarks/$resultFileName") + if (!file.exists()) { + file.createNewFile() + } + output = Some(new FileOutputStream(file)) + } + + benchmark() + + output.foreach { o => + if (o != null) { + o.close() + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index d6dfdec45a0e8..bb12cb2281c6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -17,29 +17,28 @@ package org.apache.spark.sql.execution.benchmark -import java.io.{File, FileOutputStream, OutputStream} +import java.io.File import scala.util.{Random, Try} -import org.scalatest.{BeforeAndAfterEachTestData, Suite, TestData} - import org.apache.spark.SparkConf -import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.monotonically_increasing_id import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, TimestampType} -import org.apache.spark.util.{Benchmark, Utils} +import org.apache.spark.util.{Benchmark, BenchmarkBase => FileBenchmarkBase, Utils} /** * Benchmark to measure read performance with Filter pushdown. - * To run this: - * build/sbt "sql/test-only *FilterPushdownBenchmark" - * - * Results will be written to "benchmarks/FilterPushdownBenchmark-results.txt". + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/FilterPushdownBenchmark-results.txt". */ -class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfterEachTest { +object FilterPushdownBenchmark extends FileBenchmarkBase { + private val conf = new SparkConf() .setAppName(this.getClass.getSimpleName) // Since `spark.master` always exists, overrides this value @@ -50,7 +49,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter .setIfMissing("orc.compression", "snappy") .setIfMissing("spark.sql.parquet.compression.codec", "snappy") - private val numRows = 1024 * 1024 * 15 + private val numRows = 1024 private val width = 5 private val mid = numRows / 2 // For Parquet/ORC, we will use the same value for block size and compression size @@ -58,33 +57,6 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter private val spark = SparkSession.builder().config(conf).getOrCreate() - private var out: OutputStream = _ - - override def beforeAll() { - super.beforeAll() - out = new FileOutputStream(new File("benchmarks/FilterPushdownBenchmark-results.txt")) - } - - override def beforeEach(td: TestData) { - super.beforeEach(td) - val separator = "=" * 96 - val testHeader = (separator + '\n' + td.name + '\n' + separator + '\n' + '\n').getBytes - out.write(testHeader) - } - - override def afterEach(td: TestData) { - out.write('\n') - super.afterEach(td) - } - - override def afterAll() { - try { - out.close() - } finally { - super.afterAll() - } - } - def withTempPath(f: File => Unit): Unit = { val path = Utils.createTempDir() path.delete() @@ -154,7 +126,7 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter title: String, whereExpr: String, selectExpr: String = "*"): Unit = { - val benchmark = new Benchmark(title, values, minNumIters = 5, output = Some(out)) + val benchmark = new Benchmark(title, values, minNumIters = 5, output = output) Seq(false, true).foreach { pushDownEnabled => val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" @@ -241,191 +213,184 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter } } - ignore("Pushdown for many distinct value case") { - withTempPath { dir => - withTempTable("orcTable", "parquetTable") { - Seq(true, false).foreach { useStringForValue => - prepareTable(dir, numRows, width, useStringForValue) - if (useStringForValue) { - runStringBenchmark(numRows, width, mid, "string") - } else { - runIntBenchmark(numRows, width, mid) + override def benchmark(): Unit = { + runBenchmark("Pushdown for many distinct value case") { + withTempPath { dir => + withTempTable("orcTable", "parquetTable") { + Seq(true, false).foreach { useStringForValue => + prepareTable(dir, numRows, width, useStringForValue) + if (useStringForValue) { + runStringBenchmark(numRows, width, mid, "string") + } else { + runIntBenchmark(numRows, width, mid) + } } } } } - } - ignore("Pushdown for few distinct value case (use dictionary encoding)") { - withTempPath { dir => - val numDistinctValues = 200 + runBenchmark("Pushdown for few distinct value case (use dictionary encoding)") { + withTempPath { dir => + val numDistinctValues = 200 - withTempTable("orcTable", "parquetTable") { - prepareStringDictTable(dir, numRows, numDistinctValues, width) - runStringBenchmark(numRows, width, numDistinctValues / 2, "distinct string") + withTempTable("orcTable", "parquetTable") { + prepareStringDictTable(dir, numRows, numDistinctValues, width) + runStringBenchmark(numRows, width, numDistinctValues / 2, "distinct string") + } } } - } - ignore("Pushdown benchmark for StringStartsWith") { - withTempPath { dir => - withTempTable("orcTable", "parquetTable") { - prepareTable(dir, numRows, width, true) - Seq( - "value like '10%'", - "value like '1000%'", - s"value like '${mid.toString.substring(0, mid.toString.length - 1)}%'" - ).foreach { whereExpr => - val title = s"StringStartsWith filter: ($whereExpr)" - filterPushDownBenchmark(numRows, title, whereExpr) + runBenchmark("Pushdown benchmark for StringStartsWith") { + withTempPath { dir => + withTempTable("orcTable", "parquetTable") { + prepareTable(dir, numRows, width, true) + Seq( + "value like '10%'", + "value like '1000%'", + s"value like '${mid.toString.substring(0, mid.toString.length - 1)}%'" + ).foreach { whereExpr => + val title = s"StringStartsWith filter: ($whereExpr)" + filterPushDownBenchmark(numRows, title, whereExpr) + } } } } - } - - ignore(s"Pushdown benchmark for ${DecimalType.simpleString}") { - withTempPath { dir => - Seq( - s"decimal(${Decimal.MAX_INT_DIGITS}, 2)", - s"decimal(${Decimal.MAX_LONG_DIGITS}, 2)", - s"decimal(${DecimalType.MAX_PRECISION}, 2)" - ).foreach { dt => - val columns = (1 to width).map(i => s"CAST(id AS string) c$i") - val valueCol = if (dt.equalsIgnoreCase(s"decimal(${Decimal.MAX_INT_DIGITS}, 2)")) { - monotonically_increasing_id() % 9999999 - } else { - monotonically_increasing_id() - } - val df = spark.range(numRows).selectExpr(columns: _*).withColumn("value", valueCol.cast(dt)) - withTempTable("orcTable", "parquetTable") { - saveAsTable(df, dir) - Seq(s"value = $mid").foreach { whereExpr => - val title = s"Select 1 $dt row ($whereExpr)".replace("value AND value", "value") - filterPushDownBenchmark(numRows, title, whereExpr) + runBenchmark(s"Pushdown benchmark for ${DecimalType.simpleString}") { + withTempPath { dir => + Seq( + s"decimal(${Decimal.MAX_INT_DIGITS}, 2)", + s"decimal(${Decimal.MAX_LONG_DIGITS}, 2)", + s"decimal(${DecimalType.MAX_PRECISION}, 2)" + ).foreach { dt => + val columns = (1 to width).map(i => s"CAST(id AS string) c$i") + val valueCol = if (dt.equalsIgnoreCase(s"decimal(${Decimal.MAX_INT_DIGITS}, 2)")) { + monotonically_increasing_id() % 9999999 + } else { + monotonically_increasing_id() } + val df = spark.range(numRows) + .selectExpr(columns: _*).withColumn("value", valueCol.cast(dt)) + withTempTable("orcTable", "parquetTable") { + saveAsTable(df, dir) + + Seq(s"value = $mid").foreach { whereExpr => + val title = s"Select 1 $dt row ($whereExpr)".replace("value AND value", "value") + filterPushDownBenchmark(numRows, title, whereExpr) + } - val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)") - Seq(10, 50, 90).foreach { percent => - filterPushDownBenchmark( - numRows, - s"Select $percent% $dt rows (value < ${numRows * percent / 100})", - s"value < ${numRows * percent / 100}", - selectExpr - ) + val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)") + Seq(10, 50, 90).foreach { percent => + filterPushDownBenchmark( + numRows, + s"Select $percent% $dt rows (value < ${numRows * percent / 100})", + s"value < ${numRows * percent / 100}", + selectExpr + ) + } } } } } - } - ignore("Pushdown benchmark for InSet -> InFilters") { - withTempPath { dir => - withTempTable("orcTable", "parquetTable") { - prepareTable(dir, numRows, width, false) - Seq(5, 10, 50, 100).foreach { count => - Seq(10, 50, 90).foreach { distribution => - val filter = - Range(0, count).map(r => scala.util.Random.nextInt(numRows * distribution / 100)) - val whereExpr = s"value in(${filter.mkString(",")})" - val title = s"InSet -> InFilters (values count: $count, distribution: $distribution)" - filterPushDownBenchmark(numRows, title, whereExpr) + runBenchmark("Pushdown benchmark for InSet -> InFilters") { + withTempPath { dir => + withTempTable("orcTable", "parquetTable") { + prepareTable(dir, numRows, width, false) + Seq(5, 10, 50, 100).foreach { count => + Seq(10, 50, 90).foreach { distribution => + val filter = + Range(0, count).map(r => scala.util.Random.nextInt(numRows * distribution / 100)) + val whereExpr = s"value in(${filter.mkString(",")})" + val title = s"InSet -> InFilters (values count: $count, distribution: $distribution)" + filterPushDownBenchmark(numRows, title, whereExpr) + } } } } } - } - ignore(s"Pushdown benchmark for ${ByteType.simpleString}") { - withTempPath { dir => - val columns = (1 to width).map(i => s"CAST(id AS string) c$i") - val df = spark.range(numRows).selectExpr(columns: _*) - .withColumn("value", (monotonically_increasing_id() % Byte.MaxValue).cast(ByteType)) - .orderBy("value") - withTempTable("orcTable", "parquetTable") { - saveAsTable(df, dir) - - Seq(s"value = CAST(${Byte.MaxValue / 2} AS ${ByteType.simpleString})") - .foreach { whereExpr => - val title = s"Select 1 ${ByteType.simpleString} row ($whereExpr)" - .replace("value AND value", "value") - filterPushDownBenchmark(numRows, title, whereExpr) - } + runBenchmark(s"Pushdown benchmark for ${ByteType.simpleString}") { + withTempPath { dir => + val columns = (1 to width).map(i => s"CAST(id AS string) c$i") + val df = spark.range(numRows).selectExpr(columns: _*) + .withColumn("value", (monotonically_increasing_id() % Byte.MaxValue).cast(ByteType)) + .orderBy("value") + withTempTable("orcTable", "parquetTable") { + saveAsTable(df, dir) - val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)") - Seq(10, 50, 90).foreach { percent => - filterPushDownBenchmark( - numRows, - s"Select $percent% ${ByteType.simpleString} rows " + - s"(value < CAST(${Byte.MaxValue * percent / 100} AS ${ByteType.simpleString}))", - s"value < CAST(${Byte.MaxValue * percent / 100} AS ${ByteType.simpleString})", - selectExpr - ) + Seq(s"value = CAST(${Byte.MaxValue / 2} AS ${ByteType.simpleString})") + .foreach { whereExpr => + val title = s"Select 1 ${ByteType.simpleString} row ($whereExpr)" + .replace("value AND value", "value") + filterPushDownBenchmark(numRows, title, whereExpr) + } + + val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)") + Seq(10, 50, 90).foreach { percent => + filterPushDownBenchmark( + numRows, + s"Select $percent% ${ByteType.simpleString} rows " + + s"(value < CAST(${Byte.MaxValue * percent / 100} AS ${ByteType.simpleString}))", + s"value < CAST(${Byte.MaxValue * percent / 100} AS ${ByteType.simpleString})", + selectExpr + ) + } } } } - } - ignore(s"Pushdown benchmark for Timestamp") { - withTempPath { dir => - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> true.toString) { - ParquetOutputTimestampType.values.toSeq.map(_.toString).foreach { fileType => - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) { - val columns = (1 to width).map(i => s"CAST(id AS string) c$i") - val df = spark.range(numRows).selectExpr(columns: _*) - .withColumn("value", monotonically_increasing_id().cast(TimestampType)) - withTempTable("orcTable", "parquetTable") { - saveAsTable(df, dir) - - Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr => - val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)" - .replace("value AND value", "value") - filterPushDownBenchmark(numRows, title, whereExpr) - } - - val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)") - Seq(10, 50, 90).foreach { percent => - filterPushDownBenchmark( - numRows, - s"Select $percent% timestamp stored as $fileType rows " + - s"(value < CAST(${numRows * percent / 100} AS timestamp))", - s"value < CAST(${numRows * percent / 100} as timestamp)", - selectExpr - ) + runBenchmark(s"Pushdown benchmark for Timestamp") { + withTempPath { dir => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key -> true.toString) { + ParquetOutputTimestampType.values.toSeq.map(_.toString).foreach { fileType => + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) { + val columns = (1 to width).map(i => s"CAST(id AS string) c$i") + val df = spark.range(numRows).selectExpr(columns: _*) + .withColumn("value", monotonically_increasing_id().cast(TimestampType)) + withTempTable("orcTable", "parquetTable") { + saveAsTable(df, dir) + + Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr => + val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)" + .replace("value AND value", "value") + filterPushDownBenchmark(numRows, title, whereExpr) + } + + val selectExpr = (1 to width) + .map(i => s"MAX(c$i)").mkString("", ",", ", MAX(value)") + Seq(10, 50, 90).foreach { percent => + filterPushDownBenchmark( + numRows, + s"Select $percent% timestamp stored as $fileType rows " + + s"(value < CAST(${numRows * percent / 100} AS timestamp))", + s"value < CAST(${numRows * percent / 100} as timestamp)", + selectExpr + ) + } } } } } } } - } - ignore(s"Pushdown benchmark with many filters") { - val numRows = 1 - val width = 500 - - withTempPath { dir => - val columns = (1 to width).map(i => s"id c$i") - val df = spark.range(1).selectExpr(columns: _*) - withTempTable("orcTable", "parquetTable") { - saveAsTable(df, dir) - Seq(1, 250, 500).foreach { numFilter => - val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ") - // Note: InferFiltersFromConstraints will add more filters to this given filters - filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr) + runBenchmark(s"Pushdown benchmark with many filters") { + val numRows = 1 + val width = 500 + + withTempPath { dir => + val columns = (1 to width).map(i => s"id c$i") + val df = spark.range(1).selectExpr(columns: _*) + withTempTable("orcTable", "parquetTable") { + saveAsTable(df, dir) + Seq(1, 250, 500).foreach { numFilter => + val whereExpr = (1 to numFilter).map(i => s"c$i = 0").mkString(" and ") + // Note: InferFiltersFromConstraints will add more filters to this given filters + filterPushDownBenchmark(numRows, s"Select 1 row with $numFilter filters", whereExpr) + } } } } } } - -trait BenchmarkBeforeAndAfterEachTest extends BeforeAndAfterEachTestData { this: Suite => - - override def beforeEach(td: TestData) { - super.beforeEach(td) - } - - override def afterEach(td: TestData) { - super.afterEach(td) - } -} From 6e7cfc85d4d7719ee31254317b0ca81173be7128 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 18 Sep 2018 00:53:55 +0800 Subject: [PATCH 2/3] Revert numRows --- .../spark/sql/execution/benchmark/FilterPushdownBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index bb12cb2281c6d..60d215c46740f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -49,7 +49,7 @@ object FilterPushdownBenchmark extends FileBenchmarkBase { .setIfMissing("orc.compression", "snappy") .setIfMissing("spark.sql.parquet.compression.codec", "snappy") - private val numRows = 1024 + private val numRows = 1024 * 1024 * 15 private val width = 5 private val mid = numRows / 2 // For Parquet/ORC, we will use the same value for block size and compression size From 075ef7a0696332c3b9d35ff1750ea7ade08e7c3d Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 19 Sep 2018 09:16:08 +0800 Subject: [PATCH 3/3] Remove the one space indent before the numbered list --- .../sql/execution/benchmark/FilterPushdownBenchmark.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index 60d215c46740f..9ecea99f12895 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -32,10 +32,10 @@ import org.apache.spark.util.{Benchmark, BenchmarkBase => FileBenchmarkBase, Uti /** * Benchmark to measure read performance with Filter pushdown. * To run this benchmark: - * 1. without sbt: bin/spark-submit --class - * 2. build/sbt "sql/test:runMain " - * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " - * Results will be written to "benchmarks/FilterPushdownBenchmark-results.txt". + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/FilterPushdownBenchmark-results.txt". */ object FilterPushdownBenchmark extends FileBenchmarkBase {