Skip to content

Commit

Permalink
Exit the watchdog thread after being idle for 60 seconds.
Browse files Browse the repository at this point in the history
This should make it possible for people doing class unloading to
unload class loaders containing Okio.

Closes #107
  • Loading branch information
squarejesse committed Aug 22, 2016
1 parent 838a215 commit 9e99b2d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 57 deletions.
122 changes: 66 additions & 56 deletions okio/src/main/java/okio/AsyncTimeout.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,45 @@

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.TimeUnit;

import static okio.Util.checkOffsetAndCount;

/**
* This timeout uses a background thread to take action exactly when the timeout
* occurs. Use this to implement timeouts where they aren't supported natively,
* such as to sockets that are blocked on writing.
* This timeout uses a background thread to take action exactly when the timeout occurs. Use this to
* implement timeouts where they aren't supported natively, such as to sockets that are blocked on
* writing.
*
* <p>Subclasses should override {@link #timedOut} to take action when a timeout
* occurs. This method will be invoked by the shared watchdog thread so it
* should not do any long-running operations. Otherwise we risk starving other
* timeouts from being triggered.
* <p>Subclasses should override {@link #timedOut} to take action when a timeout occurs. This method
* will be invoked by the shared watchdog thread so it should not do any long-running operations.
* Otherwise we risk starving other timeouts from being triggered.
*
* <p>Use {@link #sink} and {@link #source} to apply this timeout to a stream.
* The returned value will apply the timeout to each operation on the wrapped
* stream.
* <p>Use {@link #sink} and {@link #source} to apply this timeout to a stream. The returned value
* will apply the timeout to each operation on the wrapped stream.
*
* <p>Callers should call {@link #enter} before doing work that is subject to
* timeouts, and {@link #exit} afterwards. The return value of {@link #exit}
* indicates whether a timeout was triggered. Note that the call to {@link
* #timedOut} is asynchronous, and may be called after {@link #exit}.
* <p>Callers should call {@link #enter} before doing work that is subject to timeouts, and {@link
* #exit} afterwards. The return value of {@link #exit} indicates whether a timeout was triggered.
* Note that the call to {@link #timedOut} is asynchronous, and may be called after {@link #exit}.
*/
public class AsyncTimeout extends Timeout {
/**
* Don't write more than 64 KiB of data at a time, give or take a segment.
* Otherwise slow connections may suffer timeouts even when they're making
* (slow) progress. Without this, writing a single 1 MiB buffer may never
* succeed on a sufficiently slow connection.
* Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow
* connections may suffer timeouts even when they're making (slow) progress. Without this, writing
* a single 1 MiB buffer may never succeed on a sufficiently slow connection.
*/
private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;

/** Duration for the watchdog thread to be idle before it shuts itself down. */
private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
private static final long IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS);

/**
* The watchdog thread processes a linked list of pending timeouts, sorted in
* the order to be triggered. This class synchronizes on AsyncTimeout.class.
* This lock guards the queue.
* The watchdog thread processes a linked list of pending timeouts, sorted in the order to be
* triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue.
*
* <p>Head's 'next' points to the first element of the linked list. The first
* element is the next node to time out, or null if the queue is empty. The
* head is null until the watchdog thread is started.
* <p>Head's 'next' points to the first element of the linked list. The first element is the next
* node to time out, or null if the queue is empty. The head is null until the watchdog thread is
* started and also after being idle for {@link #IDLE_TIMEOUT_MILLIS}.
*/
private static AsyncTimeout head;

Expand Down Expand Up @@ -137,24 +137,23 @@ private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
}

/**
* Returns the amount of time left until the time out. This will be negative
* if the timeout has elapsed and the timeout should occur immediately.
* Returns the amount of time left until the time out. This will be negative if the timeout has
* elapsed and the timeout should occur immediately.
*/
private long remainingNanos(long now) {
return timeoutAt - now;
}

/**
* Invoked by the watchdog thread when the time between calls to {@link
* #enter()} and {@link #exit()} has exceeded the timeout.
* Invoked by the watchdog thread when the time between calls to {@link #enter()} and {@link
* #exit()} has exceeded the timeout.
*/
protected void timedOut() {
}

