Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Better coalescing supplier #4508

Merged
merged 17 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import com.google.common.base.Throwables;
Expand All @@ -29,52 +28,61 @@
* requested; requests will not receive results for computations that started prior to the request.
*/
public class CoalescingSupplier<T> implements Supplier<T> {

private final Supplier<T> delegate;
private volatile CompletableFuture<T> nextResult = new CompletableFuture<T>();
private final Lock fairLock = new ReentrantLock(true);
private volatile Round nextResult = new Round();

public CoalescingSupplier(Supplier<T> delegate) {
this.delegate = delegate;
}

@Override
public T get() {
CompletableFuture<T> future = nextResult;

completeOrWaitForCompletion(future);

return getResult(future);
Round present = nextResult;
if (present.isFirstToArrive()) {
present.execute();
return present.getResult();
}
Round next = present.awaitDone();
if (next.isFirstToArrive()) {
next.execute();
}
return next.getResult();
}

private void completeOrWaitForCompletion(CompletableFuture<T> future) {
fairLock.lock();
try {
resetAndCompleteIfNotCompleted(future);
} finally {
fairLock.unlock();
private final class Round {
private final AtomicBoolean hasStarted = new AtomicBoolean(false);
private final CompletableFuture<T> future = new CompletableFuture<>();
private volatile Round next;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's unclear to me based on Java semantics whether this needs to be volatile or not. But I'm gonna leave it as such.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not, since completing a future does happen before a join on that future. Not 100% confident in this one so agree with leaving it as such.


boolean isFirstToArrive() {
return !hasStarted.get() && hasStarted.compareAndSet(false, true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much faster than the equivalent 'hasStarted.compareAndSet(false, true)'. I believe this is because the proposed solution can do the read in a MESI shared state, whereas the CAS will always make it exclusive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I think we should document this in a comment here.

}
}

private void resetAndCompleteIfNotCompleted(CompletableFuture<T> future) {
if (future.isDone()) {
return;
Round awaitDone() {
try {
future.join();
} catch (CompletionException e) {
// ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we eating this and not at least logging at lower level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's intended behaviour - we're literally just awaiting the conclusion of the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we just want it to be complete. We shouldn't do anything with the exception because it's never relevant in this method.

}
return next;
}

nextResult = new CompletableFuture<T>();
try {
future.complete(delegate.get());
} catch (Throwable t) {
future.completeExceptionally(t);
void execute() {
try {
future.complete(delegate.get());
} catch (Throwable t) {
future.completeExceptionally(t);
}
next = new Round();
nextResult = next;
}
}

private T getResult(CompletableFuture<T> future) {
try {
return future.getNow(null);
} catch (CompletionException ex) {
throw Throwables.propagate(ex.getCause());
T getResult() {
try {
return future.join();
} catch (CompletionException e) {
throw Throwables.propagate(e.getCause());
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.performance.benchmarks;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

import com.palantir.common.concurrent.CoalescingSupplier;

@State(Scope.Benchmark)
public class CoalescingSupplierBenchmark {
private final Supplier<String> supplier = new CoalescingSupplier<>(() -> {
try {
Thread.sleep(2);
return "result";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});

@Benchmark
@Threads(16)
@Warmup(time = 1, timeUnit = TimeUnit.SECONDS, iterations = 4)
@Measurement(time = 1, timeUnit = TimeUnit.SECONDS, iterations = 10)
@Fork(1)
public String benchmark() {
return supplier.get();
}
}
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-4508.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Faster coalescing supplier
links:
- https://github.com/palantir/atlasdb/pull/4508