diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index f02f46236e2b0..f0f4b7646838a 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -953,8 +953,10 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
spark.sql.parquet.compression.codec |
snappy |
- Sets the compression codec use when writing Parquet files. Acceptable values include:
- uncompressed, snappy, gzip, lzo.
+ Sets the compression codec used when writing Parquet files. If either `compression` or
+ `parquet.compression` is specified in the table-specific options/properties, the precedence would be
+ `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
+ none, uncompressed, snappy, gzip, lzo.
|
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 84fe4bb711a4e..00ac21d834ec1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -323,11 +323,13 @@ object SQLConf {
.createWithDefault(false)
val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
- .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
- "uncompressed, snappy, gzip, lzo.")
+ .doc("Sets the compression codec used when writing Parquet files. If either `compression` or" +
+ "`parquet.compression` is specified in the table-specific options/properties, the precedence" +
+ "would be `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`." +
+ "Acceptable values include: none, uncompressed, snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
- .checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
+ .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
.createWithDefault("snappy")
val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
@@ -364,8 +366,10 @@ object SQLConf {
.createWithDefault(true)
val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
- .doc("Sets the compression codec use when writing ORC files. Acceptable values include: " +
- "none, uncompressed, snappy, zlib, lzo.")
+ .doc("Sets the compression codec used when writing ORC files. If either `compression` or" +
+ "`orc.compress` is specified in the table-specific options/properties, the precedence" +
+ "would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." +
+ "Acceptable values include: none, uncompressed, snappy, zlib, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 772d4565de548..ef67ea7d17cea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.Locale
+import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
* Acceptable values are defined in [[shortParquetCompressionCodecNames]].
*/
val compressionCodecClassName: String = {
- val codecName = parameters.getOrElse("compression",
- sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
+ // `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and
+ // `spark.sql.parquet.compression.codec`
+ // are in order of precedence from highest to lowest.
+ val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION)
+ val codecName = parameters
+ .get("compression")
+ .orElse(parquetCompressionConf)
+ .getOrElse(sqlConf.parquetCompressionCodec)
+ .toLowerCase(Locale.ROOT)
if (!shortParquetCompressionCodecNames.contains(codecName)) {
val availableCodecs =
shortParquetCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
new file mode 100644
index 0000000000000..ed8fd2b453456
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetOutputFormat
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSQLContext {
+ test("Test `spark.sql.parquet.compression.codec` config") {
+ Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c =>
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
+ val expected = if (c == "NONE") "UNCOMPRESSED" else c
+ val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
+ assert(option.compressionCodecClassName == expected)
+ }
+ }
+ }
+
+ test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") {
+ // When "compression" is configured, it should be the first choice.
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+ val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip")
+ val option = new ParquetOptions(props, spark.sessionState.conf)
+ assert(option.compressionCodecClassName == "UNCOMPRESSED")
+ }
+
+ // When "compression" is not configured, "parquet.compression" should be the preferred choice.
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+ val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
+ val option = new ParquetOptions(props, spark.sessionState.conf)
+ assert(option.compressionCodecClassName == "GZIP")
+ }
+
+ // When both "compression" and "parquet.compression" are not configured,
+ // spark.sql.parquet.compression.codec should be the right choice.
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+ val props = Map.empty[String, String]
+ val option = new ParquetOptions(props, spark.sessionState.conf)
+ assert(option.compressionCodecClassName == "SNAPPY")
+ }
+ }
+
+ private def getTableCompressionCodec(path: String): Seq[String] = {
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val codecs = for {
+ footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
+ block <- footer.getParquetMetadata.getBlocks.asScala
+ column <- block.getColumns.asScala
+ } yield column.getCodec.name()
+ codecs.distinct
+ }
+
+ private def createTableWithCompression(
+ tableName: String,
+ isPartitioned: Boolean,
+ compressionCodec: String,
+ rootDir: File): Unit = {
+ val options =
+ s"""
+ |OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
+ |'parquet.compression'='$compressionCodec')
+ """.stripMargin
+ val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
+ sql(
+ s"""
+ |CREATE TABLE $tableName USING Parquet $options $partitionCreate
+ |AS SELECT 1 AS col1, 2 AS p
+ """.stripMargin)
+ }
+
+ private def checkCompressionCodec(compressionCodec: String, isPartitioned: Boolean): Unit = {
+ withTempDir { tmpDir =>
+ val tempTableName = "TempParquetTable"
+ withTable(tempTableName) {
+ createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir)
+ val partitionPath = if (isPartitioned) "p=2" else ""
+ val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
+ val realCompressionCodecs = getTableCompressionCodec(path)
+ assert(realCompressionCodecs.forall(_ == compressionCodec))
+ }
+ }
+ }
+
+ test("Create parquet table with compression") {
+ Seq(true, false).foreach { isPartitioned =>
+ Seq("UNCOMPRESSED", "SNAPPY", "GZIP").foreach { compressionCodec =>
+ checkCompressionCodec(compressionCodec, isPartitioned)
+ }
+ }
+ }
+
+ test("Create table with unknown compression") {
+ Seq(true, false).foreach { isPartitioned =>
+ val exception = intercept[IllegalArgumentException] {
+ checkCompressionCodec("aa", isPartitioned)
+ }
+ assert(exception.getMessage.contains("Codec [aa] is not available"))
+ }
+ }
+}