Skip to content

Commit

Permalink
feat: pre-fetch parts
Browse files Browse the repository at this point in the history
Parts are going to be fetched into cache asynchronously when fetch request is received, so if fetch request timeout before remote tier fetch completes, the next one should hit the cached value and make progress.
  • Loading branch information
jeqo committed Oct 18, 2023
1 parent 3dbedec commit 00715a7
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -49,7 +50,9 @@
import io.aiven.kafka.tieredstorage.fetch.FetchEnumeration;
import io.aiven.kafka.tieredstorage.fetch.FetchManager;
import io.aiven.kafka.tieredstorage.fetch.FetchManagerFactory;
import io.aiven.kafka.tieredstorage.fetch.FetchPart;
import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException;
import io.aiven.kafka.tieredstorage.fetch.cache.FetchCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1;
Expand Down Expand Up @@ -432,6 +435,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata);
final var key = objectKey(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG);
final var fetchEnumeration = new FetchEnumeration(fetchManager, key, segmentManifest, range, partSize);
maybePreFetch(key, segmentManifest, fetchEnumeration);
return fetchEnumeration.toInputStream();
} catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) {
throw new RemoteResourceNotFoundException(e);
Expand All @@ -440,6 +444,28 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
}
}

private void maybePreFetch(final ObjectKey key,
final SegmentManifest segmentManifest,
final FetchEnumeration fetchEnumeration) {
if (fetchManager instanceof FetchCache) {
// when cache is available, prepare beforehand to avoid retries to fail on remote tiered fetch
// this also impacts latency as all fetches happen async without waiting for consumption to progress
final var nextParts = new LinkedHashSet<FetchPart>(2);
final var parts = fetchEnumeration.parts().iterator();
// Prefetch current and next part.
// Otherwise, whole segment would be pre-fetched with broker fetch request that includes start offset only.
// This is assuming parts are larger than fetch max bytes per partition on the consumer side,
// so when a consumer read finishes, and next batches are requested, the part is already prefetched.
int maxPartsToPrefetch = 2;
while (parts.hasNext() && maxPartsToPrefetch > 0) {
nextParts.add(parts.next());
maxPartsToPrefetch--;
}
// only fetches parts not in cache already. non-blocking
((FetchCache<?>) fetchManager).prepareParts(key, segmentManifest, nextParts);
}
}

@Override
public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final IndexType indexType) throws RemoteStorageException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Enumeration;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
Expand Down Expand Up @@ -102,6 +104,22 @@ private Chunk getLastChunk(final int endPosition) {
}
}

public Set<FetchPart> parts() {
final var parts = new LinkedHashSet<FetchPart>();
var current = firstPart;
while (!current.equals(lastPart)) {
parts.add(current);
final var maybeNext = current.next();
if (maybeNext.isPresent()) {
current = maybeNext.get();
} else {
break;
}
}
parts.add(current);
return parts;
}

@Override
public boolean hasMoreElements() {
return !closed && currentPart.isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -120,6 +121,25 @@ public InputStream fetchPartContent(final ObjectKey objectKey,
}
}


public void prepareParts(final ObjectKey objectKey,
final SegmentManifest manifest,
final Set<FetchPart> parts) {
for (final var part : parts) {
final FetchPartKey fetchPartKey = new FetchPartKey(objectKey.value(), part.range);
cache.asMap()
.computeIfAbsent(fetchPartKey, key -> CompletableFuture.supplyAsync(() -> {
statsCounter.recordMiss();
try {
final InputStream partContent = fetchManager.fetchPartContent(objectKey, manifest, part);
return this.cachePartContent(fetchPartKey, partContent);
} catch (final StorageBackendException | IOException e) {
throw new CompletionException(e);
}
}, executor));
}
}

public abstract InputStream readCachedPartContent(final T cachedChunk);

public abstract T cachePartContent(final FetchPartKey fetchPartKey, final InputStream chunk) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,19 @@ void endPositionIsOutsideIndex() {
@Test
void shouldReturnRangeFromSingleChunk() throws StorageBackendException {
// Given a set of 10 chunks with 10 bytes each
// When
final int from = 32;
final int to = 34;
final var part0 = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1);
final var part1 = part0.next().get();
final var part2 = part1.next().get();
final var part3 = part2.next().get();
// When
final FetchEnumeration fetchEnumeration =
new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1);
when(chunkManager.fetchPartContent(eq(SEGMENT_KEY), eq(manifest), any()))
.thenReturn(new ByteArrayInputStream(CHUNK_CONTENT));
// Then
assertThat(fetchEnumeration.parts()).containsExactly(part3);
assertThat(fetchEnumeration.firstChunk.id).isEqualTo(fetchEnumeration.lastChunk.id);
assertThat(fetchEnumeration.nextElement()).hasContent("234");
assertThat(fetchEnumeration.hasMoreElements()).isFalse();
Expand All @@ -126,22 +131,24 @@ void shouldReturnRangeFromSingleChunk() throws StorageBackendException {
@Test
void shouldReturnRangeFromMultipleParts() throws StorageBackendException {
// Given a set of 10 chunks with 10 bytes each
// When
final int from = 15;
final int to = 34;
final FetchEnumeration fetchEnumeration =
new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1);
final var part0 = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1);
final var part1 = part0.next().get();
final var part2 = part1.next().get();
final var part3 = part2.next().get();
// When
final FetchEnumeration fetchEnumeration =
new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1);

when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part1))
.thenReturn(new ByteArrayInputStream(CHUNK_CONTENT));
final var part2 = part1.next().get();
when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part2))
.thenReturn(new ByteArrayInputStream(CHUNK_CONTENT));
final var part3 = part2.next().get();
when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part3))
.thenReturn(new ByteArrayInputStream(CHUNK_CONTENT));
// Then
assertThat(fetchEnumeration.parts()).containsExactly(part1, part2, part3);
assertThat(fetchEnumeration.firstChunk.id).isNotEqualTo(fetchEnumeration.lastChunk.id);
assertThat(fetchEnumeration.nextElement()).hasContent("56789");
assertThat(fetchEnumeration.nextElement()).hasContent("0123456789");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.InputStream;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
Expand Down Expand Up @@ -210,6 +211,48 @@ void sizeBasedEviction() throws IOException, StorageBackendException {
verify(fetchManager, times(3)).fetchPartContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any());
}

@Test
void preparingParts() throws Exception {
fetchCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1"
));
fetchCache.prepareParts(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, Set.of(firstPart, nextPart));
await().pollInterval(Duration.ofMillis(5)).until(() -> fetchCache.statsCounter.snapshot().loadCount() == 2);
verify(fetchManager, times(2)).fetchPartContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any());

final InputStream cachedPart0 =
fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThat(cachedPart0).hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(fetchManager);

final InputStream cachedPart1 =
fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
assertThat(cachedPart1).hasBinaryContent(CHUNK_1);
verifyNoMoreInteractions(fetchManager);
}

@Test
void preparingFirstPart() throws Exception {
fetchCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1"
));
fetchCache.prepareParts(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, Set.of(firstPart));
await().pollInterval(Duration.ofMillis(5))
.until(() -> fetchCache.statsCounter.snapshot().loadCount() == 1);
verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);

final InputStream cachedPart0 =
fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThat(cachedPart0).hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(fetchManager);

final InputStream cachedPart1 =
fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
assertThat(cachedPart1).hasBinaryContent(CHUNK_1);
verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
}
}

@Nested
Expand Down

0 comments on commit 00715a7

Please sign in to comment.