Skip to content

Commit

Permalink
Fixed bug
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Jan 20, 2017
1 parent 0c92318 commit c5c2e79
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ object UnsupportedOperationChecker {
* data.
*/
def containsCompleteData(subplan: LogicalPlan): Boolean = {
val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
val aggs = subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
// Either the subplan has no streaming source, or it has aggregation with Complete mode
!subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,17 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.intersect(_),
streamStreamSupported = false)

// Sort: supported only on batch subplans and on aggregation + complete output mode
// Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode
testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
assertSupportedInStreamingPlan(
"sort - sort over aggregated data in Complete output mode",
"sort - sort after aggregation in Complete output mode",
streamRelation.groupBy()(Count("*")).sortBy(),
Complete)
assertNotSupportedInStreamingPlan(
"sort - sort before aggregation in Complete output mode",
streamRelation.sortBy().groupBy()(Count("*")),
Complete,
Seq("sort", "aggregat", "complete"))
assertNotSupportedInStreamingPlan(
"sort - sort over aggregated data in Update output mode",
streamRelation.groupBy()(Count("*")).sortBy(),
Expand Down

0 comments on commit c5c2e79

Please sign in to comment.