diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index df975fd2a3d..f28bb19deeb 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -193,8 +193,11 @@ class WatchdogStream extends StateCheckingResponseObserver private final ResponseObserver outerResponseObserver; private volatile StreamController innerController; + // When a stream is created it has automatic inbound flow control enabled. The stream + // won't wait for the caller to request a message. Setting the default to WAITING + // to reflect this state. @GuardedBy("lock") - private State state = State.IDLE; + private State state = State.WAITING; @GuardedBy("lock") private int pendingCount = 0; @@ -220,6 +223,16 @@ public void onStartImpl(StreamController controller) { public void disableAutoInboundFlowControl() { Preconditions.checkState( !hasStarted, "Can't disable automatic flow control after the stream has started"); + + // Adding the lock only to satisfy the annotation. It doesn't matter because before + // the stream is started, this is only accessed by the caller. + synchronized (lock) { + // When auto flow control is disabled, caller needs to call onRequest() to request a + // message. Setting the state to IDLE because now we're waiting for caller to call + // onRequest(). + state = State.IDLE; + } + autoAutoFlowControl = false; innerController.disableAutoInboundFlowControl(); } diff --git a/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index e20218452c6..d82fbeec49c 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -154,6 +154,31 @@ public void testTimedOutBeforeStart() throws InterruptedException { assertThat(error).isInstanceOf(WatchdogTimeoutException.class); } + @Test + public void testTimedOutBeforeResponse() throws InterruptedException { + MockServerStreamingCallable autoFlowControlCallable = + new MockServerStreamingCallable<>(); + AutoFlowControlObserver downstreamObserver = new AutoFlowControlObserver<>(); + + autoFlowControlCallable.call("request", watchdog.watch(downstreamObserver, waitTime, idleTime)); + MockServerStreamingCall call1 = autoFlowControlCallable.popLastCall(); + + clock.incrementNanoTime(idleTime.toNanos() + 1); + watchdog.run(); + assertThat(downstreamObserver.done.isDone()).isFalse(); + assertThat(call1.getController().isCancelled()).isTrue(); + call1.getController().getObserver().onError(new CancellationException("cancelled")); + + Throwable actualError = null; + try { + downstreamObserver.done.get(); + } catch (ExecutionException e) { + actualError = e.getCause(); + } + assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); + assertThat(actualError.getMessage()).contains("waiting for next response"); + } + @Test public void testMultiple() throws Exception { // Start stream1 @@ -310,4 +335,30 @@ public void onComplete() { done.set(null); } } + + static class AutoFlowControlObserver implements ResponseObserver { + SettableApiFuture controller = SettableApiFuture.create(); + Queue responses = Queues.newLinkedBlockingDeque(); + SettableApiFuture done = SettableApiFuture.create(); + + @Override + public void onStart(StreamController controller) { + this.controller.set(controller); + } + + @Override + public void onResponse(T response) { + responses.add(response); + } + + @Override + public void onError(Throwable t) { + done.setException(t); + } + + @Override + public void onComplete() { + done.set(null); + } + } }