You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When aggregating by a column that does not exist, spark will convert it to aggregate by Literal value of null, Spark will do column Pruning , and the Project operator in the optimized plan will not select any columns. so In the logic of vanilla spark, spark’s project operator returns N rows with no columns. In ch backend, all rows will be discarded in ExpressionActions and an empty data set will be returned, resulting in inconsistencies in the aggregated results of the agg operator
A clear and concise description of what works not as it is supposed to.
How to reproduce
the spark unit test of "SPARK-34713: group by CreateStruct with ExtractValue" in DataFrameAggregateSuite can reproduce it
test("SPARK-347133: group by CreateStruct with ExtractValue") {
spark.conf.set("spark.sql.codegen.wholeStage", false)
val nonStringMapDF = Seq(Tuple1(Map(1 -> 1))).toDF("col")
// Spark implicit casts string literal "a" to int to match the key type.
checkAnswer(nonStringMapDF.groupBy(struct($"col.a")).count().select("count"), Row(1))
val arrayDF = Seq(Tuple1(Seq(1))).toDF("col")
val e = interceptAnalysisException
assert(e.message.contains("requires integral type"))
}
The text was updated successfully, but these errors were encountered:
Describe the issue
When aggregating by a column that does not exist, spark will convert it to aggregate by Literal value of null, Spark will do column Pruning , and the Project operator in the optimized plan will not select any columns. so In the logic of vanilla spark, spark’s project operator returns N rows with no columns. In ch backend, all rows will be discarded in ExpressionActions and an empty data set will be returned, resulting in inconsistencies in the aggregated results of the agg operator
the Physical Plan :
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
HashAggregate(keys=[[null]#18], functions=[count(1)], output=[count#10L])
+- CHNativeColumnarToRow
+- ShuffleQueryStage 0
+- ColumnarExchangeAdaptor hashpartitioning([null]#18, 1), ENSURE_REQUIREMENTS, false, [plan_id=145], [id=#145], [OUTPUT] Vector([null]:StructType(StructField(a,IntegerType,true)), count:LongType), [OUTPUT] Vector([null]:StructType(StructField(a,IntegerType,true)), count:LongType)
+- RowToCHNativeColumnar
+- HashAggregate(keys=[[null] AS [null]#18], functions=[partial_count(1)], output=[[null]#18, count#20L])
+- CHNativeColumnarToRow
+- *(3) ProjectExecTransformer
+- RowToCHNativeColumnar
+- LocalTableScan [_1#1]
A clear and concise description of what works not as it is supposed to.
How to reproduce
the spark unit test of "SPARK-34713: group by CreateStruct with ExtractValue" in DataFrameAggregateSuite can reproduce it
test("SPARK-347133: group by CreateStruct with ExtractValue") {
spark.conf.set("spark.sql.codegen.wholeStage", false)
val nonStringMapDF = Seq(Tuple1(Map(1 -> 1))).toDF("col")
// Spark implicit casts string literal "a" to int to match the key type.
checkAnswer(nonStringMapDF.groupBy(struct($"col.a")).count().select("count"), Row(1))
val arrayDF = Seq(Tuple1(Seq(1))).toDF("col")
val e = interceptAnalysisException
assert(e.message.contains("requires integral type"))
}
The text was updated successfully, but these errors were encountered: