Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5341] Fix some Spark 3.5 UTs #5445

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with Adapt
}
}

// Disable for Sparke3.5.
testWithSpecifiedSparkVersion("input row", Some("3.2"), Some("3.4")) {
testWithSpecifiedSparkVersion("input row", Some("3.2")) {
withTable("t") {
sql("CREATE TABLE t USING json AS SELECT * FROM values(1, 'a', (2, 'b'), (3, 'c'))")
runQueryAndCompare("SELECT * FROM t", cache = true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
}
}

// Disable for Spark3.5.
testWithSpecifiedSparkVersion("generate hash join plan - v2", Some("3.2"), Some("3.4")) {
testWithSpecifiedSparkVersion("generate hash join plan - v2", Some("3.2")) {
withSQLConf(
("spark.sql.autoBroadcastJoinThreshold", "-1"),
("spark.sql.adaptive.enabled", "false"),
Expand All @@ -88,6 +87,8 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
val wholeStages = plan.collect { case wst: WholeStageTransformer => wst }
if (SparkShimLoader.getSparkVersion.startsWith("3.2.")) {
assert(wholeStages.length == 1)
} else if (SparkShimLoader.getSparkVersion.startsWith("3.5.")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did it increase to 5 in 3.5? I was also debugging this and saw there were two more exchanges coming in 3.5, shall we debug why there are more exchanges?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The physical plan in spark3.5 seems changed:

*(9) Project [l_partkey#200L]
+- *(9) SortMergeJoin [l_suppkey#201L], [ps_suppkey#156L], Inner
   :- *(6) Sort [l_suppkey#201L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(l_suppkey#201L, 5), ENSURE_REQUIREMENTS, [plan_id=300]
   :     +- *(5) Project [l_partkey#200L, l_suppkey#201L]
   :        +- *(5) SortMergeJoin [l_partkey#200L], [p_partkey#123L], Inner
   :           :- *(2) Sort [l_partkey#200L ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(l_partkey#200L, 5), ENSURE_REQUIREMENTS, [plan_id=283]
   :           :     +- *(1) Filter (isnotnull(l_partkey#200L) AND isnotnull(l_suppkey#201L))
   :           :        +- *(1) ColumnarToRow
   :           :           +- BatchScan parquet file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-classes/tpch-data-parquet-velox/lineitem[l_partkey#200L, l_suppkey#201L] ParquetScan DataFilters: [isnotnull(l_partkey#200L), isnotnull(l_suppkey#201L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-cl..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(l_partkey), IsNotNull(l_suppkey)], PushedGroupBy: [], ReadSchema: struct<l_partkey:bigint,l_suppkey:bigint> RuntimeFilters: []
   :           +- *(4) Sort [p_partkey#123L ASC NULLS FIRST], false, 0
   :              +- Exchange hashpartitioning(p_partkey#123L, 5), ENSURE_REQUIREMENTS, [plan_id=292]
   :                 +- *(3) Filter isnotnull(p_partkey#123L)
   :                    +- *(3) ColumnarToRow
   :                       +- BatchScan parquet file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-classes/tpch-data-parquet-velox/part[p_partkey#123L] ParquetScan DataFilters: [isnotnull(p_partkey#123L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-cl..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(p_partkey)], PushedGroupBy: [], ReadSchema: struct<p_partkey:bigint> RuntimeFilters: []
   +- *(8) Sort [ps_suppkey#156L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(ps_suppkey#156L, 5), ENSURE_REQUIREMENTS, [plan_id=309]
         +- *(7) Filter isnotnull(ps_suppkey#156L)
            +- *(7) ColumnarToRow
               +- BatchScan parquet file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-classes/tpch-data-parquet-velox/partsupp[ps_suppkey#156L] ParquetScan DataFilters: [isnotnull(ps_suppkey#156L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-cl..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(ps_suppkey)], PushedGroupBy: [], ReadSchema: struct<ps_suppkey:bigint> RuntimeFilters: []

My understanding is that it has 4 exchanges so 5 stages.

Copy link
Contributor

@ayushi-agarwal ayushi-agarwal Apr 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in 3.4 plan I saw there were only 2 exchanges, there was no exchange after part table and lineitem table scan for their join. Seems some regression in 3.5 resulting in 2 more exchanges.

Copy link
Contributor Author

@yma11 yma11 Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ayushi-agarwal Do you want to further dig out the plan change? if so, you can open a ticket dedicated for it. Maybe related with data set or some configurations. Let's merge this PR first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will create a ticket to investigate further. Thanks @yma11

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(wholeStages.length == 5)
} else {
assert(wholeStages.length == 3)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenCTEInlineSuiteAEOff]
enableSuite[GlutenCTEInlineSuiteAEOn]
enableSuite[GlutenDataFrameAggregateSuite]
// Disable for Spark3.5.
// Test for vanilla spark codegen, not apply for Gluten
.exclude("SPARK-43876: Enable fast hashmap for distinct queries")
.exclude(
"zero moments", // [velox does not return NaN]
Expand Down
Loading