Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45484][SQL][3.5] Deprecated the incorrect parquet compression codec lz4raw #43330

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -995,12 +995,22 @@ object SQLConf {
"`parquet.compression` is specified in the table-specific options/properties, the " +
"precedence would be `compression`, `parquet.compression`, " +
"`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " +
"snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.")
"snappy, gzip, lzo, brotli, lz4, lz4raw, lz4_raw, zstd.")
.version("1.1.1")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(
Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw", "zstd"))
Set(
"none",
"uncompressed",
"snappy",
"gzip",
"lzo",
"brotli",
"lz4",
"lz4raw",
"lz4_raw",
"zstd"))
.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Locale
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -32,7 +33,7 @@ import org.apache.spark.sql.internal.SQLConf
class ParquetOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient private val sqlConf: SQLConf)
extends FileSourceOptions(parameters) {
extends FileSourceOptions(parameters) with Logging {

import ParquetOptions._

Expand All @@ -59,6 +60,9 @@ class ParquetOptions(
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
}
if (codecName == "lz4raw") {
log.warn("Parquet compression codec 'lz4raw' is deprecated, please use 'lz4_raw'")
}
shortParquetCompressionCodecNames(codecName).name()
}

Expand Down Expand Up @@ -96,7 +100,9 @@ object ParquetOptions extends DataSourceOptions {
"lzo" -> CompressionCodecName.LZO,
"brotli" -> CompressionCodecName.BROTLI,
"lz4" -> CompressionCodecName.LZ4,
// Deprecated, to be removed at Spark 4.0.0, please use 'lz4_raw' instead.
"lz4raw" -> CompressionCodecName.LZ4_RAW,
"lz4_raw" -> CompressionCodecName.LZ4_RAW,
"zstd" -> CompressionCodecName.ZSTD)

def getParquetCompressionCodecName(name: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
// Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available
// on Maven Central.
override protected def availableCodecs: Seq[String] = {
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw")
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw", "lz4_raw")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,23 @@ import org.apache.spark.sql.test.SharedSparkSession

class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession {
test("Test `spark.sql.parquet.compression.codec` config") {
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "LZ4", "BROTLI", "ZSTD").foreach { c =>
Seq(
"NONE",
"UNCOMPRESSED",
"SNAPPY",
"GZIP",
"LZO",
"LZ4",
"BROTLI",
"ZSTD",
"LZ4RAW",
"LZ4_RAW").foreach { c =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
val expected = if (c == "NONE") "UNCOMPRESSED" else c
val expected = c match {
case "NONE" => "UNCOMPRESSED"
case "LZ4RAW" => "LZ4_RAW"
case other => other
}
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
assert(option.compressionCodecClassName == expected)
}
Expand Down Expand Up @@ -97,15 +111,18 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar
createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir)
val partitionPath = if (isPartitioned) "p=2" else ""
val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
val realCompressionCodecs = getTableCompressionCodec(path)
val realCompressionCodecs = getTableCompressionCodec(path).map {
case "LZ4_RAW" if compressionCodec == "LZ4RAW" => "LZ4RAW"
case other => other
}
assert(realCompressionCodecs.forall(_ == compressionCodec))
}
}
}

test("Create parquet table with compression") {
Seq(true, false).foreach { isPartitioned =>
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4")
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4RAW", "LZ4_RAW")
codecs.foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
Expand Down