Skip to content

Commit

Permalink
Merge pull request alteryx#189 from tgravescs/sparkYarnErrorHandling
Browse files Browse the repository at this point in the history
Impove Spark on Yarn Error handling

Improve cli error handling and only allow a certain number of worker failures before failing the application.  This will help prevent users from doing foolish things and their jobs running forever.  For instance using 32 bit java but trying to allocate 8G containers. This loops forever without this change, now it errors out after a certain number of retries.  The number of tries is configurable.  Also increase the frequency we ping the RM to increase speed at which we get containers if they die. The Yarn MR app defaults to pinging the RM every 1 seconds, so the default of 5 seconds here is fine. But that is configurable as well in case people want to change it.

I do want to make sure there aren't any cases that calling stopExecutors in CoarseGrainedSchedulerBackend would cause problems?  I couldn't think of any and testing on standalone cluster as well as yarn.
  • Loading branch information
mateiz committed Nov 20, 2013
2 parents 5592580 + 4093e93 commit aa638ed
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
}

override def stop() {
stopExecutors()
try {
if (driverActor != null) {
val future = driverActor.ask(StopDriver)(timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ private[spark] class SimrSchedulerBackend(
val conf = new Configuration()
val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false)
super.stopExecutors()
super.stop()
}
}
2 changes: 2 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ System Properties:
* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
* 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
* 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
* 'spark.yarn.scheduler.heartbeat.interval-ms', the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
* 'spark.yarn.max.worker.failures', the maximum number of worker failures before failing the application. Default is the number of workers requested times 2 with minimum of 3.

# Launching Spark on YARN

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true

// default to numWorkers * 2, with minimum of 3
private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3).toString()).toInt

def run() {
// setup the directories so things go to yarn approved directories rather
Expand Down Expand Up @@ -225,12 +227,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e

if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIAddress
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
sparkContext.preferredNodeLocationData)
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
appAttemptId, args, sparkContext.preferredNodeLocationData)
} else {
logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
", numTries = " + numTries)
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
", numTries = " + numTries)
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
appAttemptId, args)
}
}
} finally {
Expand All @@ -249,8 +252,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
// If user thread exists, then quit !
userThread.isAlive) {

this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of worker failures reached")
}
yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
Expand All @@ -266,21 +272,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.

val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))

// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong

// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
launchReporterThread(interval)
}
}

// TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime

val t = new Thread {
override def run() {
while (userThread.isAlive) {
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of worker failures reached")
}
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
Expand Down Expand Up @@ -319,7 +331,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
*/

def finishApplicationMaster(status: FinalApplicationStatus) {
def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {

synchronized {
if (isFinished) {
Expand All @@ -333,6 +345,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setDiagnostics(diagnostics)
// set tracking url to empty since we don't have a history server
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
Expand Down
32 changes: 20 additions & 12 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)

def run() {
validateArgs()

init(yarnConf)
start()
logClusterResourceDetails()
Expand All @@ -84,6 +86,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
System.exit(0)
}

def validateArgs() = {
Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
(args.userJar == null) -> "Error: You must specify a user jar!",
(args.userClass == null) -> "Error: You must specify a user class!",
(args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD),
(args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
.foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
args.printUsageAndExit(1)
}
}
}

def getAppStagingDir(appId: ApplicationId): String = {
SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
}
Expand All @@ -97,7 +116,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
", queueChildQueueCount=" + queueInfo.getChildQueues.size)
}


def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
Expand Down Expand Up @@ -215,11 +233,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl

val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()

if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
System.exit(1)
}

Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
Expand Down Expand Up @@ -334,7 +347,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "


// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
// node, spark gc effects all other containers performance (which can also be other spark containers)
Expand All @@ -360,11 +372,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}

if (args.userClass == null) {
logError("Error: You must specify a user class!")
System.exit(1)
}

val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
Expand Down Expand Up @@ -442,6 +449,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")

val args = new ClientArguments(argStrings)

new Client(args).run
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
// Used to generate a unique id per worker
private val workerIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
private val numWorkersFailed = new AtomicInteger()

def getNumWorkersRunning: Int = numWorkersRunning.intValue

def getNumWorkersFailed: Int = numWorkersFailed.intValue

def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
Expand Down Expand Up @@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
else {
// simply decrement count - next iteration of ReporterThread will take care of allocating !
numWorkersRunning.decrementAndGet()
logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
" httpaddress: " + completedContainer.getDiagnostics)
logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
" httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())

// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit
if (completedContainer.getExitStatus() != 0) {
logInfo("Container marked as failed: " + containerId)
numWorkersFailed.incrementAndGet()
}
}

allocatedHostToContainersMap.synchronized {
Expand Down Expand Up @@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList)



if (numWorkers > 0) {
logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.")
}
Expand Down

0 comments on commit aa638ed

Please sign in to comment.