Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5341]Fix test write parquet with compression codec #5424

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ object BackendSettings extends BackendSettingsApi {

def validateCompressionCodec(): Option[String] = {
// Velox doesn't support brotli and lzo.
val unSupportedCompressions = Set("brotli, lzo")
val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options)
if (unSupportedCompressions.contains(compressionCodec)) {
Some("Brotli or lzo compression codec is unsupported in Velox backend.")
Some("Brotli, lzo, lz4raw and lz4_raw compression codec is unsupported in Velox backend.")
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,6 @@ class VeloxTestSettings extends BackendTestSettings {
// Rewrite by just removing test timestamp.
.exclude("test reading unaligned pages - test all types")
enableSuite[GlutenParquetCompressionCodecPrecedenceSuite]
.exclude("Create parquet table with compression")
enableSuite[GlutenParquetDeltaByteArrayEncodingSuite]
enableSuite[GlutenParquetDeltaEncodingInteger]
enableSuite[GlutenParquetDeltaEncodingLong]
Expand Down Expand Up @@ -742,6 +741,9 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("nested column: Max(top level column) not push down")
.exclude("nested column: Count(nested sub-field) not push down")
enableSuite[GlutenParquetCodecSuite]
// codec not supported in native
.exclude("write and read - file source parquet - codec: lz4_raw")
.exclude("write and read - file source parquet - codec: lz4raw")
enableSuite[GlutenOrcCodecSuite]
enableSuite[GlutenFileSourceStrategySuite]
// Plan comparison.
Expand Down Expand Up @@ -1096,9 +1098,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Merge runtime bloom filters")
enableSuite[GlutenIntervalFunctionsSuite]
enableSuite[GlutenJoinSuite]
// Disable for Spark3.5.
.exclude(
"SPARK-36612: Support left outer join build left or right outer join build right in shuffled hash join")
// exclude as it check spark plan
.exclude("SPARK-36794: Ignore duplicated key when building relation for semi/anti hash join")
// exclude as it check for SMJ node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class GlutenJoinSuite extends JoinSuite with GlutenSQLTestsTrait {
"SPARK-34593: Preserve broadcast nested loop join partitioning and ordering",
"SPARK-35984: Config to force applying shuffled hash join",
"test SortMergeJoin (with spill)",
"SPARK-36612: Support left outer join build left or right" +
" outer join build right in shuffled hash join",
// NaN is not supported currently, just skip.
"NaN and -0.0 in join keys"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,71 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet

import org.apache.spark.sql.GlutenSQLTestsBaseTrait

import org.apache.hadoop.fs.Path

import java.io.File

import scala.collection.JavaConverters._

class GlutenParquetCompressionCodecPrecedenceSuite
extends ParquetCompressionCodecPrecedenceSuite
with GlutenSQLTestsBaseTrait {

private def getTableCompressionCodec(path: String): Seq[String] = {
val hadoopConf = spark.sessionState.newHadoopConf()
val codecs = for {
footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
block <- footer.getParquetMetadata.getBlocks.asScala
column <- block.getColumns.asScala
} yield column.getCodec.name()
codecs.distinct
}

private def createTableWithCompression(
tableName: String,
isPartitioned: Boolean,
compressionCodec: String,
rootDir: File): Unit = {
val options =
s"""
|OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
|'parquet.compression'='$compressionCodec')
""".stripMargin
val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
sql(s"""
|CREATE TABLE $tableName USING Parquet $options $partitionCreate
|AS SELECT 1 AS col1, 2 AS p
""".stripMargin)
}
private def checkCompressionCodec(compressionCodec: String, isPartitioned: Boolean): Unit = {
withTempDir {
tmpDir =>
val tempTableName = "TempParquetTable"
withTable(tempTableName) {
createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir)
val partitionPath = if (isPartitioned) "p=2" else ""
val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
val realCompressionCodecs = getTableCompressionCodec(path)
// Native parquet write currently not support LZ4_RAW
// reference here: https://github.com/facebookincubator/velox/blob/d796cfc8c2a3cc045f
// 1b33880c5839fec21a6b3b/velox/dwio/parquet/writer/Writer.cpp#L107C1-L120C17
if (compressionCodec == "LZ4_RAW" || compressionCodec == "LZ4RAW") {
assert(realCompressionCodecs.forall(_ == "SNAPPY"))
} else {
assert(realCompressionCodecs.forall(_ == compressionCodec))
}
}
}
}

testGluten("Create parquet table with compression") {
Seq(true, false).foreach {
isPartitioned =>
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4RAW", "LZ4_RAW")
codecs.foreach {
compressionCodec => checkCompressionCodec(compressionCodec, isPartitioned)
}
}
}
}
with GlutenSQLTestsBaseTrait {}
Loading