This repository has been archived by the owner on Nov 14, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
Better coalescing supplier #4508
Merged
Merged
Changes from 5 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
1d01ca9
Add generated changelog entries
j-baker 7c45509
Faster coalescing supplier
j-baker ccf67f1
Make more simple
j-baker 129ffe7
Merge branch 'jbaker/faster_coalescing_supplier' of github.com:palant…
j-baker 04f70e6
Add generated changelog entries
j-baker 6740acc
Cleaner still
j-baker c34fb12
Merge branch 'jbaker/faster_coalescing_supplier' of github.com:palant…
j-baker a973bab
imports
j-baker e901538
Fix the bugs
j-baker f508977
Oops
j-baker 654ef39
Fixes
j-baker f1d89f3
Make the benchmark parallel
j-baker 8f46444
Remove the benchmark
j-baker 14c90c2
Add generated changelog entries
j-baker 9b509f8
Merge branch 'develop' into jbaker/faster_coalescing_supplier
j-baker fc24f04
Merge branch 'jbaker/faster_coalescing_supplier' of github.com:palant…
j-baker 8a3ab37
PR comment
j-baker File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,8 @@ | |
|
||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionException; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; | ||
import java.util.function.Supplier; | ||
|
||
import com.google.common.base.Throwables; | ||
|
@@ -29,52 +29,61 @@ | |
* requested; requests will not receive results for computations that started prior to the request. | ||
*/ | ||
public class CoalescingSupplier<T> implements Supplier<T> { | ||
|
||
private final Supplier<T> delegate; | ||
private volatile CompletableFuture<T> nextResult = new CompletableFuture<T>(); | ||
private final Lock fairLock = new ReentrantLock(true); | ||
private volatile Round nextResult = new Round(); | ||
|
||
public CoalescingSupplier(Supplier<T> delegate) { | ||
this.delegate = delegate; | ||
} | ||
|
||
@Override | ||
public T get() { | ||
CompletableFuture<T> future = nextResult; | ||
|
||
completeOrWaitForCompletion(future); | ||
|
||
return getResult(future); | ||
Round present = nextResult; | ||
if (present.isFirstToArrive()) { | ||
present.execute(); | ||
return present.getResult(); | ||
} | ||
awaitDone(present.future); | ||
Round next = present.next; | ||
if (next.isFirstToArrive()) { | ||
next.execute(); | ||
} | ||
return next.getResult(); | ||
} | ||
|
||
private void completeOrWaitForCompletion(CompletableFuture<T> future) { | ||
fairLock.lock(); | ||
try { | ||
resetAndCompleteIfNotCompleted(future); | ||
} finally { | ||
fairLock.unlock(); | ||
private final class Round { | ||
private final AtomicBoolean hasStarted = new AtomicBoolean(false); | ||
private final CompletableFuture<T> future = new CompletableFuture<>(); | ||
private volatile Round next; | ||
|
||
boolean isFirstToArrive() { | ||
return !hasStarted.get() && hasStarted.compareAndSet(false, true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is much faster than the equivalent 'hasStarted.compareAndSet(false, true)'. I believe this is because the proposed solution can do the read in a MESI shared state, whereas the CAS will always make it exclusive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. I think we should document this in a comment here. |
||
} | ||
} | ||
|
||
private void resetAndCompleteIfNotCompleted(CompletableFuture<T> future) { | ||
if (future.isDone()) { | ||
return; | ||
void execute() { | ||
next = new Round(); | ||
nextResult = next; | ||
try { | ||
future.complete(delegate.get()); | ||
} catch (Throwable t) { | ||
future.completeExceptionally(t); | ||
} | ||
} | ||
|
||
nextResult = new CompletableFuture<T>(); | ||
try { | ||
future.complete(delegate.get()); | ||
} catch (Throwable t) { | ||
future.completeExceptionally(t); | ||
T getResult() { | ||
try { | ||
return future.join(); | ||
} catch (CompletionException e) { | ||
throw Throwables.propagate(e.getCause()); | ||
} | ||
} | ||
} | ||
|
||
private T getResult(CompletableFuture<T> future) { | ||
private static void awaitDone(CompletableFuture<?> future) { | ||
try { | ||
return future.getNow(null); | ||
} catch (CompletionException ex) { | ||
throw Throwables.propagate(ex.getCause()); | ||
future.join(); | ||
} catch (CompletionException e) { | ||
// ignore | ||
} | ||
} | ||
|
||
} |
52 changes: 52 additions & 0 deletions
52
...rf/src/main/java/com/palantir/atlasdb/performance/benchmarks/CoalescingSupplierTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.atlasdb.performance.benchmarks; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Supplier; | ||
|
||
import org.openjdk.jmh.annotations.Benchmark; | ||
import org.openjdk.jmh.annotations.Fork; | ||
import org.openjdk.jmh.annotations.Measurement; | ||
import org.openjdk.jmh.annotations.Scope; | ||
import org.openjdk.jmh.annotations.State; | ||
import org.openjdk.jmh.annotations.Threads; | ||
import org.openjdk.jmh.annotations.Warmup; | ||
|
||
import com.palantir.common.concurrent.CoalescingSupplier; | ||
|
||
@State(Scope.Benchmark) | ||
public class CoalescingSupplierTests { | ||
private final Supplier<String> supplier = new CoalescingSupplier<>(() -> { | ||
try { | ||
Thread.sleep(2); | ||
return "result"; | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
|
||
@Benchmark | ||
@Threads(16) | ||
@Warmup(time = 1, timeUnit = TimeUnit.SECONDS, iterations = 4) | ||
@Measurement(time = 1, timeUnit = TimeUnit.SECONDS, iterations = 10) | ||
@Fork(1) | ||
public String benchmark() { | ||
return supplier.get(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
type: improvement | ||
improvement: | ||
description: Faster coalescing supplier | ||
links: | ||
- https://github.com/palantir/atlasdb/pull/4508 |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's unclear to me based on Java semantics whether this needs to be volatile or not. But I'm gonna leave it as such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not, since completing a future does happen before a join on that future. Not 100% confident in this one so agree with leaving it as such.