-
Notifications
You must be signed in to change notification settings - Fork 434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-5341] Fix some Spark 3.5 UTs #5445
Conversation
Run Gluten Clickhouse CI |
@@ -88,6 +87,8 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite { | |||
val wholeStages = plan.collect { case wst: WholeStageTransformer => wst } | |||
if (SparkShimLoader.getSparkVersion.startsWith("3.2.")) { | |||
assert(wholeStages.length == 1) | |||
} else if (SparkShimLoader.getSparkVersion.startsWith("3.5.")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did it increase to 5 in 3.5? I was also debugging this and saw there were two more exchanges coming in 3.5, shall we debug why there are more exchanges?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The physical plan in spark3.5 seems changed:
*(9) Project [l_partkey#200L]
+- *(9) SortMergeJoin [l_suppkey#201L], [ps_suppkey#156L], Inner
:- *(6) Sort [l_suppkey#201L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(l_suppkey#201L, 5), ENSURE_REQUIREMENTS, [plan_id=300]
: +- *(5) Project [l_partkey#200L, l_suppkey#201L]
: +- *(5) SortMergeJoin [l_partkey#200L], [p_partkey#123L], Inner
: :- *(2) Sort [l_partkey#200L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(l_partkey#200L, 5), ENSURE_REQUIREMENTS, [plan_id=283]
: : +- *(1) Filter (isnotnull(l_partkey#200L) AND isnotnull(l_suppkey#201L))
: : +- *(1) ColumnarToRow
: : +- BatchScan parquet file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-classes/tpch-data-parquet-velox/lineitem[l_partkey#200L, l_suppkey#201L] ParquetScan DataFilters: [isnotnull(l_partkey#200L), isnotnull(l_suppkey#201L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-cl..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(l_partkey), IsNotNull(l_suppkey)], PushedGroupBy: [], ReadSchema: struct<l_partkey:bigint,l_suppkey:bigint> RuntimeFilters: []
: +- *(4) Sort [p_partkey#123L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(p_partkey#123L, 5), ENSURE_REQUIREMENTS, [plan_id=292]
: +- *(3) Filter isnotnull(p_partkey#123L)
: +- *(3) ColumnarToRow
: +- BatchScan parquet file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-classes/tpch-data-parquet-velox/part[p_partkey#123L] ParquetScan DataFilters: [isnotnull(p_partkey#123L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-cl..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(p_partkey)], PushedGroupBy: [], ReadSchema: struct<p_partkey:bigint> RuntimeFilters: []
+- *(8) Sort [ps_suppkey#156L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ps_suppkey#156L, 5), ENSURE_REQUIREMENTS, [plan_id=309]
+- *(7) Filter isnotnull(ps_suppkey#156L)
+- *(7) ColumnarToRow
+- BatchScan parquet file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-classes/tpch-data-parquet-velox/partsupp[ps_suppkey#156L] ParquetScan DataFilters: [isnotnull(ps_suppkey#156L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/root/workspace/apache_1/backends-velox/target/scala-2.12/test-cl..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(ps_suppkey)], PushedGroupBy: [], ReadSchema: struct<ps_suppkey:bigint> RuntimeFilters: []
My understanding is that it has 4 exchanges so 5 stages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in 3.4 plan I saw there were only 2 exchanges, there was no exchange after part table and lineitem table scan for their join. Seems some regression in 3.5 resulting in 2 more exchanges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ayushi-agarwal Do you want to further dig out the plan change? if so, you can open a ticket dedicated for it. Maybe related with data set or some configurations. Let's merge this PR first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will create a ticket to investigate further. Thanks @yma11
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
enable VeloxCacheSuite, VeloxHashJoinSuite
What changes were proposed in this pull request?
Fix some Spark3.5 UTs
How was this patch tested?
CI