From 56eb839b93b726130250baa40b959a87d6da1fc8 Mon Sep 17 00:00:00 2001
From: wuzhim <596361258@qq.com>
Date: Thu, 17 Sep 2020 16:07:44 +0800
Subject: [PATCH] KE-17215 clean SQLExecution broadcast
cherry pick from https://github.com/Kyligence/spark/commit/8963cb1e216409b992f40900134175accff96e67
---
assembly/pom.xml | 2 +-
common/network-common/pom.xml | 2 +-
common/network-shuffle/pom.xml | 2 +-
common/network-yarn/pom.xml | 2 +-
common/sketch/pom.xml | 2 +-
common/tags/pom.xml | 2 +-
common/unsafe/pom.xml | 2 +-
core/pom.xml | 2 +-
.../org/apache/spark/ContextCleaner.scala | 67 ++++++++++++-------
.../org/apache/spark/MapOutputTracker.scala | 2 +-
.../scala/org/apache/spark/SparkContext.scala | 7 +-
.../spark/broadcast/BroadcastManager.scala | 29 +++++++-
.../apache/spark/scheduler/ResultTask.scala | 2 +-
.../spark/scheduler/ShuffleMapTask.scala | 2 +-
.../spark/scheduler/TaskSetManager.scala | 7 ++
.../storage/BlockManagerMasterEndpoint.scala | 39 ++++++++---
examples/pom.xml | 2 +-
external/docker-integration-tests/pom.xml | 2 +-
external/flume-assembly/pom.xml | 2 +-
external/flume-sink/pom.xml | 2 +-
external/flume/pom.xml | 2 +-
external/kafka-0-10-assembly/pom.xml | 2 +-
external/kafka-0-10-sql/pom.xml | 2 +-
external/kafka-0-10/pom.xml | 2 +-
external/kafka-0-8-assembly/pom.xml | 2 +-
external/kafka-0-8/pom.xml | 2 +-
external/kinesis-asl-assembly/pom.xml | 2 +-
external/kinesis-asl/pom.xml | 2 +-
external/spark-ganglia-lgpl/pom.xml | 2 +-
graphx/pom.xml | 2 +-
launcher/pom.xml | 2 +-
mllib-local/pom.xml | 2 +-
mllib/pom.xml | 2 +-
pom.xml | 2 +-
repl/pom.xml | 2 +-
resource-managers/mesos/pom.xml | 2 +-
resource-managers/yarn/pom.xml | 2 +-
sql/catalyst/pom.xml | 2 +-
sql/core/pom.xml | 2 +-
.../spark/sql/execution/SQLExecution.scala | 2 +
sql/hive-thriftserver/pom.xml | 2 +-
sql/hive/pom.xml | 2 +-
streaming/pom.xml | 2 +-
tools/pom.xml | 2 +-
44 files changed, 150 insertions(+), 77 deletions(-)
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 8c8045e8cd2f0..80f26fcaba7a7 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../pom.xml
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 1bf3f00b2cc93..d2a90340519f7 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 842cd59e3468f..a48a788f7b661 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 00e8a0c7d5689..9b5e6ed3a5418 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index 88aa6eacaec8c..f66359af4e5b6 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/common/tags/pom.xml b/common/tags/pom.xml
index ac290ee46b0f5..0ea37e69c7884 100644
--- a/common/tags/pom.xml
+++ b/common/tags/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index 3527bf6de34c7..ec1fde2c10a3f 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/core/pom.xml b/core/pom.xml
index 9c39470892a85..3faeccc500f7c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../pom.xml
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 4d884dec07916..8d5718750098c 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import java.lang.ref.{ReferenceQueue, WeakReference}
import java.util.Collections
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ExecutorService, ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, Utils}
+
/**
* Classes that represent cleaning tasks.
*/
@@ -112,6 +113,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking.shuffle", false)
+ /**
+ * The cleaning thread size.
+ */
+ private val cleanupTaskThreads = sc.conf.getInt(
+ "spark.cleaner.referenceTracking.cleanupThreadNumber", 100)
+
+ private val cleanupExecutorPool: ExecutorService =
+ ThreadUtils.newDaemonFixedThreadPool(cleanupTaskThreads, "cleanup")
+
@volatile private var stopped = false
/** Attach a listener object to get information of when objects are cleaned. */
@@ -178,32 +188,41 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
- val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
- .map(_.asInstanceOf[CleanupTaskWeakReference])
- // Synchronize here to avoid being interrupted on stop()
- synchronized {
- reference.foreach { ref =>
- logDebug("Got cleaning task " + ref.task)
- referenceBuffer.remove(ref)
- ref.task match {
- case CleanRDD(rddId) =>
- doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
- case CleanShuffle(shuffleId) =>
- doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
- case CleanBroadcast(broadcastId) =>
- doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
- case CleanAccum(accId) =>
- doCleanupAccum(accId, blocking = blockOnCleanupTasks)
- case CleanCheckpoint(rddId) =>
- doCleanCheckpoint(rddId)
- }
+ Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
+ .map(_.asInstanceOf[CleanupTaskWeakReference]).foreach {
+ r =>
+ referenceBuffer.remove(r)
+ runtCleanTask(r)
+ }
+ } catch {
+ case ie: InterruptedException if stopped => // ignore
+ case e: Exception => logError("Error in cleaning main thread", e)
+ }
+ }
+ }
+
+ private def runtCleanTask(ref: CleanupTaskWeakReference) = {
+ cleanupExecutorPool.submit(new Runnable {
+ override def run(): Unit = {
+ try {
+ ref.task match {
+ case CleanRDD(rddId) =>
+ doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
+ case CleanShuffle(shuffleId) =>
+ doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
+ case CleanBroadcast(broadcastId) =>
+ doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+ case CleanAccum(accId) =>
+ doCleanupAccum(accId, blocking = blockOnCleanupTasks)
+ case CleanCheckpoint(rddId) =>
+ doCleanCheckpoint(rddId)
}
+ } catch {
+ case ie: InterruptedException if stopped => // ignore
+ case e: Exception => logError("Error in cleaning thread", e)
}
- } catch {
- case ie: InterruptedException if stopped => // ignore
- case e: Exception => logError("Error in cleaning thread", e)
}
- }
+ })
}
/** Perform RDD cleanup. */
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 4ef6656222455..dc6c42d40a2d3 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -623,7 +623,7 @@ private[spark] object MapOutputTracker extends Logging {
if (arr.length >= minBroadcastSize) {
// Use broadcast instead.
// Important arr(0) is the tag == DIRECT, ignore that while deserializing !
- val bcast = broadcastManager.newBroadcast(arr, isLocal)
+ val bcast = broadcastManager.newBroadcast(arr, isLocal, null)
// toByteArray creates copy, so we can reuse out
out.reset()
out.write(BROADCAST)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ca033868b5801..861706fe70346 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1485,10 +1485,13 @@ class SparkContext(config: SparkConf) extends Logging {
assertNotStopped()
require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
- val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
+ val executionId = getLocalProperty("spark.sql.execution.id")
+ val bc = env.broadcastManager.newBroadcast[T](value, isLocal, executionId)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
- cleaner.foreach(_.registerBroadcastForCleanup(bc))
+ if (executionId == null) {
+ cleaner.foreach(_.registerBroadcastForCleanup(bc))
+ }
bc
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
index e88988fe03b2e..eea10474ac764 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
@@ -17,13 +17,18 @@
package org.apache.spark.broadcast
+import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
+import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap}
+
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
+
private[spark] class BroadcastManager(
val isDriver: Boolean,
conf: SparkConf,
@@ -32,6 +37,7 @@ private[spark] class BroadcastManager(
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
+ var cachedBroadcast = new ConcurrentHashMap[String, ListBuffer[Long]]()
initialize()
@@ -52,8 +58,27 @@ private[spark] class BroadcastManager(
private val nextBroadcastId = new AtomicLong(0)
- def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
- broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
+ private[spark] def currentBroadcastId: Long = nextBroadcastId.get()
+
+ def cleanBroadCast(executionId: String): Unit = {
+ if (cachedBroadcast.containsKey(executionId)) {
+ cachedBroadcast.get(executionId).foreach(broadcastId => unbroadcast(broadcastId, true, false))
+ cachedBroadcast.remove(executionId)
+ }
+ }
+
+ def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, executionId: String): Broadcast[T] = {
+ val broadcastId = nextBroadcastId.getAndIncrement()
+ if (executionId != null) {
+ if (cachedBroadcast.containsKey(executionId)) {
+ cachedBroadcast.get(executionId) += broadcastId
+ } else {
+ val list = new scala.collection.mutable.ListBuffer[Long]
+ list += broadcastId
+ cachedBroadcast.put(executionId, list)
+ }
+ }
+ broadcastFactory.newBroadcast[T](value_, isLocal, broadcastId)
}
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index e36c759a42556..53cc429302f9f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -52,7 +52,7 @@ import org.apache.spark.rdd.RDD
private[spark] class ResultTask[T, U](
stageId: Int,
stageAttemptId: Int,
- taskBinary: Broadcast[Array[Byte]],
+ val taskBinary: Broadcast[Array[Byte]],
partition: Partition,
locs: Seq[TaskLocation],
val outputId: Int,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 7a25c47e2cab3..1260a9225af6f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -53,7 +53,7 @@ import org.apache.spark.shuffle.ShuffleWriter
private[spark] class ShuffleMapTask(
stageId: Int,
stageAttemptId: Int,
- taskBinary: Broadcast[Array[Byte]],
+ val taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation],
localProperties: Properties,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2f4e46c7ec8f1..1c8c8bc42efec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -519,6 +519,13 @@ private[spark] class TaskSetManager(
private def maybeFinishTaskSet() {
if (isZombie && runningTasks == 0) {
sched.taskSetFinished(this)
+ val broadcastId = taskSet.tasks.head match {
+ case resultTask: ResultTask[Any, Any] =>
+ resultTask.taskBinary.id
+ case shuffleMapTask: ShuffleMapTask =>
+ shuffleMapTask.taskBinary.id
+ }
+ SparkEnv.get.broadcastManager.unbroadcast(broadcastId, true, false)
if (tasksSuccessful == numTasks) {
blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet(
taskSet.stageId,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 626b8367a2243..84a99400060ad 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -17,10 +17,11 @@
package org.apache.spark.storage
+import java.io.IOException
import java.util.{HashMap => JHashMap}
-import scala.collection.mutable
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
@@ -197,11 +198,17 @@ class BlockManagerMasterEndpoint(
// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
// The dispatcher is used as an implicit argument into the Future sequence construction.
val removeMsg = RemoveRdd(rddId)
- Future.sequence(
- blockManagerInfo.values.map { bm =>
- bm.slaveEndpoint.ask[Int](removeMsg)
- }.toSeq
- )
+
+ val futures = blockManagerInfo.values.map { bm =>
+ bm.slaveEndpoint.ask[Int](removeMsg).recover {
+ case e: IOException =>
+ logWarning(s"Error trying to remove RDD $rddId from block manager ${bm.blockManagerId}",
+ e)
+ 0 // zero blocks were removed
+ }
+ }.toSeq
+
+ Future.sequence(futures)
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
@@ -224,11 +231,21 @@ class BlockManagerMasterEndpoint(
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver
}
- Future.sequence(
- requiredBlockManagers.map { bm =>
- bm.slaveEndpoint.ask[Int](removeMsg)
- }.toSeq
- )
+ val futures = requiredBlockManagers.map { bm =>
+ bm.slaveEndpoint.ask[Int](removeMsg).recover {
+ case e: IOException =>
+ logWarning(s"Error trying to remove broadcast $broadcastId from block manager " +
+ s"${bm.blockManagerId}", e)
+ 0 // zero blocks were removed
+ }
+ }.toSeq
+ val blocksToRemove = blockLocations.keySet().asScala
+ .collect {
+ case broadcastId@BroadcastBlockId(`broadcastId`, _) =>
+ broadcastId
+ }
+ blocksToRemove.foreach(blockLocations.remove)
+ Future.sequence(futures)
}
private def removeBlockManager(blockManagerId: BlockManagerId) {
diff --git a/examples/pom.xml b/examples/pom.xml
index 4f9538063e1b9..56c4075e45d90 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../pom.xml
diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml
index b11079169baba..787750d0377d6 100644
--- a/external/docker-integration-tests/pom.xml
+++ b/external/docker-integration-tests/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index f3c846afb6c5f..77dd7ab9f4aa8 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 50d2d963ca644..6355bb616804a 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 5caeea40516b0..82123e4269d5b 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml
index ecde9819bb91c..6483c07bd27c5 100644
--- a/external/kafka-0-10-assembly/pom.xml
+++ b/external/kafka-0-10-assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index c8841440280e8..b7bf97e348cdf 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index 4ab76faae69ff..10b1762701faa 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/kafka-0-8-assembly/pom.xml b/external/kafka-0-8-assembly/pom.xml
index 1f4e7ce624b86..0f32ad68c23fc 100644
--- a/external/kafka-0-8-assembly/pom.xml
+++ b/external/kafka-0-8-assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/kafka-0-8/pom.xml b/external/kafka-0-8/pom.xml
index f05d01b95e361..3bc493c03f5ff 100644
--- a/external/kafka-0-8/pom.xml
+++ b/external/kafka-0-8/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/kinesis-asl-assembly/pom.xml b/external/kinesis-asl-assembly/pom.xml
index 0befa4bae663b..02ab5cf4e71cd 100644
--- a/external/kinesis-asl-assembly/pom.xml
+++ b/external/kinesis-asl-assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/kinesis-asl/pom.xml b/external/kinesis-asl/pom.xml
index 44e95ec3b0b68..8c0d644d87538 100644
--- a/external/kinesis-asl/pom.xml
+++ b/external/kinesis-asl/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/external/spark-ganglia-lgpl/pom.xml b/external/spark-ganglia-lgpl/pom.xml
index 84fd6106bf4bf..ad113f78f479e 100644
--- a/external/spark-ganglia-lgpl/pom.xml
+++ b/external/spark-ganglia-lgpl/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 715efc4c29776..5020229dd363e 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../pom.xml
diff --git a/launcher/pom.xml b/launcher/pom.xml
index cc0a8343b48cc..9f8c056f4073e 100644
--- a/launcher/pom.xml
+++ b/launcher/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../pom.xml
diff --git a/mllib-local/pom.xml b/mllib-local/pom.xml
index 41fd33510fd35..fc6e02653a456 100644
--- a/mllib-local/pom.xml
+++ b/mllib-local/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../pom.xml
diff --git a/mllib/pom.xml b/mllib/pom.xml
index d409128a92616..3d9fd70e1f7a7 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../pom.xml
diff --git a/pom.xml b/pom.xml
index bdb10abe82d73..82270aee6f1d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
pom
Spark Project Parent POM
http://spark.apache.org/
diff --git a/repl/pom.xml b/repl/pom.xml
index 3e5190a7625a7..7e83d3f5eee53 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../pom.xml
diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml
index ce371d58037b3..0cc5f5dc37e22 100644
--- a/resource-managers/mesos/pom.xml
+++ b/resource-managers/mesos/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml
index a43f8682e0ae3..84f8626806311 100644
--- a/resource-managers/yarn/pom.xml
+++ b/resource-managers/yarn/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index b885713271494..e956986b46a98 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index d2536dff4a00d..08dfe37205554 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index be35916e3447e..6624a5f76dfb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.SparkContext
+import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
SparkListenerSQLExecutionStart}
@@ -70,6 +71,7 @@ object SQLExecution {
} finally {
executionIdToQueryExecution.remove(executionId)
sc.setLocalProperty(EXECUTION_ID_KEY, null)
+ SparkEnv.get.broadcastManager.cleanBroadCast(executionId.toString)
}
r
} else {
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index f5bfcf6f586ec..6f630d970a833 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 0019c5401aa8e..193bf142f355a 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../../pom.xml
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 9594fa61999de..44249b001fa1a 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../pom.xml
diff --git a/tools/pom.xml b/tools/pom.xml
index 7bc83881792fe..9657ba4bbdbe2 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.11
- 2.2.1-kylin-r14
+ 2.2.1-kylin-r15
../pom.xml