Skip to content

Commit

Permalink
Finish app if SparkContext initialization times out.
Browse files Browse the repository at this point in the history
This avoids the NPEs that would happen if code just kept going.
  • Loading branch information
Marcelo Vanzin committed Aug 18, 2014
1 parent 0e4be3d commit 5657c7d
Showing 1 changed file with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private val sparkContextRef = new AtomicReference[SparkContext](null)
private val userResult = new AtomicBoolean(false)

final def run() = {
final def run(): Unit = {
// Setup the directories so things go to YARN approved directories rather
// than user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())
Expand All @@ -83,8 +83,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
System.setProperty("spark.master", "yarn-cluster")
}

val attemptId = client.getAttemptId()
logInfo("ApplicationAttemptId: " + attemptId)
logInfo("ApplicationAttemptId: " + client.getAttemptId())

// Call this to force generation of secret so it gets populated into the
// Hadoop UGI. This has to happen before the startUserClass which does a
Expand All @@ -102,6 +101,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
waitForSparkContextInitialized()

val sc = sparkContextRef.get()

// If there is no SparkContext at this point, just fail the app.
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
if (isLastAttempt()) {
cleanupStagingDir()
}
return
}

(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
} else {
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
Expand Down Expand Up @@ -146,14 +155,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,

finish(if (success) FinalApplicationStatus.SUCCEEDED else FinalApplicationStatus.FAILED)

val shouldCleanup =
if (success) {
true
} else {
val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
attemptId.getAttemptId() >= maxAppAttempts
}

val shouldCleanup = success || isLastAttempt()
if (shouldCleanup) {
cleanupStagingDir()
}
Expand Down Expand Up @@ -197,6 +199,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
}
}

private def isLastAttempt() = {
val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
client.getAttemptId().getAttemptId() >= maxAppAttempts
}

/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
Expand Down Expand Up @@ -316,16 +323,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
}

private def allocateExecutors() = {
logInfo("Requesting" + args.numExecutors + " executors.")
try {
logInfo("Requesting" + args.numExecutors + " executors.")
allocator.allocateResources()

var iters = 0
while (allocator.getNumExecutorsRunning < args.numExecutors && !finished) {
checkNumExecutorsFailed()
allocator.allocateResources()
Thread.sleep(ALLOCATE_HEARTBEAT_INTERVAL)
iters += 1
}
}
logInfo("All executors have launched.")
Expand Down

0 comments on commit 5657c7d

Please sign in to comment.