diff --git a/src/main/java/io/vavr/concurrent/FutureImpl.java b/src/main/java/io/vavr/concurrent/FutureImpl.java
index f3b82f252..3f17dd55e 100644
--- a/src/main/java/io/vavr/concurrent/FutureImpl.java
+++ b/src/main/java/io/vavr/concurrent/FutureImpl.java
@@ -34,6 +34,7 @@
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
+import java.util.function.Supplier;
/**
* INTERNAL API - This class is subject to change.
@@ -100,7 +101,12 @@ private FutureImpl(Executor executor, Option> value, Queue {
+ // Synchronize as the future could be in the process of cancelling
+ synchronized (lock) {
+ return isCompleted();
+ }
+ });
} catch (Throwable x) {
tryComplete(Try.failure(x));
}
@@ -115,7 +121,7 @@ private FutureImpl(Executor executor, Option> value, Queue FutureImpl of(Executor executor) {
- return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) -> {});
+ return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) -> {});
}
/**
@@ -127,7 +133,7 @@ static FutureImpl of(Executor executor) {
* @return a new {@code FutureImpl} instance
*/
static FutureImpl of(Executor executor, Try extends T> value) {
- return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread) -> {});
+ return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread, isCompleted) -> {});
}
/**
@@ -140,7 +146,7 @@ static FutureImpl of(Executor executor, Try extends T> value) {
* @return a new {@code FutureImpl} instance
*/
static FutureImpl sync(Executor executor, Task extends T> task) {
- return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) ->
+ return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) ->
task.run(complete::with)
);
}
@@ -156,8 +162,12 @@ static FutureImpl sync(Executor executor, Task extends T> task) {
*/
static FutureImpl async(Executor executor, Task extends T> task) {
// In a single-threaded context this Future may already have been completed during initialization.
- return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) ->
+ return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) ->
executor.execute(() -> {
+ // Avoid performing work, if future is already complete (normally by cancellation)
+ if (isCompleted.get()) {
+ return;
+ }
updateThread.run();
try {
task.run(complete::with);
@@ -414,6 +424,6 @@ private void handleUncaughtException(Throwable x) {
}
private interface Computation {
- void execute(Task.Complete complete, Runnable updateThread) throws Throwable;
+ void execute(Task.Complete complete, Runnable updateThread, Supplier isCompleted) throws Throwable;
}
}
diff --git a/src/test/java/io/vavr/concurrent/FutureTest.java b/src/test/java/io/vavr/concurrent/FutureTest.java
index 8b2a63356..73e98291e 100644
--- a/src/test/java/io/vavr/concurrent/FutureTest.java
+++ b/src/test/java/io/vavr/concurrent/FutureTest.java
@@ -54,6 +54,7 @@
import static io.vavr.concurrent.Concurrent.waitUntil;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static io.vavr.concurrent.Concurrent.zZz;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -146,6 +147,31 @@ protected int getPeekNonNilPerformingAnAction() {
// -- static failed()
+
+ @Test
+ public void shouldNotExecuteFutureThatHasBeenCancelledBeforeItStarted() throws InterruptedException {
+ ExecutorService es = Executors.newSingleThreadExecutor();
+
+ AtomicBoolean future2Executed = new AtomicBoolean(false);
+
+ // Submit f1 to the executor first
+ Future f = Future.run(es, () -> Thread.sleep(1000));
+ // Submit f2 next, it will have to wait to be executed
+ Future f2 = Future.run(es, () -> {
+ // Should never run this
+ future2Executed.set(true);
+ });
+
+ // Cancel f2 BEFORE it runs on the executor
+ f2.cancel(true);
+ f.cancel(true);
+ es.shutdown();
+ boolean terminated = es.awaitTermination(2, SECONDS);
+ assertThat(terminated).isTrue();
+ // f2 should never have run
+ assertThat(future2Executed.get()).isFalse();
+ }
+
@Test
public void shouldCreateFailureThatFailsWithRuntimeException() {
final Future