Skip to content

Commit

Permalink
Correctly clean up staging directory.
Browse files Browse the repository at this point in the history
This change also avoids overriding the app's status with "SUCCEEDED"
in cluster mode when the shutdown hook runs, by signaling the AM that
the SparkContext was shut down (see YarnClusterScheduler.scala). That
way the AM can correctly expose its final status to the RM.
  • Loading branch information
Marcelo Vanzin committed Aug 20, 2014
1 parent 92770cc commit c0794be
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn

import java.io.IOException
import java.net.Socket
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.atomic.AtomicReference

import scala.collection.JavaConversions._
import scala.util.Try
Expand Down Expand Up @@ -57,6 +57,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))

@volatile private var finished = false
@volatile private var finalStatus = FinalApplicationStatus.UNDEFINED

private var reporterThread: Thread = _
private var allocator: YarnAllocator = _

Expand All @@ -66,9 +68,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,

// Fields used in cluster mode.
private val sparkContextRef = new AtomicReference[SparkContext](null)
private val userResult = new AtomicBoolean(false)

final def run(): Unit = {
final def run(): Int = {
if (isDriver) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
Expand All @@ -80,43 +81,49 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,

logInfo("ApplicationAttemptId: " + client.getAttemptId())

// If this is the last attempt, register a shutdown hook to cleanup the staging dir
// after the app is finished, in case it does not exit through the expected means.
// Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
if (isLastAttempt()) {
val cleanupHook = new Runnable {
override def run() {
logInfo("AppMaster received a signal.")
if (!finished) {
cleanupStagingDir()
}
val cleanupHook = new Runnable {
override def run() {
// If the SparkContext is still registered, shut it down as a best case effort in case
// users do not call sc.stop or do System.exit().
val sc = sparkContextRef.get()
if (sc != null) {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
finish(FinalApplicationStatus.SUCCEEDED)
}

// Cleanup the staging dir after the app is finished, or if it's the last attempt at
// running the AM.
val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
if (finished || isLastAttempt) {
cleanupStagingDir()
}
}
ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
}
// Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)

// Call this to force generation of secret so it gets populated into the
// Hadoop UGI. This has to happen before the startUserClass which does a
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)

val success =
try {
if (isDriver) runDriver() else runExecutorLauncher(securityMgr)
} catch {
case e: Exception =>
logError("Exception while running AM main loop.", e)
false
}
if (isDriver) {
runDriver()
} else {
runExecutorLauncher(securityMgr)
}

finish(if (success) FinalApplicationStatus.SUCCEEDED else FinalApplicationStatus.FAILED)
val shouldCleanup = success || isLastAttempt()
if (shouldCleanup) {
cleanupStagingDir()
if (finalStatus != FinalApplicationStatus.UNDEFINED) {
finish(finalStatus)
0
} else {
1
}
}

final def finish(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
if (!finished) {
logInfo(s"Finishing ApplicationMaster with $status" +
Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
Expand All @@ -127,33 +134,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
reporterThread.join()
}
} finally {
client.shutdown(status, diagnostics)
client.shutdown(status, Option(diagnostics).getOrElse(""))
}
}
}

private[spark] def sparkContextInitialized(sc: SparkContext) = {
var modified = false
private def sparkContextInitialized(sc: SparkContext) = {
sparkContextRef.synchronized {
modified = sparkContextRef.compareAndSet(null, sc)
sparkContextRef.compareAndSet(null, sc)
sparkContextRef.notifyAll()
}
}

// Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
// System.exit.
// Should not really have to do this, but it helps YARN to evict resources earlier.
// Not to mention, prevent the Client from declaring failure even though we exited properly.
// Note that this will unfortunately not properly clean up the staging files because it gets
// called too late, after the filesystem is already shutdown.
if (modified) {
Runtime.getRuntime().addShutdownHook(new Thread {
override def run() {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
finish(FinalApplicationStatus.SUCCEEDED)
}
})
}
private def sparkContextStopped(sc: SparkContext) = {
sparkContextRef.compareAndSet(sc, null)
}

private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
Expand All @@ -168,7 +162,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
reporterThread = launchReporterThread()
}

private def runDriver(): Boolean = {
private def runDriver(): Unit = {
addAmIpFilter()
val userThread = startUserClass()

Expand All @@ -179,20 +173,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// If there is no SparkContext at this point, just fail the app.
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
false
} else {
registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
try {
userThread.join()
userResult.get()
} finally {
// In cluster mode, ask the reporter thread to stop since the user app is finished.
reporterThread.interrupt()
}
}
}

private def runExecutorLauncher(securityMgr: SecurityManager): Boolean = {
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityMgr)._1
actor = waitForSparkDriver()
Expand All @@ -201,12 +193,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,

// In client mode the actor will stop the reporter thread.
reporterThread.join()
true
}

private def isLastAttempt() = {
val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
client.getAttemptId().getAttemptId() >= maxAppAttempts
finalStatus = FinalApplicationStatus.SUCCEEDED
}

private def launchReporterThread(): Thread = {
Expand Down Expand Up @@ -361,7 +348,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
mainMethod.invoke(null, mainArgs)
// Some apps have "System.exit(0)" at the end. The user thread will stop here unless
// it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
userResult.set(true)
finalStatus = FinalApplicationStatus.SUCCEEDED
} finally {
logDebug("Finishing main")
}
Expand Down Expand Up @@ -408,14 +395,18 @@ object ApplicationMaster extends Logging {
val amArgs = new ApplicationMasterArguments(args)
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
master.run()
System.exit(master.run())
}
}

private[spark] def sparkContextInitialized(sc: SparkContext) = {
master.sparkContextInitialized(sc)
}

private[spark] def sparkContextStopped(sc: SparkContext) = {
master.sparkContextStopped(sc)
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration

/**
*
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
* ApplicationMaster, etc is done
*/
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
extends TaskSchedulerImpl(sc) {

logInfo("Created YarnClusterScheduler")

Expand All @@ -51,4 +52,10 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}

override def stop() {
super.stop()
ApplicationMaster.sparkContextStopped(sc)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[yarn] class YarnAllocationHandler(
}

override def allocateResources() = {
addResourceRequests(maxExecutors - numPendingAllocate.get())
addResourceRequests(maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get())

// We have already set the container request. Poll the ResourceManager for a response.
// This doubles as a heartbeat if there are no pending container requests.
Expand Down

0 comments on commit c0794be

Please sign in to comment.