Skip to content

Commit

Permalink
KE-32010, SparkSession/SparkContext clean off (apache#348)
Browse files Browse the repository at this point in the history
  • Loading branch information
fishcus authored Nov 19, 2021
1 parent 00494f2 commit c2609ec
Show file tree
Hide file tree
Showing 55 changed files with 139 additions and 55 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r26</version>
<version>2.2.1-kylin-r27</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r26</version>
<version>2.2.1-kylin-r27</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r26</version>
<version>2.2.1-kylin-r27</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r26</version>
<version>2.2.1-kylin-r27</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r26</version>
<version>2.2.1-kylin-r27</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r26</version>
<version>2.2.1-kylin-r27</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r26</version>
<version>2.2.1-kylin-r27</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r26</version>
<version>2.2.1-kylin-r27</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

private val periodicGCService: ScheduledExecutorService =
private var periodicGCService: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")

/**
Expand Down Expand Up @@ -119,7 +119,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val cleanupTaskThreads = sc.conf.getInt(
"spark.cleaner.referenceTracking.cleanupThreadNumber", 1)

private val cleanupExecutorPool: ExecutorService =
private var cleanupExecutorPool: ExecutorService =
ThreadUtils.newDaemonFixedThreadPool(cleanupTaskThreads, "cleanup")

@volatile private var stopped = false
Expand Down Expand Up @@ -152,7 +152,11 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.interrupt()
}
cleaningThread.join()
cleanupExecutorPool.shutdownNow()
cleanupExecutorPool = null
periodicGCService.shutdown()
periodicGCService = null
logInfo("ContextCleaner stopped")
}

/** Register an RDD for cleanup when it is garbage collected. */
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,

// Thread pool used for handling map output status requests. This is a separate thread pool
// to ensure we don't block the normal dispatcher threads.
private val threadpool: ThreadPoolExecutor = {
private var threadpool: ThreadPoolExecutor = {
val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
for (i <- 0 until numThreads) {
Expand Down Expand Up @@ -579,12 +579,14 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
override def stop() {
mapOutputRequests.offer(PoisonPill)
threadpool.shutdown()
threadpool = null
sendTracker(StopMapOutputTracker)
mapStatuses.clear()
trackerEndpoint = null
cachedSerializedStatuses.clear()
clearCachedBroadcast()
shuffleIdLocks.clear()
logInfo("MapOutputTrackerMaster stopped")
}
}

Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class SparkContext(config: SparkConf) extends Logging {
def isStopped: Boolean = stopped.get()

// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus(this)
private[spark] var listenerBus = new LiveListenerBus(this)

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
Expand Down Expand Up @@ -1899,6 +1899,8 @@ class SparkContext(config: SparkConf) extends Logging {
logInfo("SparkContext already stopped.")
return
}

logInfo("SparkContext is to stop.")
if (_shutdownHookRef != null) {
ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
}
Expand All @@ -1909,6 +1911,7 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_ui.foreach(_.stop())
}
_ui = null
if (env != null) {
Utils.tryLogNonFatalError {
env.metricsSystem.report()
Expand All @@ -1917,6 +1920,7 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_cleaner.foreach(_.stop())
}
_cleaner = null
Utils.tryLogNonFatalError {
_executorAllocationManager.foreach(_.stop())
}
Expand All @@ -1925,10 +1929,14 @@ class SparkContext(config: SparkConf) extends Logging {
listenerBus.stop()
_listenerBusStarted = false
}
listenerBus = null
_jobProgressListener.clear()
_jobProgressListener = null
}
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
_eventLogger = null
if (_dagScheduler != null) {
Utils.tryLogNonFatalError {
_dagScheduler.stop()
Expand All @@ -1939,16 +1947,20 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
env.rpcEnv.stop(_heartbeatReceiver)
}
_heartbeatReceiver = null
}
Utils.tryLogNonFatalError {
_progressBar.foreach(_.stop())
}
_progressBar = null
_taskScheduler = null
_schedulerBackend = null
// TODO: Cache.stop()?
if (_env != null) {
Utils.tryLogNonFatalError {
_env.stop()
}
_env = null
SparkEnv.set(null)
}
// Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class SparkEnv (
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
}
logInfo("SparkEnv stopped")
}

private[spark]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ private[spark] class BroadcastManager(

def stop() {
broadcastFactory.stop()
broadcastFactory = null
logInfo("BroadcastManager stopped")
}

private val nextBroadcastId = new AtomicLong(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ private[spark] class MetricsSystem private (
logWarning("Stopping a MetricsSystem that is not running")
}
running = false
logInfo("MetricsSystem stopped")
}

def report() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,11 @@ private[spark] class NettyBlockTransferService(
override def close(): Unit = {
if (server != null) {
server.close()
server = null
}
if (clientFactory != null) {
clientFactory.close()
clientFactory = null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,13 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
// Enqueue a message that tells the message loops to stop.
receivers.offer(PoisonPill)
threadpool.shutdown()
logInfo("Dispatcher stopped")
}

def awaitTermination(): Unit = {
threadpool.awaitTermination(Long.MaxValue, TimeUnit.MILLISECONDS)
threadpool.awaitTermination(1000 * 10, TimeUnit.MILLISECONDS)
threadpool = null
logInfo("Dispatcher awaitTermination finished")
}

/**
Expand All @@ -191,7 +194,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
}

/** Thread pool used for dispatching messages. */
private val threadpool: ThreadPoolExecutor = {
private var threadpool: ThreadPoolExecutor = {
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(2, Runtime.getRuntime.availableProcessors()))
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[netty] class NettyRpcEnv(
"rpc",
conf.getInt("spark.rpc.io.threads", 0))

private val dispatcher: Dispatcher = new Dispatcher(this)
private var dispatcher: Dispatcher = new Dispatcher(this)

private val streamManager = new NettyStreamManager(this)

Expand All @@ -67,7 +67,7 @@ private[netty] class NettyRpcEnv(
}
}

private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
private var clientFactory = transportContext.createClientFactory(createClientBootstraps())

/**
* A separate client factory for file downloads. This avoids using the same RPC handler as
Expand All @@ -79,12 +79,12 @@ private[netty] class NettyRpcEnv(
*/
@volatile private var fileDownloadFactory: TransportClientFactory = _

val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")
var timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")

// Because TransportClientFactory.createClient is blocking, we need to run it in this thread pool
// to implement non-blocking send/ask.
// TODO: a non-blocking TransportClientFactory.createClient in future
private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
private[netty] var clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
"netty-rpc-connection",
conf.getInt("spark.rpc.connect.threads", 64))

Expand Down Expand Up @@ -278,10 +278,13 @@ private[netty] class NettyRpcEnv(

override def shutdown(): Unit = {
cleanup()
logInfo("NettyRpcEnv shutdown")
}

override def awaitTermination(): Unit = {
dispatcher.awaitTermination()
dispatcher = null
logInfo("NettyRpcEnv awaitTermination finished")
}

private def cleanup(): Unit = {
Expand All @@ -297,21 +300,26 @@ private[netty] class NettyRpcEnv(
}
if (timeoutScheduler != null) {
timeoutScheduler.shutdownNow()
timeoutScheduler = null
}
if (dispatcher != null) {
dispatcher.stop()
}
if (server != null) {
server.close()
server = null
}
if (clientFactory != null) {
clientFactory.close()
clientFactory = null
}
if (clientConnectionExecutor != null) {
clientConnectionExecutor.shutdownNow()
clientConnectionExecutor = null
}
if (fileDownloadFactory != null) {
fileDownloadFactory.close()
fileDownloadFactory = null
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class DAGScheduler(
sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)

private val messageScheduler =
private var messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")

private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
Expand Down Expand Up @@ -1666,8 +1666,10 @@ class DAGScheduler(

def stop() {
messageScheduler.shutdownNow()
messageScheduler = null
eventProcessLoop.stop()
taskScheduler.stop()
logInfo("DAGScheduler stopped")
}

eventProcessLoop.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ private[spark] class EventLoggingListener(
} catch {
case e: Exception => logDebug(s"failed to set time of $target", e)
}
logInfo("EventLoggingListener stopped")
}

private[spark] def redactEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,11 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
// `stop` is called.
eventLock.release()
listenerThread.join()
listeners.clear()
} else {
// Keep quiet
}
logInfo("LiveListenerBus stopped")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,6 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul

def stop() {
getTaskResultExecutor.shutdownNow()
logInfo("TaskResultExecutor shutdown")
}
}
Loading

0 comments on commit c2609ec

Please sign in to comment.