Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Refactor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
j-baker committed Jan 28, 2020
1 parent 4bcc3c4 commit d9ae980
Showing 1 changed file with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package com.palantir.common.concurrent;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

/**
* A supplier that coalesces computation requests, such that only one computation is ever running at a time, and
Expand Down Expand Up @@ -53,7 +55,7 @@ public T get() {

private final class Round {
private final AtomicBoolean hasStarted = new AtomicBoolean(false);
private final CompletableFuture<T> future = new CompletableFuture<>();
private final SettableFuture<T> future = SettableFuture.create();
private volatile Round next;

boolean isFirstToArrive() {
Expand All @@ -63,29 +65,38 @@ boolean isFirstToArrive() {

Round awaitDone() {
try {
future.join();
} catch (CompletionException e) {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
// ignore
}
return next;
}

void execute() {
synchronized (lock) {
next = new Round();
try {
future.complete(delegate.get());
} catch (Throwable t) {
future.completeExceptionally(t);
}
round = next;
next = new Round();
ListenableFuture<T> result = compute();
round = next;
future.setFuture(result);
}

private ListenableFuture<T> compute() {
try {
return Futures.immediateFuture(delegate.get());
} catch (Throwable t) {
return Futures.immediateFailedFuture(t);
}
}

T getResult() {
try {
return future.join();
} catch (CompletionException e) {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
}
}
Expand Down

0 comments on commit d9ae980

Please sign in to comment.