diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 73d3756ef6b93..427d04801902f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -995,12 +995,22 @@ object SQLConf { "`parquet.compression` is specified in the table-specific options/properties, the " + "precedence would be `compression`, `parquet.compression`, " + "`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " + - "snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.") + "snappy, gzip, lzo, brotli, lz4, lz4raw, lz4_raw, zstd.") .version("1.1.1") .stringConf .transform(_.toLowerCase(Locale.ROOT)) .checkValues( - Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw", "zstd")) + Set( + "none", + "uncompressed", + "snappy", + "gzip", + "lzo", + "brotli", + "lz4", + "lz4raw", + "lz4_raw", + "zstd")) .createWithDefault("snappy") val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 023d2460959cd..95869b6fbb9d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -22,6 +22,7 @@ import java.util.Locale import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf @@ -32,7 +33,7 @@ import org.apache.spark.sql.internal.SQLConf class ParquetOptions( @transient private val parameters: CaseInsensitiveMap[String], @transient private val sqlConf: SQLConf) - extends FileSourceOptions(parameters) { + extends FileSourceOptions(parameters) with Logging { import ParquetOptions._ @@ -59,6 +60,9 @@ class ParquetOptions( throw new IllegalArgumentException(s"Codec [$codecName] " + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") } + if (codecName == "lz4raw") { + log.warn("Parquet compression codec 'lz4raw' is deprecated, please use 'lz4_raw'") + } shortParquetCompressionCodecNames(codecName).name() } @@ -96,7 +100,9 @@ object ParquetOptions extends DataSourceOptions { "lzo" -> CompressionCodecName.LZO, "brotli" -> CompressionCodecName.BROTLI, "lz4" -> CompressionCodecName.LZ4, + // Deprecated, to be removed at Spark 4.0.0, please use 'lz4_raw' instead. "lz4raw" -> CompressionCodecName.LZ4_RAW, + "lz4_raw" -> CompressionCodecName.LZ4_RAW, "zstd" -> CompressionCodecName.ZSTD) def getParquetCompressionCodecName(name: String): String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala index 09a348cd29451..9f3d6ff48d477 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala @@ -59,7 +59,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite { // Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available // on Maven Central. override protected def availableCodecs: Seq[String] = { - Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw") + Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw", "lz4_raw") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index ac0aad16f1eba..27e2816ce9d94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -29,9 +29,23 @@ import org.apache.spark.sql.test.SharedSparkSession class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession { test("Test `spark.sql.parquet.compression.codec` config") { - Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "LZ4", "BROTLI", "ZSTD").foreach { c => + Seq( + "NONE", + "UNCOMPRESSED", + "SNAPPY", + "GZIP", + "LZO", + "LZ4", + "BROTLI", + "ZSTD", + "LZ4RAW", + "LZ4_RAW").foreach { c => withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { - val expected = if (c == "NONE") "UNCOMPRESSED" else c + val expected = c match { + case "NONE" => "UNCOMPRESSED" + case "LZ4RAW" => "LZ4_RAW" + case other => other + } val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) assert(option.compressionCodecClassName == expected) } @@ -97,7 +111,10 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir) val partitionPath = if (isPartitioned) "p=2" else "" val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath" - val realCompressionCodecs = getTableCompressionCodec(path) + val realCompressionCodecs = getTableCompressionCodec(path).map { + case "LZ4_RAW" if compressionCodec == "LZ4RAW" => "LZ4RAW" + case other => other + } assert(realCompressionCodecs.forall(_ == compressionCodec)) } } @@ -105,7 +122,7 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar test("Create parquet table with compression") { Seq(true, false).foreach { isPartitioned => - val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4") + val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4RAW", "LZ4_RAW") codecs.foreach { compressionCodec => checkCompressionCodec(compressionCodec, isPartitioned) }