diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPolling.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPolling.java new file mode 100644 index 0000000000..d392a7393d --- /dev/null +++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPolling.java @@ -0,0 +1,66 @@ +/* + * Copyright 2023 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import com.google.common.base.Stopwatch; +import java.time.Duration; +import java.util.Objects; + +/** + * Blocks the current thread to poll the given assertion every 10ms until it's successful or the + * timeout is exceeded. Expected usage: + * + *
{@code
+ * assertByPolling(Duration.ofSeconds(2), () -> assertThat(...));
+ * }
+ */ +public class AssertByPolling { + + public static void assertByPolling(Duration timeout, Runnable assertion) + throws InterruptedException { + Objects.requireNonNull(timeout, "Timeout must not be null"); + Stopwatch stopwatch = Stopwatch.createStarted(); + while (true) { + try { + assertion.run(); + return; // Success + + } catch (AssertionError err) { + if (stopwatch.elapsed(MILLISECONDS) < timeout.toMillis()) { + MILLISECONDS.sleep(10); + } else { + throw new AssertionError("Timeout waiting for successful assertion.", err); + } + } + } + } +} diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPollingTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPollingTest.java new file mode 100644 index 0000000000..eabf5ede24 --- /dev/null +++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/AssertByPollingTest.java @@ -0,0 +1,86 @@ +/* + * Copyright 2023 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import static com.google.api.gax.batching.AssertByPolling.assertByPolling; + +import com.google.common.truth.Truth; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Test; + +public class AssertByPollingTest { + + @Test + public void testFailsWhenTimeoutExceeded() { + AssertionError error = + Assert.assertThrows( + AssertionError.class, + () -> assertByPolling(Duration.ofNanos(2), () -> Truth.assertThat(1).isAtLeast(2))); + + Throwable cause = error.getCause(); + Truth.assertThat(cause).isInstanceOf(AssertionError.class); + // Error provides original assertion failure that never came true. + Truth.assertThat(cause.getMessage()).contains("expected to be at least"); + } + + @Test + public void testImmediateSuccessSucceedsRegardlessOfTimeout() throws InterruptedException { + Runnable succeedsAfter1ms = + () -> { + try { + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + }; + Duration timeout = Duration.ofNanos(0); + assertByPolling(timeout, succeedsAfter1ms); + } + + @Test + public void testSucceedsThirdTime() throws InterruptedException { + AtomicInteger attempt = new AtomicInteger(1); + AtomicInteger numFailures = new AtomicInteger(0); + Runnable succeedsThirdTime = + () -> { + if (attempt.getAndIncrement() < 3) { + numFailures.incrementAndGet(); + Assert.fail(); + } + }; + + Duration timeout = Duration.ofMillis(100); + assertByPolling(timeout, succeedsThirdTime); + Truth.assertThat(numFailures.get()).isEqualTo(2); + } +} diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java index cb82d28db3..534ad3c137 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java @@ -29,6 +29,7 @@ */ package com.google.api.gax.batching; +import static com.google.api.gax.batching.AssertByPolling.assertByPolling; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -41,6 +42,7 @@ import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.common.util.concurrent.SettableFuture; import java.lang.Thread.State; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -522,10 +524,10 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception { final AtomicInteger totalDecreased = new AtomicInteger(0); final AtomicInteger releasedCounter = new AtomicInteger(0); - List reserveThreads = + List> reserveThreads = testConcurrentUpdates( flowController, 100, 100, 10, totalIncreased, totalDecreased, releasedCounter); - for (Future t : reserveThreads) { + for (Future t : reserveThreads) { t.get(200, TimeUnit.MILLISECONDS); } assertEquals(reserveThreads.size(), releasedCounter.get()); @@ -555,10 +557,10 @@ public void testConcurrentUpdateThresholds_nonBlocking() throws Exception { AtomicInteger totalIncreased = new AtomicInteger(0); AtomicInteger totalDecreased = new AtomicInteger(0); AtomicInteger releasedCounter = new AtomicInteger(0); - List reserveThreads = + List> reserveThreads = testConcurrentUpdates( flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter); - for (Future t : reserveThreads) { + for (Future t : reserveThreads) { t.get(200, TimeUnit.MILLISECONDS); } assertEquals(reserveThreads.size(), releasedCounter.get()); @@ -596,19 +598,18 @@ public void testNumberOfBytesOutOfBoundaryWontDeadlock() throws Exception { // will be blocked by reserve 10 Thread t = new Thread( - new Runnable() { - @Override - public void run() { - try { - flowController.reserve(0, 100); - } catch (FlowControlException e) { - } + () -> { + try { + flowController.reserve(0, 100); + } catch (FlowControlException e) { + throw new AssertionError(e); } }); t.start(); + // wait for thread to start, and check it should be blocked - Thread.sleep(50); - assertEquals(State.WAITING, t.getState()); + assertByPolling(Duration.ofMillis(200), () -> assertEquals(State.WAITING, t.getState())); + // increase and decrease should not be blocked int increase = 5, decrease = 20; flowController.decreaseThresholds(0, decrease); @@ -641,19 +642,18 @@ public void testElementCountsOutOfBoundaryWontDeadlock() throws Exception { .build()); Thread t = new Thread( - new Runnable() { - @Override - public void run() { - try { - flowController.reserve(initial + 10, 10); - } catch (FlowControlException e) { - } + () -> { + try { + flowController.reserve(initial + 10, 10); + } catch (FlowControlException e) { + throw new AssertionError(e); } }); t.start(); + // wait for thread to start, and check it should be blocked - Thread.sleep(50); - assertEquals(State.WAITING, t.getState()); + assertByPolling(Duration.ofMillis(200), () -> assertEquals(State.WAITING, t.getState())); + // increase and decrease should not be blocked int increase = 5, decrease = 20; flowController.decreaseThresholds(decrease, 0); @@ -734,7 +734,7 @@ public void run() { .isAtLeast(currentTime); } - private List testConcurrentUpdates( + private List> testConcurrentUpdates( final FlowController flowController, final int increaseStepRange, final int decreaseStepRange, @@ -774,8 +774,8 @@ public void run() { } } }; - List updateFuture = new ArrayList<>(); - List reserveReleaseFuture = new ArrayList<>(); + List> updateFuture = new ArrayList<>(); + List> reserveReleaseFuture = new ArrayList<>(); ExecutorService executors = Executors.newFixedThreadPool(10); ExecutorService reserveExecutor = Executors.newFixedThreadPool(10); for (int i = 0; i < 5; i++) { @@ -783,7 +783,7 @@ public void run() { updateFuture.add(executors.submit(decreaseRunnable)); reserveReleaseFuture.add(reserveExecutor.submit(reserveReleaseRunnable)); } - for (Future t : updateFuture) { + for (Future t : updateFuture) { t.get(50, TimeUnit.MILLISECONDS); } executors.shutdown();