From ca94a952295cfe0ad721f318000f5e68e5aed0e6 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 2 Oct 2023 11:12:30 +0300 Subject: [PATCH] feat: pre-fetch parts Parts are going to be fetched into cache asynchronously when fetch request is received, so if fetch request timeout before remote tier fetch completes, the next one should hit the cached value and make progress. --- .../tieredstorage/RemoteStorageManager.java | 32 ++++++++++++++-- .../tieredstorage/fetch/FetchEnumeration.java | 18 +++++++++ .../tieredstorage/fetch/cache/FetchCache.java | 20 ++++++++++ .../fetch/FetchEnumerationTest.java | 19 +++++++--- .../fetch/cache/FetchCacheTest.java | 38 +++++++++++++++++++ 5 files changed, 118 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index cc5657299..47aa713bf 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -23,6 +23,7 @@ import java.nio.file.Files; import java.security.KeyPair; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -45,7 +46,9 @@ import io.aiven.kafka.tieredstorage.fetch.FetchEnumeration; import io.aiven.kafka.tieredstorage.fetch.FetchManager; import io.aiven.kafka.tieredstorage.fetch.FetchManagerFactory; +import io.aiven.kafka.tieredstorage.fetch.FetchPart; import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException; +import io.aiven.kafka.tieredstorage.fetch.cache.FetchCache; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; @@ -383,10 +386,11 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata); final var suffix = ObjectKeyFactory.Suffix.LOG; - final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix); + final var key = objectKey(remoteLogSegmentMetadata, suffix); - return new FetchEnumeration(fetchManager, segmentKey, segmentManifest, range, partSize) - .toInputStream(); + final var fetchEnumeration = new FetchEnumeration(fetchManager, key, segmentManifest, range, partSize); + maybePreFetch(key, segmentManifest, fetchEnumeration); + return fetchEnumeration.toInputStream(); } catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) { throw new RemoteResourceNotFoundException(e); } catch (final Exception e) { @@ -394,6 +398,28 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme } } + private void maybePreFetch(final ObjectKey key, + final SegmentManifest segmentManifest, + final FetchEnumeration fetchEnumeration) { + if (fetchManager instanceof FetchCache) { + // when cache is available, prepare beforehand to avoid retries to fail on remote tiered fetch + // this also impacts latency as all fetches happen async without waiting for consumption to progress + final var nextParts = new LinkedHashSet(2); + final var parts = fetchEnumeration.parts().iterator(); + // Prefetch current and next part. + // Otherwise, whole segment would be pre-fetched with broker fetch request that includes start offset only. + // This is assuming parts are larger than fetch max bytes per partition on the consumer side, + // so when a consumer read finishes, and next batches are requested, the part is already prefetched. + int maxPartsToPrefetch = 2; + while (parts.hasNext() && maxPartsToPrefetch > 0) { + nextParts.add(parts.next()); + maxPartsToPrefetch--; + } + // only fetches parts not in cache already. non-blocking + ((FetchCache) fetchManager).prepareParts(key, segmentManifest, nextParts); + } + } + @Override public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final IndexType indexType) throws RemoteStorageException { diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java index 5f084daf4..9b220cdef 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java @@ -20,10 +20,12 @@ import java.io.InputStream; import java.io.SequenceInputStream; import java.util.Enumeration; +import java.util.LinkedHashSet; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; +import java.util.Set; import io.aiven.kafka.tieredstorage.Chunk; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; @@ -102,6 +104,22 @@ private Chunk getLastChunk(final int endPosition) { } } + public Set parts() { + final var parts = new LinkedHashSet(); + var current = firstPart; + while (!current.equals(lastPart)) { + parts.add(current); + final var maybeNext = current.next(); + if (maybeNext.isPresent()) { + current = maybeNext.get(); + } else { + break; + } + } + parts.add(current); + return parts; + } + @Override public boolean hasMoreElements() { return !closed && currentPart.isPresent(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java index ce97fb46b..d0c0ed1e1 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -120,6 +121,25 @@ public InputStream partContent(final ObjectKey objectKey, } } + + public void prepareParts(final ObjectKey objectKey, + final SegmentManifest manifest, + final Set parts) { + for (final var part : parts) { + final FetchPartKey fetchPartKey = new FetchPartKey(objectKey.value(), part.range); + cache.asMap() + .computeIfAbsent(fetchPartKey, key -> CompletableFuture.supplyAsync(() -> { + statsCounter.recordMiss(); + try { + final InputStream partContent = fetchManager.partContent(objectKey, manifest, part); + return this.cachePartContent(fetchPartKey, partContent); + } catch (final StorageBackendException | IOException e) { + throw new CompletionException(e); + } + }, executor)); + } + } + public abstract InputStream readCachedPartContent(final T cachedChunk); public abstract T cachePartContent(final FetchPartKey fetchPartKey, final InputStream chunk) throws IOException; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java index 60c34d54f..d025f752c 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java @@ -97,14 +97,19 @@ void endPositionIsOutsideIndex() { @Test void shouldReturnRangeFromSingleChunk() throws StorageBackendException { // Given a set of 10 chunks with 10 bytes each - // When final int from = 32; final int to = 34; + final var part0 = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); + final var part1 = part0.next().get(); + final var part2 = part1.next().get(); + final var part3 = part2.next().get(); + // When final FetchEnumeration fetchEnumeration = new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1); when(chunkManager.partContent(eq(SEGMENT_KEY), eq(manifest), any())) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then + assertThat(fetchEnumeration.parts()).containsExactly(part3); assertThat(fetchEnumeration.firstChunk.id).isEqualTo(fetchEnumeration.lastChunk.id); assertThat(fetchEnumeration.nextElement()).hasContent("234"); assertThat(fetchEnumeration.hasMoreElements()).isFalse(); @@ -115,22 +120,24 @@ void shouldReturnRangeFromSingleChunk() throws StorageBackendException { @Test void shouldReturnRangeFromMultipleParts() throws StorageBackendException { // Given a set of 10 chunks with 10 bytes each - // When final int from = 15; final int to = 34; - final FetchEnumeration fetchEnumeration = - new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1); final var part0 = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); final var part1 = part0.next().get(); + final var part2 = part1.next().get(); + final var part3 = part2.next().get(); + // When + final FetchEnumeration fetchEnumeration = + new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1); + when(chunkManager.partContent(SEGMENT_KEY, manifest, part1)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - final var part2 = part1.next().get(); when(chunkManager.partContent(SEGMENT_KEY, manifest, part2)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - final var part3 = part2.next().get(); when(chunkManager.partContent(SEGMENT_KEY, manifest, part3)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then + assertThat(fetchEnumeration.parts()).containsExactly(part1, part2, part3); assertThat(fetchEnumeration.firstChunk.id).isNotEqualTo(fetchEnumeration.lastChunk.id); assertThat(fetchEnumeration.nextElement()).hasContent("56789"); assertThat(fetchEnumeration.nextElement()).hasContent("0123456789"); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheTest.java index a83f77afb..1922a9a3d 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheTest.java @@ -21,6 +21,7 @@ import java.io.InputStream; import java.time.Duration; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import io.aiven.kafka.tieredstorage.fetch.FetchManager; @@ -196,6 +197,43 @@ void sizeBasedEviction() throws IOException, StorageBackendException { verify(fetchManager, times(3)).partContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any()); } + @Test + void preparingParts() throws Exception { + fetchCache.configure(Map.of( + "retention.ms", "-1", + "size", "-1" + )); + fetchCache.prepareParts(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, Set.of(firstPart, nextPart)); + await().pollInterval(Duration.ofMillis(5)).until(() -> fetchCache.statsCounter.snapshot().loadCount() == 2); + verify(fetchManager, times(2)).partContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any()); + + final InputStream cachedPart0 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + assertThat(cachedPart0).hasBinaryContent(CHUNK_0); + verifyNoMoreInteractions(fetchManager); + + final InputStream cachedPart1 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + assertThat(cachedPart1).hasBinaryContent(CHUNK_1); + verifyNoMoreInteractions(fetchManager); + } + + @Test + void preparingFirstPart() throws Exception { + fetchCache.configure(Map.of( + "retention.ms", "-1", + "size", "-1" + )); + fetchCache.prepareParts(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, Set.of(firstPart)); + await().pollInterval(Duration.ofMillis(5)).until(() -> fetchCache.statsCounter.snapshot().loadCount() == 1); + verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + + final InputStream cachedPart0 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + assertThat(cachedPart0).hasBinaryContent(CHUNK_0); + verifyNoMoreInteractions(fetchManager); + + final InputStream cachedPart1 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + assertThat(cachedPart1).hasBinaryContent(CHUNK_1); + verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + } } @Nested