Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sql] Rename execution/aggregates.scala Aggregate.scala, and added a bunch of private[this] to variables. #348

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ case class Aggregate(

// HACK: Generators don't correctly preserve their output through serializations so we grab
// out child's output attributes statically here.
val childOutput = child.output
private[this] val childOutput = child.output

def output = aggregateExpressions.map(_.toAttribute)
override def output = aggregateExpressions.map(_.toAttribute)

/**
* An aggregate that needs to be computed for each row in a group.
Expand All @@ -75,7 +75,7 @@ case class Aggregate(

/** A list of aggregates that need to be computed for each group. */
@transient
lazy val computedAggregates = aggregateExpressions.flatMap { agg =>
private[this] lazy val computedAggregates = aggregateExpressions.flatMap { agg =>
agg.collect {
case a: AggregateExpression =>
ComputedAggregate(
Expand All @@ -87,10 +87,10 @@ case class Aggregate(

/** The schema of the result of all aggregate evaluations */
@transient
lazy val computedSchema = computedAggregates.map(_.resultAttribute)
private[this] lazy val computedSchema = computedAggregates.map(_.resultAttribute)

/** Creates a new aggregate buffer for a group. */
def newAggregateBuffer(): Array[AggregateFunction] = {
private[this] def newAggregateBuffer(): Array[AggregateFunction] = {
val buffer = new Array[AggregateFunction](computedAggregates.length)
var i = 0
while (i < computedAggregates.length) {
Expand All @@ -102,7 +102,7 @@ case class Aggregate(

/** Named attributes used to substitute grouping attributes into the final result. */
@transient
lazy val namedGroups = groupingExpressions.map {
private[this] lazy val namedGroups = groupingExpressions.map {
case ne: NamedExpression => ne -> ne.toAttribute
case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute
}
Expand All @@ -112,21 +112,21 @@ case class Aggregate(
* expression into the final result expression.
*/
@transient
lazy val resultMap =
private[this] lazy val resultMap =
(computedAggregates.map { agg => agg.unbound -> agg.resultAttribute} ++ namedGroups).toMap

/**
* Substituted version of aggregateExpressions expressions which are used to compute final
* output rows given a group and the result of all aggregate computations.
*/
@transient
lazy val resultExpressions = aggregateExpressions.map { agg =>
private[this] lazy val resultExpressions = aggregateExpressions.map { agg =>
agg.transform {
case e: Expression if resultMap.contains(e) => resultMap(e)
}
}

def execute() = attachTree(this, "execute") {
override def execute() = attachTree(this, "execute") {
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
Expand Down