/**
* Returns a new sink that delegates to {@code sink}, using this to implement
* timeouts. This works best if {@link #timedOut} is overridden to interrupt
* {@code sink}'s current operation.
* Returns a new sink that delegates to {@code sink}, using this to implement timeouts. This works
* best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation.
*/
public final Sink sink(final Sink sink) {
return new Sink() {
Expand Down Expand Up @@ -225,9 +224,8 @@ public final Sink sink(final Sink sink) {
}

/**
* Returns a new source that delegates to {@code source}, using this to
* implement timeouts. This works best if {@link #timedOut} is overridden to
* interrupt {@code sink}'s current operation.
* Returns a new source that delegates to {@code source}, using this to implement timeouts. This
* works best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation.
*/
public final Source source(final Source source) {
return new Source() {
Expand Down Expand Up @@ -268,30 +266,28 @@ public final Source source(final Source source) {
}

/**
* Throws an IOException if {@code throwOnTimeout} is {@code true} and a
* timeout occurred. See {@link #newTimeoutException(java.io.IOException)}
* for the type of exception thrown.
* Throws an IOException if {@code throwOnTimeout} is {@code true} and a timeout occurred. See
* {@link #newTimeoutException(java.io.IOException)} for the type of exception thrown.
*/
final void exit(boolean throwOnTimeout) throws IOException {
boolean timedOut = exit();
if (timedOut && throwOnTimeout) throw newTimeoutException(null);
}

/**
* Returns either {@code cause} or an IOException that's caused by
* {@code cause} if a timeout occurred. See
* {@link #newTimeoutException(java.io.IOException)} for the type of
* exception returned.
* Returns either {@code cause} or an IOException that's caused by {@code cause} if a timeout
* occurred. See {@link #newTimeoutException(java.io.IOException)} for the type of exception
* returned.
*/
final IOException exit(IOException cause) throws IOException {
if (!exit()) return cause;
return newTimeoutException(cause);
}

/**
* Returns an {@link IOException} to represent a timeout. By default this method returns
* {@link java.io.InterruptedIOException}. If {@code cause} is non-null it is set as the cause of
* the returned exception.
* Returns an {@link IOException} to represent a timeout. By default this method returns {@link
* java.io.InterruptedIOException}. If {@code cause} is non-null it is set as the cause of the
* returned exception.
*/
protected IOException newTimeoutException(IOException cause) {
InterruptedIOException e = new InterruptedIOException("timeout");
Expand All @@ -310,10 +306,20 @@ public Watchdog() {
public void run() {
while (true) {
try {
AsyncTimeout timedOut = awaitTimeout();

// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();

// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;

// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}

// Close the timed out node.
timedOut.timedOut();
Expand All @@ -324,19 +330,23 @@ public void run() {
}

/**
* Removes and returns the node at the head of the list, waiting for it to
* time out if necessary. Returns null if the situation changes while waiting:
* either a newer node is inserted at the head, or the node being waited on
* has been removed.
* Removes and returns the node at the head of the list, waiting for it to time out if necessary.
* This returns {@link #head} if there was no node at the head of the list when starting, and
* there continues to be no node after waiting {@code IDLE_TIMEOUT_NANOS}. It returns null if a
* new node was inserted while waiting. Otherwise this returns the node being waited on that has
* been removed.
*/
static synchronized AsyncTimeout awaitTimeout() throws InterruptedException {
static AsyncTimeout awaitTimeout() throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next;

// The queue is empty. Wait for something to be enqueued.
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
AsyncTimeout.class.wait();
return null;
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}

long waitNanos = node.remainingNanos(System.nanoTime());
Expand Down
2 changes: 1 addition & 1 deletion okio/src/test/java/okio/AsyncTimeoutTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* 1000ms, named 'a', 'b', 'c' and 'd'.
*/
public final class AsyncTimeoutTest {
private final List<Timeout> timedOut = new CopyOnWriteArrayList<Timeout>();
private final List<Timeout> timedOut = new CopyOnWriteArrayList<>();
private final AsyncTimeout a = new RecordingAsyncTimeout();
private final AsyncTimeout b = new RecordingAsyncTimeout();
private final AsyncTimeout c = new RecordingAsyncTimeout();
Expand Down

0 comments on commit 9e99b2d

Please sign in to comment.