Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Fix coalescing supplier
Browse files Browse the repository at this point in the history
Let's just take out a lock - and write a test that covers it. It's more
obviously correct.
  • Loading branch information
j-baker committed Jan 28, 2020
1 parent 451e0aa commit 7e600b7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*/
public class CoalescingSupplier<T> implements Supplier<T> {
private final Supplier<T> delegate;

private final Object lock = new Object();
private volatile Round round = new Round();

public CoalescingSupplier(Supplier<T> delegate) {
Expand Down Expand Up @@ -69,13 +71,15 @@ Round awaitDone() {
}

void execute() {
next = new Round();
try {
future.complete(delegate.get());
} catch (Throwable t) {
future.completeExceptionally(t);
synchronized (lock) {
next = new Round();
try {
future.complete(delegate.get());
} catch (Throwable t) {
future.completeExceptionally(t);
}
round = next;
}
round = next;
}

T getResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,26 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import static com.google.common.base.Preconditions.checkState;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.Before;
import org.junit.Test;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;

public class CoalescingSupplierTest {
Expand Down Expand Up @@ -114,6 +121,34 @@ public void exceptionsArePropagatedForCoalescedCalls() {
tasks.assertAllFailed(expected);
}

@Test
public void stressTest() {
int poolSize = 1024;
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(PTExecutors.newFixedThreadPool(poolSize));
AtomicLong counter = new AtomicLong(0);
Supplier<Long> supplier = new CoalescingSupplier<>(() -> {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return counter.incrementAndGet();
});
List<ListenableFuture<?>> futures = IntStream.range(0, poolSize)
.mapToObj(index -> executorService.submit(() -> {
long last = supplier.get();
for (int i = 0; i < 128; i++) {
long current = supplier.get();
checkState(current > last, "current > last");
last = current;
}
})).collect(Collectors.toList());
executorService.shutdown();
Futures.getUnchecked(Futures.allAsList(futures));
}

private AsyncTasks getConcurrently(int count) {
return AsyncTasks.runInParallel(supplier::get, count);
}
Expand Down

0 comments on commit 7e600b7

Please sign in to comment.