-
Notifications
You must be signed in to change notification settings - Fork 15
Better coalescing supplier #4508
Changes from all commits
1d01ca9
7c45509
ccf67f1
129ffe7
04f70e6
6740acc
c34fb12
a973bab
e901538
f508977
654ef39
f1d89f3
8f46444
14c90c2
9b509f8
fc24f04
8a3ab37
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,7 @@ | |
|
||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionException; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Supplier; | ||
|
||
import com.google.common.base.Throwables; | ||
|
@@ -29,52 +28,62 @@ | |
* requested; requests will not receive results for computations that started prior to the request. | ||
*/ | ||
public class CoalescingSupplier<T> implements Supplier<T> { | ||
|
||
private final Supplier<T> delegate; | ||
private volatile CompletableFuture<T> nextResult = new CompletableFuture<T>(); | ||
private final Lock fairLock = new ReentrantLock(true); | ||
private volatile Round round = new Round(); | ||
|
||
public CoalescingSupplier(Supplier<T> delegate) { | ||
this.delegate = delegate; | ||
} | ||
|
||
@Override | ||
public T get() { | ||
CompletableFuture<T> future = nextResult; | ||
|
||
completeOrWaitForCompletion(future); | ||
|
||
return getResult(future); | ||
Round present = round; | ||
if (present.isFirstToArrive()) { | ||
present.execute(); | ||
return present.getResult(); | ||
} | ||
Round next = present.awaitDone(); | ||
if (next.isFirstToArrive()) { | ||
next.execute(); | ||
} | ||
return next.getResult(); | ||
} | ||
|
||
private void completeOrWaitForCompletion(CompletableFuture<T> future) { | ||
fairLock.lock(); | ||
try { | ||
resetAndCompleteIfNotCompleted(future); | ||
} finally { | ||
fairLock.unlock(); | ||
private final class Round { | ||
private final AtomicBoolean hasStarted = new AtomicBoolean(false); | ||
private final CompletableFuture<T> future = new CompletableFuture<>(); | ||
private volatile Round next; | ||
|
||
boolean isFirstToArrive() { | ||
// adding the get benchmarks as faster, expected because compareAndSet forces an exclusive cache line | ||
return !hasStarted.get() && hasStarted.compareAndSet(false, true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is much faster than the equivalent 'hasStarted.compareAndSet(false, true)'. I believe this is because the proposed solution can do the read in a MESI shared state, whereas the CAS will always make it exclusive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. I think we should document this in a comment here. |
||
} | ||
} | ||
|
||
private void resetAndCompleteIfNotCompleted(CompletableFuture<T> future) { | ||
if (future.isDone()) { | ||
return; | ||
Round awaitDone() { | ||
try { | ||
future.join(); | ||
} catch (CompletionException e) { | ||
// ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we eating this and not at least logging at lower level? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's intended behaviour - we're literally just awaiting the conclusion of the future There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we just want it to be complete. We shouldn't do anything with the exception because it's never relevant in this method. |
||
} | ||
return next; | ||
} | ||
|
||
nextResult = new CompletableFuture<T>(); | ||
try { | ||
future.complete(delegate.get()); | ||
} catch (Throwable t) { | ||
future.completeExceptionally(t); | ||
void execute() { | ||
next = new Round(); | ||
try { | ||
future.complete(delegate.get()); | ||
} catch (Throwable t) { | ||
future.completeExceptionally(t); | ||
} | ||
round = next; | ||
} | ||
} | ||
|
||
private T getResult(CompletableFuture<T> future) { | ||
try { | ||
return future.getNow(null); | ||
} catch (CompletionException ex) { | ||
throw Throwables.propagate(ex.getCause()); | ||
T getResult() { | ||
try { | ||
return future.join(); | ||
} catch (CompletionException e) { | ||
throw Throwables.propagate(e.getCause()); | ||
} | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
type: improvement | ||
improvement: | ||
description: More predictable coalescing supplier | ||
links: | ||
- https://github.com/palantir/atlasdb/pull/4508 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's unclear to me based on Java semantics whether this needs to be volatile or not. But I'm gonna leave it as such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not, since completing a future does happen before a join on that future. Not 100% confident in this one so agree with leaving it as such.