Skip to content

Commit

Permalink
Added SortedOperation pattern to match *some* definitely sorted opera…
Browse files Browse the repository at this point in the history
…tions and avoid some sorting cost in HiveComparisonTest.
  • Loading branch information
liancheng committed May 28, 2014
1 parent 6d1c642 commit bf0e7dc
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,16 @@ object Unions {
case other => other :: Nil
}
}

/**
* A pattern that matches (some) sorted operations and returns corresponding sorting orders.
* Currently operations matched by this pattern are guaranteed to be sorted, but not all sorted
* operations are matched by this pattern.
*/
object SortedOperation {
// TODO (lian) detect more sorted operations
def unapply(plan: LogicalPlan): Option[Seq[SortOrder]] = plan match {
case FilteredOperation(_, Sort(order, _)) => Some(order)
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ case class Aggregate(
*/
@transient
private[this] lazy val resultMap =
(computedAggregates.map { agg => agg.unbound -> agg.resultAttribute} ++ namedGroups).toMap
(computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap

/**
* Substituted version of aggregateExpressions expressions which are used to compute final
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.sql.hive.execution

import java.io._

import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}

import org.apache.spark.sql.Logging
import org.apache.spark.sql.catalyst.planning.SortedOperation
import org.apache.spark.sql.catalyst.plans.logical.{ExplainCommand, NativeCommand}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.Sort
import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
import org.apache.spark.sql.hive.test.TestHive

/**
Expand Down Expand Up @@ -131,14 +132,8 @@ abstract class HiveComparisonTest
val orderedAnswer = hiveQuery.logical match {
// Clean out non-deterministic time schema info.
case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
case _: ExplainCommand => answer
case _ =>
// TODO: Really we only care about the final total ordering here...
val isOrdered = hiveQuery.executedPlan.collect {
case s @ Sort(_, global, _) if global => s
}.nonEmpty
// If the query results aren't sorted, then sort them to ensure deterministic answers.
if (!isOrdered) answer.sorted else answer
case _: ExplainCommand | SortedOperation(_) => answer
case _ => answer.sorted
}
orderedAnswer.map(cleanPaths)
}
Expand All @@ -161,7 +156,7 @@ abstract class HiveComparisonTest
"minFileSize"
)
protected def nonDeterministicLine(line: String) =
nonDeterministicLineIndicators.map(line contains _).reduceLeft(_||_)
nonDeterministicLineIndicators.exists(line contains _)

/**
* Removes non-deterministic paths from `str` so cached answers will compare correctly.
Expand Down

0 comments on commit bf0e7dc

Please sign in to comment.