diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index e2e0a11e6..3fae4dd30 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -30,6 +30,7 @@ + diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java index ee0371efc..543683431 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import io.aiven.kafka.tieredstorage.Chunk; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; @@ -43,13 +45,13 @@ public class FetchChunkEnumeration implements Enumeration { private final ChunkIndex chunkIndex; int currentChunkId; public boolean closed; + CompletableFuture nextChunk = null; /** - * - * @param chunkManager provides chunk input to fetch from + * @param chunkManager provides chunk input to fetch from * @param objectKeyPath required by chunkManager - * @param manifest provides to index to build response from - * @param range original offset range start/end position + * @param manifest provides to index to build response from + * @param range original offset range start/end position */ public FetchChunkEnumeration(final ChunkManager chunkManager, final String objectKeyPath, @@ -99,7 +101,17 @@ public InputStream nextElement() { throw new NoSuchElementException(); } - InputStream chunkContent = getChunkContent(currentChunkId); + InputStream chunkContent; + if (nextChunk == null) { + chunkContent = getChunkContent(currentChunkId); + } else { + try { // continue fetching + chunkContent = nextChunk.get(); + nextChunk = null; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } final Chunk currentChunk = chunkIndex.chunks().get(currentChunkId); final int chunkStartPosition = currentChunk.originalPosition; @@ -131,6 +143,12 @@ public InputStream nextElement() { } currentChunkId += 1; + + // eagerly fetching next chunk for caching + if (currentChunkId <= lastChunkId) { + nextChunk = CompletableFuture.supplyAsync(() -> getChunkContent(currentChunkId)); + } + return chunkContent; } @@ -147,6 +165,7 @@ public InputStream toInputStream() { } public void close() { + nextChunk = null; closed = true; } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java index 1749656c3..5dd529d92 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java @@ -133,7 +133,7 @@ private static class TestObjectFetcher implements ObjectFetcher { @Override public InputStream fetch(final String key) throws StorageBackendException { - throw new RuntimeException("Should not be called"); + throw new StorageBackendException("Should not be called"); } @Override @@ -162,7 +162,7 @@ public void assertAllStreamsWereClosed(final boolean readFully) throws IOExcepti verify(is).close(); } } else { - assertThat(openInputStreams).hasSize(1); + assertThat(openInputStreams).hasSizeBetween(1, 2); verify(openInputStreams.get(0)).close(); } }