Skip to content

Commit

Permalink
Rename execution/aggregates.scala Aggregate.scala, and added a bunch …
Browse files Browse the repository at this point in the history
…of private[this] to variables.
  • Loading branch information
rxin committed Apr 7, 2014
1 parent 0307db0 commit f4bc36f
Showing 1 changed file with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ case class Aggregate(

// HACK: Generators don't correctly preserve their output through serializations so we grab
// out child's output attributes statically here.
val childOutput = child.output
private[this] val childOutput = child.output

def output = aggregateExpressions.map(_.toAttribute)
override def output = aggregateExpressions.map(_.toAttribute)

/**
* An aggregate that needs to be computed for each row in a group.
Expand All @@ -75,7 +75,7 @@ case class Aggregate(

/** A list of aggregates that need to be computed for each group. */
@transient
lazy val computedAggregates = aggregateExpressions.flatMap { agg =>
private[this] lazy val computedAggregates = aggregateExpressions.flatMap { agg =>
agg.collect {
case a: AggregateExpression =>
ComputedAggregate(
Expand All @@ -87,10 +87,10 @@ case class Aggregate(

/** The schema of the result of all aggregate evaluations */
@transient
lazy val computedSchema = computedAggregates.map(_.resultAttribute)
private[this] lazy val computedSchema = computedAggregates.map(_.resultAttribute)

/** Creates a new aggregate buffer for a group. */
def newAggregateBuffer(): Array[AggregateFunction] = {
private[this] def newAggregateBuffer(): Array[AggregateFunction] = {
val buffer = new Array[AggregateFunction](computedAggregates.length)
var i = 0
while (i < computedAggregates.length) {
Expand All @@ -102,7 +102,7 @@ case class Aggregate(

/** Named attributes used to substitute grouping attributes into the final result. */
@transient
lazy val namedGroups = groupingExpressions.map {
private[this] lazy val namedGroups = groupingExpressions.map {
case ne: NamedExpression => ne -> ne.toAttribute
case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute
}
Expand All @@ -112,21 +112,21 @@ case class Aggregate(
* expression into the final result expression.
*/
@transient
lazy val resultMap =
private[this] lazy val resultMap =
(computedAggregates.map { agg => agg.unbound -> agg.resultAttribute} ++ namedGroups).toMap

/**
* Substituted version of aggregateExpressions expressions which are used to compute final
* output rows given a group and the result of all aggregate computations.
*/
@transient
lazy val resultExpressions = aggregateExpressions.map { agg =>
private[this] lazy val resultExpressions = aggregateExpressions.map { agg =>
agg.transform {
case e: Expression if resultMap.contains(e) => resultMap(e)
}
}

def execute() = attachTree(this, "execute") {
override def execute() = attachTree(this, "execute") {
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
Expand Down

0 comments on commit f4bc36f

Please sign in to comment.