Skip to content

Commit

Permalink
[SPARK-19650] Commands should not trigger a Spark job
Browse files Browse the repository at this point in the history
This is a backport of apache@8f0511e.

## What changes were proposed in this pull request?
Spark executes SQL commands eagerly. It does this by creating an RDD which contains the command's results. The downside to this is that any action on this RDD triggers a Spark job which is expensive and is unnecessary.

This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; it just materializes the result and puts them in a `LocalRelation`.

## How was this patch tested?
Added a regression test to `SQLQuerySuite`.

Author: Herman van Hovell <[email protected]>

Closes apache#253 from hvanhovell/SPARK-19650.
  • Loading branch information
hvanhovell authored and yhuai committed Feb 27, 2017
1 parent 241e4d8 commit 95a9b4b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 7 deletions.
14 changes: 7 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
Expand Down Expand Up @@ -178,13 +178,13 @@ class Dataset[T] private[sql](
case _ => false
}

// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
case p if hasSideEffects(p) =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
case Union(children) if children.forall(hasSideEffects) =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
case c if hasSideEffects(c) =>
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
case u @ Union(children) if children.forall(hasSideEffects) =>
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
case _ =>
queryExecution.analyzed
}
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.sql
import java.io.File
import java.math.MathContext
import java.sql.Timestamp
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.expressions.PlanExpression
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.{aggregate, FileSourceScanExec}
Expand Down Expand Up @@ -2506,4 +2508,28 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

checkAnswer(sql(badQuery), Row(1) :: Nil)
}

test("SPARK-19650: An action on a Command should not trigger a Spark job") {
// Create a listener that checks if new jobs have started.
val jobStarted = new AtomicBoolean(false)
val listener = new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobStarted.set(true)
}
}

// Make sure no spurious job starts are pending in the listener bus.
sparkContext.listenerBus.waitUntilEmpty(500)
sparkContext.addSparkListener(listener)
try {
// Execute the command.
sql("show databases").head()

// Make sure we have seen all events triggered by DataFrame.show()
sparkContext.listenerBus.waitUntilEmpty(500)
} finally {
sparkContext.removeSparkListener(listener)
}
assert(!jobStarted.get(), "Command should not trigger a Spark job.")
}
}

0 comments on commit 95a9b4b

Please sign in to comment.