Skip to content

Commit

Permalink
feat: eagerly fetch next chunk
Browse files Browse the repository at this point in the history
To improve fetching performance, next chunk can be fetched as soon as a full intermediate chunk is returned.
  • Loading branch information
jeqo committed Sep 20, 2023
1 parent 38831b0 commit 994fd3f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<suppress checks="ClassDataAbstractionCoupling" files="MetricCollector.java"/>
<suppress checks="CyclomaticComplexity" files="MetricCollector.java"/>
<suppress checks="CyclomaticComplexity" files="SingleBrokerTest.java"/>
<suppress checks="CyclomaticComplexity" files="FetchChunkEnumeration.java"/>
<suppress checks="NPathComplexity" files="SingleBrokerTest.java"/>
<suppress checks="JavaNCSSCheck" files="MetricsRegistry.java"/>
<suppress checks="JavaNCSSCheck" files="RemoteStorageManagerMetricsTest.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
Expand All @@ -43,13 +45,13 @@ public class FetchChunkEnumeration implements Enumeration<InputStream> {
private final ChunkIndex chunkIndex;
int currentChunkId;
public boolean closed;
CompletableFuture<InputStream> nextChunk = null;

/**
*
* @param chunkManager provides chunk input to fetch from
* @param chunkManager provides chunk input to fetch from
* @param objectKeyPath required by chunkManager
* @param manifest provides to index to build response from
* @param range original offset range start/end position
* @param manifest provides to index to build response from
* @param range original offset range start/end position
*/
public FetchChunkEnumeration(final ChunkManager chunkManager,
final String objectKeyPath,
Expand Down Expand Up @@ -99,7 +101,17 @@ public InputStream nextElement() {
throw new NoSuchElementException();
}

InputStream chunkContent = getChunkContent(currentChunkId);
InputStream chunkContent;
if (nextChunk == null) {
chunkContent = getChunkContent(currentChunkId);
} else {
try { // continue fetching
chunkContent = nextChunk.get();
nextChunk = null;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

final Chunk currentChunk = chunkIndex.chunks().get(currentChunkId);
final int chunkStartPosition = currentChunk.originalPosition;
Expand Down Expand Up @@ -131,6 +143,12 @@ public InputStream nextElement() {
}

currentChunkId += 1;

// eagerly fetching next chunk for caching
if (currentChunkId <= lastChunkId) {
nextChunk = CompletableFuture.supplyAsync(() -> getChunkContent(currentChunkId));
}

return chunkContent;
}

Expand All @@ -147,6 +165,7 @@ public InputStream toInputStream() {
}

public void close() {
nextChunk = null;
closed = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private static class TestObjectFetcher implements ObjectFetcher {

@Override
public InputStream fetch(final String key) throws StorageBackendException {
throw new RuntimeException("Should not be called");
throw new StorageBackendException("Should not be called");
}

@Override
Expand Down Expand Up @@ -162,7 +162,7 @@ public void assertAllStreamsWereClosed(final boolean readFully) throws IOExcepti
verify(is).close();
}
} else {
assertThat(openInputStreams).hasSize(1);
assertThat(openInputStreams).hasSizeBetween(1, 2);
verify(openInputStreams.get(0)).close();
}
}
Expand Down

0 comments on commit 994fd3f

Please sign in to comment.