Skip to content

Commit

Permalink
[SPARK-45484][SQL] Fix the bug that uses incorrect parquet compressio…
Browse files Browse the repository at this point in the history
…n codec lz4raw

### What changes were proposed in this pull request?
apache#41507 supported the new parquet compression codec `lz4raw`. But `lz4raw` is not a correct parquet compression codec name.

This mistake causes error. Please refer https://github.com/apache/spark/pull/43310/files#r1352405312

The root cause is parquet uses `lz4_raw` as its name and store it into the metadata of parquet file. Please refer https://github.com/apache/spark/blob/6373f19f537f69c6460b2e4097f19903c01a608f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala#L65

We should use `lz4_raw` as its name.

### Why are the changes needed?
Fix the bug that uses incorrect parquet compression codec `lz4raw`.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Fix a bug.

### How was this patch tested?
New test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes apache#43310 from beliefer/SPARK-45484.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Jiaan Geng <[email protected]>
  • Loading branch information
beliefer committed Oct 20, 2023
1 parent cd827d1 commit b9ac7d3
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ license: |
- Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set `spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`).
- Since Spark 4.0, any read of SQL tables takes into consideration the SQL configs `spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles` instead of the core config `spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`.
- Since Spark 4.0, `spark.sql.hive.metastore` drops the support of Hive prior to 2.0.0 as they require JDK 8 that Spark does not support anymore. Users should migrate to higher versions.
- Since Spark 4.0, `spark.sql.parquet.compression.codec` drops the support of codec name `lz4raw`, please use `lz4_raw` instead.

## Upgrading from Spark SQL 3.4 to 3.5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1015,12 +1015,12 @@ object SQLConf {
"`parquet.compression` is specified in the table-specific options/properties, the " +
"precedence would be `compression`, `parquet.compression`, " +
"`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " +
"snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.")
"snappy, gzip, lzo, brotli, lz4, lz4_raw, zstd.")
.version("1.1.1")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(
Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw", "zstd"))
Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4_raw", "zstd"))
.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object ParquetOptions extends DataSourceOptions {
"lzo" -> CompressionCodecName.LZO,
"brotli" -> CompressionCodecName.BROTLI,
"lz4" -> CompressionCodecName.LZ4,
"lz4raw" -> CompressionCodecName.LZ4_RAW,
"lz4_raw" -> CompressionCodecName.LZ4_RAW,
"zstd" -> CompressionCodecName.ZSTD)

def getParquetCompressionCodecName(name: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
// Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available
// on Maven Central.
override protected def availableCodecs: Seq[String] = {
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw")
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,16 @@ import org.apache.spark.sql.test.SharedSparkSession

class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession {
test("Test `spark.sql.parquet.compression.codec` config") {
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "LZ4", "BROTLI", "ZSTD").foreach { c =>
Seq(
"NONE",
"UNCOMPRESSED",
"SNAPPY",
"GZIP",
"LZO",
"LZ4",
"BROTLI",
"ZSTD",
"LZ4_RAW").foreach { c =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
val expected = if (c == "NONE") "UNCOMPRESSED" else c
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
Expand Down Expand Up @@ -105,7 +114,7 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar

test("Create parquet table with compression") {
Seq(true, false).foreach { isPartitioned =>
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4")
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4_RAW")
codecs.foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
Expand Down

0 comments on commit b9ac7d3

Please sign in to comment.