diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index c2666b2ab9129..f4d016cb96711 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -87,7 +87,7 @@ object UnsupportedOperationChecker { * data. */ def containsCompleteData(subplan: LogicalPlan): Boolean = { - val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + val aggs = subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } // Either the subplan has no streaming source, or it has aggregation with Complete mode !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 58e69f9ebea05..dcdb1ae089328 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -199,12 +199,17 @@ class UnsupportedOperationsSuite extends SparkFunSuite { _.intersect(_), streamStreamSupported = false) - // Sort: supported only on batch subplans and on aggregation + complete output mode + // Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _)) assertSupportedInStreamingPlan( - "sort - sort over aggregated data in Complete output mode", + "sort - sort after aggregation in Complete output mode", streamRelation.groupBy()(Count("*")).sortBy(), Complete) + assertNotSupportedInStreamingPlan( + "sort - sort before aggregation in Complete output mode", + streamRelation.sortBy().groupBy()(Count("*")), + Complete, + Seq("sort", "aggregat", "complete")) assertNotSupportedInStreamingPlan( "sort - sort over aggregated data in Update output mode", streamRelation.groupBy()(Count("*")).sortBy(),