Skip to content

Commit

Permalink
Fix crash on repeated MockWebServer shutdown
Browse files Browse the repository at this point in the history
The problem was the awaitIdle() call was scheduling executor jobs after
the ExecutorService had been shut down. This fixes that and defends
against other races.
  • Loading branch information
squarejesse committed Dec 31, 2019
1 parent 975be25 commit edb5865
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ class MockWebServer : ExternalResource(), Closeable {

// Await shutdown.
for (queue in taskRunner.activeQueues()) {
if (!queue.awaitIdle(TimeUnit.SECONDS.toNanos(5))) {
if (!queue.idleLatch().await(5, TimeUnit.SECONDS)) {
throw IOException("Gave up waiting for queue to shut down")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class OkHttpClientTestRule : TestRule {

private fun ensureAllTaskQueuesIdle() {
for (queue in TaskRunner.INSTANCE.activeQueues()) {
assertThat(queue.awaitIdle(TimeUnit.MILLISECONDS.toNanos(1000L)))
assertThat(queue.idleLatch().await(1_000L, TimeUnit.MILLISECONDS))
.withFailMessage("Queue still active after 1000 ms")
.isTrue()
}
Expand Down
42 changes: 29 additions & 13 deletions okhttp/src/main/java/okhttp3/internal/concurrent/TaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package okhttp3.internal.concurrent
import okhttp3.internal.assertThreadDoesntHoldLock
import java.util.concurrent.CountDownLatch
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit

/**
* A set of tasks that are executed in sequential order.
Expand Down Expand Up @@ -101,25 +100,42 @@ class TaskQueue internal constructor(
}, delayNanos)
}

/** Returns true if this queue became idle before the timeout elapsed. */
fun awaitIdle(delayNanos: Long): Boolean {
val latch = CountDownLatch(1)
/** Returns a latch that reaches 0 when the queue is next idle. */
fun idleLatch(): CountDownLatch {
synchronized(taskRunner) {
// If the queue is already idle, that's easy.
if (activeTask == null && futureTasks.isEmpty()) {
return CountDownLatch(0)
}

val task = object : Task("OkHttp awaitIdle", cancelable = false) {
override fun runOnce(): Long {
latch.countDown()
return -1L
// If there's an existing AwaitIdleTask, use it. This is necessary when the executor is
// shutdown but still busy as we can't enqueue in that case.
val existingTask = activeTask
if (existingTask is AwaitIdleTask) {
return existingTask.latch
}
for (futureTask in futureTasks) {
if (futureTask is AwaitIdleTask) {
return futureTask.latch
}
}
}

// Don't delegate to schedule because that has to honor shutdown rules.
synchronized(taskRunner) {
if (scheduleAndDecide(task, 0L)) {
// Don't delegate to schedule() because that enforces shutdown rules.
val newTask = AwaitIdleTask()
if (scheduleAndDecide(newTask, 0L)) {
taskRunner.kickCoordinator(this)
}
return newTask.latch
}
}

return latch.await(delayNanos, TimeUnit.NANOSECONDS)
private class AwaitIdleTask : Task("OkHttp awaitIdle", cancelable = false) {
val latch = CountDownLatch(1)

override fun runOnce(): Long {
latch.countDown()
return -1L
}
}

/** Adds [task] to run in [delayNanos]. Returns true if the coordinator is impacted. */
Expand Down
4 changes: 2 additions & 2 deletions okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,14 @@ class RealWebSocket(
/** For testing: wait until the web socket's executor has terminated. */
@Throws(InterruptedException::class)
fun awaitTermination(timeout: Long, timeUnit: TimeUnit) {
taskQueue.awaitIdle(timeUnit.toNanos(timeout))
taskQueue.idleLatch().await(timeout, timeUnit)
}

/** For testing: force this web socket to release its threads. */
@Throws(InterruptedException::class)
fun tearDown() {
taskQueue.shutdown()
taskQueue.awaitIdle(TimeUnit.SECONDS.toNanos(10L))
taskQueue.idleLatch().await(10, TimeUnit.SECONDS)
}

@Synchronized fun sentPingCount(): Int = sentPingCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,22 @@ class TaskRunnerRealBackendTest {
return@schedule -1L
}

queue.awaitIdle(TimeUnit.MILLISECONDS.toNanos(500))
queue.idleLatch().await(500, TimeUnit.MILLISECONDS)

assertThat(log.take()).isEqualTo("failing task running")
assertThat(log.take()).isEqualTo("uncaught exception: java.lang.RuntimeException: boom!")
assertThat(log.take()).isEqualTo("normal task running")
assertThat(log).isEmpty()
}

@Test fun idleLatchAfterShutdown() {
queue.schedule("task") {
Thread.sleep(250)
backend.shutdown()
return@schedule -1L
}

assertThat(queue.idleLatch().await(500L, TimeUnit.MILLISECONDS)).isTrue()
assertThat(queue.idleLatch().count).isEqualTo(0)
}
}
24 changes: 24 additions & 0 deletions okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,30 @@ class TaskRunnerTest {
)
}

@Test fun idleLatch() {
redQueue.execute("task") {
log += "run@${taskFaker.nanoTime}"
}

val idleLatch = redQueue.idleLatch()
assertThat(idleLatch.count).isEqualTo(1)

taskFaker.advanceUntil(0.µs)
assertThat(log).containsExactly("run@0")

assertThat(idleLatch.count).isEqualTo(0)
}

@Test fun multipleCallsToIdleLatchReturnSameInstance() {
redQueue.execute("task") {
log += "run@${taskFaker.nanoTime}"
}

val idleLatch1 = redQueue.idleLatch()
val idleLatch2 = redQueue.idleLatch()
assertThat(idleLatch2).isSameAs(idleLatch1)
}

private val Int.µs: Long
get() = this * 1_000L
}

0 comments on commit edb5865

Please sign in to comment.