Skip to content

Commit

Permalink
[SPARK-40235][CORE] Use interruptible lock instead of synchronized in…
Browse files Browse the repository at this point in the history
… Executor.updateDependencies()

### What changes were proposed in this pull request?

This patch modifies the synchronization in `Executor.updateDependencies()` in order to allow tasks to be interrupted while they are blocked and waiting on other tasks to finish downloading dependencies.

This synchronization was added years ago in mesos/spark@7b9e96c in order to prevent concurrently-launching tasks from performing concurrent dependency updates. If one task is downloading dependencies, all other newly-launched tasks will block until the original dependency download is complete.

Let's say that a Spark task launches, becomes blocked on a `updateDependencies()` call, then is cancelled while it is blocked. Although Spark will send a `Thread.interrupt()` to the canceled task, the task will continue waiting because threads blocked on a `synchronized` won't throw an InterruptedException in response to the interrupt. As a result, the blocked thread will continue to wait until the other thread exits the synchronized block. 

This PR aims to fix this problem by replacing the `synchronized` with a `ReentrantLock`, which has a `lockInterruptibly` method.

### Why are the changes needed?

In a real-world scenario, we hit a case where a task was canceled right after being launched while another task was blocked in a slow library download. The slow library download took so long that the TaskReaper killed the executor because the canceled task could not exit in a timely fashion. This patch's fix prevents this issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test case.

Closes #37681 from JoshRosen/SPARK-40235-update-dependencies-lock.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
  • Loading branch information
JoshRosen committed Aug 29, 2022
1 parent c95ed82 commit 295dd57
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 3 deletions.
22 changes: 19 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.nio.ByteBuffer
import java.util.{Locale, Properties}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy
import javax.ws.rs.core.UriBuilder

Expand Down Expand Up @@ -85,6 +86,11 @@ private[spark] class Executor(

private[executor] val conf = env.conf

// SPARK-40235: updateDependencies() uses a ReentrantLock instead of the `synchronized` keyword
// so that tasks can exit quickly if they are interrupted while waiting on another task to
// finish downloading dependencies.
private val updateDependenciesLock = new ReentrantLock()

// No ip or host:port - just hostname
Utils.checkHost(executorHostname)
// must not have port specified.
Expand Down Expand Up @@ -978,13 +984,19 @@ private[spark] class Executor(
/**
* Download any missing dependencies if we receive a new set of files and JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
* Visible for testing.
*/
private def updateDependencies(
private[executor] def updateDependencies(
newFiles: Map[String, Long],
newJars: Map[String, Long],
newArchives: Map[String, Long]): Unit = {
newArchives: Map[String, Long],
testStartLatch: Option[CountDownLatch] = None,
testEndLatch: Option[CountDownLatch] = None): Unit = {
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
updateDependenciesLock.lockInterruptibly()
try {
// For testing, so we can simulate a slow file download:
testStartLatch.foreach(_.countDown())
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo(s"Fetching $name with timestamp $timestamp")
Expand Down Expand Up @@ -1027,6 +1039,10 @@ private[spark] class Executor(
}
}
}
// For testing, so we can simulate a slow file download:
testEndLatch.foreach(_.await())
} finally {
updateDependenciesLock.unlock()
}
}

Expand Down
53 changes: 53 additions & 0 deletions core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,59 @@ class ExecutorSuite extends SparkFunSuite
}
}

test("SPARK-40235: updateDependencies is interruptible when waiting on lock") {
val conf = new SparkConf
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
withExecutor("id", "localhost", env) { executor =>
val startLatch = new CountDownLatch(1)
val endLatch = new CountDownLatch(1)

// Start a thread to simulate a task that begins executing updateDependencies()
// and takes a long time to finish because file download is slow:
val slowLibraryDownloadThread = new Thread(() => {
executor.updateDependencies(
Map.empty,
Map.empty,
Map.empty,
Some(startLatch),
Some(endLatch))
})
slowLibraryDownloadThread.start()

// Wait for that thread to acquire the lock:
startLatch.await()

// Start a second thread to simulate a task that blocks on the other task's
// dependency update:
val blockedLibraryDownloadThread = new Thread(() => {
executor.updateDependencies(
Map.empty,
Map.empty,
Map.empty)
})
blockedLibraryDownloadThread.start()
eventually(timeout(10.seconds), interval(100.millis)) {
val threadState = blockedLibraryDownloadThread.getState
assert(Set(Thread.State.BLOCKED, Thread.State.WAITING).contains(threadState))
}

// Interrupt the blocked thread:
blockedLibraryDownloadThread.interrupt()

// The thread should exit:
eventually(timeout(10.seconds), interval(100.millis)) {
assert(blockedLibraryDownloadThread.getState == Thread.State.TERMINATED)
}

// Allow the first thread to finish and exit:
endLatch.countDown()
eventually(timeout(10.seconds), interval(100.millis)) {
assert(slowLibraryDownloadThread.getState == Thread.State.TERMINATED)
}
}
}

private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = {
val mockEnv = mock[SparkEnv]
val mockRpcEnv = mock[RpcEnv]
Expand Down

0 comments on commit 295dd57

Please sign in to comment.