From 0946097fd9df65d2f7ad0b69b347e27a8a14c0d9 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 23 May 2018 17:43:23 +0800 Subject: [PATCH 1/7] Add parquet write benchmark --- .../parquet/ParquetWriteBenchmark.scala | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala new file mode 100644 index 0000000000000..4e846f86f5c55 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import scala.util.Try + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure parquet write performance. + * To run this: + * spark-submit --class --jars + */ +object ParquetWriteBenchmark { + val conf = new SparkConf() + conf.set("spark.sql.parquet.compression.codec", "snappy") + + val spark = SparkSession.builder + .master("local[1]") + .appName("parquet-write-benchmark") + .config(conf) + .getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { + val path = Utils.createTempDir() + path.delete() + try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { + try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { + val (keys, values) = pairs.unzip + val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) + (keys, values).zipped.foreach(spark.conf.set) + try f finally { + keys.zip(currentValues).foreach { + case (key, Some(value)) => spark.conf.set(key, value) + case (key, None) => spark.conf.unset(key) + } + } + } + + def runSQL(name: String, sql: String, values: Int): Unit = { + withTempTable("t1") { + spark.range(values).createOrReplaceTempView("t1") + val benchmark = new Benchmark(name, values) + benchmark.addCase("Parquet Writer") { _ => + withTempPath { dir => + spark.sql("select cast(id as INT) as id from t1").write.parquet(dir.getCanonicalPath) + } + } + benchmark.run() + } + } + + def intWriteBenchmark(values: Int): Unit = { + runSQL("Output Single Int Column", "select cast(id as INT) as id from t1", values) + } + + def intStringScanBenchmark(values: Int): Unit = { + runSQL(name = "Output Int and String Column", + sql = "select cast(id as INT) as c1, cast(id as STRING) as c2 from t1", + values = values) + } + + def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = { + runSQL(name = "String with Nulls", + sql = s"select IF(rand(1) < $fractionOfNulls, NULL, cast(id as STRING)) as c1, " + + s"IF(rand(2) < $fractionOfNulls, NULL, cast(id as STRING)) as c2 from t1", + values = values) + } + + def partitionTableScanBenchmark(values: Int): Unit = { + withTempTable("t1", "tempTable") { + spark.range(values).createOrReplaceTempView("t1") + val benchmark = new Benchmark("Partitioned Table", values) + benchmark.addCase("Parquet Writer") { _ => + withTempPath { dir => + spark.sql("select id % 2 as p, cast(id as INT) as id from t1") + .write.partitionBy("p").parquet(dir.getCanonicalPath) + } + } + + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + + Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + --------------------------------------------------------------------------------------------- + Parquet Writer 4163 / 4173 3.8 264.7 1.0X + */ + benchmark.run() + } + } + + def main(args: Array[String]): Unit = { + intWriteBenchmark(1024 * 1024 * 15) + intStringScanBenchmark(1024 * 1024 * 10) + partitionTableScanBenchmark(1024 * 1024 * 15) + for (fractionOfNulls <- List(0.0, 0.50, 0.95)) { + stringWithNullsScanBenchmark(1024 * 1024 * 10, fractionOfNulls) + } + } +} From 528a877dbda290d386af4fe981cceeadf2047897 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 23 May 2018 19:30:56 +0800 Subject: [PATCH 2/7] address comments --- .../parquet/ParquetWriteBenchmark.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala index 4e846f86f5c55..280b3d03b7cc3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala @@ -71,7 +71,7 @@ object ParquetWriteBenchmark { val benchmark = new Benchmark(name, values) benchmark.addCase("Parquet Writer") { _ => withTempPath { dir => - spark.sql("select cast(id as INT) as id from t1").write.parquet(dir.getCanonicalPath) + spark.sql(sql).write.parquet(dir.getCanonicalPath) } } benchmark.run() @@ -79,24 +79,31 @@ object ParquetWriteBenchmark { } def intWriteBenchmark(values: Int): Unit = { + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + + Output Single Int Column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Parquet Writer 2536 / 2610 6.2 161.3 1.0X + */ runSQL("Output Single Int Column", "select cast(id as INT) as id from t1", values) } - def intStringScanBenchmark(values: Int): Unit = { + def intStringWriteBenchmark(values: Int): Unit = { + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + + Output Int and String Column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Parquet Writer 4644 / 4673 2.3 442.9 1.0X + */ runSQL(name = "Output Int and String Column", sql = "select cast(id as INT) as c1, cast(id as STRING) as c2 from t1", values = values) } - def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = { - runSQL(name = "String with Nulls", - sql = s"select IF(rand(1) < $fractionOfNulls, NULL, cast(id as STRING)) as c1, " + - s"IF(rand(2) < $fractionOfNulls, NULL, cast(id as STRING)) as c2 from t1", - values = values) - } - - def partitionTableScanBenchmark(values: Int): Unit = { - withTempTable("t1", "tempTable") { + def partitionTableWriteBenchmark(values: Int): Unit = { + withTempTable("t1") { spark.range(values).createOrReplaceTempView("t1") val benchmark = new Benchmark("Partitioned Table", values) benchmark.addCase("Parquet Writer") { _ => @@ -119,10 +126,7 @@ object ParquetWriteBenchmark { def main(args: Array[String]): Unit = { intWriteBenchmark(1024 * 1024 * 15) - intStringScanBenchmark(1024 * 1024 * 10) - partitionTableScanBenchmark(1024 * 1024 * 15) - for (fractionOfNulls <- List(0.0, 0.50, 0.95)) { - stringWithNullsScanBenchmark(1024 * 1024 * 10, fractionOfNulls) - } + intStringWriteBenchmark(1024 * 1024 * 10) + partitionTableWriteBenchmark(1024 * 1024 * 15) } } From 209a33faea1a7fd55461dd79a6a660ae760e881b Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 23 May 2018 22:14:38 +0800 Subject: [PATCH 3/7] address comments --- .../execution/datasources/parquet/ParquetWriteBenchmark.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala index 280b3d03b7cc3..e52687404c473 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala @@ -86,7 +86,9 @@ object ParquetWriteBenchmark { ------------------------------------------------------------------------------------------------ Parquet Writer 2536 / 2610 6.2 161.3 1.0X */ - runSQL("Output Single Int Column", "select cast(id as INT) as id from t1", values) + runSQL(name = "Output Single Int Column", + sql = "select cast(id as INT) as id from t1", + values = values) } def intStringWriteBenchmark(values: Int): Unit = { From 536e27102888efa2146ec889ad68ed7e10f5e560 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sat, 26 May 2018 20:32:55 +0800 Subject: [PATCH 4/7] add bucket --- .../parquet/ParquetWriteBenchmark.scala | 67 +++++++++++-------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala index e52687404c473..c22c65f23f838 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala @@ -31,6 +31,8 @@ import org.apache.spark.util.{Benchmark, Utils} * spark-submit --class --jars */ object ParquetWriteBenchmark { + val tempTable = "temp" + val format = "orc" val conf = new SparkConf() conf.set("spark.sql.parquet.compression.codec", "snappy") @@ -53,6 +55,14 @@ object ParquetWriteBenchmark { try f finally tableNames.foreach(spark.catalog.dropTempView) } + def withTable(tableNames: String*)(f: => Unit): Unit = { + try f finally { + tableNames.foreach { name => + spark.sql(s"DROP TABLE IF EXISTS $name") + } + } + } + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { val (keys, values) = pairs.unzip val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) @@ -65,13 +75,13 @@ object ParquetWriteBenchmark { } } - def runSQL(name: String, sql: String, values: Int): Unit = { - withTempTable("t1") { - spark.range(values).createOrReplaceTempView("t1") + def runSQL(values: Int, name: String, sql: String, table: String = "t"): Unit = { + withTempTable(tempTable) { + spark.range(values).createOrReplaceTempView(tempTable) val benchmark = new Benchmark(name, values) benchmark.addCase("Parquet Writer") { _ => - withTempPath { dir => - spark.sql(sql).write.parquet(dir.getCanonicalPath) + withTable(table) { + spark.sql(sql) } } benchmark.run() @@ -86,9 +96,9 @@ object ParquetWriteBenchmark { ------------------------------------------------------------------------------------------------ Parquet Writer 2536 / 2610 6.2 161.3 1.0X */ - runSQL(name = "Output Single Int Column", - sql = "select cast(id as INT) as id from t1", - values = values) + runSQL(values = values, + name = "Output Single Int Column", + sql = s"create table t using $format as select cast(id as INT) as id from $tempTable") } def intStringWriteBenchmark(values: Int): Unit = { @@ -99,36 +109,37 @@ object ParquetWriteBenchmark { ------------------------------------------------------------------------------------------------ Parquet Writer 4644 / 4673 2.3 442.9 1.0X */ - runSQL(name = "Output Int and String Column", - sql = "select cast(id as INT) as c1, cast(id as STRING) as c2 from t1", - values = values) + runSQL(values = values, + name = "Output Int and String Column", + sql = s"create table t using $format as select cast(id as INT)" + + s" as c1, cast(id as STRING) as c2 from $tempTable") } def partitionTableWriteBenchmark(values: Int): Unit = { - withTempTable("t1") { - spark.range(values).createOrReplaceTempView("t1") - val benchmark = new Benchmark("Partitioned Table", values) - benchmark.addCase("Parquet Writer") { _ => - withTempPath { dir => - spark.sql("select id % 2 as p, cast(id as INT) as id from t1") - .write.partitionBy("p").parquet(dir.getCanonicalPath) - } - } + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - /* - Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + --------------------------------------------------------------------------------------------- + Parquet Writer 4163 / 4173 3.8 264.7 1.0X + */ + runSQL(values = values, + name = "Partitioned Table", + sql = s"create table t using $format partitioned by (p) as select id % 2 as p," + + s" cast(id as INT) as id from $tempTable") + } - Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - --------------------------------------------------------------------------------------------- - Parquet Writer 4163 / 4173 3.8 264.7 1.0X - */ - benchmark.run() - } + def clusteredTableWriteBenchmark(values: Int): Unit = { + runSQL(values = values, + name = "Clustered Table", + sql = s"create table t using $format CLUSTERED by (p) INTO 8 buckets as select id as p," + + s" cast(id as INT) as id from $tempTable") } def main(args: Array[String]): Unit = { intWriteBenchmark(1024 * 1024 * 15) intStringWriteBenchmark(1024 * 1024 * 10) partitionTableWriteBenchmark(1024 * 1024 * 15) + clusteredTableWriteBenchmark(1024 * 1024 * 15) } } From bbe6925297ccc21c357362d25192990469b34099 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 28 May 2018 16:56:15 -0700 Subject: [PATCH 5/7] refactor --- .../parquet/ParquetWriteBenchmark.scala | 115 +++++++----------- 1 file changed, 42 insertions(+), 73 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala index c22c65f23f838..664cb4f70e348 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala @@ -32,8 +32,12 @@ import org.apache.spark.util.{Benchmark, Utils} */ object ParquetWriteBenchmark { val tempTable = "temp" - val format = "orc" + val table = "t" + val format = "parquet" + val numRows = 1024 * 1024 * 15 val conf = new SparkConf() + val benchmark = new Benchmark(s"$format writer benchmark", numRows) + conf.set("spark.sql.parquet.compression.codec", "snappy") val spark = SparkSession.builder @@ -45,11 +49,6 @@ object ParquetWriteBenchmark { // Set default configs. Individual cases will change them if necessary. spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") - def withTempPath(f: File => Unit): Unit = { - val path = Utils.createTempDir() - path.delete() - try f(path) finally Utils.deleteRecursively(path) - } def withTempTable(tableNames: String*)(f: => Unit): Unit = { try f finally tableNames.foreach(spark.catalog.dropTempView) @@ -63,83 +62,53 @@ object ParquetWriteBenchmark { } } - def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { - val (keys, values) = pairs.unzip - val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) - (keys, values).zipped.foreach(spark.conf.set) - try f finally { - keys.zip(currentValues).foreach { - case (key, Some(value)) => spark.conf.set(key, value) - case (key, None) => spark.conf.unset(key) - } + def writeInt(table: String, benchmark: Benchmark): Unit = { + spark.sql(s"create table $table(c1 INT, c2 STRING) using $format") + benchmark.addCase("Output Single Int Column") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + + s"c1, cast(id as STRING) as c2 from $tempTable") } } - def runSQL(values: Int, name: String, sql: String, table: String = "t"): Unit = { - withTempTable(tempTable) { - spark.range(values).createOrReplaceTempView(tempTable) - val benchmark = new Benchmark(name, values) - benchmark.addCase("Parquet Writer") { _ => - withTable(table) { - spark.sql(sql) - } - } - benchmark.run() + def writeIntString(table: String, benchmark: Benchmark): Unit = { + spark.sql(s"create table $table(c1 INT, c2 STRING) using $format") + benchmark.addCase("Output Int and String Column") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + + s"c1, cast(id as STRING) as c2 from $tempTable") } } - def intWriteBenchmark(values: Int): Unit = { - /* - Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - - Output Single Int Column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - Parquet Writer 2536 / 2610 6.2 161.3 1.0X - */ - runSQL(values = values, - name = "Output Single Int Column", - sql = s"create table t using $format as select cast(id as INT) as id from $tempTable") - } - - def intStringWriteBenchmark(values: Int): Unit = { - /* - Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - - Output Int and String Column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - Parquet Writer 4644 / 4673 2.3 442.9 1.0X - */ - runSQL(values = values, - name = "Output Int and String Column", - sql = s"create table t using $format as select cast(id as INT)" + - s" as c1, cast(id as STRING) as c2 from $tempTable") - } - - def partitionTableWriteBenchmark(values: Int): Unit = { - /* - Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - - Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - --------------------------------------------------------------------------------------------- - Parquet Writer 4163 / 4173 3.8 264.7 1.0X - */ - runSQL(values = values, - name = "Partitioned Table", - sql = s"create table t using $format partitioned by (p) as select id % 2 as p," + - s" cast(id as INT) as id from $tempTable") + def writePartition(table: String, benchmark: Benchmark): Unit = { + spark.sql(s"create table $table(p INT, id INT) using $format PARTITIONED BY (p)") + benchmark.addCase("Output Partitions") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as id," + + s" cast(id % 2 as INT) as p from $tempTable") + } } - def clusteredTableWriteBenchmark(values: Int): Unit = { - runSQL(values = values, - name = "Clustered Table", - sql = s"create table t using $format CLUSTERED by (p) INTO 8 buckets as select id as p," + - s" cast(id as INT) as id from $tempTable") + def writeBucket(table: String, benchmark: Benchmark): Unit = { + spark.sql(s"create table $table(c1 INT, c2 INT) using $format CLUSTERED BY (c2) INTO 2 BUCKETS") + benchmark.addCase("Output Buckets") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + + s"c1, cast(id as INT) as c2 from $tempTable") + } } def main(args: Array[String]): Unit = { - intWriteBenchmark(1024 * 1024 * 15) - intStringWriteBenchmark(1024 * 1024 * 10) - partitionTableWriteBenchmark(1024 * 1024 * 15) - clusteredTableWriteBenchmark(1024 * 1024 * 15) + val tableInt = "tableInt" + val tableIntString = "tableIntString" + val tablePartition = "tablePartition" + val tableBucket = "tableBucket" + withTempTable(tempTable) { + spark.range(numRows).createOrReplaceTempView(tempTable) + withTable(tableInt, tableIntString, tablePartition, tableBucket) { + val benchmark = new Benchmark(s"$format writer benchmark", numRows) + writeInt(tableInt, benchmark) + writeIntString(tableIntString, benchmark) + writePartition(tablePartition, benchmark) + writeBucket(tableBucket, benchmark) + benchmark.run() + } + } } } From 8ffba61a3ebd6e06eec2fdf03e19a65cb5b40787 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 28 May 2018 19:02:54 -0700 Subject: [PATCH 6/7] write benchmark for all datasources --- .../benchmark/DataSourceWriteBenchmark.scala | 145 ++++++++++++++++++ .../parquet/ParquetWriteBenchmark.scala | 114 -------------- 2 files changed, 145 insertions(+), 114 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala new file mode 100644 index 0000000000000..f6ffd6eba2263 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Benchmark + +/** + * Benchmark to measure data source write performance. + * By default it measures 4 data source format: Parquet, ORC, JSON, CSV: + * spark-submit --class + * To measure specified formats, run it with arguments: + * spark-submit --class format1 [format2] [...] + */ +object DataSourceWriteBenchmark { + val conf = new SparkConf() + .setAppName("DataSourceWriteBenchmark") + .setIfMissing("spark.master", "local[1]") + .set("spark.sql.parquet.compression.codec", "snappy") + .set("spark.sql.orc.compression.codec", "snappy") + + val spark = SparkSession.builder.config(conf).getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + + val tempTable = "temp" + val numRows = 1024 * 1024 * 15 + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { + try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withTable(tableNames: String*)(f: => Unit): Unit = { + try f finally { + tableNames.foreach { name => + spark.sql(s"DROP TABLE IF EXISTS $name") + } + } + } + + def writeInt(table: String, format: String, benchmark: Benchmark): Unit = { + spark.sql(s"create table $table(c1 INT, c2 STRING) using $format") + benchmark.addCase("Output Single Int Column") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + + s"c1, cast(id as STRING) as c2 from $tempTable") + } + } + + def writeIntString(table: String, format: String, benchmark: Benchmark): Unit = { + spark.sql(s"create table $table(c1 INT, c2 STRING) using $format") + benchmark.addCase("Output Int and String Column") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + + s"c1, cast(id as STRING) as c2 from $tempTable") + } + } + + def writePartition(table: String, format: String, benchmark: Benchmark): Unit = { + spark.sql(s"create table $table(p INT, id INT) using $format PARTITIONED BY (p)") + benchmark.addCase("Output Partitions") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as id," + + s" cast(id % 2 as INT) as p from $tempTable") + } + } + + def writeBucket(table: String, format: String, benchmark: Benchmark): Unit = { + spark.sql(s"create table $table(c1 INT, c2 INT) using $format CLUSTERED BY (c2) INTO 2 BUCKETS") + benchmark.addCase("Output Buckets") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + + s"c1, cast(id as INT) as c2 from $tempTable") + } + } + + def main(args: Array[String]): Unit = { + val tableInt = "tableInt" + val tableIntString = "tableIntString" + val tablePartition = "tablePartition" + val tableBucket = "tableBucket" + // If the + val formats: Seq[String] = if (args.isEmpty) { + Seq("Parquet", "ORC", "JSON", "CSV") + } else { + args + } + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + Parquet writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Output Single Int Column 6054 / 6070 2.6 384.9 1.0X + Output Int and String Column 5784 / 5800 2.7 367.8 1.0X + Output Partitions 3891 / 3904 4.0 247.4 1.6X + Output Buckets 5446 / 5729 2.9 346.2 1.1X + + ORC writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Output Single Int Column 5734 / 5823 2.7 364.6 1.0X + Output Int and String Column 5802 / 5839 2.7 368.9 1.0X + Output Partitions 3384 / 3671 4.6 215.1 1.7X + Output Buckets 4950 / 4988 3.2 314.7 1.2X + + JSON writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Output Single Int Column 5576 / 5594 2.8 354.5 1.0X + Output Int and String Column 5550 / 5620 2.8 352.9 1.0X + Output Partitions 3727 / 4100 4.2 237.0 1.5X + Output Buckets 5316 / 5852 3.0 338.0 1.0X + + CSV writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Output Single Int Column 7064 / 8714 2.2 449.1 1.0X + Output Int and String Column 7114 / 7663 2.2 452.3 1.0X + Output Partitions 5771 / 6228 2.7 366.9 1.2X + Output Buckets 7414 / 7479 2.1 471.3 1.0X + */ + withTempTable(tempTable) { + spark.range(numRows).createOrReplaceTempView(tempTable) + formats.foreach { format => + withTable(tableInt, tableIntString, tablePartition, tableBucket) { + val benchmark = new Benchmark(s"$format writer benchmark", numRows) + writeInt(tableInt, format, benchmark) + writeIntString(tableIntString, format, benchmark) + writePartition(tablePartition, format, benchmark) + writeBucket(tableBucket, format, benchmark) + benchmark.run() + } + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala deleted file mode 100644 index 664cb4f70e348..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources.parquet - -import java.io.File - -import scala.util.Try - -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.{Benchmark, Utils} - -/** - * Benchmark to measure parquet write performance. - * To run this: - * spark-submit --class --jars - */ -object ParquetWriteBenchmark { - val tempTable = "temp" - val table = "t" - val format = "parquet" - val numRows = 1024 * 1024 * 15 - val conf = new SparkConf() - val benchmark = new Benchmark(s"$format writer benchmark", numRows) - - conf.set("spark.sql.parquet.compression.codec", "snappy") - - val spark = SparkSession.builder - .master("local[1]") - .appName("parquet-write-benchmark") - .config(conf) - .getOrCreate() - - // Set default configs. Individual cases will change them if necessary. - spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") - - - def withTempTable(tableNames: String*)(f: => Unit): Unit = { - try f finally tableNames.foreach(spark.catalog.dropTempView) - } - - def withTable(tableNames: String*)(f: => Unit): Unit = { - try f finally { - tableNames.foreach { name => - spark.sql(s"DROP TABLE IF EXISTS $name") - } - } - } - - def writeInt(table: String, benchmark: Benchmark): Unit = { - spark.sql(s"create table $table(c1 INT, c2 STRING) using $format") - benchmark.addCase("Output Single Int Column") { _ => - spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + - s"c1, cast(id as STRING) as c2 from $tempTable") - } - } - - def writeIntString(table: String, benchmark: Benchmark): Unit = { - spark.sql(s"create table $table(c1 INT, c2 STRING) using $format") - benchmark.addCase("Output Int and String Column") { _ => - spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + - s"c1, cast(id as STRING) as c2 from $tempTable") - } - } - - def writePartition(table: String, benchmark: Benchmark): Unit = { - spark.sql(s"create table $table(p INT, id INT) using $format PARTITIONED BY (p)") - benchmark.addCase("Output Partitions") { _ => - spark.sql(s"INSERT overwrite table $table select cast(id as INT) as id," + - s" cast(id % 2 as INT) as p from $tempTable") - } - } - - def writeBucket(table: String, benchmark: Benchmark): Unit = { - spark.sql(s"create table $table(c1 INT, c2 INT) using $format CLUSTERED BY (c2) INTO 2 BUCKETS") - benchmark.addCase("Output Buckets") { _ => - spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + - s"c1, cast(id as INT) as c2 from $tempTable") - } - } - - def main(args: Array[String]): Unit = { - val tableInt = "tableInt" - val tableIntString = "tableIntString" - val tablePartition = "tablePartition" - val tableBucket = "tableBucket" - withTempTable(tempTable) { - spark.range(numRows).createOrReplaceTempView(tempTable) - withTable(tableInt, tableIntString, tablePartition, tableBucket) { - val benchmark = new Benchmark(s"$format writer benchmark", numRows) - writeInt(tableInt, benchmark) - writeIntString(tableIntString, benchmark) - writePartition(tablePartition, benchmark) - writeBucket(tableBucket, benchmark) - benchmark.run() - } - } - } -} From e90fa00e8963eb985bdd30d9a262c61f6ca1ce61 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 29 May 2018 10:55:33 -0700 Subject: [PATCH 7/7] Address comments --- .../benchmark/DataSourceWriteBenchmark.scala | 70 ++++++++++--------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala index f6ffd6eba2263..2d2cdebd067c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala @@ -55,44 +55,43 @@ object DataSourceWriteBenchmark { } } - def writeInt(table: String, format: String, benchmark: Benchmark): Unit = { - spark.sql(s"create table $table(c1 INT, c2 STRING) using $format") - benchmark.addCase("Output Single Int Column") { _ => - spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + - s"c1, cast(id as STRING) as c2 from $tempTable") + def writeNumeric(table: String, format: String, benchmark: Benchmark, dataType: String): Unit = { + spark.sql(s"create table $table(id $dataType) using $format") + benchmark.addCase(s"Output Single $dataType Column") { _ => + spark.sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS $dataType) AS c1 FROM $tempTable") } } def writeIntString(table: String, format: String, benchmark: Benchmark): Unit = { - spark.sql(s"create table $table(c1 INT, c2 STRING) using $format") + spark.sql(s"CREATE TABLE $table(c1 INT, c2 STRING) USING $format") benchmark.addCase("Output Int and String Column") { _ => - spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + - s"c1, cast(id as STRING) as c2 from $tempTable") + spark.sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS INT) AS " + + s"c1, CAST(id AS STRING) AS c2 FROM $tempTable") } } def writePartition(table: String, format: String, benchmark: Benchmark): Unit = { - spark.sql(s"create table $table(p INT, id INT) using $format PARTITIONED BY (p)") + spark.sql(s"CREATE TABLE $table(p INT, id INT) USING $format PARTITIONED BY (p)") benchmark.addCase("Output Partitions") { _ => - spark.sql(s"INSERT overwrite table $table select cast(id as INT) as id," + - s" cast(id % 2 as INT) as p from $tempTable") + spark.sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS INT) AS id," + + s" CAST(id % 2 AS INT) AS p FROM $tempTable") } } def writeBucket(table: String, format: String, benchmark: Benchmark): Unit = { - spark.sql(s"create table $table(c1 INT, c2 INT) using $format CLUSTERED BY (c2) INTO 2 BUCKETS") + spark.sql(s"CREATE TABLE $table(c1 INT, c2 INT) USING $format CLUSTERED BY (c2) INTO 2 BUCKETS") benchmark.addCase("Output Buckets") { _ => - spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + - s"c1, cast(id as INT) as c2 from $tempTable") + spark.sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS INT) AS " + + s"c1, CAST(id AS INT) AS c2 FROM $tempTable") } } def main(args: Array[String]): Unit = { val tableInt = "tableInt" + val tableDouble = "tableDouble" val tableIntString = "tableIntString" val tablePartition = "tablePartition" val tableBucket = "tableBucket" - // If the val formats: Seq[String] = if (args.isEmpty) { Seq("Parquet", "ORC", "JSON", "CSV") } else { @@ -102,38 +101,43 @@ object DataSourceWriteBenchmark { Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz Parquet writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Output Single Int Column 6054 / 6070 2.6 384.9 1.0X - Output Int and String Column 5784 / 5800 2.7 367.8 1.0X - Output Partitions 3891 / 3904 4.0 247.4 1.6X - Output Buckets 5446 / 5729 2.9 346.2 1.1X + Output Single Int Column 1815 / 1932 8.7 115.4 1.0X + Output Single Double Column 1877 / 1878 8.4 119.3 1.0X + Output Int and String Column 6265 / 6543 2.5 398.3 0.3X + Output Partitions 4067 / 4457 3.9 258.6 0.4X + Output Buckets 5608 / 5820 2.8 356.6 0.3X ORC writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Output Single Int Column 5734 / 5823 2.7 364.6 1.0X - Output Int and String Column 5802 / 5839 2.7 368.9 1.0X - Output Partitions 3384 / 3671 4.6 215.1 1.7X - Output Buckets 4950 / 4988 3.2 314.7 1.2X + Output Single Int Column 1201 / 1239 13.1 76.3 1.0X + Output Single Double Column 1542 / 1600 10.2 98.0 0.8X + Output Int and String Column 6495 / 6580 2.4 412.9 0.2X + Output Partitions 3648 / 3842 4.3 231.9 0.3X + Output Buckets 5022 / 5145 3.1 319.3 0.2X JSON writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Output Single Int Column 5576 / 5594 2.8 354.5 1.0X - Output Int and String Column 5550 / 5620 2.8 352.9 1.0X - Output Partitions 3727 / 4100 4.2 237.0 1.5X - Output Buckets 5316 / 5852 3.0 338.0 1.0X + Output Single Int Column 1988 / 2093 7.9 126.4 1.0X + Output Single Double Column 2854 / 2911 5.5 181.4 0.7X + Output Int and String Column 6467 / 6653 2.4 411.1 0.3X + Output Partitions 4548 / 5055 3.5 289.1 0.4X + Output Buckets 5664 / 5765 2.8 360.1 0.4X CSV writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - Output Single Int Column 7064 / 8714 2.2 449.1 1.0X - Output Int and String Column 7114 / 7663 2.2 452.3 1.0X - Output Partitions 5771 / 6228 2.7 366.9 1.2X - Output Buckets 7414 / 7479 2.1 471.3 1.0X + Output Single Int Column 3025 / 3190 5.2 192.3 1.0X + Output Single Double Column 3575 / 3634 4.4 227.3 0.8X + Output Int and String Column 7313 / 7399 2.2 464.9 0.4X + Output Partitions 5105 / 5190 3.1 324.6 0.6X + Output Buckets 6986 / 6992 2.3 444.1 0.4X */ withTempTable(tempTable) { spark.range(numRows).createOrReplaceTempView(tempTable) formats.foreach { format => - withTable(tableInt, tableIntString, tablePartition, tableBucket) { + withTable(tableInt, tableDouble, tableIntString, tablePartition, tableBucket) { val benchmark = new Benchmark(s"$format writer benchmark", numRows) - writeInt(tableInt, format, benchmark) + writeNumeric(tableInt, format, benchmark, "Int") + writeNumeric(tableDouble, format, benchmark, "Double") writeIntString(tableIntString, format, benchmark) writePartition(tablePartition, format, benchmark) writeBucket(tableBucket, format, benchmark)