Skip to content

Commit

Permalink
[SPARK-45484][SQL][3.5] Deprecated the incorrect parquet compression …
Browse files Browse the repository at this point in the history
…codec lz4raw

### What changes were proposed in this pull request?
According to the discussion at #43310 (comment), this PR want deprecates the incorrect parquet compression codec `lz4raw` at Spark 3.5.1 and adds a warning log.

The warning log prompts users that `lz4raw` will be removed it at Apache Spark 4.0.0.

### Why are the changes needed?
Deprecated the incorrect parquet compression codec `lz4raw`.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the waring log below.
`Parquet compression codec 'lz4raw' is deprecated, please use 'lz4_raw'`

### How was this patch tested?
Exists test cases and new test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43330 from beliefer/SPARK-45484_3.5.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Jiaan Geng <[email protected]>
  • Loading branch information
beliefer committed Oct 17, 2023
1 parent 0dc1962 commit b210373
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -995,12 +995,22 @@ object SQLConf {
"`parquet.compression` is specified in the table-specific options/properties, the " +
"precedence would be `compression`, `parquet.compression`, " +
"`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " +
"snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.")
"snappy, gzip, lzo, brotli, lz4, lz4raw, lz4_raw, zstd.")
.version("1.1.1")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(
Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw", "zstd"))
Set(
"none",
"uncompressed",
"snappy",
"gzip",
"lzo",
"brotli",
"lz4",
"lz4raw",
"lz4_raw",
"zstd"))
.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Locale
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -32,7 +33,7 @@ import org.apache.spark.sql.internal.SQLConf
class ParquetOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient private val sqlConf: SQLConf)
extends FileSourceOptions(parameters) {
extends FileSourceOptions(parameters) with Logging {

import ParquetOptions._

Expand All @@ -59,6 +60,9 @@ class ParquetOptions(
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
}
if (codecName == "lz4raw") {
log.warn("Parquet compression codec 'lz4raw' is deprecated, please use 'lz4_raw'")
}
shortParquetCompressionCodecNames(codecName).name()
}

Expand Down Expand Up @@ -96,7 +100,9 @@ object ParquetOptions extends DataSourceOptions {
"lzo" -> CompressionCodecName.LZO,
"brotli" -> CompressionCodecName.BROTLI,
"lz4" -> CompressionCodecName.LZ4,
// Deprecated, to be removed at Spark 4.0.0, please use 'lz4_raw' instead.
"lz4raw" -> CompressionCodecName.LZ4_RAW,
"lz4_raw" -> CompressionCodecName.LZ4_RAW,
"zstd" -> CompressionCodecName.ZSTD)

def getParquetCompressionCodecName(name: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
// Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available
// on Maven Central.
override protected def availableCodecs: Seq[String] = {
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw")
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw", "lz4_raw")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,23 @@ import org.apache.spark.sql.test.SharedSparkSession

class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession {
test("Test `spark.sql.parquet.compression.codec` config") {
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "LZ4", "BROTLI", "ZSTD").foreach { c =>
Seq(
"NONE",
"UNCOMPRESSED",
"SNAPPY",
"GZIP",
"LZO",
"LZ4",
"BROTLI",
"ZSTD",
"LZ4RAW",
"LZ4_RAW").foreach { c =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
val expected = if (c == "NONE") "UNCOMPRESSED" else c
val expected = c match {
case "NONE" => "UNCOMPRESSED"
case "LZ4RAW" => "LZ4_RAW"
case other => other
}
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
assert(option.compressionCodecClassName == expected)
}
Expand Down Expand Up @@ -97,15 +111,18 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar
createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir)
val partitionPath = if (isPartitioned) "p=2" else ""
val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
val realCompressionCodecs = getTableCompressionCodec(path)
val realCompressionCodecs = getTableCompressionCodec(path).map {
case "LZ4_RAW" if compressionCodec == "LZ4RAW" => "LZ4RAW"
case other => other
}
assert(realCompressionCodecs.forall(_ == compressionCodec))
}
}
}

test("Create parquet table with compression") {
Seq(true, false).foreach { isPartitioned =>
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4")
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4RAW", "LZ4_RAW")
codecs.foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
Expand Down

0 comments on commit b210373

Please sign in to comment.