Skip to content

Commit

Permalink
[SPARK-22932][SQL] Refactor AnalysisContext
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
Add a `reset` function to ensure the state in `AnalysisContext ` is per-query.

## How was this patch tested?
The existing test cases

Author: gatorsmile <[email protected]>

Closes #20127 from gatorsmile/refactorAnalysisContext.
  • Loading branch information
gatorsmile committed Jan 3, 2018
1 parent b962488 commit 27c949d
Showing 1 changed file with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ object SimpleAnalyzer extends Analyzer(
/**
* Provides a way to keep state during the analysis, this enables us to decouple the concerns
* of analysis environment from the catalog.
* The state that is kept here is per-query.
*
* Note this is thread local.
*
Expand All @@ -70,6 +71,8 @@ object AnalysisContext {
}

def get: AnalysisContext = value.get()
def reset(): Unit = value.remove()

private def set(context: AnalysisContext): Unit = value.set(context)

def withAnalysisContext[A](database: Option[String])(f: => A): A = {
Expand All @@ -95,6 +98,17 @@ class Analyzer(
this(catalog, conf, conf.optimizerMaxIterations)
}

override def execute(plan: LogicalPlan): LogicalPlan = {
AnalysisContext.reset()
try {
executeSameContext(plan)
} finally {
AnalysisContext.reset()
}
}

private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan)

def resolver: Resolver = conf.resolver

protected val fixedPoint = FixedPoint(maxIterations)
Expand Down Expand Up @@ -176,7 +190,7 @@ class Analyzer(
case With(child, relations) =>
substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
case (resolved, (name, relation)) =>
resolved :+ name -> execute(substituteCTE(relation, resolved))
resolved :+ name -> executeSameContext(substituteCTE(relation, resolved))
})
case other => other
}
Expand Down Expand Up @@ -600,7 +614,7 @@ class Analyzer(
"avoid errors. Increase the value of spark.sql.view.maxNestedViewDepth to work " +
"aroud this.")
}
execute(child)
executeSameContext(child)
}
view.copy(child = newChild)
case p @ SubqueryAlias(_, view: View) =>
Expand Down Expand Up @@ -1269,7 +1283,7 @@ class Analyzer(
do {
// Try to resolve the subquery plan using the regular analyzer.
previous = current
current = execute(current)
current = executeSameContext(current)

// Use the outer references to resolve the subquery plan if it isn't resolved yet.
val i = plans.iterator
Expand Down Expand Up @@ -1392,7 +1406,7 @@ class Analyzer(
grouping,
Alias(cond, "havingCondition")() :: Nil,
child)
val resolvedOperator = execute(aggregatedCondition)
val resolvedOperator = executeSameContext(aggregatedCondition)
def resolvedAggregateFilter =
resolvedOperator
.asInstanceOf[Aggregate]
Expand Down Expand Up @@ -1450,7 +1464,8 @@ class Analyzer(
val aliasedOrdering =
unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")())
val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering)
val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate]
val resolvedAggregate: Aggregate =
executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate]
val resolvedAliasedOrdering: Seq[Alias] =
resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]]

Expand Down

0 comments on commit 27c949d

Please sign in to comment.