From 070851464f66a3214f67fb5b31ab230fc364145a Mon Sep 17 00:00:00 2001 From: James Baker Date: Tue, 14 Jan 2020 18:48:18 +0000 Subject: [PATCH] Better coalescing supplier (#4508) * Add generated changelog entries * Faster coalescing supplier This is a moderate improvement on the present coalescing supplier. Won't describe present state of the world, but the new implementation hopefully describes the algorithm better. For each round, either we are the first to arrive (execute and return) or we are not. In the case we are not, we await the current round ending and then perform the check again. If not this time, we wait for the executor to finish and return their result. The improvement is modest - with 16 threads looping and a task that takes 2ms (the benchmark) we see throughput of 6886 +- 73 operations per second. With this change, we see a result of 7232 +- 89 operations per second. While the change is minimal, the result is closer to optimal; 16 / 0.002 = 8000 as perfect (which we can never really achieve in such a benchmark). * Make more simple * Add generated changelog entries * Cleaner still * imports * Fix the bugs * Oops * Fixes * Make the benchmark parallel * Remove the benchmark * Add generated changelog entries * PR comment --- .../common/concurrent/CoalescingSupplier.java | 73 +++++++++++-------- changelog/@unreleased/pr-4508.v2.yml | 5 ++ 2 files changed, 46 insertions(+), 32 deletions(-) create mode 100644 changelog/@unreleased/pr-4508.v2.yml diff --git a/atlasdb-commons/src/main/java/com/palantir/common/concurrent/CoalescingSupplier.java b/atlasdb-commons/src/main/java/com/palantir/common/concurrent/CoalescingSupplier.java index 052bc06536d..d4ab56d6a34 100644 --- a/atlasdb-commons/src/main/java/com/palantir/common/concurrent/CoalescingSupplier.java +++ b/atlasdb-commons/src/main/java/com/palantir/common/concurrent/CoalescingSupplier.java @@ -17,8 +17,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import com.google.common.base.Throwables; @@ -29,10 +28,8 @@ * requested; requests will not receive results for computations that started prior to the request. */ public class CoalescingSupplier implements Supplier { - private final Supplier delegate; - private volatile CompletableFuture nextResult = new CompletableFuture(); - private final Lock fairLock = new ReentrantLock(true); + private volatile Round round = new Round(); public CoalescingSupplier(Supplier delegate) { this.delegate = delegate; @@ -40,41 +37,53 @@ public CoalescingSupplier(Supplier delegate) { @Override public T get() { - CompletableFuture future = nextResult; - - completeOrWaitForCompletion(future); - - return getResult(future); + Round present = round; + if (present.isFirstToArrive()) { + present.execute(); + return present.getResult(); + } + Round next = present.awaitDone(); + if (next.isFirstToArrive()) { + next.execute(); + } + return next.getResult(); } - private void completeOrWaitForCompletion(CompletableFuture future) { - fairLock.lock(); - try { - resetAndCompleteIfNotCompleted(future); - } finally { - fairLock.unlock(); + private final class Round { + private final AtomicBoolean hasStarted = new AtomicBoolean(false); + private final CompletableFuture future = new CompletableFuture<>(); + private volatile Round next; + + boolean isFirstToArrive() { + // adding the get benchmarks as faster, expected because compareAndSet forces an exclusive cache line + return !hasStarted.get() && hasStarted.compareAndSet(false, true); } - } - private void resetAndCompleteIfNotCompleted(CompletableFuture future) { - if (future.isDone()) { - return; + Round awaitDone() { + try { + future.join(); + } catch (CompletionException e) { + // ignore + } + return next; } - nextResult = new CompletableFuture(); - try { - future.complete(delegate.get()); - } catch (Throwable t) { - future.completeExceptionally(t); + void execute() { + next = new Round(); + try { + future.complete(delegate.get()); + } catch (Throwable t) { + future.completeExceptionally(t); + } + round = next; } - } - private T getResult(CompletableFuture future) { - try { - return future.getNow(null); - } catch (CompletionException ex) { - throw Throwables.propagate(ex.getCause()); + T getResult() { + try { + return future.join(); + } catch (CompletionException e) { + throw Throwables.propagate(e.getCause()); + } } } - } diff --git a/changelog/@unreleased/pr-4508.v2.yml b/changelog/@unreleased/pr-4508.v2.yml new file mode 100644 index 00000000000..de60c7932b2 --- /dev/null +++ b/changelog/@unreleased/pr-4508.v2.yml @@ -0,0 +1,5 @@ +type: improvement +improvement: + description: More predictable coalescing supplier + links: + - https://github.com/palantir/atlasdb/pull/4508