Skip to content

Commit

Permalink
chore: handle FlowControllerTest thread start variation by polling (#…
Browse files Browse the repository at this point in the history
…1586)

* chore: handle FlowControllerTest thread start variation by polling

* chore: add license to new files

* chore: use assertThrows and ensure original assertion is cause

* chore: simplify assertByPolling to method with 2 args

* chore: lint

* chore: fix javadoc
  • Loading branch information
burkedavison authored Apr 17, 2023
1 parent ca4de24 commit 0019c92
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2023 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.google.common.base.Stopwatch;
import java.time.Duration;
import java.util.Objects;

/**
* Blocks the current thread to poll the given assertion every 10ms until it's successful or the
* timeout is exceeded. Expected usage:
*
* <pre>{@code
* assertByPolling(Duration.ofSeconds(2), () -> assertThat(...));
* }</pre>
*/
public class AssertByPolling {

public static void assertByPolling(Duration timeout, Runnable assertion)
throws InterruptedException {
Objects.requireNonNull(timeout, "Timeout must not be null");
Stopwatch stopwatch = Stopwatch.createStarted();
while (true) {
try {
assertion.run();
return; // Success

} catch (AssertionError err) {
if (stopwatch.elapsed(MILLISECONDS) < timeout.toMillis()) {
MILLISECONDS.sleep(10);
} else {
throw new AssertionError("Timeout waiting for successful assertion.", err);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2023 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import static com.google.api.gax.batching.AssertByPolling.assertByPolling;

import com.google.common.truth.Truth;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;

public class AssertByPollingTest {

@Test
public void testFailsWhenTimeoutExceeded() {
AssertionError error =
Assert.assertThrows(
AssertionError.class,
() -> assertByPolling(Duration.ofNanos(2), () -> Truth.assertThat(1).isAtLeast(2)));

Throwable cause = error.getCause();
Truth.assertThat(cause).isInstanceOf(AssertionError.class);
// Error provides original assertion failure that never came true.
Truth.assertThat(cause.getMessage()).contains("expected to be at least");
}

@Test
public void testImmediateSuccessSucceedsRegardlessOfTimeout() throws InterruptedException {
Runnable succeedsAfter1ms =
() -> {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
};
Duration timeout = Duration.ofNanos(0);
assertByPolling(timeout, succeedsAfter1ms);
}

@Test
public void testSucceedsThirdTime() throws InterruptedException {
AtomicInteger attempt = new AtomicInteger(1);
AtomicInteger numFailures = new AtomicInteger(0);
Runnable succeedsThirdTime =
() -> {
if (attempt.getAndIncrement() < 3) {
numFailures.incrementAndGet();
Assert.fail();
}
};

Duration timeout = Duration.ofMillis(100);
assertByPolling(timeout, succeedsThirdTime);
Truth.assertThat(numFailures.get()).isEqualTo(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.batching;

import static com.google.api.gax.batching.AssertByPolling.assertByPolling;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -41,6 +42,7 @@
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.common.util.concurrent.SettableFuture;
import java.lang.Thread.State;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -522,10 +524,10 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
final AtomicInteger totalDecreased = new AtomicInteger(0);
final AtomicInteger releasedCounter = new AtomicInteger(0);

List<Future> reserveThreads =
List<Future<?>> reserveThreads =
testConcurrentUpdates(
flowController, 100, 100, 10, totalIncreased, totalDecreased, releasedCounter);
for (Future t : reserveThreads) {
for (Future<?> t : reserveThreads) {
t.get(200, TimeUnit.MILLISECONDS);
}
assertEquals(reserveThreads.size(), releasedCounter.get());
Expand Down Expand Up @@ -555,10 +557,10 @@ public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
AtomicInteger totalIncreased = new AtomicInteger(0);
AtomicInteger totalDecreased = new AtomicInteger(0);
AtomicInteger releasedCounter = new AtomicInteger(0);
List<Future> reserveThreads =
List<Future<?>> reserveThreads =
testConcurrentUpdates(
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
for (Future t : reserveThreads) {
for (Future<?> t : reserveThreads) {
t.get(200, TimeUnit.MILLISECONDS);
}
assertEquals(reserveThreads.size(), releasedCounter.get());
Expand Down Expand Up @@ -596,19 +598,18 @@ public void testNumberOfBytesOutOfBoundaryWontDeadlock() throws Exception {
// will be blocked by reserve 10
Thread t =
new Thread(
new Runnable() {
@Override
public void run() {
try {
flowController.reserve(0, 100);
} catch (FlowControlException e) {
}
() -> {
try {
flowController.reserve(0, 100);
} catch (FlowControlException e) {
throw new AssertionError(e);
}
});
t.start();

// wait for thread to start, and check it should be blocked
Thread.sleep(50);
assertEquals(State.WAITING, t.getState());
assertByPolling(Duration.ofMillis(200), () -> assertEquals(State.WAITING, t.getState()));

// increase and decrease should not be blocked
int increase = 5, decrease = 20;
flowController.decreaseThresholds(0, decrease);
Expand Down Expand Up @@ -641,19 +642,18 @@ public void testElementCountsOutOfBoundaryWontDeadlock() throws Exception {
.build());
Thread t =
new Thread(
new Runnable() {
@Override
public void run() {
try {
flowController.reserve(initial + 10, 10);
} catch (FlowControlException e) {
}
() -> {
try {
flowController.reserve(initial + 10, 10);
} catch (FlowControlException e) {
throw new AssertionError(e);
}
});
t.start();

// wait for thread to start, and check it should be blocked
Thread.sleep(50);
assertEquals(State.WAITING, t.getState());
assertByPolling(Duration.ofMillis(200), () -> assertEquals(State.WAITING, t.getState()));

// increase and decrease should not be blocked
int increase = 5, decrease = 20;
flowController.decreaseThresholds(decrease, 0);
Expand Down Expand Up @@ -734,7 +734,7 @@ public void run() {
.isAtLeast(currentTime);
}

private List<Future> testConcurrentUpdates(
private List<Future<?>> testConcurrentUpdates(
final FlowController flowController,
final int increaseStepRange,
final int decreaseStepRange,
Expand Down Expand Up @@ -774,16 +774,16 @@ public void run() {
}
}
};
List<Future> updateFuture = new ArrayList<>();
List<Future> reserveReleaseFuture = new ArrayList<>();
List<Future<?>> updateFuture = new ArrayList<>();
List<Future<?>> reserveReleaseFuture = new ArrayList<>();
ExecutorService executors = Executors.newFixedThreadPool(10);
ExecutorService reserveExecutor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
updateFuture.add(executors.submit(increaseRunnable));
updateFuture.add(executors.submit(decreaseRunnable));
reserveReleaseFuture.add(reserveExecutor.submit(reserveReleaseRunnable));
}
for (Future t : updateFuture) {
for (Future<?> t : updateFuture) {
t.get(50, TimeUnit.MILLISECONDS);
}
executors.shutdown();
Expand Down

0 comments on commit 0019c92

Please sign in to comment.