From 4624a50f9ed6c8fcf24d1b40f877402378f1f5d3 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 12 Feb 2024 17:27:33 -0500 Subject: [PATCH 1/4] fix: Fix watchdog to start with WAITING state --- .../java/com/google/api/gax/rpc/Watchdog.java | 13 ++++- .../com/google/api/gax/rpc/WatchdogTest.java | 51 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index df975fd2a3..8e14df68cc 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -194,7 +194,7 @@ class WatchdogStream extends StateCheckingResponseObserver private volatile StreamController innerController; @GuardedBy("lock") - private State state = State.IDLE; + private State state = State.WAITING; @GuardedBy("lock") private int pendingCount = 0; @@ -220,6 +220,17 @@ public void onStartImpl(StreamController controller) { public void disableAutoInboundFlowControl() { Preconditions.checkState( !hasStarted, "Can't disable automatic flow control after the stream has started"); + + synchronized (lock) { + // WatchdogStream is created with waiting state. If auto flow control is disabled, + // set the state to be IDLE, which will be updated to WAITING when onRequest() is + // called. + if (state == State.WAITING) { + state = State.IDLE; + lastActivityAt = clock.millisTime(); + } + } + autoAutoFlowControl = false; innerController.disableAutoInboundFlowControl(); } diff --git a/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index e20218452c..d82fbeec49 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -154,6 +154,31 @@ public void testTimedOutBeforeStart() throws InterruptedException { assertThat(error).isInstanceOf(WatchdogTimeoutException.class); } + @Test + public void testTimedOutBeforeResponse() throws InterruptedException { + MockServerStreamingCallable autoFlowControlCallable = + new MockServerStreamingCallable<>(); + AutoFlowControlObserver downstreamObserver = new AutoFlowControlObserver<>(); + + autoFlowControlCallable.call("request", watchdog.watch(downstreamObserver, waitTime, idleTime)); + MockServerStreamingCall call1 = autoFlowControlCallable.popLastCall(); + + clock.incrementNanoTime(idleTime.toNanos() + 1); + watchdog.run(); + assertThat(downstreamObserver.done.isDone()).isFalse(); + assertThat(call1.getController().isCancelled()).isTrue(); + call1.getController().getObserver().onError(new CancellationException("cancelled")); + + Throwable actualError = null; + try { + downstreamObserver.done.get(); + } catch (ExecutionException e) { + actualError = e.getCause(); + } + assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); + assertThat(actualError.getMessage()).contains("waiting for next response"); + } + @Test public void testMultiple() throws Exception { // Start stream1 @@ -310,4 +335,30 @@ public void onComplete() { done.set(null); } } + + static class AutoFlowControlObserver implements ResponseObserver { + SettableApiFuture controller = SettableApiFuture.create(); + Queue responses = Queues.newLinkedBlockingDeque(); + SettableApiFuture done = SettableApiFuture.create(); + + @Override + public void onStart(StreamController controller) { + this.controller.set(controller); + } + + @Override + public void onResponse(T response) { + responses.add(response); + } + + @Override + public void onError(Throwable t) { + done.setException(t); + } + + @Override + public void onComplete() { + done.set(null); + } + } } From fc950691cfc2a777e9427e8eba76470f87b37ac9 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 13 Feb 2024 10:53:07 -0500 Subject: [PATCH 2/4] fix test --- gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java | 1 - 1 file changed, 1 deletion(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index 8e14df68cc..e5216e1979 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -227,7 +227,6 @@ public void disableAutoInboundFlowControl() { // called. if (state == State.WAITING) { state = State.IDLE; - lastActivityAt = clock.millisTime(); } } From aca70617c59fd52654226feb4b6ed4d6ff98319e Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 16 Feb 2024 13:40:53 -0500 Subject: [PATCH 3/4] address comments --- .../main/java/com/google/api/gax/rpc/Watchdog.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index e5216e1979..4e86373d1d 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -221,14 +221,10 @@ public void disableAutoInboundFlowControl() { Preconditions.checkState( !hasStarted, "Can't disable automatic flow control after the stream has started"); - synchronized (lock) { - // WatchdogStream is created with waiting state. If auto flow control is disabled, - // set the state to be IDLE, which will be updated to WAITING when onRequest() is - // called. - if (state == State.WAITING) { - state = State.IDLE; - } - } + // WatchdogStream is created with waiting state. If auto flow control is disabled, + // set the state to be IDLE, which will be updated to WAITING when onRequest() is + // called. + state = State.IDLE; autoAutoFlowControl = false; innerController.disableAutoInboundFlowControl(); From b0600695c12bbe4686acc08147dde72e8d70ef47 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 16 Feb 2024 14:11:46 -0500 Subject: [PATCH 4/4] update javadoc --- .../java/com/google/api/gax/rpc/Watchdog.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index 4e86373d1d..f28bb19dee 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -193,6 +193,9 @@ class WatchdogStream extends StateCheckingResponseObserver private final ResponseObserver outerResponseObserver; private volatile StreamController innerController; + // When a stream is created it has automatic inbound flow control enabled. The stream + // won't wait for the caller to request a message. Setting the default to WAITING + // to reflect this state. @GuardedBy("lock") private State state = State.WAITING; @@ -221,10 +224,14 @@ public void disableAutoInboundFlowControl() { Preconditions.checkState( !hasStarted, "Can't disable automatic flow control after the stream has started"); - // WatchdogStream is created with waiting state. If auto flow control is disabled, - // set the state to be IDLE, which will be updated to WAITING when onRequest() is - // called. - state = State.IDLE; + // Adding the lock only to satisfy the annotation. It doesn't matter because before + // the stream is started, this is only accessed by the caller. + synchronized (lock) { + // When auto flow control is disabled, caller needs to call onRequest() to request a + // message. Setting the state to IDLE because now we're waiting for caller to call + // onRequest(). + state = State.IDLE; + } autoAutoFlowControl = false; innerController.disableAutoInboundFlowControl();