diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 84582c27d..5ab057fd6 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -49,7 +50,9 @@ import io.aiven.kafka.tieredstorage.fetch.FetchEnumeration; import io.aiven.kafka.tieredstorage.fetch.FetchManager; import io.aiven.kafka.tieredstorage.fetch.FetchManagerFactory; +import io.aiven.kafka.tieredstorage.fetch.FetchPart; import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException; +import io.aiven.kafka.tieredstorage.fetch.cache.FetchCache; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1; @@ -432,6 +435,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata); final var key = objectKey(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); final var fetchEnumeration = new FetchEnumeration(fetchManager, key, segmentManifest, range, partSize); + maybePreFetch(key, segmentManifest, fetchEnumeration); return fetchEnumeration.toInputStream(); } catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) { throw new RemoteResourceNotFoundException(e); @@ -440,6 +444,28 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme } } + private void maybePreFetch(final ObjectKey key, + final SegmentManifest segmentManifest, + final FetchEnumeration fetchEnumeration) { + if (fetchManager instanceof FetchCache) { + // when cache is available, prepare beforehand to avoid retries to fail on remote tiered fetch + // this also impacts latency as all fetches happen async without waiting for consumption to progress + final var nextParts = new LinkedHashSet(2); + final var parts = fetchEnumeration.parts().iterator(); + // Prefetch current and next part. + // Otherwise, whole segment would be pre-fetched with broker fetch request that includes start offset only. + // This is assuming parts are larger than fetch max bytes per partition on the consumer side, + // so when a consumer read finishes, and next batches are requested, the part is already prefetched. + int maxPartsToPrefetch = 2; + while (parts.hasNext() && maxPartsToPrefetch > 0) { + nextParts.add(parts.next()); + maxPartsToPrefetch--; + } + // only fetches parts not in cache already. non-blocking + ((FetchCache) fetchManager).prepareParts(key, segmentManifest, nextParts); + } + } + @Override public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final IndexType indexType) throws RemoteStorageException { diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java index 7cb3cc1ee..291e5ef88 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java @@ -20,10 +20,12 @@ import java.io.InputStream; import java.io.SequenceInputStream; import java.util.Enumeration; +import java.util.LinkedHashSet; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; +import java.util.Set; import io.aiven.kafka.tieredstorage.Chunk; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; @@ -102,6 +104,22 @@ private Chunk getLastChunk(final int endPosition) { } } + public Set parts() { + final var parts = new LinkedHashSet(); + var current = firstPart; + while (!current.equals(lastPart)) { + parts.add(current); + final var maybeNext = current.next(); + if (maybeNext.isPresent()) { + current = maybeNext.get(); + } else { + break; + } + } + parts.add(current); + return parts; + } + @Override public boolean hasMoreElements() { return !closed && currentPart.isPresent(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java index de3b66eb6..8f5600091 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -120,6 +121,25 @@ public InputStream fetchPartContent(final ObjectKey objectKey, } } + + public void prepareParts(final ObjectKey objectKey, + final SegmentManifest manifest, + final Set parts) { + for (final var part : parts) { + final FetchPartKey fetchPartKey = new FetchPartKey(objectKey.value(), part.range); + cache.asMap() + .computeIfAbsent(fetchPartKey, key -> CompletableFuture.supplyAsync(() -> { + statsCounter.recordMiss(); + try { + final InputStream partContent = fetchManager.fetchPartContent(objectKey, manifest, part); + return this.cachePartContent(fetchPartKey, partContent); + } catch (final StorageBackendException | IOException e) { + throw new CompletionException(e); + } + }, executor)); + } + } + public abstract InputStream readCachedPartContent(final T cachedChunk); public abstract T cachePartContent(final FetchPartKey fetchPartKey, final InputStream chunk) throws IOException; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java index 25d923edb..c9be3707b 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java @@ -108,14 +108,19 @@ void endPositionIsOutsideIndex() { @Test void shouldReturnRangeFromSingleChunk() throws StorageBackendException { // Given a set of 10 chunks with 10 bytes each - // When final int from = 32; final int to = 34; + final var part0 = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); + final var part1 = part0.next().get(); + final var part2 = part1.next().get(); + final var part3 = part2.next().get(); + // When final FetchEnumeration fetchEnumeration = new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1); when(chunkManager.fetchPartContent(eq(SEGMENT_KEY), eq(manifest), any())) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then + assertThat(fetchEnumeration.parts()).containsExactly(part3); assertThat(fetchEnumeration.firstChunk.id).isEqualTo(fetchEnumeration.lastChunk.id); assertThat(fetchEnumeration.nextElement()).hasContent("234"); assertThat(fetchEnumeration.hasMoreElements()).isFalse(); @@ -126,22 +131,24 @@ void shouldReturnRangeFromSingleChunk() throws StorageBackendException { @Test void shouldReturnRangeFromMultipleParts() throws StorageBackendException { // Given a set of 10 chunks with 10 bytes each - // When final int from = 15; final int to = 34; - final FetchEnumeration fetchEnumeration = - new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1); final var part0 = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); final var part1 = part0.next().get(); + final var part2 = part1.next().get(); + final var part3 = part2.next().get(); + // When + final FetchEnumeration fetchEnumeration = + new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1); + when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part1)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - final var part2 = part1.next().get(); when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part2)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - final var part3 = part2.next().get(); when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part3)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then + assertThat(fetchEnumeration.parts()).containsExactly(part1, part2, part3); assertThat(fetchEnumeration.firstChunk.id).isNotEqualTo(fetchEnumeration.lastChunk.id); assertThat(fetchEnumeration.nextElement()).hasContent("56789"); assertThat(fetchEnumeration.nextElement()).hasContent("0123456789"); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheTest.java index dd910d682..17205a96d 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheTest.java @@ -21,6 +21,7 @@ import java.io.InputStream; import java.time.Duration; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; @@ -210,6 +211,48 @@ void sizeBasedEviction() throws IOException, StorageBackendException { verify(fetchManager, times(3)).fetchPartContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any()); } + @Test + void preparingParts() throws Exception { + fetchCache.configure(Map.of( + "retention.ms", "-1", + "size", "-1" + )); + fetchCache.prepareParts(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, Set.of(firstPart, nextPart)); + await().pollInterval(Duration.ofMillis(5)).until(() -> fetchCache.statsCounter.snapshot().loadCount() == 2); + verify(fetchManager, times(2)).fetchPartContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any()); + + final InputStream cachedPart0 = + fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + assertThat(cachedPart0).hasBinaryContent(CHUNK_0); + verifyNoMoreInteractions(fetchManager); + + final InputStream cachedPart1 = + fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + assertThat(cachedPart1).hasBinaryContent(CHUNK_1); + verifyNoMoreInteractions(fetchManager); + } + + @Test + void preparingFirstPart() throws Exception { + fetchCache.configure(Map.of( + "retention.ms", "-1", + "size", "-1" + )); + fetchCache.prepareParts(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, Set.of(firstPart)); + await().pollInterval(Duration.ofMillis(5)) + .until(() -> fetchCache.statsCounter.snapshot().loadCount() == 1); + verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + + final InputStream cachedPart0 = + fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + assertThat(cachedPart0).hasBinaryContent(CHUNK_0); + verifyNoMoreInteractions(fetchManager); + + final InputStream cachedPart1 = + fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + assertThat(cachedPart1).hasBinaryContent(CHUNK_1); + verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + } } @Nested