Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', parquet.compression needs to be considered. #20076

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,10 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
<td><code>spark.sql.parquet.compression.codec</code></td>
<td>snappy</td>
<td>
Sets the compression codec use when writing Parquet files. Acceptable values include:
uncompressed, snappy, gzip, lzo.
Sets the compression codec used when writing Parquet files. If other compression codec
configuration was found through hive or parquet, the precedence would be `compression`,
`parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
none, uncompressed, snappy, gzip, lzo.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,13 @@ object SQLConf {
.createWithDefault(false)

val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
.doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
"uncompressed, snappy, gzip, lzo.")
.doc("Sets the compression codec used when writing Parquet files. If other compression codec " +
"configuration was found through hive or parquet, the precedence would be `compression`, " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sets the compression codec used when writing Parquet files. If either compression or parquet.compression is specified in the table-specific options/properties, the precedence would be compression, ...

"`parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include: " +
"none, uncompressed, snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
Expand Down Expand Up @@ -364,7 +366,9 @@ object SQLConf {
.createWithDefault(true)

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec use when writing ORC files. Acceptable values include: " +
.doc("Sets the compression codec used when writing ORC files. If other compression codec " +
"configuration was found through hive or ORC, the precedence would be `compression`, " +
"`orc.compress`, `spark.sql.orc.compression.codec`. Acceptable values include: " +
"none, uncompressed, snappy, zlib, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.util.Locale

import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand All @@ -27,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf
/**
* Options for the Parquet data source.
*/
private[parquet] class ParquetOptions(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revive private[parquet]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It should be revived. Thanks.

class ParquetOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient private val sqlConf: SQLConf)
extends Serializable {
Expand All @@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
* Acceptable values are defined in [[shortParquetCompressionCodecNames]].
*/
val compressionCodecClassName: String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change compressionCodecClassName to compressionCodec instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Seems you're right.
@gatorsmile Are we mistaken, shouldn't we change ParquetOptions's compressionCodec to compressionCodecClassName ? Because OrcOptions and TextOptions are all using compressionCodec .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compressionCodecClassName is a better name. We should change all the others to this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could alternatively say compressionCodecName here. It's rather names like UNCOMPRESSED, LZO, etc in this case. For the text based sources, they are canonical class names so I am okay with compressionCodecClassName but for ORC and Parquet these are not classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compressionCodecName is also fine to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, change all compressionCodecClassName and compressionCodec to compressionCodecName? In TextOptions ,JSONOptions and CSVOptions too ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile @HyukjinKwon
In TextOptions ,JSONOptions and CSVOptions, it's "Option[String]", but in OrcOptions and ParquetOptions, it's a "String".
Just change compressionCodecClassName in OrcOptions and ParquetOptions to compressionCodecName is ok ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do Parquet and ORC ones here for now if that's also fine to @gatorsmile.

val codecName = parameters.getOrElse("compression",
sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and
// `spark.sql.parquet.compression.codec`
// are in order of precedence from highest to lowest.
val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION)
val codecName = parameters
.get("compression")
.orElse(parquetCompressionConf)
.getOrElse(sqlConf.parquetCompressionCodec)
.toLowerCase(Locale.ROOT)
if (!shortParquetCompressionCodecNames.contains(codecName)) {
val availableCodecs =
shortParquetCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import java.io.File

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSQLContext {
test("Test `spark.sql.parquet.compression.codec` config") {
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
val expected = if (c == "NONE") "UNCOMPRESSED" else c
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
assert(option.compressionCodecClassName == expected)
}
}
}

test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") {
// When "compression" is configured, it should be the first choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip")
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "UNCOMPRESSED")
}

// When "compression" is not configured, "parquet.compression" should be the preferred choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "GZIP")
}

// When both "compression" and "parquet.compression" are not configured,
// spark.sql.parquet.compression.codec should be the right choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map.empty[String, String]
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "SNAPPY")
}
}

private def getTableCompressionCodec(path: String): Seq[String] = {
val hadoopConf = spark.sessionState.newHadoopConf()
val codecs = for {
footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
block <- footer.getParquetMetadata.getBlocks.asScala
column <- block.getColumns.asScala
} yield column.getCodec.name()
codecs.distinct
}

private def createTableWithCompression(
tableName: String,
isPartitioned: Boolean,
compressionCodec: String,
rootDir: File): Unit = {
val options =
s"""OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
|'parquet.compression'='$compressionCodec')""".stripMargin
val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
sql(s"""CREATE TABLE $tableName USING Parquet $options $partitionCreate
|as select 1 as col1, 2 as p""".stripMargin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    val options =
      s"""
        |OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
        |'parquet.compression'='$compressionCodec')
       """.stripMargin
    val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
    sql(
      s"""
        |CREATE TABLE $tableName USING Parquet $options $partitionCreate
        |AS SELECT 1 AS col1, 2 AS p
      """.stripMargin)

}

private def checkCompressionCodec(compressionCodec: String, isPartitioned: Boolean): Unit = {
withTempDir { tmpDir =>
val tempTableName = "TempParquetTable"
withTable(tempTableName) {
createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir)
val partitionPath = if (isPartitioned) "p=2" else ""
val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
val realCompressionCodecs = getTableCompressionCodec(path)
assert(realCompressionCodecs.forall(_ == compressionCodec))
}
}
}

test("Create parquet table with compression") {
Seq(true, false).foreach { isPartitioned =>
Seq("UNCOMPRESSED", "SNAPPY", "GZIP").foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
}
}

test("Create table with unknown compression") {
Seq(true, false).foreach { isPartitioned =>
val exception = intercept[IllegalArgumentException] {
checkCompressionCodec("aa", isPartitioned)
}
assert(exception.getMessage.contains("Codec [aa] is not available"))
}
}
}