From 625f7abcde0a3dc2ea3a78c3ef7b88648ac1d69e Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 23 Feb 2024 18:21:59 -0500 Subject: [PATCH] fix: Fix watchdog to start with WAITING state (#2468) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Watchdog should start with WAITING state, and only switch to `idle` if auto flow control was disabled. Before the fix, when auto flow control was enabled, we wait for server to return a response without calling `onRequest()` and watchdog would report the timeout exception because of idle timeout, which is incorrect and causes confusion. Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/gapic-generator-java/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #2498 ☕️ --------- Co-authored-by: Igor Bernstein Co-authored-by: Lawrence Qiu --- .../java/com/google/api/gax/rpc/Watchdog.java | 15 +++++- .../com/google/api/gax/rpc/WatchdogTest.java | 51 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index df975fd2a3d..f28bb19deeb 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -193,8 +193,11 @@ class WatchdogStream extends StateCheckingResponseObserver private final ResponseObserver outerResponseObserver; private volatile StreamController innerController; + // When a stream is created it has automatic inbound flow control enabled. The stream + // won't wait for the caller to request a message. Setting the default to WAITING + // to reflect this state. @GuardedBy("lock") - private State state = State.IDLE; + private State state = State.WAITING; @GuardedBy("lock") private int pendingCount = 0; @@ -220,6 +223,16 @@ public void onStartImpl(StreamController controller) { public void disableAutoInboundFlowControl() { Preconditions.checkState( !hasStarted, "Can't disable automatic flow control after the stream has started"); + + // Adding the lock only to satisfy the annotation. It doesn't matter because before + // the stream is started, this is only accessed by the caller. + synchronized (lock) { + // When auto flow control is disabled, caller needs to call onRequest() to request a + // message. Setting the state to IDLE because now we're waiting for caller to call + // onRequest(). + state = State.IDLE; + } + autoAutoFlowControl = false; innerController.disableAutoInboundFlowControl(); } diff --git a/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index e20218452c6..d82fbeec49c 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -154,6 +154,31 @@ public void testTimedOutBeforeStart() throws InterruptedException { assertThat(error).isInstanceOf(WatchdogTimeoutException.class); } + @Test + public void testTimedOutBeforeResponse() throws InterruptedException { + MockServerStreamingCallable autoFlowControlCallable = + new MockServerStreamingCallable<>(); + AutoFlowControlObserver downstreamObserver = new AutoFlowControlObserver<>(); + + autoFlowControlCallable.call("request", watchdog.watch(downstreamObserver, waitTime, idleTime)); + MockServerStreamingCall call1 = autoFlowControlCallable.popLastCall(); + + clock.incrementNanoTime(idleTime.toNanos() + 1); + watchdog.run(); + assertThat(downstreamObserver.done.isDone()).isFalse(); + assertThat(call1.getController().isCancelled()).isTrue(); + call1.getController().getObserver().onError(new CancellationException("cancelled")); + + Throwable actualError = null; + try { + downstreamObserver.done.get(); + } catch (ExecutionException e) { + actualError = e.getCause(); + } + assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); + assertThat(actualError.getMessage()).contains("waiting for next response"); + } + @Test public void testMultiple() throws Exception { // Start stream1 @@ -310,4 +335,30 @@ public void onComplete() { done.set(null); } } + + static class AutoFlowControlObserver implements ResponseObserver { + SettableApiFuture controller = SettableApiFuture.create(); + Queue responses = Queues.newLinkedBlockingDeque(); + SettableApiFuture done = SettableApiFuture.create(); + + @Override + public void onStart(StreamController controller) { + this.controller.set(controller); + } + + @Override + public void onResponse(T response) { + responses.add(response); + } + + @Override + public void onError(Throwable t) { + done.setException(t); + } + + @Override + public void onComplete() { + done.set(null); + } + } }