Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Better coalescing supplier (#4508)
Browse files Browse the repository at this point in the history
* Add generated changelog entries

* Faster coalescing supplier

This is a moderate improvement on the present coalescing supplier.
Won't describe present state of the world, but the new implementation
hopefully describes the algorithm better.

For each round, either we are the first to arrive (execute and return)
or we are not.

In the case we are not, we await the current round ending and then
perform the check again. If not this time, we wait for the executor to
finish and return their result.

The improvement is modest - with 16 threads looping and a task that takes
2ms (the benchmark) we see throughput of 6886 +- 73 operations per
second. With this change, we see a result of 7232 +- 89 operations per
second.

While the change is minimal, the result is closer to optimal;
16 / 0.002 = 8000 as perfect (which we can never really achieve in such
a benchmark).

* Make more simple

* Add generated changelog entries

* Cleaner still

* imports

* Fix the bugs

* Oops

* Fixes

* Make the benchmark parallel

* Remove the benchmark

* Add generated changelog entries

* PR comment
  • Loading branch information
j-baker authored and jeremyk-91 committed Jan 14, 2020
1 parent 49fc1e4 commit 0708514
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import com.google.common.base.Throwables;
Expand All @@ -29,52 +28,62 @@
* requested; requests will not receive results for computations that started prior to the request.
*/
public class CoalescingSupplier<T> implements Supplier<T> {

private final Supplier<T> delegate;
private volatile CompletableFuture<T> nextResult = new CompletableFuture<T>();
private final Lock fairLock = new ReentrantLock(true);
private volatile Round round = new Round();

public CoalescingSupplier(Supplier<T> delegate) {
this.delegate = delegate;
}

@Override
public T get() {
CompletableFuture<T> future = nextResult;

completeOrWaitForCompletion(future);

return getResult(future);
Round present = round;
if (present.isFirstToArrive()) {
present.execute();
return present.getResult();
}
Round next = present.awaitDone();
if (next.isFirstToArrive()) {
next.execute();
}
return next.getResult();
}

private void completeOrWaitForCompletion(CompletableFuture<T> future) {
fairLock.lock();
try {
resetAndCompleteIfNotCompleted(future);
} finally {
fairLock.unlock();
private final class Round {
private final AtomicBoolean hasStarted = new AtomicBoolean(false);
private final CompletableFuture<T> future = new CompletableFuture<>();
private volatile Round next;

boolean isFirstToArrive() {
// adding the get benchmarks as faster, expected because compareAndSet forces an exclusive cache line
return !hasStarted.get() && hasStarted.compareAndSet(false, true);
}
}

private void resetAndCompleteIfNotCompleted(CompletableFuture<T> future) {
if (future.isDone()) {
return;
Round awaitDone() {
try {
future.join();
} catch (CompletionException e) {
// ignore
}
return next;
}

nextResult = new CompletableFuture<T>();
try {
future.complete(delegate.get());
} catch (Throwable t) {
future.completeExceptionally(t);
void execute() {
next = new Round();
try {
future.complete(delegate.get());
} catch (Throwable t) {
future.completeExceptionally(t);
}
round = next;
}
}

private T getResult(CompletableFuture<T> future) {
try {
return future.getNow(null);
} catch (CompletionException ex) {
throw Throwables.propagate(ex.getCause());
T getResult() {
try {
return future.join();
} catch (CompletionException e) {
throw Throwables.propagate(e.getCause());
}
}
}

}
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-4508.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: More predictable coalescing supplier
links:
- https://github.com/palantir/atlasdb/pull/4508

0 comments on commit 0708514

Please sign in to comment.