Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Result mismatch found in FlushableAgg #6630

Open
jiangjiangtian opened this issue Jul 30, 2024 · 19 comments
Open

[VL] Result mismatch found in FlushableAgg #6630

jiangjiangtian opened this issue Jul 30, 2024 · 19 comments
Labels
bug Something isn't working triage

Comments

@jiangjiangtian
Copy link
Contributor

jiangjiangtian commented Jul 30, 2024

Backend

VL (Velox)

Bug description

I have a sql query that runs in gluten and vanilla spark, its format is as follows:

select count(*) from ((
    select *
    from test1
    where xxx
  )a
  left join
  (
    select col_a, col_b, col_c, col_d, col_e
    from test2
    where xxx
    group by col_a
            ,col_b
            ,col_c
            ,col_d
            ,col_e
  )b
ON a.col1 = b.col1);

I get different number of rows. And I look at the spark ui, I found the reason is that the numbers of rows of the second subquery don't match.
vanilla spark:
image

gluten:
image
image

Actually, I found that some rows are duplicate.
But when I just run the second subquery, I get the right result.
image
image
We can see the plan is different. The second hash aggregation is regular.

Besides, I set spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation to false and I get the right result.
image
image

So I think there might be a bug for flushable hash aggregation or the plan conversion, but I can't find a small SQL to demonstrate the bug.
I'm sorry for not having a small example.

Spark version

3.0

Spark configurations

No response

System information

Velox System Info v0.0.2
Commit: 96712646c63bf4305cca4eaa7dfd26c2179547b1
CMake Version: 3.17.5
System: Linux-3.10.0-862.mt20190308.130.el7.x86_64
Arch: x86_64
CPU Name: Model name: Intel(R) Xeon(R) Platinum 8255C CPU @ 2.50GHz
C++ Compiler: /opt/rh/devtoolset-10/root/usr/bin/c++
C++ Compiler Version: 10.2.1
C Compiler: /opt/rh/devtoolset-10/root/usr/bin/cc
C Compiler Version: 10.2.1
CMake Prefix Path: /usr/local;/usr;/;/usr;/usr/local;/usr/X11R6;/usr/pkg;/opt

Relevant logs

No response

@jiangjiangtian jiangjiangtian added bug Something isn't working triage labels Jul 30, 2024
@jiangjiangtian jiangjiangtian changed the title [VL] result mismatch between gluten and spark [VL] result mismatch of hash aggregation between gluten and spark Jul 30, 2024
@Yohahaha
Copy link
Contributor

could you post Gluten version/commit?

FlushableAgg has non-empty metrics of number of flushed rows, may lead wrong results.

@jiangjiangtian
Copy link
Contributor Author

could you post Gluten version/commit?

FlushableAgg has non-empty metrics of number of flushed rows, may lead wrong results.

Thanks for reply. My Gluten version is v1.2.0-rc1

@Yohahaha
Copy link
Contributor

@jiangjiangtian could you try with spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false?

@jiangjiangtian
Copy link
Contributor Author

jiangjiangtian commented Jul 31, 2024

@jiangjiangtian could you try with spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false?

yes, I have tried. The result is right.

@jiangjiangtian
Copy link
Contributor Author

@Yohahaha I add two screenshots above, you can see that when I set spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation to false, the two hash aggregation is regular. So I think we might need to always let the second hash aggregation be a full aggregation. How do you think?

@jiangjiangtian
Copy link
Contributor Author

@kecookier

@kecookier
Copy link
Contributor

@zhztheplayer Can you help take a look at this problem?

@Yohahaha
Copy link
Contributor

@Yohahaha I add two screenshots above, you can see that when I set spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation to false, the two hash aggregation is regular. So I think we might need to always let the second hash aggregation be a full aggregation. How do you think?

I see the new description.

I have submit a similar issue #4421.

@Yohahaha Yohahaha changed the title [VL] result mismatch of hash aggregation between gluten and spark [VL] result mismatch found in FlushableAgg Jul 31, 2024
@FelixYBW
Copy link
Contributor

#4421 is closed. Is the issue fixed?

@kecookier
Copy link
Contributor

#4421 is closed. Is the issue fixed?

@jiangjiangtian Please check it.

@jiangjiangtian
Copy link
Contributor Author

#4421 is closed. Is the issue fixed?

@jiangjiangtian Please check it.

ok, I will check it.

@kecookier
Copy link
Contributor

