Skip to content

Commit

Permalink
Improve column pruning in the optimizer.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Apr 10, 2014
1 parent eb5f2b6 commit 2f4e7b9
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,47 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
Batch("Filter Pushdown", Once,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin) :: Nil
PushPredicateThroughInnerJoin,
ColumnPruning) :: Nil
}

/**
* Attempts to eliminate the reading of unneeded columns from the query plan using the following
* transformations:
*
* - Inserting Projections beneath the following operators:
* - Aggregate
* - Project <- Join
* - Collapse adjacent projections, performing alias substitution.
*/
object ColumnPruning extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
a.copy(child = Project(a.references.toSeq, child))

case Project(projectList, Join(left, right, joinType, condition)) =>
val allReferences: Set[Attribute] =
projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
def prunedChild(c: LogicalPlan) =
if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
} else {
c
}

Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))

case Project(project1, Project(project2, child)) =>
val aliasMap = project2.collect {
case a @ Alias(e, _) => (a.toAttribute: Expression, a)
}.toMap
// TODO: Fix TransformBase.
val substitutedProjection = project1.map(_.transform {
case a if aliasMap.contains(a) => aliasMap(a)
}).asInstanceOf[Seq[NamedExpression]]

Project(substitutedProjection, child)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ case class Aggregate(
extends UnaryNode {

def output = aggregateExpressions.map(_.toAttribute)
def references = child.references
def references = (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet
}

case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNode {
Expand Down

0 comments on commit 2f4e7b9

Please sign in to comment.