diff --git a/src/main/java/io/lettuce/core/internal/Futures.java b/src/main/java/io/lettuce/core/internal/Futures.java index 8c3fcf69f5..f8276f8b3b 100644 --- a/src/main/java/io/lettuce/core/internal/Futures.java +++ b/src/main/java/io/lettuce/core/internal/Futures.java @@ -12,6 +12,7 @@ * without further notice. * * @author Mark Paluch + * @author jinkshower * @since 5.1 */ public abstract class Futures { @@ -21,7 +22,7 @@ private Futures() { } /** - * Create a composite {@link CompletableFuture} is composed from the given {@code stages}. + * Create a composite {@link CompletableFuture} that is composed of the given {@code stages}. * * @param stages must not be {@code null}. * @return the composed {@link CompletableFuture}. @@ -32,10 +33,11 @@ public static CompletableFuture allOf(Collection stage : stages) { + for (CompletionStage stage : copies) { futures[index++] = stage.toCompletableFuture(); } diff --git a/src/test/java/io/lettuce/core/internal/FuturesUnitTests.java b/src/test/java/io/lettuce/core/internal/FuturesUnitTests.java index 1e3ec41789..d7c1868d44 100644 --- a/src/test/java/io/lettuce/core/internal/FuturesUnitTests.java +++ b/src/test/java/io/lettuce/core/internal/FuturesUnitTests.java @@ -4,7 +4,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -16,6 +22,7 @@ * Unit tests for {@link Futures}. * * @author Mark Paluch + * @author Tihomir Mateev */ class FuturesUnitTests { @@ -56,4 +63,33 @@ void awaitAllShouldSetInterruptedBit() { assertThat(Thread.currentThread().isInterrupted()).isTrue(); } + @Test + void allOfShouldNotThrow() throws InterruptedException { + int threadCount = 100; + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + List issues = new ArrayList<>(); + List> futures = Collections.synchronizedList(new ArrayList<>()); + // Submit multiple threads to perform concurrent operations + CountDownLatch latch = new CountDownLatch(threadCount); + for (int i = 0; i < threadCount; i++) { + executorService.submit(() -> { + try { + for (int y = 0; y < 1000; y++) { + futures.add(new CompletableFuture<>()); + } + + Futures.allOf(futures); + } catch (Exception e) { + issues.add(e); + } finally { + latch.countDown(); + } + }); + } + + // wait for all threads to complete + latch.await(); + assertThat(issues).doesNotHaveAnyElementsOfTypes(ArrayIndexOutOfBoundsException.class); + } + }