@FelixYBW @Yohahaha I'm sure it's not caused by #4421. We tested with 1.2.0-rc1, which already contains this fix .

@Yohahaha
Copy link
Contributor

Yohahaha commented Aug 1, 2024

@FelixYBW @Yohahaha I'm sure it's not caused by #4421. We tested with 1.2.0-rc1, which already contains this fix .

I mean #4421 is similar but not same as current issue.

@FelixYBW
Copy link
Contributor

FelixYBW commented Aug 1, 2024

@kecookier we will take a look

@kecookier
Copy link
Contributor

SQL explain.

== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [table_dst], Map(partition_date -> None), true, false
+- 'Aggregate ['a.col_main_noi__id, 'a.partition_date, 'b.aaa_col_user_id, 'b.ccc_name], ['a.col_main_noi__id AS main_noi__id#103, 'max('a.noi__name) AS noi__name#104, 'max('noi__close_status) AS noi__status#105, 'b.aaa_col_user_id, 'b.ccc_name, 'if(('sum('c.col_bal) > 0), 1, 0) AS is_has_col_bal#106, 'sum('coalesce('c.col_bal, 0)) AS aaa_col_bal#107, 'max('b.ccc_bu_code) AS ccc_bu_code#108, 'max('b.ccc_bu_name) AS ccc_bu_name#109, 'a.partition_date]
   +- 'Join LeftOuter, ('c.col_user_id = 'b.aaa_col_user_id)
      :- 'Join LeftOuter, ('a.noi__id = 'b.dp_noi__id)
      :  :- 'SubqueryAlias a
      :  :  +- 'Project ['noi__id, 'col_main_noi__id, 'noi__name, 'noi__close_status, 'partition_date]
      :  :     +- 'Filter ((('partition_date = 2024-07-28) AND ('partition_chain = dp)) AND (NOT ('noi__close_status = 1) AND (('col_noi__cate1_id = 226) OR ('col_noi__cate2_id = 380))))
      :  :        +- 'UnresolvedRelation [tmp_db, table_c]
      :  +- 'SubqueryAlias b
      :     +- 'Aggregate ['aaa_col_user_id, 'ccc_name, 'ccc_bu_code, 'ccc_bu_name, 'dp_noi__id], ['aaa_col_user_id, 'ccc_name, 'ccc_bu_code, 'ccc_bu_name, 'dp_noi__id]
      :        +- 'Filter (('partition_date = 2024-07-28) AND 'ccc_bu_code IN (2,18,28,41,51,56,59,66))
      :           +- 'UnresolvedRelation [tmp_db, table_b]
      +- 'SubqueryAlias c
         +- 'Aggregate ['col_user_id], ['col_user_id, 'sum('col_bal) AS col_bal#100, 'sum('cash_col_bal) AS cash_col_bal#101, 'sum('redpack_col_bal) AS redpack_col_bal#102]
            +- 'Filter (('partition_date = 2024-07-28) AND 'pool_type IN (0,6))
               +- 'UnresolvedRelation [tmp_db, table_a]

== Analyzed Logical Plan ==

InsertIntoHiveTable ebt_12.table_dst `ebt_12`.`table_dst`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(partition_date -> None), true, false, [main_noi__id, noi__name, noi__status, col_user_id, ccc_name, is_has_col_bal, aaa_col_bal, ccc_bu_code, ccc_bu_name, partition_date]
+- Project [cast(main_noi__id#103L as bigint) AS main_noi__id#403L, cast(noi__name#104 as string) AS noi__name#404, cast(noi__status#105L as int) AS noi__status#405, cast(aaa_col_user_id#205L as bigint) AS col_user_id#406L, ccc_name#206, cast(is_has_col_bal#106 as int) AS is_has_col_bal#407, cast(aaa_col_bal#107 as double) AS aaa_col_bal#408, cast(ccc_bu_code#108 as bigint) AS ccc_bu_code#409L, cast(ccc_bu_name#109 as string) AS ccc_bu_name#410, cast(partition_date#203 as string) AS partition_date#411]
   +- Aggregate [col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], [col_main_noi__id#113L AS main_noi__id#103L, max(noi__name#114) AS noi__name#104, max(noi__close_status#120L) AS noi__status#105L, aaa_col_user_id#205L, ccc_name#206, if ((sum(col_bal#100) > cast(0 as double))) 1 else 0 AS is_has_col_bal#106, sum(coalesce(col_bal#100, cast(0 as double))) AS aaa_col_bal#107, max(ccc_bu_code#207) AS ccc_bu_code#108, max(ccc_bu_name#208) AS ccc_bu_name#109, partition_date#203]
      +- Join LeftOuter, (col_user_id#222L = aaa_col_user_id#205L)
         :- Join LeftOuter, (noi__id#111L = dp_noi__id#209L)
         :  :- SubqueryAlias a
         :  :  +- Project [noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203]
         :  :     +- Filter (((partition_date#203 = 2024-07-28) AND (partition_chain#204 = dp)) AND (NOT (noi__close_status#120L = cast(1 as bigint)) AND ((col_noi__cate1_id#146L = cast(226 as bigint)) OR (col_noi__cate2_id#144L = cast(380 as bigint)))))
         :  :        +- SubqueryAlias spark_catalog.tmp_db.table_c
         :  :           +- Relation tmp_db.table_c[chain#110L,noi__id#111L,xmd_main_noi__id#112L,col_main_noi__id#113L,noi__name#114,noi__phone#115,address#116,barea_id#117L,brand_id#118L,brand_name#119,noi__close_status#120L,latitude#121L,longitude#122L,nation_code#123L,org_id#124L,org_name#125,org_rank#126,org_type#127,main_org_id#128L,main_org_name#129,col_city_id#130L,city_name#131,col_main_city_id#132L,main_city_name#133,... 71 more fields] orc
         :  +- SubqueryAlias b
         :     +- Aggregate [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
         :        +- Filter ((partition_date#221 = 2024-07-28) AND cast(ccc_bu_code#207 as string) IN (cast(2 as string),cast(18 as string),cast(28 as string),cast(41 as string),cast(51 as string),cast(56 as string),cast(59 as string),cast(66 as string)))
         :           +- SubqueryAlias spark_catalog.tmp_db.table_b
         :              +- Relation tmp_db.table_b[aaa_col_user_id#205L,ccc_name#206,ccc_bu_code#207,ccc_bu_name#208,dp_noi__id#209L,col_noi__id#210L,coop_date#211,customer_name#212,src_id#213L,src_name#214,operate_id#215L,operate_name#216,respons_bd_id#217L,respons_charge_name#218,ccc_status#219,last_promote_time#220,partition_date#221] orc
         +- SubqueryAlias c
            +- Aggregate [col_user_id#222L], [col_user_id#222L, sum(col_bal#224) AS col_bal#100, sum(cash_col_bal#225) AS cash_col_bal#101, sum(redpack_col_bal#226) AS redpack_col_bal#102]
               +- Filter ((partition_date#246 = 2024-07-28) AND cast(pool_type#223L as bigint) IN (cast(0 as bigint),cast(6 as bigint)))
                  +- SubqueryAlias spark_catalog.tmp_db.table_a
                     +- Relation tmp_db.table_a[col_user_id#222L,pool_type#223L,col_bal#224,cash_col_bal#225,redpack_col_bal#226,alliance_redpack_col_bal#227,normal_redpack_col_bal#228,free_room_col_bal#229,refund_frozen_cash#230,refund_frozen_redpack#231,recharge_frozen_cash#232,charge_amt#233,incloud_refund_recharge_amt#234,cash_recharge_amt#235,incloud_cash_recharge_amt#236,redpack_recharge_amt#237,incloud_redpack_recharge_amt#238,fst_cash_recharge_time#239,fst_cash_recharge_amt#240,fst_redpack_recharge_time#241,fst_redpack_recharge_amt#242,accu_charge_cnt#243L,accu_cash_recharge_cnt#244L,acc_redpack_recharge_cnt#245L,partition_date#246] orc

== Optimized Logical Plan ==
InsertIntoHiveTable ebt_12.table_dst `ebt_12`.`table_dst`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(partition_date -> None), true, false, [main_noi__id, noi__name, noi__status, col_user_id, ccc_name, is_has_col_bal, aaa_col_bal, ccc_bu_code, ccc_bu_name, partition_date]
+- Aggregate [col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], [col_main_noi__id#113L AS main_noi__id#403L, max(noi__name#114) AS noi__name#404, cast(max(noi__close_status#120L) as int) AS noi__status#405, aaa_col_user_id#205L AS col_user_id#406L, ccc_name#206, if ((sum(col_bal#100) > 0.0)) 1 else 0 AS is_has_col_bal#407, sum(coalesce(col_bal#100, 0.0)) AS aaa_col_bal#408, cast(max(ccc_bu_code#207) as bigint) AS ccc_bu_code#409L, max(ccc_bu_name#208) AS ccc_bu_name#410, partition_date#203 AS partition_date#411]
   +- Project [col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, col_bal#100]
      +- Join LeftOuter, (col_user_id#222L = aaa_col_user_id#205L)
         :- Project [col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208]
         :  +- Join LeftOuter, (noi__id#111L = dp_noi__id#209L)
         :     :- Project [noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203]
         :     :  +- Filter ((((((isnotnull(noi__close_status#120L) AND isnotnull(partition_date#203)) AND isnotnull(partition_chain#204)) AND (partition_date#203 = 2024-07-28)) AND (partition_chain#204 = dp)) AND NOT (noi__close_status#120L = 1)) AND ((col_noi__cate1_id#146L = 226) OR (col_noi__cate2_id#144L = 380)))
         :     :     +- Relation tmp_db.table_c[chain#110L,noi__id#111L,xmd_main_noi__id#112L,col_main_noi__id#113L,noi__name#114,noi__phone#115,address#116,barea_id#117L,brand_id#118L,brand_name#119,noi__close_status#120L,latitude#121L,longitude#122L,nation_code#123L,org_id#124L,org_name#125,org_rank#126,org_type#127,main_org_id#128L,main_org_name#129,col_city_id#130L,city_name#131,col_main_city_id#132L,main_city_name#133,... 71 more fields] orc
         :     +- Aggregate [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
         :        +- Project [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
         :           +- Filter (((isnotnull(partition_date#221) AND (partition_date#221 = 2024-07-28)) AND ccc_bu_code#207 IN (2,18,28,41,51,56,59,66)) AND isnotnull(dp_noi__id#209L))
         :              +- Relation tmp_db.table_b[aaa_col_user_id#205L,ccc_name#206,ccc_bu_code#207,ccc_bu_name#208,dp_noi__id#209L,col_noi__id#210L,coop_date#211,customer_name#212,src_id#213L,src_name#214,operate_id#215L,operate_name#216,respons_bd_id#217L,respons_charge_name#218,ccc_status#219,last_promote_time#220,partition_date#221] orc
         +- Aggregate [col_user_id#222L], [col_user_id#222L, sum(col_bal#224) AS col_bal#100]
            +- Project [col_user_id#222L, col_bal#224]
               +- Filter (((isnotnull(partition_date#246) AND (partition_date#246 = 2024-07-28)) AND pool_type#223L IN (0,6)) AND isnotnull(col_user_id#222L))
                  +- Relation tmp_db.table_a[col_user_id#222L,pool_type#223L,col_bal#224,cash_col_bal#225,redpack_col_bal#226,alliance_redpack_col_bal#227,normal_redpack_col_bal#228,free_room_col_bal#229,refund_frozen_cash#230,refund_frozen_redpack#231,recharge_frozen_cash#232,charge_amt#233,incloud_refund_recharge_amt#234,cash_recharge_amt#235,incloud_cash_recharge_amt#236,redpack_recharge_amt#237,incloud_redpack_recharge_amt#238,fst_cash_recharge_time#239,fst_cash_recharge_amt#240,fst_redpack_recharge_time#241,fst_redpack_recharge_amt#242,accu_charge_cnt#243L,accu_cash_recharge_cnt#244L,acc_redpack_recharge_cnt#245L,partition_date#246] orc

== Physical Plan ==
Execute InsertIntoHiveTable ebt_12.table_dst `ebt_12`.`table_dst`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(partition_date -> None), true, false, [main_noi__id, noi__name, noi__status, col_user_id, ccc_name, is_has_col_bal, aaa_col_bal, ccc_bu_code, ccc_bu_name, partition_date]
+- AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      VeloxColumnarToRowExec
      +- ^(6) ProjectExecTransformer [col_main_noi__id#113L AS main_noi__id#403L, max(noi__name#114)#250 AS noi__name#404, cast(max(noi__close_status#120L)#251L as int) AS noi__status#405, aaa_col_user_id#205L AS col_user_id#406L, ccc_name#206, if ((sum(col_bal#100)#252 > 0.0)) 1 else 0 AS is_has_col_bal#407, sum(coalesce(col_bal#100, 0.0))#255 AS aaa_col_bal#408, cast(max(ccc_bu_code#207)#253 as bigint) AS ccc_bu_code#409L, max(ccc_bu_name#208)#254 AS ccc_bu_name#410, partition_date#203 AS partition_date#411]
         +- ^(6) HashAggregateTransformer(keys=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], functions=[max(noi__name#114), max(noi__close_status#120L), sum(col_bal#100), sum(coalesce(col_bal#100, 0.0)), max(ccc_bu_code#207), max(ccc_bu_name#208)], isStreamingAgg=false, output=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, max(noi__name#114)#250, max(noi__close_status#120L)#251L, sum(col_bal#100)#252, sum(coalesce(col_bal#100, 0.0))#255, max(ccc_bu_code#207)#253, max(ccc_bu_name#208)#254])
            +- ^(6) HashAggregateTransformer(keys=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], functions=[partial_max(noi__name#114), partial_max(noi__close_status#120L), partial_sum(col_bal#100), partial_sum(_pre_0#433), partial_max(ccc_bu_code#207), partial_max(ccc_bu_name#208)], isStreamingAgg=false, output=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, max#418, max#419L, sum#420, sum#421, max#422, max#423])
               +- ^(6) ProjectExecTransformer [ccc_bu_name#208, noi__name#114, ccc_bu_code#207, noi__close_status#120L, col_main_noi__id#113L, partition_date#203, ccc_name#206, aaa_col_user_id#205L, col_bal#100, coalesce(col_bal#100, 0.0) AS _pre_0#433]
                  +- ^(6) ShuffledHashJoinExecTransformer [aaa_col_user_id#205L], [col_user_id#222L], LeftOuter, BuildRight
                     :- ^(6) InputIteratorTransformer[col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208]
                     :  +- CustomShuffleReader coalesced
                     :     +- ShuffleQueryStage 4
                     :        +- ColumnarExchange hashpartitioning(aaa_col_user_id#205L, 2000), ENSURE_REQUIREMENTS, [col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208], [id=#1555], [id=#1555], [OUTPUT] List(col_main_noi__id:LongType, noi__name:StringType, noi__close_status:LongType, partition_date:StringType, aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType), [OUTPUT] List(col_main_noi__id:LongType, noi__name:StringType, noi__close_status:LongType, partition_date:StringType, aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType)
                     :           +- VeloxAppendBatches 3276
                     :              +- ^(5) ProjectExecTransformer [hash(aaa_col_user_id#205L, 42) AS hash_partition_key#432, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208]
                     :                 +- ^(5) ShuffledHashJoinExecTransformer [noi__id#111L], [dp_noi__id#209L], LeftOuter, BuildRight
                     :                    :- ^(5) InputIteratorTransformer[noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203]
                     :                    :  +- CustomShuffleReader coalesced
                     :                    :     +- ShuffleQueryStage 0
                     :                    :        +- ColumnarExchange hashpartitioning(noi__id#111L, 2000), ENSURE_REQUIREMENTS, [noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203], [id=#1118], [id=#1118], [OUTPUT] List(noi__id:LongType, col_main_noi__id:LongType, noi__name:StringType, noi__close_status:LongType, partition_date:StringType), [OUTPUT] List(noi__id:LongType, col_main_noi__id:LongType, noi__name:StringType, noi__close_status:LongType, partition_date:StringType)
                     :                    :           +- VeloxAppendBatches 3276
                     :                    :              +- ^(1) ProjectExecTransformer [hash(noi__id#111L, 42) AS hash_partition_key#427, noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203]
                     :                    :                 +- ^(1) FilterExecTransformer ((isnotnull(noi__close_status#120L) AND NOT (noi__close_status#120L = 1)) AND ((col_noi__cate1_id#146L = 226) OR (col_noi__cate2_id#144L = 380)))
                     :                    :                    +- ^(1) NativeFileScan orc tmp_db.table_c[noi__id#111L,col_main_noi__id#113L,noi__name#114,noi__close_status#120L,col_noi__cate2_id#144L,col_noi__cate1_id#146L,partition_date#203,partition_chain#204] Batched: true, DataFilters: [isnotnull(noi__close_status#120L), NOT (noi__close_status#120L = 1), ((col_noi__cate1_id#146L = 226)..., Format: ORC, Location: InMemoryFileIndex(1 paths)[viewfs://hadoop-xxx, PartitionFilters: [isnotnull(partition_date#203), isnotnull(partition_chain#204), (partition_date#203 = 2024-07-28)..., PushedFilters: [IsNotNull(noi__close_status), Not(EqualTo(noi__close_status,1)), Or(EqualTo(col_noi__cate1_id,226),E..., ReadSchema: struct<noi__id:bigint,col_main_noi__id:bigint,noi__name:string,noi__close_status:bigint,col_noi__cate2_i...
                     :                    +- ^(5) InputIteratorTransformer[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                     :                       +- CustomShuffleReader coalesced
                     :                          +- ShuffleQueryStage 3
                     :                             +- ColumnarExchange hashpartitioning(dp_noi__id#209L, 2000), ENSURE_REQUIREMENTS, [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], [id=#1373], [id=#1373], [OUTPUT] List(aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType, dp_noi__id:LongType), [OUTPUT] List(aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType, dp_noi__id:LongType)
                     :                                +- VeloxAppendBatches 3276
                     :                                   +- ^(4) ProjectExecTransformer [hash(dp_noi__id#209L, 42) AS hash_partition_key#431, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                     :                                      +- ^(4) FlushableHashAggregateTransformer(keys=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], functions=[], isStreamingAgg=false, output=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L])
                     :                                         +- ^(4) InputIteratorTransformer[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                     :                                            +- CustomShuffleReader coalesced
                     :                                               +- ShuffleQueryStage 1
                     :                                                  +- ColumnarExchange hashpartitioning(aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L, 2000), ENSURE_REQUIREMENTS, [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], [id=#1169], [id=#1169], [OUTPUT] List(aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType, dp_noi__id:LongType), [OUTPUT] List(aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType, dp_noi__id:LongType)
                     :                                                     +- VeloxAppendBatches 3276
                     :                                                        +- ^(2) ProjectExecTransformer [hash(aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L, 42) AS hash_partition_key#428, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                     :                                                           +- ^(2) FlushableHashAggregateTransformer(keys=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], functions=[], isStreamingAgg=false, output=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L])
                     :                                                              +- ^(2) ProjectExecTransformer [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                     :                                                                 +- ^(2) FilterExecTransformer (ccc_bu_code#207 IN (2,18,28,41,51,56,59,66) AND isnotnull(dp_noi__id#209L))
                     :                                                                    +- ^(2) NativeFileScan orc tmp_db.table_b[aaa_col_user_id#205L,ccc_name#206,ccc_bu_code#207,ccc_bu_name#208,dp_noi__id#209L,partition_date#221] Batched: true, DataFilters: [ccc_bu_code#207 IN (2,18,28,41,51,56,59,66), isnotnull(dp_noi__id#209L)], Format: ORC, Location: InMemoryFileIndex(1 paths)[viewfs://hadoop-xxx/..., PartitionFilters: [isnotnull(partition_date#221), (partition_date#221 = 2024-07-28)], PushedFilters: [In(ccc_bu_code, [2,18,28,41,51,56,59,66]), IsNotNull(dp_noi__id)], ReadSchema: struct<aaa_col_user_id:bigint,ccc_name:string,ccc_bu_code:string,ccc_bu_name:string,dp...
                     +- ^(6) HashAggregateTransformer(keys=[col_user_id#222L], functions=[sum(col_bal#224)], isStreamingAgg=false, output=[col_user_id#222L, col_bal#100])
                        +- ^(6) InputIteratorTransformer[col_user_id#222L, sum#425]
                           +- CustomShuffleReader coalesced
                              +- ShuffleQueryStage 2
                                 +- ColumnarExchange hashpartitioning(col_user_id#222L, 2000), ENSURE_REQUIREMENTS, [col_user_id#222L, sum#425], [id=#1235], [id=#1235], [OUTPUT] List(col_user_id:LongType, sum:DoubleType), [OUTPUT] List(col_user_id:LongType, sum:DoubleType)
                                    +- VeloxAppendBatches 3276
                                       +- ^(3) ProjectExecTransformer [hash(col_user_id#222L, 42) AS hash_partition_key#430, col_user_id#222L, sum#425]
                                          +- ^(3) FlushableHashAggregateTransformer(keys=[col_user_id#222L], functions=[partial_sum(col_bal#224)], isStreamingAgg=false, output=[col_user_id#222L, sum#425])
                                             +- ^(3) ProjectExecTransformer [col_user_id#222L, col_bal#224]
                                                +- ^(3) FilterExecTransformer (pool_type#223L IN (0,6) AND isnotnull(col_user_id#222L))
                                                   +- ^(3) NativeFileScan orc tmp_db.table_a[col_user_id#222L,pool_type#223L,col_bal#224,partition_date#246] Batched: true, DataFilters: [pool_type#223L IN (0,6), isnotnull(col_user_id#222L)], Format: ORC, Location: InMemoryFileIndex(1 paths)[viewfs://hadoop-xxx/..., PartitionFilters: [isnotnull(partition_date#246), (partition_date#246 = 2024-07-28)], PushedFilters: [In(pool_type, [0,6]), IsNotNull(col_user_id)], ReadSchema: struct<col_user_id:bigint,pool_type:bigint,col_bal:double>
   +- == Initial Plan ==
      SortAggregate(key=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], functions=[max(noi__name#114), max(noi__close_status#120L), sum(col_bal#100), sum(coalesce(col_bal#100, 0.0)), max(ccc_bu_code#207), max(ccc_bu_name#208)], output=[main_noi__id#403L, noi__name#404, noi__status#405, col_user_id#406L, ccc_name#206, is_has_col_bal#407, aaa_col_bal#408, ccc_bu_code#409L, ccc_bu_name#410, partition_date#411])
      +- SortAggregate(key=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], functions=[partial_max(noi__name#114), partial_max(noi__close_status#120L), partial_sum(col_bal#100), partial_sum(coalesce(col_bal#100, 0.0)), partial_max(ccc_bu_code#207), partial_max(ccc_bu_name#208)], output=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, max#418, max#419L, sum#420, sum#421, max#422, max#423])
         +- Sort [col_main_noi__id#113L ASC NULLS FIRST, partition_date#203 ASC NULLS FIRST, aaa_col_user_id#205L ASC NULLS FIRST, ccc_name#206 ASC NULLS FIRST], false, 0
            +- Project [col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, col_bal#100]
               +- SortMergeJoin [aaa_col_user_id#205L], [col_user_id#222L], LeftOuter
                  :- Sort [aaa_col_user_id#205L ASC NULLS FIRST], false, 0
                  :  +- Exchange hashpartitioning(aaa_col_user_id#205L, 2000), ENSURE_REQUIREMENTS, [id=#1039]
                  :     +- Project [col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208]
                  :        +- SortMergeJoin [noi__id#111L], [dp_noi__id#209L], LeftOuter
                  :           :- Sort [noi__id#111L ASC NULLS FIRST], false, 0
                  :           :  +- Exchange hashpartitioning(noi__id#111L, 2000), ENSURE_REQUIREMENTS, [id=#1029]
                  :           :     +- Project [noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203]
                  :           :        +- Filter ((isnotnull(noi__close_status#120L) AND NOT (noi__close_status#120L = 1)) AND ((col_noi__cate1_id#146L = 226) OR (col_noi__cate2_id#144L = 380)))
                  :           :           +- FileScan orc tmp_db.table_c[noi__id#111L,col_main_noi__id#113L,noi__name#114,noi__close_status#120L,col_noi__cate2_id#144L,col_noi__cate1_id#146L,partition_date#203,partition_chain#204] Batched: true, DataFilters: [isnotnull(noi__close_status#120L), NOT (noi__close_status#120L = 1), ((col_noi__cate1_id#146L = 226)..., Format: ORC, Location: InMemoryFileIndex[viewfs://hadoop-xxx/..., PartitionFilters: [isnotnull(partition_date#203), isnotnull(partition_chain#204), (partition_date#203 = 2024-07-28)..., PushedFilters: [IsNotNull(noi__close_status), Not(EqualTo(noi__close_status,1)), Or(EqualTo(col_noi__cate1_id,226),E..., ReadSchema: struct<noi__id:bigint,col_main_noi__id:bigint,noi__name:string,noi__close_status:bigint,col_noi__cate2_i...
                  :           +- Sort [dp_noi__id#209L ASC NULLS FIRST], false, 0
                  :              +- Exchange hashpartitioning(dp_noi__id#209L, 2000), ENSURE_REQUIREMENTS, [id=#1030]
                  :                 +- HashAggregate(keys=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], functions=[], output=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L])
                  :                    +- Exchange hashpartitioning(aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L, 2000), ENSURE_REQUIREMENTS, [id=#1025]
                  :                       +- HashAggregate(keys=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], functions=[], output=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L])
                  :                          +- Project [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                  :                             +- Filter (ccc_bu_code#207 IN (2,18,28,41,51,56,59,66) AND isnotnull(dp_noi__id#209L))
                  :                                +- FileScan orc tmp_db.table_b[aaa_col_user_id#205L,ccc_name#206,ccc_bu_code#207,ccc_bu_name#208,dp_noi__id#209L,partition_date#221] Batched: true, DataFilters: [ccc_bu_code#207 IN (2,18,28,41,51,56,59,66), isnotnull(dp_noi__id#209L)], Format: ORC, Location: InMemoryFileIndex[viewfs://hadoop-xxx/..., PartitionFilters: [isnotnull(partition_date#221), (partition_date#221 = 2024-07-28)], PushedFilters: [In(ccc_bu_code, [2,18,28,41,51,56,59,66]), IsNotNull(dp_noi__id)], ReadSchema: struct<aaa_col_user_id:bigint,ccc_name:string,ccc_bu_code:string,ccc_bu_name:string,dp...
                  +- Sort [col_user_id#222L ASC NULLS FIRST], false, 0
                     +- HashAggregate(keys=[col_user_id#222L], functions=[sum(col_bal#224)], output=[col_user_id#222L, col_bal#100])
                        +- Exchange hashpartitioning(col_user_id#222L, 2000), ENSURE_REQUIREMENTS, [id=#1035]
                           +- HashAggregate(keys=[col_user_id#222L], functions=[partial_sum(col_bal#224)], output=[col_user_id#222L, sum#425])
                              +- Project [col_user_id#222L, col_bal#224]
                                 +- Filter (pool_type#223L IN (0,6) AND isnotnull(col_user_id#222L))
                                    +- FileScan orc tmp_db.table_a[col_user_id#222L,pool_type#223L,col_bal#224,partition_date#246] Batched: true, DataFilters: [pool_type#223L IN (0,6), isnotnull(col_user_id#222L)], Format: ORC, Location: InMemoryFileIndex[viewfs://hadoop-xxx/_..., PartitionFilters: [isnotnull(partition_date#246), (partition_date#246 = 2024-07-28)], PushedFilters: [In(pool_type, [0,6]), IsNotNull(col_user_id)], ReadSchema: struct<col_user_id:bigint,pool_type:bigint,col_bal:double>

@FelixYBW
Copy link
Contributor

FelixYBW commented Aug 6, 2024

SQL explain.

@PHILO-HE Can you take a look?

@zhztheplayer zhztheplayer changed the title [VL] result mismatch found in FlushableAgg [VL] Result mismatch found in FlushableAgg Aug 8, 2024
@zhztheplayer
Copy link
Member

zhztheplayer commented Aug 8, 2024

I use Gluten + Spark 3.4 to run a simple distinct doesn't seem to repeat the issue

image

It is probably related to Spark 3.0 / 3.1 which have CustomShuffleReaderExec in code though I am not sure. We should do further investigations.

@zhztheplayer
Copy link
Member

I managed to get a more similar case and still not reproduced the issue.

# Generate partitioned data:
tools/gluten-it/sbin/gluten-it.sh data-gen-only --local-cluster --auto-cluster-resource -s=100.0 --gen-partitioned-data
tools/gluten-it/ sbin/gluten-it.sh spark-shell --local-cluster --auto-cluster-resource -s=100.0 --data-gen=skip

# In opened Spark shell, run:
spark sql "set spark.sql.adaptive.coalescePartitions.minPartitionSize=500m" show # force AQEShuffleReadExec
spark sql "set spark.sql.autoBroadcastJoinThreshold=-1" show # disable bhj
val df = spark sql "select * from (select distinct l_orderkey,l_partkey from lineitem) a inner join (select l_orderkey from lineitem limit 10) b on a.l_orderkey = b.l_orderkey limit 10" # run query
df collect # execute
df explain # explain

And the plan explained is fine:

cbe10c4162ef01d4ca4868e387d04ff

In debugger, AQEShuffleReadExec has correct outputPartitioning:

92407d1a561fc31dc960d84eee93028

@NEUpanning
Copy link
Contributor

To reproduce this issue, ensure that the outputPartitioning of AQEShuffleReadExec is UnknownPartitioning. This means that the child (AQEShuffleReadExec) output is NOT partitioned by aggregation keys. Under these conditions, the final aggregation will be transformed into FlushableHashAggregate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

No branches or pull requests

6 participants