diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala index df08174101f19..394c4e01651e8 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala @@ -371,8 +371,7 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu } } - // Disable for Spark3.5. - testWithSpecifiedSparkVersion("regr_r2", Some("3.3"), Some("3.4")) { + testWithSpecifiedSparkVersion("regr_r2", Some("3.3")) { runQueryAndCompare(""" |select regr_r2(l_partkey, l_suppkey) from lineitem; |""".stripMargin) { @@ -391,8 +390,7 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu } } - // Disable for Spark3.5. - testWithSpecifiedSparkVersion("regr_slope", Some("3.4"), Some("3.4")) { + testWithSpecifiedSparkVersion("regr_slope", Some("3.4")) { runQueryAndCompare(""" |select regr_slope(l_partkey, l_suppkey) from lineitem; |""".stripMargin) { @@ -411,8 +409,7 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu } } - // Disable for Sparke3.5. - testWithSpecifiedSparkVersion("regr_intercept", Some("3.4"), Some("3.4")) { + testWithSpecifiedSparkVersion("regr_intercept", Some("3.4")) { runQueryAndCompare(""" |select regr_intercept(l_partkey, l_suppkey) from lineitem; |""".stripMargin) { @@ -431,8 +428,7 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu } } - // Disable for Sparke3.5. - testWithSpecifiedSparkVersion("regr_sxy regr_sxx regr_syy", Some("3.4"), Some("3.4")) { + testWithSpecifiedSparkVersion("regr_sxy regr_sxx regr_syy", Some("3.4")) { runQueryAndCompare(""" |select regr_sxy(l_quantity, l_tax) from lineitem; |""".stripMargin) { diff --git a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala index f997693f82128..22a5b6b70be2c 100644 --- a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala @@ -17,14 +17,12 @@ package org.apache.gluten.execution import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.col import org.apache.iceberg.spark.SparkWriteOptions -import org.scalatest.Ignore import java.io.File -// Ignored due to failures, see https://github.com/apache/incubator-gluten/issues/5362 -@Ignore class VeloxTPCHIcebergSuite extends VeloxTPCHSuite { protected val tpchBasePath: String = new File( @@ -97,21 +95,21 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite { } } -// Ignored due to failures, see https://github.com/apache/incubator-gluten/issues/5362 -@Ignore class VeloxPartitionedTableTPCHIcebergSuite extends VeloxTPCHIcebergSuite { override protected def createTPCHNotNullTables(): Unit = { TPCHTables.map { table => val tablePath = new File(resourcePath, table.name).getAbsolutePath - val tableDF = spark.read.format(fileFormat).load(tablePath) + val tableDF = spark.read + .format(fileFormat) + .load(tablePath) + .repartition(table.partitionColumns.map(col): _*) + .sortWithinPartitions(table.partitionColumns.map(col): _*) - tableDF - .repartition(800) - .write + tableDF.write .format("iceberg") .partitionBy(table.partitionColumns: _*) - .option(SparkWriteOptions.FANOUT_ENABLED, "true") + .option(SparkWriteOptions.FANOUT_ENABLED, "false") .mode("overwrite") .saveAsTable(table.name) (table.name, tableDF) diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index 70e0b73ed3ac2..1e23b49f9a527 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, InternalRow} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, TypedImperativeAggregate} +import org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, RegrIntercept, RegrR2, RegrReplacement, RegrSlope, RegrSXY, TypedImperativeAggregate} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution} import org.apache.spark.sql.catalyst.rules.Rule @@ -71,7 +71,15 @@ class Spark35Shims extends SparkShims { Sig[Empty2Null](ExpressionNames.EMPTY2NULL)) } - override def aggregateExpressionMappings: Seq[Sig] = Seq.empty + override def aggregateExpressionMappings: Seq[Sig] = { + Seq( + Sig[RegrR2](ExpressionNames.REGR_R2), + Sig[RegrSlope](ExpressionNames.REGR_SLOPE), + Sig[RegrIntercept](ExpressionNames.REGR_INTERCEPT), + Sig[RegrSXY](ExpressionNames.REGR_SXY), + Sig[RegrReplacement](ExpressionNames.REGR_REPLACEMENT) + ) + } override def convertPartitionTransforms( partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = {