From c2609eccf303af202269474722c1a0b6c8310019 Mon Sep 17 00:00:00 2001 From: Feng Zhu Date: Fri, 19 Nov 2021 14:09:48 +0800 Subject: [PATCH] KE-32010, SparkSession/SparkContext clean off (#348) --- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml | 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- .../org/apache/spark/ContextCleaner.scala | 8 +++++-- .../org/apache/spark/MapOutputTracker.scala | 4 +++- .../scala/org/apache/spark/SparkContext.scala | 14 ++++++++++++- .../scala/org/apache/spark/SparkEnv.scala | 1 + .../spark/broadcast/BroadcastManager.scala | 2 ++ .../apache/spark/metrics/MetricsSystem.scala | 1 + .../netty/NettyBlockTransferService.scala | 2 ++ .../apache/spark/rpc/netty/Dispatcher.scala | 7 +++++-- .../apache/spark/rpc/netty/NettyRpcEnv.scala | 16 ++++++++++---- .../apache/spark/scheduler/DAGScheduler.scala | 4 +++- .../scheduler/EventLoggingListener.scala | 1 + .../spark/scheduler/LiveListenerBus.scala | 2 ++ .../spark/scheduler/TaskResultGetter.scala | 1 + .../spark/scheduler/TaskSchedulerImpl.scala | 17 +++++++++++---- .../CoarseGrainedSchedulerBackend.scala | 5 ++++- .../storage/BlockManagerMasterEndpoint.scala | 4 +++- .../spark/ui/jobs/JobProgressListener.scala | 21 +++++++++++++++++++ .../org/apache/spark/util/EventLoop.scala | 3 ++- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml | 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- resource-managers/mesos/pom.xml | 2 +- resource-managers/yarn/pom.xml | 2 +- .../cluster/YarnClientSchedulerBackend.scala | 8 +++++-- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- .../org/apache/spark/sql/SparkSession.scala | 3 +++ sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 55 files changed, 139 insertions(+), 55 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 6c179a9fa2b70..7b3898106d0a3 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../pom.xml diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 84612cec915bb..713f2afed24d6 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index dbbb04d0ce92b..473e42ce213f3 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 768576c9e990e..029ce9f3963ab 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 604f6ce5d2256..a2d181131e5f3 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/common/tags/pom.xml b/common/tags/pom.xml index ea8b22faf9190..acaf6c04e6c73 100644 --- a/common/tags/pom.xml +++ b/common/tags/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index bb72318d4f47c..926dba39c766a 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 084f7de7f1c33..7b7df1fe946f5 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../pom.xml diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 63bd1ce254df9..5ddda3fea2ba2 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -73,7 +73,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleaningThread = new Thread() { override def run() { keepCleaning() }} - private val periodicGCService: ScheduledExecutorService = + private var periodicGCService: ScheduledExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc") /** @@ -119,7 +119,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleanupTaskThreads = sc.conf.getInt( "spark.cleaner.referenceTracking.cleanupThreadNumber", 1) - private val cleanupExecutorPool: ExecutorService = + private var cleanupExecutorPool: ExecutorService = ThreadUtils.newDaemonFixedThreadPool(cleanupTaskThreads, "cleanup") @volatile private var stopped = false @@ -152,7 +152,11 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.interrupt() } cleaningThread.join() + cleanupExecutorPool.shutdownNow() + cleanupExecutorPool = null periodicGCService.shutdown() + periodicGCService = null + logInfo("ContextCleaner stopped") } /** Register an RDD for cleanup when it is garbage collected. */ diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index dc6c42d40a2d3..cbc291e67f413 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -308,7 +308,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, // Thread pool used for handling map output status requests. This is a separate thread pool // to ensure we don't block the normal dispatcher threads. - private val threadpool: ThreadPoolExecutor = { + private var threadpool: ThreadPoolExecutor = { val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher") for (i <- 0 until numThreads) { @@ -579,12 +579,14 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, override def stop() { mapOutputRequests.offer(PoisonPill) threadpool.shutdown() + threadpool = null sendTracker(StopMapOutputTracker) mapStatuses.clear() trackerEndpoint = null cachedSerializedStatuses.clear() clearCachedBroadcast() shuffleIdLocks.clear() + logInfo("MapOutputTrackerMaster stopped") } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c47107699c26c..26810bad64240 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -247,7 +247,7 @@ class SparkContext(config: SparkConf) extends Logging { def isStopped: Boolean = stopped.get() // An asynchronous listener bus for Spark events - private[spark] val listenerBus = new LiveListenerBus(this) + private[spark] var listenerBus = new LiveListenerBus(this) // This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( @@ -1899,6 +1899,8 @@ class SparkContext(config: SparkConf) extends Logging { logInfo("SparkContext already stopped.") return } + + logInfo("SparkContext is to stop.") if (_shutdownHookRef != null) { ShutdownHookManager.removeShutdownHook(_shutdownHookRef) } @@ -1909,6 +1911,7 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _ui.foreach(_.stop()) } + _ui = null if (env != null) { Utils.tryLogNonFatalError { env.metricsSystem.report() @@ -1917,6 +1920,7 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _cleaner.foreach(_.stop()) } + _cleaner = null Utils.tryLogNonFatalError { _executorAllocationManager.foreach(_.stop()) } @@ -1925,10 +1929,14 @@ class SparkContext(config: SparkConf) extends Logging { listenerBus.stop() _listenerBusStarted = false } + listenerBus = null + _jobProgressListener.clear() + _jobProgressListener = null } Utils.tryLogNonFatalError { _eventLogger.foreach(_.stop()) } + _eventLogger = null if (_dagScheduler != null) { Utils.tryLogNonFatalError { _dagScheduler.stop() @@ -1939,16 +1947,20 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { env.rpcEnv.stop(_heartbeatReceiver) } + _heartbeatReceiver = null } Utils.tryLogNonFatalError { _progressBar.foreach(_.stop()) } + _progressBar = null _taskScheduler = null + _schedulerBackend = null // TODO: Cache.stop()? if (_env != null) { Utils.tryLogNonFatalError { _env.stop() } + _env = null SparkEnv.set(null) } // Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index f4a59f069a5f9..6a5bb454b1d06 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -108,6 +108,7 @@ class SparkEnv ( case None => // We just need to delete tmp dir created by driver, so do nothing on executor } } + logInfo("SparkEnv stopped") } private[spark] diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index eea10474ac764..7bf18111d974e 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -54,6 +54,8 @@ private[spark] class BroadcastManager( def stop() { broadcastFactory.stop() + broadcastFactory = null + logInfo("BroadcastManager stopped") } private val nextBroadcastId = new AtomicLong(0) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 1d494500cdb5c..37b1c371eeea4 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -110,6 +110,7 @@ private[spark] class MetricsSystem private ( logWarning("Stopping a MetricsSystem that is not running") } running = false + logInfo("MetricsSystem stopped") } def report() { diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 30ff93897f98a..0a2c10ba0a01b 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -153,9 +153,11 @@ private[spark] class NettyBlockTransferService( override def close(): Unit = { if (server != null) { server.close() + server = null } if (clientFactory != null) { clientFactory.close() + clientFactory = null } } } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index e94babb846128..d0a90ee0f7445 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -177,10 +177,13 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { // Enqueue a message that tells the message loops to stop. receivers.offer(PoisonPill) threadpool.shutdown() + logInfo("Dispatcher stopped") } def awaitTermination(): Unit = { - threadpool.awaitTermination(Long.MaxValue, TimeUnit.MILLISECONDS) + threadpool.awaitTermination(1000 * 10, TimeUnit.MILLISECONDS) + threadpool = null + logInfo("Dispatcher awaitTermination finished") } /** @@ -191,7 +194,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { } /** Thread pool used for dispatching messages. */ - private val threadpool: ThreadPoolExecutor = { + private var threadpool: ThreadPoolExecutor = { val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", math.max(2, Runtime.getRuntime.availableProcessors())) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 64898499246ac..21a3ff2fb5376 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -51,7 +51,7 @@ private[netty] class NettyRpcEnv( "rpc", conf.getInt("spark.rpc.io.threads", 0)) - private val dispatcher: Dispatcher = new Dispatcher(this) + private var dispatcher: Dispatcher = new Dispatcher(this) private val streamManager = new NettyStreamManager(this) @@ -67,7 +67,7 @@ private[netty] class NettyRpcEnv( } } - private val clientFactory = transportContext.createClientFactory(createClientBootstraps()) + private var clientFactory = transportContext.createClientFactory(createClientBootstraps()) /** * A separate client factory for file downloads. This avoids using the same RPC handler as @@ -79,12 +79,12 @@ private[netty] class NettyRpcEnv( */ @volatile private var fileDownloadFactory: TransportClientFactory = _ - val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout") + var timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout") // Because TransportClientFactory.createClient is blocking, we need to run it in this thread pool // to implement non-blocking send/ask. // TODO: a non-blocking TransportClientFactory.createClient in future - private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool( + private[netty] var clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool( "netty-rpc-connection", conf.getInt("spark.rpc.connect.threads", 64)) @@ -278,10 +278,13 @@ private[netty] class NettyRpcEnv( override def shutdown(): Unit = { cleanup() + logInfo("NettyRpcEnv shutdown") } override def awaitTermination(): Unit = { dispatcher.awaitTermination() + dispatcher = null + logInfo("NettyRpcEnv awaitTermination finished") } private def cleanup(): Unit = { @@ -297,21 +300,26 @@ private[netty] class NettyRpcEnv( } if (timeoutScheduler != null) { timeoutScheduler.shutdownNow() + timeoutScheduler = null } if (dispatcher != null) { dispatcher.stop() } if (server != null) { server.close() + server = null } if (clientFactory != null) { clientFactory.close() + clientFactory = null } if (clientConnectionExecutor != null) { clientConnectionExecutor.shutdownNow() + clientConnectionExecutor = null } if (fileDownloadFactory != null) { fileDownloadFactory.close() + fileDownloadFactory = null } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0138063ff335e..7f6b31ae2ddf7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -194,7 +194,7 @@ class DAGScheduler( sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) - private val messageScheduler = + private var messageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) @@ -1666,8 +1666,10 @@ class DAGScheduler( def stop() { messageScheduler.shutdownNow() + messageScheduler = null eventProcessLoop.stop() taskScheduler.stop() + logInfo("DAGScheduler stopped") } eventProcessLoop.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a7dbf87915b27..ec5592f43b5e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -248,6 +248,7 @@ private[spark] class EventLoggingListener( } catch { case e: Exception => logDebug(s"failed to set time of $target", e) } + logInfo("EventLoggingListener stopped") } private[spark] def redactEvent( diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 73e9141d344bc..4ae5c344041be 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -197,9 +197,11 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa // `stop` is called. eventLock.release() listenerThread.join() + listeners.clear() } else { // Keep quiet } + logInfo("LiveListenerBus stopped") } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index a284f7956cd31..f0985a37fc6d8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -159,5 +159,6 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul def stop() { getTaskResultExecutor.shutdownNow() + logInfo("TaskResultExecutor shutdown") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 1b6bc9139f9c9..129155078f437 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -85,7 +85,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( // of tasks that are very short. val MIN_TIME_TO_SPECULATION = 100 - private val speculationScheduler = + private var speculationScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") // Threshold above which we warn user initial TaskSet may be starved @@ -104,7 +104,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( @volatile private var hasReceivedTask = false @volatile private var hasLaunchedTask = false - private val starvationTimer = new Timer(true) + private var starvationTimer = new Timer(true) // Incrementing task IDs val nextTaskId = new AtomicLong(0) @@ -513,13 +513,22 @@ private[spark] class TaskSchedulerImpl private[scheduler]( override def stop() { speculationScheduler.shutdown() + speculationScheduler = null if (backend != null) { - backend.stop() + Utils.tryLogNonFatalError { + backend.stop() + backend = null + } } if (taskResultGetter != null) { - taskResultGetter.stop() + Utils.tryLogNonFatalError { + taskResultGetter.stop() + taskResultGetter = null + } } starvationTimer.cancel() + starvationTimer = null + logInfo("TaskSchedulerImpl stopped") } override def defaultParallelism(): Int = backend.defaultParallelism() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index dc82bb7704727..7d7fbe090842f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -104,7 +104,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected val addressToExecutorId = new HashMap[RpcAddress, String] - private val reviveThread = + private var reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") override def onStart() { @@ -359,6 +359,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def onStop() { reviveThread.shutdownNow() + reviveThread = null } } @@ -408,6 +409,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } catch { case e: Exception => throw new SparkException("Error stopping standalone scheduler's driver endpoint", e) + } finally { + executorDataMap.clear() } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 84a99400060ad..03f3bcf90e36c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -55,7 +55,7 @@ class BlockManagerMasterEndpoint( private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] private val rddId2BlockIdIndex = new mutable.HashMap[Int, mutable.HashSet[BlockId]] - private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool") + private var askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool") private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool) private val topologyMapper = { @@ -512,6 +512,8 @@ class BlockManagerMasterEndpoint( override def onStop(): Unit = { askThreadPool.shutdownNow() + askThreadPool = null + logInfo("AskThreadPool shutdownNow") } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 7370f9feb68cd..b3a6ff7c5df67 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -595,4 +595,25 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { throw new TimeoutException( s"Can't find $numExecutors executors before $timeout milliseconds elapsed") } + + private[spark] def clear(): Unit = { + activeJobs.clear() + completedJobs.clear() + failedJobs.clear() + jobIdToData.clear() + jobGroupToJobIds.clear() + + pendingStages.clear() + activeStages.clear() + completedStages.clear() + skippedStages.clear() + failedStages.clear() + stageIdToData.clear() + stageIdToInfo.clear() + stageIdToActiveJobIds.clear() + poolToActiveStages.clear() + + executorIdToBlockManagerId.clear() + + } } diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala index 8bb02d1f7d4a1..7b31879c6655b 100644 --- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala +++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala @@ -37,7 +37,7 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging { private val stopped = new AtomicBoolean(false) - private[spark] val eventThread = new Thread(name) { + private[spark] var eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { @@ -81,6 +81,7 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging { // Call onStop after the event thread exits to make sure onReceive happens before onStop onStopCalled = true onStop() + eventThread = null } catch { case ie: InterruptedException => Thread.currentThread().interrupt() diff --git a/examples/pom.xml b/examples/pom.xml index 7f8d149c07b84..f82ca00b346ac 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../pom.xml diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index c7af6817d4a66..60159a4dcde8d 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 387aadd173347..b038c2a35026e 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index df7c6a98394d8..aed268934f941 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 247f69258c437..25ce2bfe4c321 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml index 8d66c438da0f4..c95cf9ff32e72 100644 --- a/external/kafka-0-10-assembly/pom.xml +++ b/external/kafka-0-10-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index f009df7522e0b..252e838f0f0a2 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 6b3278d052688..f40c0423abe92 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/kafka-0-8-assembly/pom.xml b/external/kafka-0-8-assembly/pom.xml index d098e375d1320..e1262390e04f1 100644 --- a/external/kafka-0-8-assembly/pom.xml +++ b/external/kafka-0-8-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/kafka-0-8/pom.xml b/external/kafka-0-8/pom.xml index 02fd738fd7f99..946bc143d3869 100644 --- a/external/kafka-0-8/pom.xml +++ b/external/kafka-0-8/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/kinesis-asl-assembly/pom.xml b/external/kinesis-asl-assembly/pom.xml index d02adc5ccdcf3..5173827bb1701 100644 --- a/external/kinesis-asl-assembly/pom.xml +++ b/external/kinesis-asl-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/kinesis-asl/pom.xml b/external/kinesis-asl/pom.xml index 6acfca437f696..a2e270eb2d883 100644 --- a/external/kinesis-asl/pom.xml +++ b/external/kinesis-asl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/external/spark-ganglia-lgpl/pom.xml b/external/spark-ganglia-lgpl/pom.xml index 525c7197512d4..71128aba00540 100644 --- a/external/spark-ganglia-lgpl/pom.xml +++ b/external/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index 3568f1251f540..2ac42e2b9fedc 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../pom.xml diff --git a/launcher/pom.xml b/launcher/pom.xml index 0f56c93e340d5..c7d8e17fd350e 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../pom.xml diff --git a/mllib-local/pom.xml b/mllib-local/pom.xml index 6396dfa15129a..ead39ea13866b 100644 --- a/mllib-local/pom.xml +++ b/mllib-local/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index 5caf7f7a80d9b..6848d437e91d2 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../pom.xml diff --git a/pom.xml b/pom.xml index dbac58ed2d31e..fb79486e765e7 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 pom Spark Project Parent POM http://spark.apache.org/ diff --git a/repl/pom.xml b/repl/pom.xml index b3763c078b3f8..469516e917118 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../pom.xml diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml index faa2554cbce05..2956a3000fe96 100644 --- a/resource-managers/mesos/pom.xml +++ b/resource-managers/mesos/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml index d4a42217550eb..118e4a4bc1684 100644 --- a/resource-managers/yarn/pom.xml +++ b/resource-managers/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 60da356ad14aa..69ec3ec5d6406 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -26,6 +26,7 @@ import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUti import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkAppHandle import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.Utils private[spark] class YarnClientSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -138,6 +139,7 @@ private[spark] class YarnClientSchedulerBackend( assert(client != null, "Attempted to stop this scheduler before starting it!") if (monitorThread != null) { monitorThread.stopMonitor() + monitorThread = null } // Report a final state to the launcher if one is connected. This is needed since in client @@ -148,10 +150,12 @@ private[spark] class YarnClientSchedulerBackend( // so assume the application was successful. client.reportLauncherState(SparkAppHandle.State.FINISHED) - super.stop() + Utils.tryLogNonFatalError { + super.stop() + } YarnSparkHadoopUtil.get.stopCredentialUpdater() client.stop() - logInfo("Stopped") + logInfo("YarnClientSchedulerBackend stopped") } } diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 9449d9fc0cb3b..b6b73cf10ce63 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 75dc95a1ff823..b24fd5c3dd19a 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 96882c62c2d67..15da7a3b7bcf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -700,6 +700,9 @@ class SparkSession private( */ def stop(): Unit = { sparkContext.stop() + SparkSession.clearDefaultSession() + SparkSession.clearActiveSession() + logInfo("SparkSession stopped") } /** diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 0b90b426c3fcf..82e7d3e8b6f56 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 21d2fd3c9b6c6..1ff1d8bafc93a 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 4c28f87a22e3a..87881a9634c44 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index 779e1a0388d9e..4e757ca9a25eb 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.2.1-kylin-r26 + 2.2.1-kylin-r27 ../pom.xml