Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45484][SQL] Fix the bug that uses incorrect parquet compression codec lz4raw #43310

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ license: |
- Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set `spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`).
- Since Spark 4.0, any read of SQL tables takes into consideration the SQL configs `spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles` instead of the core config `spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`.
- Since Spark 4.0, `spark.sql.hive.metastore` drops the support of Hive prior to 2.0.0 as they require JDK 8 that Spark does not support anymore. Users should migrate to higher versions.
- Since Spark 4.0, `spark.sql.parquet.compression.codec` drops the support of codec name `lz4raw`, please use `lz4_raw` instead.

## Upgrading from Spark SQL 3.4 to 3.5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,12 +1014,12 @@ object SQLConf {
"`parquet.compression` is specified in the table-specific options/properties, the " +
"precedence would be `compression`, `parquet.compression`, " +
"`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " +
"snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.")
"snappy, gzip, lzo, brotli, lz4, lz4_raw, zstd.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the suffix of the file after changing to LZ4_RAW?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR:
part-00000-fc07d464-03b2-42d6-adc1-68a3adca1752.c000.lz4raw.parquet
After this PR:
part-00000-07244014-f31a-4097-8878-dd3630e721ce.c000.lz4raw.parquet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is no change at file name layer, it's good.

.version("1.1.1")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(
Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw", "zstd"))
Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4_raw", "zstd"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, unfortunately, we cannot do like this because Apache Spark 3.5.0 is already released.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that. But 3.5.0 released latest, could we fix it ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if we remove this here, the production job with the existing configuration fails with Spark 3.5.1.

.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object ParquetOptions extends DataSourceOptions {
"lzo" -> CompressionCodecName.LZO,
"brotli" -> CompressionCodecName.BROTLI,
"lz4" -> CompressionCodecName.LZ4,
"lz4raw" -> CompressionCodecName.LZ4_RAW,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. We cannot delete like this. Only we can add like next line for backward-compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea.

"lz4_raw" -> CompressionCodecName.LZ4_RAW,
"zstd" -> CompressionCodecName.ZSTD)

def getParquetCompressionCodecName(name: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
// Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available
// on Maven Central.
override protected def availableCodecs: Seq[String] = {
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw")
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it succeeds currently. Do you know the difference, @beliefer ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,16 @@ import org.apache.spark.sql.test.SharedSparkSession

class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession {
test("Test `spark.sql.parquet.compression.codec` config") {
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "LZ4", "BROTLI", "ZSTD").foreach { c =>
Seq(
"NONE",
"UNCOMPRESSED",
"SNAPPY",
"GZIP",
"LZO",
"LZ4",
"BROTLI",
"ZSTD",
"LZ4_RAW").foreach { c =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
val expected = if (c == "NONE") "UNCOMPRESSED" else c
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
Expand Down Expand Up @@ -105,7 +114,7 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar

test("Create parquet table with compression") {
Seq(true, false).foreach { isPartitioned =>
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4")
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4_RAW")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this fix, if we use LZ4RAW here, the test case will be failed!

realCompressionCodecs.forall(((x$1: String) => x$1.==(compressionCodec))) was false
ScalaTestFailureLocation: org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite at (ParquetCompressionCodecPrecedenceSuite.scala:101)
org.scalatest.exceptions.TestFailedException: realCompressionCodecs.forall(((x$1: String) => x$1.==(compressionCodec))) was false
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
	at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
	at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$checkCompressionCodec$2(ParquetCompressionCodecPrecedenceSuite.scala:101)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:306)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:304)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.withTable(ParquetCompressionCodecPrecedenceSuite.scala:30)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$checkCompressionCodec$1(ParquetCompressionCodecPrecedenceSuite.scala:96)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$checkCompressionCodec$1$adapted(ParquetCompressionCodecPrecedenceSuite.scala:94)
	at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1(SQLTestUtils.scala:79)
	at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1$adapted(SQLTestUtils.scala:78)
	at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:245)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.org$apache$spark$sql$test$SQLTestUtils$$super$withTempDir(ParquetCompressionCodecPrecedenceSuite.scala:30)
	at org.apache.spark.sql.test.SQLTestUtils.withTempDir(SQLTestUtils.scala:78)
	at org.apache.spark.sql.test.SQLTestUtils.withTempDir$(SQLTestUtils.scala:77)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.withTempDir(ParquetCompressionCodecPrecedenceSuite.scala:30)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.checkCompressionCodec(ParquetCompressionCodecPrecedenceSuite.scala:94)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$new$10(ParquetCompressionCodecPrecedenceSuite.scala:110)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$new$10$adapted(ParquetCompressionCodecPrecedenceSuite.scala:109)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$new$9(ParquetCompressionCodecPrecedenceSuite.scala:109)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$new$9$adapted(ParquetCompressionCodecPrecedenceSuite.scala:107)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$new$8(ParquetCompressionCodecPrecedenceSuite.scala:107)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only a test utility method, checkCompressionCodec, failure? Or, is there any user-facing issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. checkCompressionCodec failed! It's a test utility method and unrelated to user-facing.

codecs.foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
Expand Down