From 9bbfe6ef4b5a418373c2250ad676233fb05df7f7 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 10:29:53 +0800 Subject: [PATCH 01/21] [SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to be considered. ## What changes were proposed in this pull request? 1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`. 2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none". ## How was this patch tested? Manual test. --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 84fe4bb711a4e..13410870cbb0e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -324,10 +324,10 @@ object SQLConf { val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec") .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " + - "uncompressed, snappy, gzip, lzo.") + "none, uncompressed, snappy, gzip, lzo.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) - .checkValues(Set("uncompressed", "snappy", "gzip", "lzo")) + .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo")) .createWithDefault("snappy") val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown") From 48cf108ed5c3298eb860d9735b439ac89d65765e Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 10:30:24 +0800 Subject: [PATCH 02/21] [SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to be considered. ## What changes were proposed in this pull request? 1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`. 2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none". ## How was this patch tested? Manual test. --- .../datasources/parquet/ParquetOptions.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 772d4565de548..a81022409a119 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.Locale import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf @@ -42,8 +43,15 @@ private[parquet] class ParquetOptions( * Acceptable values are defined in [[shortParquetCompressionCodecNames]]. */ val compressionCodecClassName: String = { - val codecName = parameters.getOrElse("compression", - sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT) + // `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and + // `spark.sql.parquet.compression.codec` + // are in order of precedence from highest to lowest. + val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION) + val codecName = parameters + .get("compression") + .orElse(parquetCompressionConf) + .getOrElse(sqlConf.parquetCompressionCodec) + .toLowerCase(Locale.ROOT) if (!shortParquetCompressionCodecNames.contains(codecName)) { val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT)) From 5dbd3edf9e086433d3d3fe9c0ead887d799c61d3 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 10:34:29 +0800 Subject: [PATCH 03/21] spark.sql.parquet.compression.codec[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to be considered. ## What changes were proposed in this pull request? 1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`. 2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none". ## How was this patch tested? Manual test. --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index f02f46236e2b0..51c1c068df1c4 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -954,7 +954,7 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession snappy Sets the compression codec use when writing Parquet files. Acceptable values include: - uncompressed, snappy, gzip, lzo. + none, uncompressed, snappy, gzip, lzo. From 5124f1b560e942c0dc23af31336317a4b995dd8f Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 15:06:26 +0800 Subject: [PATCH 04/21] spark.sql.parquet.compression.codec[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to be considered. ## What changes were proposed in this pull request? 1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`. 2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none". 3.Change `compressionCode` to `compressionCodecClassName`. ## How was this patch tested? Manual test. --- docs/sql-programming-guide.md | 4 +- .../datasources/orc/OrcFileFormat.scala | 2 +- .../datasources/orc/OrcOptions.scala | 2 +- .../datasources/orc/OrcSourceSuite.scala | 13 ++-- .../spark/sql/hive/orc/OrcFileFormat.scala | 2 +- .../sql/hive/CompressionCodecSuite.scala | 61 +++++++++++++++++++ 6 files changed, 74 insertions(+), 10 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 51c1c068df1c4..bd17d73683f22 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -953,7 +953,9 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession spark.sql.parquet.compression.codec snappy - Sets the compression codec use when writing Parquet files. Acceptable values include: + Sets the compression codec use when writing Parquet files. If other compression codec + configuration was found through hive or parquet, the precedence would be `compression`, + `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, snappy, gzip, lzo. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index f7471cd7debce..b6c135bb23219 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -94,7 +94,7 @@ class OrcFileFormat conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, dataSchema.catalogString) - conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec) + conf.set(COMPRESS.getAttribute, orcOptions.compressionCodecClassName) conf.asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala index c866dd834a525..12930c8e3b971 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala @@ -41,7 +41,7 @@ class OrcOptions( * Compression codec to use. * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ - val compressionCodec: String = { + val compressionCodecClassName: String = { // `compression`, `orc.compress`(i.e., OrcConf.COMPRESS), and `spark.sql.orc.compression.codec` // are in order of precedence from highest to lowest. val orcCompressionConf = parameters.get(COMPRESS.getAttribute) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 6f5f2fd795f74..beef8c2f1c3b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -134,29 +134,30 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { val conf = spark.sessionState.conf val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf) - assert(option.compressionCodec == "NONE") + assert(option.compressionCodecClassName == "NONE") } test("SPARK-21839: Add SQL config for ORC compression") { val conf = spark.sessionState.conf // Test if the default of spark.sql.orc.compression.codec is snappy - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY") + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "SNAPPY") // OrcOptions's parameters have a higher priority than SQL configuration. // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec` withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "NONE") val map1 = Map(COMPRESS.getAttribute -> "zlib") val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo") - assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB") - assert(new OrcOptions(map2, conf).compressionCodec == "LZO") + assert(new OrcOptions(map1, conf).compressionCodecClassName == "ZLIB") + assert(new OrcOptions(map2, conf).compressionCodecClassName == "LZO") } // Test all the valid options of spark.sql.orc.compression.codec Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c => withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { val expected = if (c == "UNCOMPRESSED") "NONE" else c - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected) + assert( + new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == expected) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 95741c7b30289..5715893920ac6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -74,7 +74,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val configuration = job.getConfiguration - configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodec) + configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodecClassName) configuration match { case conf: JobConf => conf.setOutputFormat(classOf[OrcOutputFormat]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala new file mode 100644 index 0000000000000..8ce362b99e61a --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.parquet.hadoop.ParquetOutputFormat + +import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class CompressionCodecSuite extends TestHiveSingleton with SQLTestUtils { + test("Test `spark.sql.parquet.compression.codec` config") { + Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c => + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { + val expected = if (c == "NONE") "UNCOMPRESSED" else c + val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) + assert(option.compressionCodecClassName == expected) + } + } + } + + test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") { + // When "compression" is configured, it should be the first choice. + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip") + val option = new ParquetOptions(props, spark.sessionState.conf) + assert(option.compressionCodecClassName == "UNCOMPRESSED") + } + + // When "compression" is not configured, "parquet.compression" should be the preferred choice. + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip") + val option = new ParquetOptions(props, spark.sessionState.conf) + assert(option.compressionCodecClassName == "GZIP") + } + + // When both "compression" and "parquet.compression" are not configured, + // spark.sql.parquet.compression.codec should be the right choice. + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + val props = Map.empty[String, String] + val option = new ParquetOptions(props, spark.sessionState.conf) + assert(option.compressionCodecClassName == "SNAPPY") + } + } +} From 1b087df2b2730eafee1f6c4e8662092a8027c8ea Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 16:59:28 +0800 Subject: [PATCH 05/21] Fix scala style --- .../sql/execution/datasources/parquet/ParquetOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index a81022409a119..ef67ea7d17cea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.Locale -import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf From 05e52b6a5b63febe881569993ef3b8b2508d7aad Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 19:55:53 +0800 Subject: [PATCH 06/21] Change compressionCodecClassName to compressionCodecName --- .../execution/datasources/orc/OrcFileFormat.scala | 2 +- .../sql/execution/datasources/orc/OrcOptions.scala | 2 +- .../datasources/parquet/ParquetFileFormat.scala | 2 +- .../datasources/parquet/ParquetOptions.scala | 6 +++--- .../execution/datasources/orc/OrcSourceSuite.scala | 13 ++++++------- .../datasources/parquet/ParquetIOSuite.scala | 2 +- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 2 +- 7 files changed, 14 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index b6c135bb23219..709d72a7793ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -94,7 +94,7 @@ class OrcFileFormat conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, dataSchema.catalogString) - conf.set(COMPRESS.getAttribute, orcOptions.compressionCodecClassName) + conf.set(COMPRESS.getAttribute, orcOptions.compressionCodecName) conf.asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala index 12930c8e3b971..b5ecea7880172 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala @@ -41,7 +41,7 @@ class OrcOptions( * Compression codec to use. * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ - val compressionCodecClassName: String = { + val compressionCodecName: String = { // `compression`, `orc.compress`(i.e., OrcConf.COMPRESS), and `spark.sql.orc.compression.codec` // are in order of precedence from highest to lowest. val orcCompressionConf = parameters.get(COMPRESS.getAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 45bedf70f975c..48d3f2f148669 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -123,7 +123,7 @@ class ParquetFileFormat sparkSession.sessionState.conf.parquetOutputTimestampType.toString) // Sets compression scheme - conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecName) // SPARK-15719: Disables writing Parquet summary files by default. if (conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index ef67ea7d17cea..d8d277c537528 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf /** * Options for the Parquet data source. */ -private[parquet] class ParquetOptions( +class ParquetOptions( @transient private val parameters: CaseInsensitiveMap[String], @transient private val sqlConf: SQLConf) extends Serializable { @@ -42,7 +42,7 @@ private[parquet] class ParquetOptions( * Compression codec to use. By default use the value specified in SQLConf. * Acceptable values are defined in [[shortParquetCompressionCodecNames]]. */ - val compressionCodecClassName: String = { + val compressionCodecName: String = { // `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and // `spark.sql.parquet.compression.codec` // are in order of precedence from highest to lowest. @@ -76,7 +76,7 @@ object ParquetOptions { val MERGE_SCHEMA = "mergeSchema" // The parquet compression short names - private val shortParquetCompressionCodecNames = Map( + val shortParquetCompressionCodecNames = Map( "none" -> CompressionCodecName.UNCOMPRESSED, "uncompressed" -> CompressionCodecName.UNCOMPRESSED, "snappy" -> CompressionCodecName.SNAPPY, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index beef8c2f1c3b8..751416217d594 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -134,30 +134,29 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { val conf = spark.sessionState.conf val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf) - assert(option.compressionCodecClassName == "NONE") + assert(option.compressionCodecName == "NONE") } test("SPARK-21839: Add SQL config for ORC compression") { val conf = spark.sessionState.conf // Test if the default of spark.sql.orc.compression.codec is snappy - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "SNAPPY") + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecName == "SNAPPY") // OrcOptions's parameters have a higher priority than SQL configuration. // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec` withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "NONE") + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecName == "NONE") val map1 = Map(COMPRESS.getAttribute -> "zlib") val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo") - assert(new OrcOptions(map1, conf).compressionCodecClassName == "ZLIB") - assert(new OrcOptions(map2, conf).compressionCodecClassName == "LZO") + assert(new OrcOptions(map1, conf).compressionCodecName == "ZLIB") + assert(new OrcOptions(map2, conf).compressionCodecName == "LZO") } // Test all the valid options of spark.sql.orc.compression.codec Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c => withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { val expected = if (c == "UNCOMPRESSED") "NONE" else c - assert( - new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == expected) + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecName == expected) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 44a8b25c61dfb..2b52bfd99b6c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -768,7 +768,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf) - assert(option.compressionCodecClassName == "UNCOMPRESSED") + assert(option.compressionCodecName == "UNCOMPRESSED") } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 5715893920ac6..5c2f88d891a81 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -74,7 +74,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val configuration = job.getConfiguration - configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodecClassName) + configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodecName) configuration match { case conf: JobConf => conf.setOutputFormat(classOf[OrcOutputFormat]) From 0c0f55d31547d8882f013b5be9d0a84fd7da808f Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 20:00:57 +0800 Subject: [PATCH 07/21] Resume a mistaken. --- .../sql/execution/datasources/parquet/ParquetOptions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index d8d277c537528..e6bfa86747f3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf /** * Options for the Parquet data source. */ -class ParquetOptions( +private[parquet] class ParquetOptions( @transient private val parameters: CaseInsensitiveMap[String], @transient private val sqlConf: SQLConf) extends Serializable { @@ -76,7 +76,7 @@ object ParquetOptions { val MERGE_SCHEMA = "mergeSchema" // The parquet compression short names - val shortParquetCompressionCodecNames = Map( + private val shortParquetCompressionCodecNames = Map( "none" -> CompressionCodecName.UNCOMPRESSED, "uncompressed" -> CompressionCodecName.UNCOMPRESSED, "snappy" -> CompressionCodecName.SNAPPY, From 4ab7ecb69e3bc6b48b3a1c57dbf5f3aba019c5ff Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 20:04:03 +0800 Subject: [PATCH 08/21] Change the compression description --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 13410870cbb0e..533658c6d65d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -323,7 +323,9 @@ object SQLConf { .createWithDefault(false) val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec") - .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " + + .doc("Sets the compression codec use when writing Parquet files. If other compression codec " + + "configuration was found through hive or parquet, the precedence would be `compression`, " + + "`parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include: " + "none, uncompressed, snappy, gzip, lzo.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) @@ -364,7 +366,9 @@ object SQLConf { .createWithDefault(true) val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec") - .doc("Sets the compression codec use when writing ORC files. Acceptable values include: " + + .doc("Sets the compression codec use when writing ORC files. If other compression codec " + + "configuration was found through hive or ORC, the precedence would be `compression`, " + + "`orc.compress`, `spark.sql.orc.compression.codec`. Acceptable values include: " + "none, uncompressed, snappy, zlib, lzo.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) From 3cf0c049961f76135653629d562eb01bf37f9457 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 20:06:40 +0800 Subject: [PATCH 09/21] update compressionCodecClassName to compressionCodecName update compressionCodecClassName to compressionCodecName --- .../org/apache/spark/sql/hive/CompressionCodecSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala index 8ce362b99e61a..c7eb4719814f4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala @@ -30,7 +30,7 @@ class CompressionCodecSuite extends TestHiveSingleton with SQLTestUtils { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { val expected = if (c == "NONE") "UNCOMPRESSED" else c val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) - assert(option.compressionCodecClassName == expected) + assert(option.compressionCodecName == expected) } } } @@ -40,14 +40,14 @@ class CompressionCodecSuite extends TestHiveSingleton with SQLTestUtils { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip") val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "UNCOMPRESSED") + assert(option.compressionCodecName == "UNCOMPRESSED") } // When "compression" is not configured, "parquet.compression" should be the preferred choice. withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip") val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "GZIP") + assert(option.compressionCodecName == "GZIP") } // When both "compression" and "parquet.compression" are not configured, @@ -55,7 +55,7 @@ class CompressionCodecSuite extends TestHiveSingleton with SQLTestUtils { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { val props = Map.empty[String, String] val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "SNAPPY") + assert(option.compressionCodecName == "SNAPPY") } } } From 10e546228e0b147ad26a7e23bae89a4d0a56c2c5 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 21:00:50 +0800 Subject: [PATCH 10/21] Use ParquetOptions in test, so change to pulbic Use ParquetOptions in test --- .../sql/execution/datasources/parquet/ParquetOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index e6bfa86747f3b..98bd5bec9351d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf /** * Options for the Parquet data source. */ -private[parquet] class ParquetOptions( +class ParquetOptions( @transient private val parameters: CaseInsensitiveMap[String], @transient private val sqlConf: SQLConf) extends Serializable { From 2ab2d293a0548b66070e840372e589eb2949a0ff Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Mon, 25 Dec 2017 23:22:01 +0800 Subject: [PATCH 11/21] Update test Fix tesr error --- .../scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala index c7eb4719814f4..5aaed99576f24 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.test.SQLTestUtils class CompressionCodecSuite extends TestHiveSingleton with SQLTestUtils { test("Test `spark.sql.parquet.compression.codec` config") { - Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c => + Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c => withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { val expected = if (c == "NONE") "UNCOMPRESSED" else c val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) From 845dda7e62b34560b7d2e10d90dbe6fd60c573d6 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Tue, 26 Dec 2017 14:14:25 +0800 Subject: [PATCH 12/21] 1 Resume the renaming 2 Move the test case to sql/core --- .../execution/datasources/orc/OrcFileFormat.scala | 2 +- .../execution/datasources/orc/OrcOptions.scala | 2 +- .../datasources/parquet/ParquetFileFormat.scala | 2 +- .../datasources/parquet/ParquetOptions.scala | 2 +- .../sql/CompressionCodecPrecedenceSuite.scala} | 15 +++++++-------- .../datasources/orc/OrcSourceSuite.scala | 12 ++++++------ .../datasources/parquet/ParquetIOSuite.scala | 2 +- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 2 +- 8 files changed, 19 insertions(+), 20 deletions(-) rename sql/{hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala => core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala} (84%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 709d72a7793ea..f7471cd7debce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -94,7 +94,7 @@ class OrcFileFormat conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, dataSchema.catalogString) - conf.set(COMPRESS.getAttribute, orcOptions.compressionCodecName) + conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec) conf.asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala index b5ecea7880172..c866dd834a525 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala @@ -41,7 +41,7 @@ class OrcOptions( * Compression codec to use. * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ - val compressionCodecName: String = { + val compressionCodec: String = { // `compression`, `orc.compress`(i.e., OrcConf.COMPRESS), and `spark.sql.orc.compression.codec` // are in order of precedence from highest to lowest. val orcCompressionConf = parameters.get(COMPRESS.getAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 48d3f2f148669..45bedf70f975c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -123,7 +123,7 @@ class ParquetFileFormat sparkSession.sessionState.conf.parquetOutputTimestampType.toString) // Sets compression scheme - conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecName) + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) // SPARK-15719: Disables writing Parquet summary files by default. if (conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 98bd5bec9351d..193ce8e9d8e63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -42,7 +42,7 @@ class ParquetOptions( * Compression codec to use. By default use the value specified in SQLConf. * Acceptable values are defined in [[shortParquetCompressionCodecNames]]. */ - val compressionCodecName: String = { + val compressionCodecClassName: String = { // `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and // `spark.sql.parquet.compression.codec` // are in order of precedence from highest to lowest. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala similarity index 84% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala index 5aaed99576f24..5cdc52d71a04f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala @@ -15,22 +15,21 @@ * limitations under the License. */ -package org.apache.spark.sql.hive +package org.apache.spark.sql import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions -import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} -class CompressionCodecSuite extends TestHiveSingleton with SQLTestUtils { +class CompressionCodecSuite extends SQLTestUtils with SharedSQLContext { test("Test `spark.sql.parquet.compression.codec` config") { Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c => withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { val expected = if (c == "NONE") "UNCOMPRESSED" else c val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) - assert(option.compressionCodecName == expected) + assert(option.compressionCodecClassName == expected) } } } @@ -40,14 +39,14 @@ class CompressionCodecSuite extends TestHiveSingleton with SQLTestUtils { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip") val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecName == "UNCOMPRESSED") + assert(option.compressionCodecClassName == "UNCOMPRESSED") } // When "compression" is not configured, "parquet.compression" should be the preferred choice. withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip") val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecName == "GZIP") + assert(option.compressionCodecClassName == "GZIP") } // When both "compression" and "parquet.compression" are not configured, @@ -55,7 +54,7 @@ class CompressionCodecSuite extends TestHiveSingleton with SQLTestUtils { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { val props = Map.empty[String, String] val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecName == "SNAPPY") + assert(option.compressionCodecClassName == "SNAPPY") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 751416217d594..6f5f2fd795f74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -134,29 +134,29 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { val conf = spark.sessionState.conf val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf) - assert(option.compressionCodecName == "NONE") + assert(option.compressionCodec == "NONE") } test("SPARK-21839: Add SQL config for ORC compression") { val conf = spark.sessionState.conf // Test if the default of spark.sql.orc.compression.codec is snappy - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecName == "SNAPPY") + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY") // OrcOptions's parameters have a higher priority than SQL configuration. // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec` withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecName == "NONE") + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") val map1 = Map(COMPRESS.getAttribute -> "zlib") val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo") - assert(new OrcOptions(map1, conf).compressionCodecName == "ZLIB") - assert(new OrcOptions(map2, conf).compressionCodecName == "LZO") + assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB") + assert(new OrcOptions(map2, conf).compressionCodec == "LZO") } // Test all the valid options of spark.sql.orc.compression.codec Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c => withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { val expected = if (c == "UNCOMPRESSED") "NONE" else c - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecName == expected) + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 2b52bfd99b6c0..44a8b25c61dfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -768,7 +768,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf) - assert(option.compressionCodecName == "UNCOMPRESSED") + assert(option.compressionCodecClassName == "UNCOMPRESSED") } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 5c2f88d891a81..95741c7b30289 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -74,7 +74,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val configuration = job.getConfiguration - configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodecName) + configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodec) configuration match { case conf: JobConf => conf.setOutputFormat(classOf[OrcOutputFormat]) From e510b486ab1cea2f2f4f855747c86cd8af73728c Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Tue, 26 Dec 2017 14:16:51 +0800 Subject: [PATCH 13/21] Rename the test file name and class name Rename the test file name and class name --- .../org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala index 5cdc52d71a04f..32c7a1abec03a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} -class CompressionCodecSuite extends SQLTestUtils with SharedSQLContext { +class CompressionCodecPrecedenceSuite extends SQLTestUtils with SharedSQLContext { test("Test `spark.sql.parquet.compression.codec` config") { Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c => withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { From 9229e6f1fa8f9fe58d279c6ab14cb1d20068a277 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Tue, 26 Dec 2017 14:22:46 +0800 Subject: [PATCH 14/21] Fix scala style Fix scala style --- .../org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala index 32c7a1abec03a..a289896503a45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala @@ -21,7 +21,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} class CompressionCodecPrecedenceSuite extends SQLTestUtils with SharedSQLContext { test("Test `spark.sql.parquet.compression.codec` config") { From 37fe65e31c83cdb30856c1c9700ff6510ec06e1f Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Tue, 2 Jan 2018 20:29:09 +0800 Subject: [PATCH 15/21] Mode doc describtion --- docs/sql-programming-guide.md | 2 +- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index bd17d73683f22..24c8a2e615c2a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -953,7 +953,7 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession spark.sql.parquet.compression.codec snappy - Sets the compression codec use when writing Parquet files. If other compression codec + Sets the compression codec used when writing Parquet files. If other compression codec configuration was found through hive or parquet, the precedence would be `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, snappy, gzip, lzo. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 533658c6d65d6..182effe52a5c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -323,7 +323,7 @@ object SQLConf { .createWithDefault(false) val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec") - .doc("Sets the compression codec use when writing Parquet files. If other compression codec " + + .doc("Sets the compression codec used when writing Parquet files. If other compression codec " + "configuration was found through hive or parquet, the precedence would be `compression`, " + "`parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include: " + "none, uncompressed, snappy, gzip, lzo.") @@ -366,7 +366,7 @@ object SQLConf { .createWithDefault(true) val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec") - .doc("Sets the compression codec use when writing ORC files. If other compression codec " + + .doc("Sets the compression codec used when writing ORC files. If other compression codec " + "configuration was found through hive or ORC, the precedence would be `compression`, " + "`orc.compress`, `spark.sql.orc.compression.codec`. Acceptable values include: " + "none, uncompressed, snappy, zlib, lzo.") From 253b2a2c57621bd049a7938fd1cb973ec643e947 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Tue, 2 Jan 2018 20:30:01 +0800 Subject: [PATCH 16/21] Add test case and change the package level --- .../parquet/ParquetCompressionCodecPrecedenceSuite.scala} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/{CompressionCodecPrecedenceSuite.scala => execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala} (100%) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala similarity index 100% rename from sql/core/src/test/scala/org/apache/spark/sql/CompressionCodecPrecedenceSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala From d60dcd121a64e460d0c2337afafdb8db40b4f862 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Tue, 2 Jan 2018 20:32:31 +0800 Subject: [PATCH 17/21] Add test case and change the package level --- ...rquetCompressionCodecPrecedenceSuite.scala | 65 +++++++++++++++++-- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index a289896503a45..9d8c14e0f59c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -15,15 +15,19 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.execution.datasources.parquet +import java.io.File + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.sql.test.SharedSQLContext -class CompressionCodecPrecedenceSuite extends SQLTestUtils with SharedSQLContext { +class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSQLContext { test("Test `spark.sql.parquet.compression.codec` config") { Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c => withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { @@ -57,4 +61,57 @@ class CompressionCodecPrecedenceSuite extends SQLTestUtils with SharedSQLContext assert(option.compressionCodecClassName == "SNAPPY") } } + + private def getTableCompressionCodec(path: String): Seq[String] = { + val hadoopConf = spark.sessionState.newHadoopConf() + val codecs = for { + footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) + block <- footer.getParquetMetadata.getBlocks.asScala + column <- block.getColumns.asScala + } yield column.getCodec.name() + codecs.distinct + } + + private def createTableWithCompression( + tableName: String, + isPartitioned: Boolean, + compressionCodec: String, + rootDir: File): Unit = { + val options = + s"""OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName', + |'parquet.compression'='$compressionCodec')""".stripMargin + val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else "" + sql(s"""CREATE TABLE $tableName USING Parquet $options $partitionCreate + |as select 1 as col1, 2 as p""".stripMargin) + } + + private def checkCompressionCodec(compressionCodec: String, isPartitioned: Boolean): Unit = { + withTempDir { tmpDir => + val tempTableName = "TempParquetTable" + withTable(tempTableName) { + createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir) + val partitionPath = if (isPartitioned) "p=2" else "" + val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath" + val realCompressionCodecs = getTableCompressionCodec(path) + assert(realCompressionCodecs.forall(_ == compressionCodec)) + } + } + } + + test("Create parquet table with compression") { + Seq(true, false).foreach { isPartitioned => + Seq("UNCOMPRESSED", "SNAPPY", "GZIP").foreach { compressionCodec => + checkCompressionCodec(compressionCodec, isPartitioned) + } + } + } + + test("Create table with unknown compression") { + Seq(true, false).foreach { isPartitioned => + val exception = intercept[IllegalArgumentException] { + checkCompressionCodec("aa", isPartitioned) + } + assert(exception.getMessage.contains("Codec [aa] is not available")) + } + } } From b5cd809c680d089d4fa8da9bd43cd64ed1a3b138 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Fri, 5 Jan 2018 10:54:54 +0800 Subject: [PATCH 18/21] Revive `private[parquet]` --- .../sql/execution/datasources/parquet/ParquetOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 193ce8e9d8e63..ef67ea7d17cea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf /** * Options for the Parquet data source. */ -class ParquetOptions( +private[parquet] class ParquetOptions( @transient private val parameters: CaseInsensitiveMap[String], @transient private val sqlConf: SQLConf) extends Serializable { From 26c1c61ffd5742b71aefdec33ddcb69b2222a944 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Sat, 6 Jan 2018 11:08:39 +0800 Subject: [PATCH 19/21] Fix scala style Fix scala style --- .../ParquetCompressionCodecPrecedenceSuite.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index 9d8c14e0f59c2..ed8fd2b453456 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -78,11 +78,16 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSQLC compressionCodec: String, rootDir: File): Unit = { val options = - s"""OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName', - |'parquet.compression'='$compressionCodec')""".stripMargin + s""" + |OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName', + |'parquet.compression'='$compressionCodec') + """.stripMargin val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else "" - sql(s"""CREATE TABLE $tableName USING Parquet $options $partitionCreate - |as select 1 as col1, 2 as p""".stripMargin) + sql( + s""" + |CREATE TABLE $tableName USING Parquet $options $partitionCreate + |AS SELECT 1 AS col1, 2 AS p + """.stripMargin) } private def checkCompressionCodec(compressionCodec: String, isPartitioned: Boolean): Unit = { From 946679745f16838932e74fabb70f2ad702fa4640 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Sat, 6 Jan 2018 11:17:01 +0800 Subject: [PATCH 20/21] Change the describtion of spark.sql.parquet.compression Change the describtion of spark.sql.parquet.compression --- docs/sql-programming-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 24c8a2e615c2a..f0f4b7646838a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -953,9 +953,9 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession spark.sql.parquet.compression.codec snappy - Sets the compression codec used when writing Parquet files. If other compression codec - configuration was found through hive or parquet, the precedence would be `compression`, - `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include: + Sets the compression codec used when writing Parquet files. If either `compression` or + `parquet.compression` is specified in the table-specific options/properties, the precedence would be + `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, snappy, gzip, lzo. From 1a8c654805656d3e143c2e63355c7b6365dac471 Mon Sep 17 00:00:00 2001 From: fjh100456 Date: Sat, 6 Jan 2018 11:24:46 +0800 Subject: [PATCH 21/21] Change describtion Change describtion --- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 182effe52a5c4..00ac21d834ec1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -323,10 +323,10 @@ object SQLConf { .createWithDefault(false) val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec") - .doc("Sets the compression codec used when writing Parquet files. If other compression codec " + - "configuration was found through hive or parquet, the precedence would be `compression`, " + - "`parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include: " + - "none, uncompressed, snappy, gzip, lzo.") + .doc("Sets the compression codec used when writing Parquet files. If either `compression` or" + + "`parquet.compression` is specified in the table-specific options/properties, the precedence" + + "would be `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`." + + "Acceptable values include: none, uncompressed, snappy, gzip, lzo.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo")) @@ -366,10 +366,10 @@ object SQLConf { .createWithDefault(true) val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec") - .doc("Sets the compression codec used when writing ORC files. If other compression codec " + - "configuration was found through hive or ORC, the precedence would be `compression`, " + - "`orc.compress`, `spark.sql.orc.compression.codec`. Acceptable values include: " + - "none, uncompressed, snappy, zlib, lzo.") + .doc("Sets the compression codec used when writing ORC files. If either `compression` or" + + "`orc.compress` is specified in the table-specific options/properties, the precedence" + + "would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." + + "Acceptable values include: none, uncompressed, snappy, zlib, lzo.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))