Skip to content

Commit

Permalink
Use CompletableFuture<Void> in Exchange interfaces
Browse files Browse the repository at this point in the history
CompletableFuture<Void> should be generally preferred. With a capturing
CompletableFuture<?> in the interface it is easy to mistakenly call
CompletableFuture#apply instead of CompletableFuture#handle and return
a CompletableFuture<CompletableFuture<?>> that will still match the
capture.
  • Loading branch information
arhimondr authored and losipiuk committed Feb 10, 2022
1 parent 2dba9e8 commit eeace27
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void destroy(OutputBuffers.OutputBufferId bufferId)
@Override
public ListenableFuture<Void> isFull()
{
return asVoid(toListenableFuture(exchangeSink.isBlocked()));
return toListenableFuture(exchangeSink.isBlocked());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,9 +807,9 @@ public ListenableFuture<Void> isBlocked()
return nonCancellationPropagating(asVoid(exchangeSourceFuture));
}
if (exchangeSource != null) {
CompletableFuture<?> blocked = exchangeSource.isBlocked();
CompletableFuture<Void> blocked = exchangeSource.isBlocked();
if (!blocked.isDone()) {
return nonCancellationPropagating(asVoid(toListenableFuture(blocked)));
return nonCancellationPropagating(toListenableFuture(blocked));
}
}
return immediateVoidFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static io.airlift.concurrent.MoreFutures.asVoid;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -491,7 +490,7 @@ public boolean isFinished()
@Override
public ListenableFuture<Void> isBlocked()
{
return asVoid(toListenableFuture(exchangeSource.isBlocked()));
return toListenableFuture(exchangeSource.isBlocked());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public ExchangeSourceSplitter split(ExchangeSourceHandle handle, long targetSize
return new ExchangeSourceSplitter()
{
@Override
public CompletableFuture<?> isBlocked()
public CompletableFuture<Void> isBlocked()
{
return completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public LocalFileSystemExchangeSink(Path outputDirectory, int outputPartitionCoun
}

@Override
public CompletableFuture<?> isBlocked()
public CompletableFuture<Void> isBlocked()
{
return NOT_BLOCKED;
}
Expand Down Expand Up @@ -111,7 +111,7 @@ public synchronized long getMemoryUsage()
}

@Override
public synchronized CompletableFuture<?> finish()
public synchronized CompletableFuture<Void> finish()
{
if (closed) {
return completedFuture(null);
Expand Down Expand Up @@ -143,7 +143,7 @@ public synchronized CompletableFuture<?> finish()
}

@Override
public synchronized CompletableFuture<?> abort()
public synchronized CompletableFuture<Void> abort()
{
if (closed) {
return completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public LocalFileSystemExchangeSource(List<Path> files)
}

@Override
public CompletableFuture<?> isBlocked()
public CompletableFuture<Void> isBlocked()
{
return NOT_BLOCKED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testIsFull()
assertEquals(outputBuffer.getState(), NO_MORE_BUFFERS);
assertNotBlocked(outputBuffer.isFull());

CompletableFuture<?> blocked = new CompletableFuture<>();
CompletableFuture<Void> blocked = new CompletableFuture<>();
exchangeSink.setBlocked(blocked);

ListenableFuture<Void> full = outputBuffer.isFull();
Expand All @@ -68,7 +68,7 @@ public void testIsFull()
public void testFinishSuccess()
{
TestingExchangeSink exchangeSink = new TestingExchangeSink();
CompletableFuture<?> finish = new CompletableFuture<>();
CompletableFuture<Void> finish = new CompletableFuture<>();
exchangeSink.setFinish(finish);

OutputBuffer outputBuffer = createSpoolingExchangeOutputBuffer(exchangeSink);
Expand All @@ -87,7 +87,7 @@ public void testFinishSuccess()
public void testFinishFailure()
{
TestingExchangeSink exchangeSink = new TestingExchangeSink();
CompletableFuture<?> finish = new CompletableFuture<>();
CompletableFuture<Void> finish = new CompletableFuture<>();
exchangeSink.setFinish(finish);

OutputBuffer outputBuffer = createSpoolingExchangeOutputBuffer(exchangeSink);
Expand All @@ -108,7 +108,7 @@ public void testFinishFailure()
public void testDestroyAfterFinishCompletion()
{
TestingExchangeSink exchangeSink = new TestingExchangeSink();
CompletableFuture<?> finish = new CompletableFuture<>();
CompletableFuture<Void> finish = new CompletableFuture<>();
exchangeSink.setFinish(finish);

OutputBuffer outputBuffer = createSpoolingExchangeOutputBuffer(exchangeSink);
Expand All @@ -130,7 +130,7 @@ public void testDestroyAfterFinishCompletion()
public void testDestroyBeforeFinishCompletion()
{
TestingExchangeSink exchangeSink = new TestingExchangeSink();
CompletableFuture<?> finish = new CompletableFuture<>();
CompletableFuture<Void> finish = new CompletableFuture<>();
exchangeSink.setFinish(finish);

OutputBuffer outputBuffer = createSpoolingExchangeOutputBuffer(exchangeSink);
Expand Down Expand Up @@ -164,9 +164,9 @@ public void testAbortBeforeNoMorePages()
public void testAbortBeforeFinishCompletion()
{
TestingExchangeSink exchangeSink = new TestingExchangeSink();
CompletableFuture<?> finish = new CompletableFuture<>();
CompletableFuture<Void> finish = new CompletableFuture<>();
exchangeSink.setFinish(finish);
CompletableFuture<?> abort = new CompletableFuture<>();
CompletableFuture<Void> abort = new CompletableFuture<>();
exchangeSink.setAbort(abort);

OutputBuffer outputBuffer = createSpoolingExchangeOutputBuffer(exchangeSink);
Expand All @@ -190,9 +190,9 @@ public void testAbortBeforeFinishCompletion()
public void testAbortAfterFinishCompletion()
{
TestingExchangeSink exchangeSink = new TestingExchangeSink();
CompletableFuture<?> finish = new CompletableFuture<>();
CompletableFuture<Void> finish = new CompletableFuture<>();
exchangeSink.setFinish(finish);
CompletableFuture<?> abort = new CompletableFuture<>();
CompletableFuture<Void> abort = new CompletableFuture<>();
exchangeSink.setAbort(abort);

OutputBuffer outputBuffer = createSpoolingExchangeOutputBuffer(exchangeSink);
Expand All @@ -219,7 +219,7 @@ public void testAbortAfterFinishCompletion()
public void testEnqueueAfterFinish()
{
TestingExchangeSink exchangeSink = new TestingExchangeSink();
CompletableFuture<?> finish = new CompletableFuture<>();
CompletableFuture<Void> finish = new CompletableFuture<>();
exchangeSink.setFinish(finish);

OutputBuffer outputBuffer = createSpoolingExchangeOutputBuffer(exchangeSink);
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testEnqueueAfterFinish()
public void testEnqueueAfterAbort()
{
TestingExchangeSink exchangeSink = new TestingExchangeSink();
CompletableFuture<?> abort = new CompletableFuture<>();
CompletableFuture<Void> abort = new CompletableFuture<>();
exchangeSink.setAbort(abort);

OutputBuffer outputBuffer = createSpoolingExchangeOutputBuffer(exchangeSink);
Expand Down Expand Up @@ -304,20 +304,20 @@ private static class TestingExchangeSink
implements ExchangeSink
{
private final ListMultimap<Integer, Slice> dataBuffer = ArrayListMultimap.create();
private CompletableFuture<?> blocked = CompletableFuture.completedFuture(null);
private CompletableFuture<?> finish = CompletableFuture.completedFuture(null);
private CompletableFuture<?> abort = CompletableFuture.completedFuture(null);
private CompletableFuture<Void> blocked = CompletableFuture.completedFuture(null);
private CompletableFuture<Void> finish = CompletableFuture.completedFuture(null);
private CompletableFuture<Void> abort = CompletableFuture.completedFuture(null);

private boolean finishCalled;
private boolean abortCalled;

@Override
public CompletableFuture<?> isBlocked()
public CompletableFuture<Void> isBlocked()
{
return blocked;
}

public void setBlocked(CompletableFuture<?> blocked)
public void setBlocked(CompletableFuture<Void> blocked)
{
this.blocked = requireNonNull(blocked, "blocked is null");
}
Expand All @@ -341,28 +341,28 @@ public long getMemoryUsage()
}

@Override
public CompletableFuture<?> finish()
public CompletableFuture<Void> finish()
{
assertFalse(abortCalled);
assertFalse(finishCalled);
finishCalled = true;
return finish;
}

public void setFinish(CompletableFuture<?> finish)
public void setFinish(CompletableFuture<Void> finish)
{
this.finish = requireNonNull(finish, "finish is null");
}

@Override
public CompletableFuture<?> abort()
public CompletableFuture<Void> abort()
{
assertFalse(abortCalled);
abortCalled = true;
return abort;
}

public void setAbort(CompletableFuture<?> abort)
public void setAbort(CompletableFuture<Void> abort)
{
this.abort = requireNonNull(abort, "abort is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public ExchangeSourceSplitter split(ExchangeSourceHandle handle, long targetSize
return new ExchangeSourceSplitter()
{
@Override
public CompletableFuture<?> isBlocked()
public CompletableFuture<Void> isBlocked()
{
return completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
@ThreadSafe
public interface ExchangeSink
{
CompletableFuture<?> NOT_BLOCKED = CompletableFuture.completedFuture(null);
CompletableFuture<Void> NOT_BLOCKED = CompletableFuture.completedFuture(null);

/**
* Returns a future that will be completed when the exchange sink becomes
* unblocked. If the exchange sink is not blocked, this method should return
* {@code NOT_BLOCKED}
*/
CompletableFuture<?> isBlocked();
CompletableFuture<Void> isBlocked();

/**
* Appends arbitrary {@code data} to a partition specified by {@code partitionId}.
Expand All @@ -53,7 +53,7 @@ public interface ExchangeSink
*
* @return future that will be resolved when the finish operation either succeeds or fails
*/
CompletableFuture<?> finish();
CompletableFuture<Void> finish();

/**
* Notifies the exchange that the write operation has been aborted.
Expand All @@ -63,5 +63,5 @@ public interface ExchangeSink
*
* @return future that will be resolved when the abort operation either succeeds or fails
*/
CompletableFuture<?> abort();
CompletableFuture<Void> abort();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
public interface ExchangeSource
extends Closeable
{
CompletableFuture<?> NOT_BLOCKED = CompletableFuture.completedFuture(null);
CompletableFuture<Void> NOT_BLOCKED = CompletableFuture.completedFuture(null);

/**
* Returns a future that will be completed when the exchange source becomes
* unblocked. If the exchange source is not blocked, this method should return
* {@code NOT_BLOCKED}
*/
CompletableFuture<?> isBlocked();
CompletableFuture<Void> isBlocked();

/**
* Once isFinished returns true, {@link #read()} will never return a non-null result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface ExchangeSourceSplitter
/**
* Returns a future that will be completed when the splitter becomes unblocked.
*/
CompletableFuture<?> isBlocked();
CompletableFuture<Void> isBlocked();

/**
* Returns next sub partition, or {@link Optional#empty()} if the splitting process is finished.
Expand Down

0 comments on commit eeace27

Please sign in to comment.