diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index b575bcd3315..d8ee7a9eb1d 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -34,10 +34,11 @@ import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.LockSupport; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -627,32 +628,54 @@ public void onClose(Status status, Metadata trailers) { } } - private static final class ThreadlessExecutor implements Executor { + @SuppressWarnings("serial") + private static final class ThreadlessExecutor extends ConcurrentLinkedQueue + implements Executor { private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName()); - private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private volatile Thread waiter; // Non private to avoid synthetic class ThreadlessExecutor() {} /** * Waits until there is a Runnable, then executes it and all queued Runnables after it. + * Must only be called by one thread at a time. */ public void waitAndDrain() throws InterruptedException { - Runnable runnable = queue.take(); - while (runnable != null) { + final Thread currentThread = Thread.currentThread(); + throwIfInterrupted(currentThread); + Runnable runnable = poll(); + if (runnable == null) { + waiter = currentThread; + try { + while ((runnable = poll()) == null) { + LockSupport.park(this); + throwIfInterrupted(currentThread); + } + } finally { + waiter = null; + } + } + do { try { runnable.run(); } catch (Throwable t) { log.log(Level.WARNING, "Runnable threw exception", t); } - runnable = queue.poll(); + } while ((runnable = poll()) != null); + } + + private static void throwIfInterrupted(Thread currentThread) throws InterruptedException { + if (currentThread.isInterrupted()) { + throw new InterruptedException(); } } @Override public void execute(Runnable runnable) { - queue.add(runnable); + add(runnable); + LockSupport.unpark(waiter); // no-op if null } } }