Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8059] [yarn] Wake up allocation thread when new requests arrive. #6600

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[spark] class ApplicationMaster(

@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _
private val allocatorLock = new Object()

// Fields used in client mode.
private var rpcEnv: RpcEnv = null
Expand Down Expand Up @@ -359,7 +360,9 @@ private[spark] class ApplicationMaster(
}
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Sleeping for $sleepInterval.")
Thread.sleep(sleepInterval)
allocatorLock.synchronized {
allocatorLock.wait(sleepInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it might be ok, but a wait call should not be called from outside of a while: http://stackoverflow.com/questions/1038007/why-should-wait-always-be-called-inside-a-loop
(in this case too, we'd still want to protect against spurious wake ups - so adding a loop is good)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's not strictly necessary but good practice. Though here we're technically in a loop, just a bigger one. Since we intend to allocate stuff periodically the worst thing that can possibly happen is some additional latency, and this only happens if there is a 3rd thread somewhere (does not exist).

In other words the spurious wake ups are benign and not possible given the changes here. I'm fine with this being merged as is.

}
} catch {
case e: InterruptedException =>
}
Expand Down Expand Up @@ -546,8 +549,15 @@ private[spark] class ApplicationMaster(
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestExecutors(requestedTotal) =>
Option(allocator) match {
case Some(a) => a.requestTotalExecutors(requestedTotal)
case None => logWarning("Container allocator is not ready to request executors yet.")
case Some(a) =>
allocatorLock.synchronized {
if (a.requestTotalExecutors(requestedTotal)) {
allocatorLock.notifyAll()
}
}

case None =>
logWarning("Container allocator is not ready to request executors yet.")
}
context.reply(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,16 @@ private[yarn] class YarnAllocator(
* Request as many executors from the ResourceManager as needed to reach the desired total. If
* the requested total is smaller than the current number of running executors, no executors will
* be killed.
*
* @return Whether the new requested total is different than the old value.
*/
def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only slightly scary thing is that we have another method with the exact same signature, but the return value means something different. Since we document this clearly it should be fine.

if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal
true
} else {
false
}
}

Expand Down